Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)
上QQ阅读APP看书,第一时间看更新

4.2 DAGScheduler解析

DAGScheduler是面向Stage的高层调度器。本节讲解DAG的定义、DAG的实例化、DAGScheduler划分Stage的原理、DAGScheduler划分Stage的具体算法、Stage内部Task获取最佳位置的算法等内容。

4.2.1 DAG的定义

DAGScheduler是面向Stage的高层级的调度器,DAGScheduler把DAG拆分成很多的Tasks,每组的Tasks都是一个Stage,解析时是以Shuffle为边界反向解析构建Stage,每当遇到Shuffle,就会产生新的Stage,然后以一个个TaskSet(每个Stage封装一个TaskSet)的形式提交给底层调度器TaskScheduler。DAGScheduler需要记录哪些RDD被存入磁盘等物化动作,同时要寻求Task的最优化调度,如在Stage内部数据的本地性等。DAGScheduler还需要监视因为Shuffle跨节点输出可能导致的失败,如果发现这个Stage失败,可能就要重新提交该Stage。

为了更好地理解Spark高层调度器DAGScheduler,须综合理解RDD、Application、Driver Program、Job内容,还需要了解以下概念。

(1)Stage:一个Job需要拆分成多组任务来完成,每组任务由Stage封装。与一个Job所有涉及的PartitionRDD类似,Stage之间也有依赖关系。

(2)TaskSet:一组任务就是一个TaskSet,对应一个Stage。其中,一个TaskSet的所有Task之间没有Shuffle依赖,因此互相之间可以并行运行。

(3)Task:一个独立的工作单元,由Driver Program发送到Executor上去执行。通常情况下,一个Task处理RDD的一个Partition的数据。根据Task返回类型的不同,Task又分为ShuffleMapTask和ResultTask。

4.2.2 DAG的实例化

在Spark源码中,DAGScheduler是整个Spark Application的入口,即在SparkContext中声明并实例化。在实例化DAGScheduler之前,已经实例化了SchedulerBackend和底层调度器TaskScheduler,而SchedulerBackend和TaskScheduler是通过SparkContext的方法createTaskScheduler实例化的。DAGScheduler在提交TaskSet给底层调度器的时候是面向TaskScheduler接口的,这符合面向对象中依赖抽象,而不依赖具体实现的原则,带来底层资源调度器的可插拔性,以至于Spark可以运行在众多的部署模式上,如Standalone、Yarn、Mesos、Local及其他自定义的部署模式。

SparkContext.scala的源码中相关的代码如下:

DAGScheduler.scala的源码中相关的代码如下:

4.2.3 DAGScheduler划分Stage的原理

Spark将数据在分布式环境下分区,然后将作业转化为DAG,并分阶段进行DAG的调度和任务的分布式并行处理。DAG将调度提交给DAGScheduler,DAGScheduler调度时会根据是否需要经过Shuffle过程将Job划分为多个Stage。

DAG划分Stage及Stage并行计算示意图如图4-3所示。

其中,实线圆角方框标识的是RDD,方框中的矩形块为RDD的分区。

在图4-3中,RDD A到RDD B之间,以及RDD F到RDD G之间的数据需要经过Shuffle过程,因此RDD A和RDD F分别是Stage 1跟Stage 3和Stage 2跟Stage 3的划分点。而RDD B到RDD G之间,以及RDD C到RDD D到RDD F和RDD E到RDD F之间的数据不需要经过Shuffle过程,因此,RDD G和RDD B的依赖是窄依赖,RDD B和RDD G划分到同一个Stage 3,RDD F和RDD D和RDD E的依赖以及RDD D和RDD C的依赖是窄依赖,RDD C、RDD D、RDD E和RDD F划分到同一个Stage 2。Stage 1和Stage 2是相互独立的,可以并发执行。而由于Stage 3依赖Stage 1和Stage 2的计算结果,所以Stage 3最后执行计算。

图4-3 DAG划分Stage及Stage并行计算示意图

根据以上RDD依赖关系的描述,图4-3中的操作算子中,map和union是窄依赖的操作,因为子RDD(如D)的分区只依赖父RDD(如C)的一个分区,其他常见的窄依赖的操作如filter、flatMap和join(每个分区和已知的分区join)等。groupByKey和join是宽依赖的操作,其他常见的宽依赖的操作如reduceByKey等。

由此可见,在DAGScheduler的调度过程中,Stage阶段的划分是根据是否有Shuffle过程,也就是当存在ShuffleDependency的宽依赖时,需要进行Shuffle,这时才会将作业(Job)划分成多个Stage。

4.2.4 DAGScheduler划分Stage的具体算法

Spark作业调度的时候,在Job提交过程中进行Stage划分以及确定Task的最佳位置。Stage的划分是DAGScheduler工作的核心,涉及作业在集群中怎么运行,Task最佳位置数据本地性的内容。Spark算子的构建是链式的,涉及怎么进行计算,首先是划分Stage,Stage划分以后才是计算的本身;分布式大数据系统追求最大化的数据本地性。数据本地性是指数据进行计算的时候,数据就在内存中,甚至不用计算就直接获得结果。

Spark Application中可以因为不同的Action触发众多的Job。也就是说,一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage。也就是说,只有前面依赖的Stage计算完毕后,后面的Stage才会运行。

Stage划分的根据是宽依赖。什么时候产生宽依赖呢?例如,reducByKey、groupByKey等。

我们从RDD的collect()方法开始,collect算子是一个Action,会触发Job的运行。

RDD.scala的collect方法的源码调用了runJob方法。

进入SparkContext.scala的runJob方法如下:

继续重载runJob方法如下:

继续重载runJob方法,SparkContext.scala的源码如下:

继续重载runJob方法如下:

进入DAGScheduler.scala的runJob方法,DAGScheduler.scala的源码如下:

DAGScheduler runJob的时候就交给了submitJob,waiter等待作业调度的结果,作业成功或者失败,打印相关的日志信息。进入DAGScheduler的submitJob方法如下:

submitJob方法中,submitJob首先获取rdd.partitions.length,校验运行的时候partitions是否存在。submitJob方法关键的代码是eventProcessLoop.post(JobSubmitted的JobSubmitted,JobSubmitted是一个case class,而不是一个case object,因为application中有很多的Job,不同的Job的JobSubmitted实例不一样,如果使用case object,case object展示的内容是一样的,就像全局唯一变量,而现在我们需要不同的实例,因此使用case class。JobSubmitted的成员finalRDD是最后一个RDD。

由Action(如collect)导致SparkContext.runJob的执行,最终导致DAGScheduler中的submitJob的执行,其核心是通过发送一个case class JobSubmitted对象给eventProcessLoop。其中,JobSubmitted的源码如下:

JobSubmitted是private[scheduler]级别的,用户不可直接调用它。JobSubmitted封装了jobId,封装了最后一个finalRDD,封装了具体对RDD操作的函数func,封装了有哪些partitions要进行计算,也封装了作业监听器listener、状态等内容。

DAGScheduler的submitJob方法关键代码eventProcessLoop.post(JobSubmitted中,将JobSubmitted放入到eventProcessLoop。post就是Java中的post,往一个线程中发一个消息。eventProcessLoop的源码如下:

DAGSchedulerEventProcessLoop继承自EventLoop。

EventLoop中开启了一个线程eventThread,线程设置成Daemon后台运行的方式;run方法里面调用了onReceive(event)方法。post方法就是往eventQueue.put事件队列中放入一个元素。EventLoop的源码如下:

eventProcessLoop是DAGSchedulerEventProcessLoo实例,DAGSchedulerEventProcessLoop继承自EventLoop,具体实现onReceive方法,onReceive方法又调用doOnReceive方法。

doOnReceive收到消息后开始处理。

Spark 2.2.1版本的DAGScheduler.scala的源码如下:

Spark 2.4.3版本的DAGScheduler.scala的源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第24行及第28行,将filesLost变量名称修改为workerLost。

 上段代码中第29行新增加收到WorkerRemoved消息的处理。

 上段代码中第32行新增加收到SpeculativeTaskSubmitted消息的处理。

总结:EventLoop里面开启一个线程,线程里面不断循环一个队列,post的时候就是将消息放到队列中,由于消息放到队列中,在不断循环,所以可以拿到这个消息,转过来回调方法onReceive(event),在onReceive处理的时候就调用了doOnReceive方法。

关于线程的异步通信:为什么要新开辟一条线程?例如,在DAGScheduler发送消息为何不直接调用doOnReceive,而需要一个消息循环器。DAGScheduler这里自己给自己发消息,不管是自己发消息,还是别人发消息,都采用一条线程去处理,两者处理的逻辑是一致的,扩展性就非常好。使用消息循环器,就能统一处理所有的消息,保证处理的业务逻辑都是一致的。

eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGScheduler-EventProcessLoop是EventLoop的子类,具体实现EventLoop的onReceive方法,onReceive方法转过来回调doOnReceive。

在doOnReceive中通过模式匹配的方式把执行路由到case JobSubmitted,调用dagScheduler.handleJobSubmitted方法。

Spark 2.2.1版本的DAGScheduler的handleJobSubmitted的源码如下:

Spark 2.4.3版本的DAGScheduler.scala的源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第13行代码之后新增BarrierJobSlotsNumberCheckFailed异常情况的处理。

 上段代码中第19行代码之前新增清除内部数据的代码。

Stage开始:每次调用一个runJob就产生一个Job;finalStage是一个ResultStage,最后一个Stage是ResultStage,前面的Stage是ShuffleMapStage。

在handleJobSubmitted中首先创建finalStage,创建finalStage时会建立父Stage的依赖链条。

通过createResultStage创建finalStage,传入的参数包括最后一个finalRDD,操作的函数func,分区partitions、jobId、callSite等内容。创建过程中可能捕获异常。例如,在Hadoop上,底层的hdfs文件被删除了或者被修改了,就出现异常。

Spark 2.2.1版本的createResultStage的源码如下:

Spark 2.4.3版本的DAGScheduler.scala的源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第6行代码之后新增检查屏障阶段的代码。

三个检查屏障阶段的方法如下。

(1)checkBarrierStageWithDynamicAllocation:如果在启用动态资源分配的情况下运行屏障阶段,将在作业提交时执行检查并快速失败。

(2)checkBarrierStageWithNumSlots:检查屏障阶段是否需要比当前活动插槽总数更多的插槽(以便能够启动屏障阶段的全部任务)。如果试图提交一个障碍阶段,需要比当前总数更多的插槽,检查会失败。如果检查连续失败,超过了作业的配置数,当前作业的提交将失败。

(3)checkBarrierStageWithRDDChainPattern:检查以确保我们不使用不支持的RDD链模式启动屏障阶段,以下模式不支持。

①与RDD具有不同分区数的父RDD(例如union()/coalesce()/first()/take()/PartitionPruning-RDD)。

②依赖多个屏障RDD的RDD(例如barrierRdd1.zip(barrierRdd2))。

createResultStage中,基于作业ID,作业ID(jobId)是作为第三个参数传进来的,创建了ResultStage。

createResultStage的getOrCreateParentStages获取或创建一个给定RDD的父Stages列表,新的Stages将提供firstJobId创建。

getOrCreateParentStages的源码如下:

getOrCreateParentStages调用了getShuffleDependencies(rdd),getShuffleDependencies返回给定RDD的父节点中直接的shuffle依赖。这个函数不会返回更远祖先节点的依赖。例如,如果C shuffle依赖于B,B shuffle依赖于A:A <-- B <-- C。在RDD C中调用getShuffleDependencies函数,将只返回B <-- C的依赖。此功能可用作单元测试。

下面根据DAG划分Stage示意图,如图4-4所示。

图4-4 DAG划分Stage示意图

RDD G在getOrCreateParentStages的getShuffleDependencies的时候同时依赖于RDD B,RDD F;看依赖关系,RDD G和RDD B在同一个Stage里,RDD G和RDD F不在同一个Stage里,根据Shuffle依赖产生了一个新的Stage。如果不是Shuffle级别的依赖,就将其加入waitingForVisit.push(dependency.rdd),waitingForVisit是一个栈Stack,把当前依赖的RDD push进去。然后进行while循环,当waitingForVisit不是空的情况下,将waitingForVisit.pop()的内容弹出来放入到toVisit,如果已经访问过的数据结构visited中没有访问记录,那么toVisit.dependencies再次循环遍历:如果是Shuffle依赖,就加入到parents数据结构;如果是窄依赖,就加入到waitingForVisit。

例如,首先将RDD G放入到waitingForVisit,然后看RDD G的依赖关系,依赖RDD B、RDD F;RDD G和RDD F构成的是宽依赖,所以就加入父Stage里,是一个新的Stage。但如果是窄依赖,就把RDD B放入到栈waitingForVisit中,RDD G和RDD B在同一个Stage中。栈waitingForVisit现在又有新的元素RDD B,然后再次进行循环,获取到宽依赖RDD A,将构成一个新的Stage。RDD G的getShuffleDependencies最终返回HashSet(ShuffleDependency(RDD F),ShuffleDependency(RDD A))。然后getShuffleDependencies(rdd).map遍历调用getOrCreateShuffleMapStage直接创建父Stage。

getShuffleDependencies的源码如下:

getOrCreateParentStages方法中通过getShuffleDependencies(rdd).map进行map转换时用了getOrCreateShuffleMapStage方法。如果在shuffleIdToMapStage数据结构中shuffleId已经存在,那就获取一个shuffle map stage,否则,如果shuffle map stage不存在,除了即将进行计算的更远祖先节点的shuffle map stage,还将创建一个自己的shuffle map stage。

getOrCreateShuffleMapStage的源码如下:

getOrCreateShuffleMapStage方法中:

 如果根据shuffleId模式匹配获取到Stage,就返回Stage。首先从shuffleIdToMapStage中根据shuffleId获取Stage。shuffleIdToMapStage是一个HashMap数据结构,将Shuffle dependency ID对应到ShuffleMapStage的映射关系,shuffleIdToMapStage只包含当前运行作业的映射数据,当Shuffle Stage作业完成时,Shuffle映射数据将被删除,Shuffle的数据将记录在MapOutputTracker中。

 如果根据shuffleId模式匹配没有获取到Stage,调用getMissingAncestorShuffle-Dependencies方法,createShuffleMapStage创建所有即将进行计算的祖先shuffle依赖的Stages。

getMissingAncestorShuffleDependencies查找shuffle依赖中还没有进行shuffleToMapStage注册的祖先节点。

Spark 2.2.1版本DAGScheduler.scala的getMissingAncestorShuffleDependencies源码如下:

Spark 2.4.3版本的DAGScheduler.scala的源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中的第2行、第3行、第6行,将Stack实例修改为ArrayStack实例。

createShuffleMapStage根据Shuffle依赖的分区创建一个ShuffleMapStage,如果前一个Stage已生成相同的Shuffle数据,那Shuffle数据仍是可用的,createShuffleMapStage方法将复制Shuffle数据的位置信息去获取数据,无须再重新生成一次数据。

Spark 2.2.1版本的DAGScheduler.scala的createShuffleMapStage源码如下:

Spark 2.4.3版本的DAGScheduler.scala的源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第2行之后新增三个检查屏障阶段的方法:checkBarrierStageWithDynamic Allocation、checkBarrierStageWithNumSlots、checkBarrierStageWithRDDChainPattern。

 上段代码中第6行构建ShuffleMapStage实例时,新增传入一个参数mapOutputTracker。

 上段代码中第12行将代码调整为!mapOutputTracker.containsShuffle(shuffleDep.shuffleId))。

 删掉上段代码中第13~22行代码。

回到handleJobSubmitted,创建finalStage以后将提交finalStage。

submitStage提交Stage,首先递归提交即将计算的父Stage。

submitStage的源码如下:

其中调用了getMissingParentStages。

DAGScheduler.scala的源码如下:

Spark 2.4.3版本的DAGScheduler.scala的源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第5行将构建Stack实例修改为构建ArrayStack实例。

1.  val waitingForVisit = new ArrayStack[RDD[_]]

接下来,我们结合Spark DAG划分Stage示意(见图4-5)进行详细阐述。

图4-5 DAG划分Stage示意图

RDD A到RDD B,以及RDD F到RDD G之间的数据需要经过Shuffle过程,因此,RDD A和RDD F分别是Stage 1跟Stage 3、Stage 2跟Stage 3的划分点。而RDD B到RDD G没有Shuffle,因此,RDD G和RDD B的依赖是窄依赖,RDD B和RDD G划分到同一个Stage 3;RDD C到RDD D、RDD F,RDD E到RDD F之间的数据不需要经过Shuffle,RDD F和RDD D加RDD E的依赖、RDD D和RDD C的依赖是窄依赖,因此,RDD C、RDD D、RDD E和RDD F划分到同一个Stage 2。Stage 1和Stage 2是相互独立的,可以并发执行。而由于Stage 3依赖Stage 1和Stage 2的计算结果,所以Stage 3最后执行计算。

 createResultStage:基于作业ID(jobId)创建ResultStage。调用getOrCreateParentStages创建所有父Stage,返回parents: List[Stage]作为父Stage,将parents传入ResultStage,实例化生成ResultStage。

在DAG划分Stage示意图中,对RDD G调用createResultStage,通过getOrCreate-ParentStages获取所有父List[Stage]:Stage 1、Stage 2,然后创建自己的Stage 3。

 getOrCreateParentStages:获取或创建给定RDD的父Stage列表。将根据提供的firstJobId创建新的Stages。

在DAG划分Stage示意图中,RDD G的getOrCreateParentStages会调用getShuffleDependencies获得RDD G所有直接宽依赖集合HashSet(ShuffleDependency(RDD F),ShuffleDependency(RDD A)),这里是RDD F和RDD A的宽依赖集合,然后遍历集合,对(ShuffleDependency(RDD F), ShuffleDependency(RDD A))分别调用getOrCreateShuffleMapStage。

 对ShuffleDependency(RDD A)调用getOrCreateShuffleMapStage,getOrCreateShuffle-MapStage中根据shuffleDep.shuffleId模式匹配调用getMissingAncestorShuffle-Dependencies,返回为空;对ShuffleDependency(RDD A)调用createShuffleMapStage,RDD A已无父Stage,因此创建Stage 1。

 对ShuffleDependency(RDD F)调用getOrCreateShuffleMapStage,getOrCreateShuffle-MapStage中根据shuffleDep.shuffleId模式匹配调用getMissingAncestorShuffle-Dependencies,返回为空;对ShuffleDependency(RDD F)调用createShuffleMapStage,RDD F之前的RDD C到RDD D、RDD F;RDD E到RDD F之间都没有Shuffle,没有宽依赖就不会产生Stage。因此,RDD F已无父Stage,创建Stage 2。

 最后,把List(Stage 1,Stage 2)作为Stage 3的父Stage,创建Stage 3。Stage 3是ResultStage。

回到DAGScheduler.scala的handleJobSubmitted方法,首先通过createResultStage构建finalStage。

handleJobSubmitted的源码如下:

handleJobSubmitted方法中的ActiveJob是一个普通的数据结构,保存了当前Job的一些信息。

handleJobSubmitted方法日志打印信息:getMissingParentStages(finalStage)),getMissing-ParentStages根据finalStage找父Stage,如果有父Stage,就直接返回;如果没有父Stage,就进行创建。

handleJobSubmitted方法中的submitStage比较重要。submitStage的源码如下:

submitStage首先从activeJobForStage中获得JobID;如果JobID已经定义isDefined,那就获得即将计算的Stage(getMissingParentStages),然后进行升序排列。如果父Stage为空,那么提交submitMissingTasks,DAGScheduler把处理的过程交给具体的TaskScheduler去处理。如果父Stage不为空,将循环递归调用submitStage(parent),从后往前回溯。后面的Stage依赖于前面的Stage。也就是说,只有前面依赖的Stage计算完毕后,后面的Stage才会运行。submitStage一直循环调用,导致的结果是父Stage的父Stage……一直回溯到最左侧的父Stage开始计算。

4.2.5 Stage内部Task获取最佳位置的算法

Task任务本地性算法实现:DAGScheudler的submitMissingTasks方法中体现了如何利用RDD的本地性得到Task的本地性,从而获取Stage内部Task的最佳位置。接下来看一下submitMissingTasks的源码,关注Stage本身的算法以及任务本地性。runningStages是一个HashSet[Stage]数据结构,表示正在运行的Stages,将当前运行的Stage增加到runningStages中,根据Stage进行判断,如果是ShuffleMapStage,则从getPreferredLocs(stage.rdd, id)获取任务本地性信息;如果是ResultStage,则从getPreferredLocs(stage.rdd, p)获取任务本地性信息。

DAGScheduler.scala的源码如下:

在submitMissingTasks中会通过调用以下代码来获得任务的本地性。

partitionsToCompute获得要计算的Partitions的id。

1.  val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

如果stage是ShuffleMapStage,在代码partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap中,id是partitions的id,使用匿名函数生成一个Tuple,第一个元素值是数据分片的id,第二个元素是把rdd和id传进去,获取位置getPreferredLocs。然后通过toMap转换,返回Map[Int, Seq[TaskLocation]]。第一个值是partitions的id,第二个值是TaskLocation。

具体一个Partition中的数据本地性的算法实现在下述getPreferredLocs代码中。

getPreferredLocsInternal是getPreferredLocs的递归实现:这个方法是线程安全的,它只能被DAGScheduler通过线程安全方法getCacheLocs()使用。

getPreferredLocsInternal的源码如下:

getPreferredLocsInternal代码中:

在visited中把当前的rdd和partition加进去,visited是一个HashSet,如果已经存在了就会出错。

如果partition被缓存(partition被缓存是指数据已经在DAGScheduler中),则在getCacheLocs(rdd)(partition)传入rdd和partition,获取缓存的位置信息。如果获取到缓存位置信息,就返回。

getCacheLocs的源码如下:

getCacheLocs中的cacheLocs是一个HashMap,包含每个RDD的分区上的缓存位置信息。map的key值是RDD的ID,Value是由分区编号索引的数组。每个数组值是RDD分区缓存位置的集合。

1.  private val cacheLocs = new HashMap [Int, IndexedSeq[Seq[TaskLocation]]]

getPreferredLocsInternal方法在具体算法实现的时候首先查询DAGScheduler的内存数据结构中是否存在当前Partition的数据本地性的信息,如果有,则直接返回;如果没有,首先会调用rdd.getPreferedLocations。

如果自定义RDD,那一定要写getPreferedLocations,这是RDD的五大特征之一。例如,想让Spark运行在HBase上或者运行在一种现在还没有直接支持的数据库上面,此时开发者需要自定义RDD。为了保证Task计算的数据本地性,最关键的方式是必须实现RDD的getPreferedLocations。数据不动代码动,以HBase为例,Spark要操作HBase的数据,要求Spark运行在HBase所在的集群中,HBase是高速数据检索的引擎,数据在哪里,Spark也需要运行在哪里。Spark能支持各种来源的数据,核心就在于getPreferedLocations。如果不实现getPreferedLocations,就要从数据库或HBase中将数据抓过来,速度会很慢。

RDD.scala的getPreferedLocations的源码如下:

这是RDD的getPreferredLocations。

1.  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

这样,数据本地性在运行前就已经完成,因为RDD构建的时候已经有元数据的信息。说明:本节代码基于Spark 2.2的源码版本。

DAGScheduler计算数据本地性的时候巧妙地借助了RDD自身的getPreferedLocations中的数据,最大化地优化效率,因为getPreferedLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定和getPreferedLocations中的Partition的数据本地性是一致的,所以这就极大地简化了Task数据本地性算法的实现和效率的优化。