查询执行

查询执行逻辑

所有的查询会依次转化为SqlQueryExecution、SqlStageExecution和SqlTaskExecution,并在不同(除非设置允许在同一Worker上运行来自同一SqlStage的多个任务)的Worker上执行Task。每个Task处理一个或多个Split并传递给下游

Task调度

SourceTask调度

总流程如下:
1. 获取Node和Split的关系
2. 根据这个关系在指定的Node上启动SourceTask处理对应的Split

  • assignSplits(nextTaskId, splitAssignment)
    • nextTaskId 用于生成新Task的Id
    • splitAssignment 类型为Multimap<Node, Split> 每个Node对应一个或多个Split

    该方法主要完成SourceTask的启动和执行,会对splitAssignment进行遍历,并对每个Entry进行如下操作
    – 根据Node获取该Node上的Task
    – 如果Task为空,则在Node上新建一个Task;否则把对应Split提交给改Node上的Task处理

  • scheduleTask(nextTaskId.getAndIncrement(), node, fragment.getPartitionedSource(), taskSplits.getValue())
    该方法在指定的Node上启动并执行一个新Task

    1. 更新当前Stage的所有exchange和outputBuffers
      1. 更新,获取ExchangeLocations,并与现有的合并
      2. 更新,获取当前Stage最新的outputBuffers.
      3. 遍历当前Stage所有Task,并执行以下操作
        1. getNewExchangeLocations() 遍历最新的ExchangeLocations,根据上游Stage的每个TaskLocation生成一个Split,并更新到Task处理的Split列表中
          • 遍历上游Stage的每个Task的TaskLocation对应URI是否在exchangeLocations中,如果存在则继续遍历,否则将该URI加入到本Stage的exchangeLocations中
        2. updateToNextOutputBuffers() 将最新的outputBuffers更新到Task中
          • 如果nextOutputBuffers(指向parentTasksAdded修改后的最新OutputBuffers)为空,则返回currentOutputBuffers(指向parentTasksAdded修改前的最新OutputBuffers);否则
            currentOutputBuffers = nextOutputBuffers;
            nextOutputBuffers = null;
            return currentOutputBuffers;
    2. 将传入的Split与上游Stage中的每个Task输出组合作为整体的Split集合:initialSplits
    3. 根据相关信息,启动Task并处理initialSplits

Fixed Task调度

将Join两边的数据集分成等量个数(initial-hash-partitions)个子集,并将Hash相等的行分布到同一台worker上进行Join。同样通过scheduleTask方法在对应Node上启动Task,除此之外,还通过subStage.parentTasksAdded(tasks.build(),true)通知上游Stage,针对当前Stage的所有Task创建或更新OutputBuffers

  • subStage.parentTasksAdded(tasks.build(),true)
    通知上游Stage,针对当前Stage的所有Task创建或更新OutputBuffers

    1. 判断当前Stage(相对于Fixed Stage来说,即上游Stage)的输出是否需要Hash
    2. 不需要Hash则针对每个taskId创建UnpartitionedPagePartitionFunction对象,并放入newBuffer中
    3. 需要Hash则对每个taskId创建HashPagePartitionFunction,并放入newBuffers中
    4. 针对所有FixedTask生成更新后的nextOutputBuffers,并最终用于subStage中的Task生成的数据输出给Fixed Stage中的Task

Single Task 调度

属于SingleStage,一般是将上游Stage的数据汇聚到一个Worker上汇总运算,如全局排序。调用方式与Fixed Task完全一致,只是SingleTask只有一个Task,而FixedTask有多个

Coordinator_Only Task调度

遇到DDL或DML语句时,直接在Coordinator上对元数据进行修改,直接在Coordinator上运行一个Task而不再随机选择节点。

Task执行

运行Task

  • TaskExecutor
    用于运行实际任务的线程池的包装类,用于处理Worker上运行所有Task中的Split。Worker启动时调用start()方法运行

  • TaskExecutor.Runner
    内部类,处理各个Split,对应TaskExecutor线程池中的一个线程。

    • 循环获取pendingSplits里的Split进行处理
    • 将取出的Split加入runningSplits队列
    • 调用各个Split的process()方法,一段时间后返回
    • 如果执行完成,则将split从runningSplits队列中移除
    • 如果本次执行完成,但Split未完成,则split放回pendingSplits
    • 如果本次执行未完成,则将split放入blockedSplits,并添加执行完成的listener,等本次执行完成后放入pendingSplits
    • 最终判断循环结束,是否需要添加新的Runner继续运行
  • PrioritizedSplitRunner
    此处所有的Split都是PrioritizedSplitRunner对象,调用其process()进行实际处理;在process()方法中调用processFor进行实际处理

  • DriverSplitRunner
    在Presto中SplitRunner的实现类为DriverSplitRunner。其中processFor(T)传入的T是时间段,默认1s。如果1s后Split没有处理完成,则会返回相应的ListenableFuture,然后由Runner线程继续处理pending队列中的下一个Split
  • Driver
    作用于Split上的一系列操作为Driver类。对Split的实际处理的Driver.processInternal()

    • 如果有未处理读取的Split,则加入SourceOperator中
    • 如果只有一个operator,则单独处理
      • 如果已经执行完成,返回NotBlocked
      • 获得operator
      • 通过isBlocked判断是否阻塞
      • 如果阻塞直接返回ListenableFuture
      • 不阻塞则结束当前operator
    • 如果有多个operator
      • 循环从operators中去两个相邻operator,得到前一个operator输出,并作为下一个operator的输入
  • 每次执行Split计算的时候,会遍历该Split上的所有operator,将当前的outputPage作为下个operator的inputPage,并交给下一个operator操作;直到Driver封装的所有operator执行完成

  • Operator对page的处理集中在addInput(page)getOutput()中,不同的实现类实现方式不一样。