查询执行
查询执行逻辑
所有的查询会依次转化为SqlQueryExecution、SqlStageExecution和SqlTaskExecution,并在不同(除非设置允许在同一Worker上运行来自同一SqlStage的多个任务)的Worker上执行Task。每个Task处理一个或多个Split并传递给下游
Task调度
SourceTask调度
总流程如下:
1. 获取Node和Split的关系
2. 根据这个关系在指定的Node上启动SourceTask处理对应的Split
- assignSplits(nextTaskId, splitAssignment)
- nextTaskId 用于生成新Task的Id
- splitAssignment 类型为
Multimap<Node, Split>
每个Node对应一个或多个Split
该方法主要完成SourceTask的启动和执行,会对splitAssignment进行遍历,并对每个Entry进行如下操作
– 根据Node获取该Node上的Task
– 如果Task为空,则在Node上新建一个Task;否则把对应Split提交给改Node上的Task处理 -
scheduleTask(nextTaskId.getAndIncrement(), node, fragment.getPartitionedSource(), taskSplits.getValue())
该方法在指定的Node上启动并执行一个新Task- 更新当前Stage的所有exchange和outputBuffers
- 更新,获取ExchangeLocations,并与现有的合并
- 更新,获取当前Stage最新的outputBuffers.
- 遍历当前Stage所有Task,并执行以下操作
getNewExchangeLocations()
遍历最新的ExchangeLocations,根据上游Stage的每个TaskLocation生成一个Split,并更新到Task处理的Split列表中- 遍历上游Stage的每个Task的TaskLocation对应URI是否在exchangeLocations中,如果存在则继续遍历,否则将该URI加入到本Stage的exchangeLocations中
updateToNextOutputBuffers()
将最新的outputBuffers更新到Task中- 如果nextOutputBuffers(指向parentTasksAdded修改后的最新OutputBuffers)为空,则返回currentOutputBuffers(指向parentTasksAdded修改前的最新OutputBuffers);否则
currentOutputBuffers = nextOutputBuffers;
nextOutputBuffers = null;
return currentOutputBuffers;
- 如果nextOutputBuffers(指向parentTasksAdded修改后的最新OutputBuffers)为空,则返回currentOutputBuffers(指向parentTasksAdded修改前的最新OutputBuffers);否则
- 将传入的Split与上游Stage中的每个Task输出组合作为整体的Split集合:initialSplits
- 根据相关信息,启动Task并处理initialSplits
- 更新当前Stage的所有exchange和outputBuffers
Fixed Task调度
将Join两边的数据集分成等量个数(initial-hash-partitions)个子集,并将Hash相等的行分布到同一台worker上进行Join。同样通过scheduleTask
方法在对应Node上启动Task,除此之外,还通过subStage.parentTasksAdded(tasks.build(),true)
通知上游Stage,针对当前Stage的所有Task创建或更新OutputBuffers
subStage.parentTasksAdded(tasks.build(),true)
通知上游Stage,针对当前Stage的所有Task创建或更新OutputBuffers- 判断当前Stage(相对于Fixed Stage来说,即上游Stage)的输出是否需要Hash
- 不需要Hash则针对每个taskId创建UnpartitionedPagePartitionFunction对象,并放入newBuffer中
- 需要Hash则对每个taskId创建HashPagePartitionFunction,并放入newBuffers中
- 针对所有FixedTask生成更新后的nextOutputBuffers,并最终用于subStage中的Task生成的数据输出给Fixed Stage中的Task
Single Task 调度
属于SingleStage,一般是将上游Stage的数据汇聚到一个Worker上汇总运算,如全局排序。调用方式与Fixed Task完全一致,只是SingleTask只有一个Task,而FixedTask有多个
Coordinator_Only Task调度
遇到DDL或DML语句时,直接在Coordinator上对元数据进行修改,直接在Coordinator上运行一个Task而不再随机选择节点。
Task执行
运行Task
- TaskExecutor
用于运行实际任务的线程池的包装类,用于处理Worker上运行所有Task中的Split。Worker启动时调用start()
方法运行 -
TaskExecutor.Runner
内部类,处理各个Split,对应TaskExecutor线程池中的一个线程。- 循环获取pendingSplits里的Split进行处理
- 将取出的Split加入runningSplits队列
- 调用各个Split的process()方法,一段时间后返回
- 如果执行完成,则将split从runningSplits队列中移除
- 如果本次执行完成,但Split未完成,则split放回pendingSplits
- 如果本次执行未完成,则将split放入blockedSplits,并添加执行完成的listener,等本次执行完成后放入pendingSplits
- 最终判断循环结束,是否需要添加新的Runner继续运行
- PrioritizedSplitRunner
此处所有的Split都是PrioritizedSplitRunner对象,调用其process()
进行实际处理;在process()
方法中调用processFor
进行实际处理 - DriverSplitRunner
在Presto中SplitRunner的实现类为DriverSplitRunner。其中processFor(T)
传入的T是时间段,默认1s。如果1s后Split没有处理完成,则会返回相应的ListenableFuture,然后由Runner线程继续处理pending队列中的下一个Split -
Driver
作用于Split上的一系列操作为Driver类。对Split的实际处理的Driver.processInternal()
- 如果有未处理读取的Split,则加入SourceOperator中
- 如果只有一个operator,则单独处理
- 如果已经执行完成,返回NotBlocked
- 获得operator
- 通过
isBlocked
判断是否阻塞 - 如果阻塞直接返回ListenableFuture
- 不阻塞则结束当前operator
- 如果有多个operator
- 循环从operators中去两个相邻operator,得到前一个operator输出,并作为下一个operator的输入
-
每次执行Split计算的时候,会遍历该Split上的所有operator,将当前的outputPage作为下个operator的inputPage,并交给下一个operator操作;直到Driver封装的所有operator执行完成
-
Operator对page的处理集中在
addInput(page)
和getOutput()
中,不同的实现类实现方式不一样。