![Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)](https://wfqqreader-1252317822.image.myqcloud.com/cover/174/40375174/b_40375174.jpg)
3.5 RDD内部的计算机制
RDD的多个Partition分别由不同的Task处理。Task分为两类:shuffleMapTask、resultTask。本节基于源码对RDD的计算过程进行深度解析。
3.5.1 Task解析
Task是计算运行在集群上的基本计算单位。一个Task负责处理RDD的一个Partition,一个RDD的多个Partition会分别由不同的Task去处理,通过之前对RDD的窄依赖关系的讲解,我们可以发现在RDD的窄依赖中,子RDD中Partition的个数基本都大于等于父RDD中Partition的个数,所以Spark计算中对于每一个Stage分配的Task的数目是基于该Stage中最后一个RDD的Partition的个数来决定的。最后一个RDD如果有100个Partition,则Spark对这个Stage分配100个Task。
Task运行于Executor上,而Executor位于CoarseGrainedExecutorBackend(JVM进程)中。
Spark Job中,根据Task所处Stage的位置,我们将Task分为两类:第一类为shuffleMapTask,指Task所处的Stage不是最后一个Stage,也就是Stage的计算结果还没有输出,而是通过Shuffle交给下一个Stage使用;第二类为resultTask,指Task所处Stage是DAG中最后一个Stage,也就是Stage计算结果需要进行输出等操作,计算到此已经结束;简单地说,Spark Job中除了最后一个Stage的Task为resultTask,其他所有Task都为shuffleMapTask。
3.5.2 计算过程深度解析
Spark中的Job本身内部是由具体的Task构成的,基于Spark程序内部的调度模式,即根据宽依赖的关系,划分不同的Stage,最后一个Stage依赖倒数第二个Stage等,我们从最后一个Stage获取结果;在Stage内部,我们知道有一系列的任务,这些任务被提交到集群上的计算节点进行计算,计算节点执行计算逻辑时,复用位于Executor中线程池中的线程,线程中运行的任务调用具体Task的run方法进行计算,此时,如果调用具体Task的run方法,就需要考虑不同Stage内部具体Task的类型,Spark规定最后一个Stage中的Task的类型为resultTask,因为我们需要获取最后的结果,所以前面所有Stage的Task是shuffleMapTask。
RDD在进行计算前,Driver给其他Executor发送消息,让Executor启动Task,在Executor启动Task成功后,通过消息机制汇报启动成功信息给Driver。Task计算示意图如图3-6所示。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P78_7361.jpg?sign=1739193310-Z8gcZnH8vBzAoWo2Uq53y7GF3PduTAZy-0-23827cf1ed68389a1638b646fb92e540)
图3-6 Task计算示意图
详细情况如下:Driver中的CoarseGrainedSchedulerBackend给CoarseGrainedExecutor-Backend发送LaunchTask消息。
(1)首先反序列化TaskDescription。
CoarseGrainedExecutorBackend.scala的receive的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P79_253255.jpg?sign=1739193310-e8MJqyBh9WLXFMKTU39CtgxFsCW221A2-0-4d12a2dfae23cd298822dd3a536ca1f8)
launchTask中调用了decode方法,解析读取dataIn、taskId、attemptNumber、executorId、name、index等信息,读取相应的JAR、文件、属性,返回TaskDescription值。
Spark 2.2.1版本的TaskDescription.scala的decode的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P79_253256.jpg?sign=1739193310-9OrJniOVijrKYCz5wHEQeS0A5bTRehdT-0-ae3da83eeed084a6f8e1ff0eac9c7357)
Spark 2.4.3版本的TaskDescription.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第7行之后新增加partitionId的变量。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P79_253257.jpg?sign=1739193310-faYjNJylyTR0tKYlCKaFGvdeLKzHsEZP-0-26661eb151c2c6816e89ef2e654e03db)
(2)Executor会通过launchTask执行Task。
(3)Executor的launchTask方法创建一个TaskRunner实例在threadPool来运行具体的Task。
Executor.scala的launchTask的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P80_253258.jpg?sign=1739193310-LcUUCn6753FgWvVZmJIbTbCtp7UjM69F-0-868a639de11ccab389d5b52e412cb513)
在TaskRunner的run方法首先会通过statusUpdate给Driver发信息汇报自己的状态,说明自己处于running状态。同时,TaskRunner内部会做一些准备工作,如反序列化Task的依赖,通过网络获取需要的文件、Jar等;然后反序列化Task本身。
Spark 2.2.1版本的Executor.scala的run方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P80_253259.jpg?sign=1739193310-UqZhBaWHUvCEksGtJEtAFaaJE8wBooTr-0-fe7fb1fccdf1b79d08cdc1b04d2416e1)
Spark 2.4.3版本的Executor.scala的run源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第15行代码变量taskStart的名称调整为taskStartTime。
1. var taskStartTime: Long = 0
(4)调用反序列化后的Task.run方法来执行任务,并获得执行结果。
Spark 2.2.1版本的Executor.scala的run方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P81_253261.jpg?sign=1739193310-F0dxDFVUaadiunTHyDumpKQWPpjfIxqm-0-045aa5b02078fd9b7635d3f21be1c5a2)
Spark 2.4.3版本的Executor.scala的run源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第2行taskStart名称调整为taskStartTime。
上段代码中第7行try方法调整为Utils.tryWithSafeFinally方法。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P82_253263.jpg?sign=1739193310-lx2zSnQHfb7z85QDpjfeblohDFTMwBgS-0-f1a10e2741470d8af6b1adfb198218ec)
task.run方法调用了runTask的方法,而runTask方法是一个抽象方法,runTask方法内部会调用RDD的iterator()方法,该方法就是针对当前Task对应的Partition进行计算的关键所在,在处理的方法内部会迭代Partition的元素,并交给我们自定义的function进行处理。
Task.scala的run方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P82_253264.jpg?sign=1739193310-dZmsC0fEH7Xd4poL77hdV12sXavByoSk-0-18b4de59c25ca33259ee200f9c86dec4)
task有两个子类,分别是ShuffleMapTask和ResultTask,下面分别对两者进行讲解。
1.ShuffleMapTask
ShuffleMapTask.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P82_253267.jpg?sign=1739193310-y09IlVuBYmD0bNn3fYdSv38wHqcDPJ0N-0-17656ab9bb5b8c07728c5e289365a509)
首先,ShuffleMapTask会反序列化RDD及其依赖关系,然后通过调用RDD的iterator方法进行计算,而iterator方法中进行的最终运算的方法是compute()。
RDD.scala的iterator方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P83_253269.jpg?sign=1739193310-zcPkKxHXjOnJmMCHXq7wJr2L2uZKRgGF-0-07e43844fc72b846b02e576879b2f3f4)
其中,RDD.scala的computeOrReadCheckpoint的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P83_253270.jpg?sign=1739193310-lmAVt8HRDXdemyuH6gujuqkKakllkEBX-0-9f01e71afae4b411d118693c6d345c35)
RDD的compute方法是一个抽象方法,每个RDD都需要重写的方法。
此时,选择查看MapPartitionsRDD已经实现的compute方法,可以发现compute方法的实现是通过f方法实现的,而f方法就是我们创建MapPartitionsRDD时输入的操作函数。
Spark 2.2.1版本的MapPartitionsRDD.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P83_253271.jpg?sign=1739193310-QHVF1GHkff6VdQJK8jDjVwEAnLHp3lBc-0-4178751337d2966eaa8086d90f55c2d6)
Spark 2.4.3版本MapPartitionsRDD.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第4行之后新增2个参数isFromBarrier、isOrderSensitive,isFromBarrier参数指示此RDD是否从RDDBarrier转换,至少含有一个RDDBarrier的Stage阶段将转变为屏障阶段(BarrierS tage)。isOrderSensitive参数指示函数是否区分顺序。
上段代码中第18行之后新增isBarrier_、getOutputDeterministicLevel方法。%
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P84_253273.jpg?sign=1739193310-dlFI0HMmlnU7RoahAV96ESXKRaoQa33q-0-cdf0400f9c34ed99a97ed7df3e3cccbe)
注意:通过迭代器的不断叠加,将每个RDD的小函数合并成一个大的函数流。
然后在计算具体的Partition之后,通过shuffleManager获得的shuffleWriter把当前Task计算的结果根据具体的shuffleManager实现写入到具体的文件中,操作完成后会把MapStatus发送给Driver端的DAGScheduler的MapOutputTracker。
2.ResultTask
Driver端的DAGScheduler的MapOutputTracker把shuffleMapTask执行的结果交给ResultTask,ResultTask根据前面Stage的执行结果进行shuffle后产生整个job最后的结果。
ResultTask.scala的runTask的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P85_253274.jpg?sign=1739193310-yfcjAYlvug0U4jvf5uzdF1wLcaAV45Cx-0-00af9917262ff8d31173b197f141e03e)
而ResultTask的runTask方法中反序列化生成func函数,最后通过func函数计算出最终的结果。