presto代码解析一

时间:2022-11-26 00:51:23

1.TaskExecutor

    一个TaskExecutor负责执行多个split的实际操作,首先会构造一个线程池,在线程池中预制多个线程。
    public synchronized void start()
{
checkState(!closed, "TaskExecutor is closed");
for (int i = 0; i < runnerThreads; i++) {
addRunnerThread();
}
}

Runner是实际的线程类,继承了runnable接口,其run方法实现了比较复杂的操作

1 重要类介绍

PrioritizedSplitRunner

1 run方法流程介绍

 首先从pendingSplits中获取到一个被挂起的还没有线程资源的split,把这个split加入到runningSplits中,然后调用该split的

split的数据,处理的流程是“并行遍历”这个driver的operators集合,这个operator集合中包含了处理这个split数据的所有操作符。
在构造完一个线程后,就会尝试去pendingSplits中获取split,这是一个priorityBlockingQuene,如果没有元素,就会阻塞。

2 操作符介绍

2 ScanFilterAndProjectOperator(已挪至专题文章进行介绍)

 这个操作符用于扫描原始数据,从操作符的名字可以看出,这个操作符其实可以完成三部分工作:扫描+过滤+投影

1 重要变量包括:

       1)) CursorProcessor cursorProcessor
代码生成,用于快速扫描数据源
2)) PageProcessor pageProcessor
代码生成,用于快速处理一个page内的数据
3)) boolean finishing
用于标示是否扫面完split内的全部数据

2 重要方法包括:

       1)) Page getOutput()
public Page getOutput()
{
if (!finishing) {
createSourceIfNecessary(); //根据不同的connector创建数据源

if (cursor != null) {
int rowsProcessed = cursorProcessor.process(operatorContext.getSession().toConnectorSession(), cursor, ROWS_PER_PAGE, pageBuilder);//使用代码生成扫描数据源,迭代读取过程

pageSourceMemoryContext.setBytes(cursor.getSystemMemoryUsage());

long bytesProcessed = cursor.getCompletedBytes() - completedBytes;
long elapsedNanos = cursor.getReadTimeNanos() - readTimeNanos;
operatorContext.recordGeneratedInput(bytesProcessed, rowsProcessed, elapsedNanos);
completedBytes = cursor.getCompletedBytes();
readTimeNanos = cursor.getReadTimeNanos();

if (rowsProcessed == 0) {
finishing = true;
}
}
else {
if (currentPage == null) {
currentPage = pageSource.getNextPage();

if (currentPage != null) {
// update operator stats
long endCompletedBytes = pageSource.getCompletedBytes();
long endReadTimeNanos = pageSource.getReadTimeNanos();
operatorContext.recordGeneratedInput(endCompletedBytes - completedBytes, currentPage.getPositionCount(), endReadTimeNanos - readTimeNanos);
completedBytes = endCompletedBytes;
readTimeNanos = endReadTimeNanos;
}

currentPosition = 0;
}

if (currentPage != null) {
switch (processingOptimization) { //根据不用的优化规则,对一个page内的数据进行优化,具体怎么优化还得再看看
case COLUMNAR: {
Page page = pageProcessor.processColumnar(operatorContext.getSession().toConnectorSession(), currentPage, getTypes());
currentPage = null;
currentPosition = 0;
return page;
}
case COLUMNAR_DICTIONARY: {
Page page = pageProcessor.processColumnarDictionary(operatorContext.getSession().toConnectorSession(), currentPage, getTypes());
currentPage = null;
currentPosition = 0;
return page;
}
case DISABLED: {
currentPosition = pageProcessor.process(operatorContext.getSession().toConnectorSession(), currentPage, currentPosition, currentPage.getPositionCount(), pageBuilder);
if (currentPosition == currentPage.getPositionCount()) {
currentPage = null;
currentPosition = 0;
}
break;
}
default:
throw new IllegalStateException(String.format("Found unexpected value %s for processingOptimization", processingOptimization));
}
}

pageSourceMemoryContext.setBytes(pageSource.getSystemMemoryUsage());
}
}

// only return a full page if buffer is full or we are finishing
if (pageBuilder.isEmpty() || (!finishing && !pageBuilder.isFull())) {
pageBuilderMemoryContext.setBytes(pageBuilder.getRetainedSizeInBytes());
return null;
}

Page page = pageBuilder.build();// pagebuilder中包含了扫描出来的数据,每个block以blockbuilder形式暂存,对page进行build时,其实就是使用blockbuilder构建block
pageBuilder.reset();

pageBuilderMemoryContext.setBytes(pageBuilder.getRetainedSizeInBytes());
return page;
}
 2)) addInput()方法   
    public final void addInput(Page page)
{
throw new UnsupportedOperationException();
}
对于扫描操作符来说,不需要添加page,因为该方法一般是操作符作为next操作符时才进行调用,而扫描操作符只可能是第一个操作符,不可能作为下一个操作符使用。

2 HashAggregationOperator

1 重要变量

  List<Type> groupByTypes //分组字段的类型,因为可能有多个分组字段,所以用集合表示
List<Integer> groupByChannels //分组通道,这里说的通道即分组的字段在传入的字段的位置,用整型表示
Step step //聚合的步骤,主要有partial,即部分聚合;final,即最终聚合
List<AccumulatorFactory> accumulatorFactories //聚合器工厂,根据不同的聚合类型产生不同的聚合器
Optional<Integer> hashChannel //进行hash的通道,和分组通道类似,一般根据什么分组,就要根据什么字段组合进行hash
Iterator<Page> outputIterator //输出迭代器

2 重要方法

1)) void addInput(Page page)

 public void addInput(Page page)
{
checkState(!finishing, "Operator is already finishing");
requireNonNull(page, "page is null");
if (aggregationBuilder == null) {
aggregationBuilder = new GroupByHashAggregationBuilder(
accumulatorFactories,
step,
expectedGroups,
groupByTypes,
groupByChannels,
hashChannel,
operatorContext);

// assume initial aggregationBuilder is not full
}
else {
checkState(!aggregationBuilder.isFull(), "Aggregation buffer is full");
}

aggregationBuilder.processPage(page);
}

该方法上个操作符输出的page,进行数据聚集,但是只是简单的进行聚集,并没有使用聚合器进行聚合

2)) Page getOutput()
        if (outputIterator == null || !outputIterator.hasNext()) {
// current output iterator is done
outputIterator = null;

// no data
if (aggregationBuilder == null) {
return null;
}

// only flush if we are finishing or the aggregation builder is full
if (!finishing && !aggregationBuilder.isFull()) {
return null;
}

outputIterator = aggregationBuilder.build();
aggregationBuilder = null;

if (!outputIterator.hasNext()) {
// current output iterator is done
outputIterator = null;
return null;
}
}

return outputIterator.next();
   输出迭代器的computeNext方法,将对聚集好的数据执行不同类型的聚合操作,代码如下:
    public Iterator<Page> build()
{
List<Type> types = new ArrayList<>(groupByHash.getTypes());
for (Aggregator aggregator : aggregators) {
types.add(aggregator.getType());
}

final PageBuilder pageBuilder = new PageBuilder(types);
return new AbstractIterator<Page>()
{
private final int groupCount = groupByHash.getGroupCount();
private int groupId;

@Override
protected Page computeNext()
{
if (groupId >= groupCount) {
return endOfData();
}

pageBuilder.reset();

List<Type> types = groupByHash.getTypes();
while (!pageBuilder.isFull() && groupId < groupCount) {//遍历所有的组,进行聚合,这里的组已经在groupByHash中被筛选出来,用整型表示,例如有5个组,那么组id就是0,1,2,3,4
groupByHash.appendValuesTo(groupId, pageBuilder, 0);//给group by的block和hash的block进行计算并赋值,例如有两个字段进行groupby,那么就有两个groupby字段,一个复合hash字段

pageBuilder.declarePosition();
for (int i = 0; i < aggregators.size(); i++) {//遍历所有的聚合器,对聚合字段进行不同类型的聚合
Aggregator aggregator = aggregators.get(i);
BlockBuilder output = pageBuilder.getBlockBuilder(types.size() + i);
aggregator.evaluate(groupId, output);//真正的聚合计算
}

groupId++;
}

return pageBuilder.build();
}
};
}

3 FilterAndProjectOperator

聚合后的数据可能还需要进行过滤和投影,使用该操作符进行处理

4 TaskOutputOperator

一般作为task的最后一个操作符,用于数据的输出

5 ExchangeOperator

   该操作符主要用于stage间进行数据的交换

1 重要方法

addInput(Page page) 没有实现,是因为该操作符一般作为stage的第一个操作符,它的输入是从其他stage中获取到的。
Page getOutput() //使用客户端直接从上一个stage获取数据

public Page getOutput()
{
Page page = exchangeClient.pollPage();
if (page != null) {
operatorContext.recordGeneratedInput(page.getSizeInBytes(), page.getPositionCount());
}
return page;
}

6 OrderByOperator

排序操作符,用于排序操作,

1 重要变量

1) List sortChannels //根据什么字段进行排序,这里的整型值代表字段在schema中的位置
2) List sortOrder //排序的类型,升序还是将序排列
3) List types //所有相关字段的类型

2 重要方法

1) addInput方法

    public void addInput(Page page)
{
checkState(state == State.NEEDS_INPUT, "Operator is already finishing");
requireNonNull(page, "page is null");

pageIndex.addPage(page);//添加到pageindex中,具体这个pageindex有什么用还不清楚
operatorContext.setMemoryReservation(pageIndex.getEstimatedSize().toBytes());
}

2) getOutput方法

if (state != State.HAS_OUTPUT) {
return null;
}

if (currentPosition >= pageIndex.getPositionCount()) {
state = State.FINISHED;
return null;
}

// iterate through the positions sequentially until we have one full page
pageBuilder.reset();
currentPosition = pageIndex.buildPage(currentPosition, outputChannels, pageBuilder);

// output the page if we have any data
if (pageBuilder.isEmpty()) {
state = State.FINISHED;
return null;
}

Page page = pageBuilder.build();
return page;
}

3) finish方法
这个方法才是真正的排序方法

    public void finish()
{
if (state == State.NEEDS_INPUT) {
state = State.HAS_OUTPUT;

// sort the index
pageIndex.sort(sortChannels, sortOrder);//对page内的数据进行排序
}
}

7 TopNOperator

presto支持limit操作,但是不支持limit n,m操作,limit操作就是使用TopNOperotor操作符完成的
1) addInput方法

    public void addInput(Page page)
{
checkState(!finishing, "Operator is already finishing");
requireNonNull(page, "page is null");
if (topNBuilder == null) {
topNBuilder = new TopNBuilder(
n,
partial,
sortTypes,
sortChannels,
sortOrders,
operatorContext);
}

checkState(!topNBuilder.isFull(), "Aggregation buffer is full");
topNBuilder.processPage(page);//把page添加到一个全局的优先级队列中,该队列的大小为limit的个数,在队列内部的数据就是当前最大或者最小的n个数据,一旦发现有比这n个数据大或者小的数据,就放到队列中来,把不符合要求的踢出队列
}