![Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)](https://wfqqreader-1252317822.image.myqcloud.com/cover/174/40375174/b_40375174.jpg)
5.5 Executor执行结果的处理方式
本节讲解Executor工作原理、ExecutorBackend注册源码解密、Executor实例化内幕、Executor具体工作内幕。
Master让Worker启动,启动了一个Executor所在的进程。在Standalone模式中,Executor所在的进程是CoarseGrainedExecutorBackend。
Master侧:Master发指令给Worker,启动Executor。
Worker侧:Worker接收到Master发过来的指令,通过ExecutorRunner启动另外一个进程来运行Executor。这里是指启动另外一个进程来启动Executor,而不是直接启动Executor。Master向Worker发送指令,Worker为什么启动另外一个进程?在另外一个进程中注册给Driver,然后启动Executor。因为Worker是管理机器上的资源的,所以机器上的资源变动时要汇报给Master。Worker不是用来计算的,不能在Worker中进行计算;Spark集群中有很多应用程序,需要很多Executor,如果不是给每个Executor启动一个对应的进程,而是所有的应用程序进程都在同一个Executor里面,那么一个程序崩溃将导致其他程序也崩溃。
启动CoarseGrainedExecutorBackend。CoarseGrainedExecutorBackend是Executor所在的进程。CoarseGrainedExecutorBackend启动时,须向Driver注册。通过发送RegisterExecutor向Driver注册,注册的内容是RegisterExecutor。
CoarseGrainedExecutorBackend.scala的onStart方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P220_253632.jpg?sign=1739193367-bxoDZJnzxeZnjZJjxD64VpVfGf8UTpXZ-0-39590be0a5510b624020b3bdf2501460)
其中,RegisterExecutor是一个case class,源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P220_253633.jpg?sign=1739193367-ilBxL0kVtBugVrrEnHKy5Yn5ScS9P8uD-0-ddac918d5b2f17832d6396f2ccc24c93)
CoarseGrainedExecutorBackend启动时,向Driver发送RegisterExecutor消息进行注册;Driver收到RegisterExecutor消息,在Executor注册成功后会返回消息RegisteredExecutor给CoarseGrainedExecutorBackend。这里注册的Executor和真正工作的Executor没有任何关系,其实注册的是RegisterExecutorBackend。可以将RegisteredExecutor理解为RegisterExecutorBackend。
需要特别注意的是,在CoarseGrainedExecutorBackend启动时向Driver注册Executor,其实质是注册ExecutorBackend实例,和Executor实例之间没有直接关系。
CoarseGrainedExecutorBackend是Executor运行所在的进程名称,CoarseGrained-ExecutorBackend本身不会完成任务的计算。
Executor才是正在处理任务的对象。Executor内部是通过线程池的方式来完成Task的计算的。Executor对象运行于CoarseGrainedExecutorBackend进程。
CoarseGrainedExecutorBackend和Executor是一一对应的。
CoarseGrainedExecutorBackend是一个消息通信体(其具体实现了ThreadSafeRPCEndpoint),可以发送信息给Driver,并可以接受Driver中发过来的指令,如启动Task等。
CoarseGrainedExecutorBackend继承自ThreadSafeRpcEndpoint,CoarseGrainedExecutor-Backend是一个消息通信体,可以收消息,也可以发消息,源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P221_253635.jpg?sign=1739193367-cq7iZDvGTKrGkFJdAlmDUQV2v7A91dVf-0-a3e07b67e58f0e47cc4d860c6b828e71)
CoarseGrainedExecutorBackend发消息给Driver。Driver在StandaloneSchedulerBackend里面(Spark 2.0中已将SparkDeploySchedulerBackend更名为StandaloneSchedulerBackend)。StandaloneSchedulerBackend继承自CoarseGrainedSchedulerBackend,start启动时启动StandaloneAppClient。StandaloneAppClient(Spark 2.0中已将AppClient更名为StandaloneApp-Client)代表应用程序本身。
StandaloneAppClient.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P221_253636.jpg?sign=1739193367-ozo1Y6TJOlxKmkhsTrYX8uirJDXzNOBz-0-4b79145d5d2797c091444dab121c32b0)
在Driver进程中有两个至关重要的Endpoint。
ClientEndpoint:主要负责向Master注册当前的程序,是AppClient的内部成员。
DriverEndpoint:这是整个程序运行时的驱动器,是CoarseGrainedExecutorBackend的内部成员。
CoarseGrainedSchedulerBackend的DriverEndpoint的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P221_253637.jpg?sign=1739193367-mUNJL53etXfOeBZBzRYda4HU691ZBBQe-0-7f102a356957568625275445dfaabfea)
DriverEndpoint会接收到RegisterExecutor消息,并完成在Driver上的注册。
RegisterExecutor中有一个数据结构executorDataMap,是Key-Value的方式。
1. private val executorDataMap = new HashMap[String, ExecutorData]
ExecutorData中的executorEndpoint是RpcEndpointRef。ExecutorData的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P221_253639.jpg?sign=1739193367-ybphsVDQ8fxTlLfr7VlJ0SBO6kLdAUYO-0-e7a93e51525da54ff26d3c56f30582b3)
CoarseGrainedExecutorBackend.scala的RegisteredExecutor的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P222_253641.jpg?sign=1739193367-eDtgL12MC5myUyMpfng35f63adqztvTH-0-cc7285b99cfcb6b3ac029d9da8ce4cdc)
CoarseGrainedExecutorBackend收到RegisteredExecutor消息以后,用new()函数创建一个Executor,而Executor就是一个普通的类。
Executor.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P222_253644.jpg?sign=1739193367-v9TlPCsoKfcPVJzxNS8X9CaIdUDlCn3S-0-f77ff36bc16aaaffdedde71e06d1875d)
回到ExecutorData.scala,其中的RpcEndpointRef是代理句柄,代理CoarseGrainedExecutor-Backend。在Driver中,通过ExecutorData封装并注册ExecutorBackend的信息到Driver的内存数据结构executorMapData中。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P222_253643.jpg?sign=1739193367-qcH3sbCXbSQNyHobPiyGJvfsQNygk2Sm-0-89681b996c9f4e945a21600971b83dcd)
Executor注册消息提交给DriverEndpoint,通过DriverEndpoint写数据给CoarseGrainedSchedulerBackend里面的数据结构executorMapData。executorMapData是CoarseGrainedSchedulerBackend的成员,因此最终注册给CoarseGrainedSchedulerBackend。CoarseGrainedSchedulerBackend获得Executor(其实是ExecutorBackend)的注册信息。
实际在执行的时候,DriverEndpoint会把信息写入CoarseGrainedSchedulerBackend的内存数据结构executorMapData中,所以最终是注册给了CoarseGrainedSchedulerBackend。也就是说,CoarseGrainedSchedulerBackend掌握了为当前程序分配的所有的ExecutorBackend进程,而在每个ExecutorBackend进行实例中,会通过Executor对象负责具体任务的运行。在运行的时候使用synchronized关键字来保证executorMapData安全地并发写操作。
CoarseGrainedSchedulerBackend.scala的receiveAndReply方法中RegisterExecutor注册的过程,源码如下所示。
Spark 2.2.1版本的CoarseGrainedSchedulerBackend.scala的receiveAndReply方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P223_253646.jpg?sign=1739193367-lAyzAcQz4n9ccCgL4FTucJwpB97WiAaT-0-02083fcf75e4eb4cc73ba3ede91b3bad)
Spark 2.4.3版本的CoarseGrainedSchedulerBackend.scala的receiveAndReply方法的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第8行if判断语句中去掉scheduler.nodeBlacklist != null的判断。
上段代码中第31行构建ExecutorData实例时,传入的executorRef.address调整为executorAddress。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P224_253649.jpg?sign=1739193367-LBE2LJSPjPlsQt5QNYfxDRgNxWYgfbHk-0-9a165eddbf44850e6ee77fe8f68adbaa)
CoarseGrainedSchedulerBackend.scala中的RegisterExecutor:
先判断executorDataMap是否已经包含executorId,如果已经包含,就会发送注册失败的消息RegisterExecutorFailed,因为已经有重复的executor ID的Executor在运行。
然后进行Executor的注册,获取到executorAddress,在executorRef.address为空的情况下就获取到senderAddress。
定义了3个数据结构:addressToExecutorId、totalCoreCount、totalRegisteredExecutors,其中,addressToExecutorId是DriverEndpoint的数据结构,而totalCoreCount、totalRegisteredExecutors是CoarseGrainedSchedulerBackend的数据结构。addressTo-ExecutorId、totalCoreCount、totalRegisteredExecutors包含Executors注册的信息分别为:RPC地址主机名和端口与ExecutorId的对应关系、集群中的总核数Cores、当前注册的Executors总数等。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P224_253650.jpg?sign=1739193367-fW1eQDprMHO5QtE3EDQEBqnF58JfZred-0-3c305123e79e27c8471b66ccb58f0464)
然后调用new()函数创建一个ExecutorData,提取出executorRef、executorRef.address、hostname、cores、cores、logUrls等信息。
同步代码块CoarseGrainedSchedulerBackend.this.synchronized:集群中很多Executor向Driver注册,为防止写冲突,因此设计一个同步代码块。在运行时使用synchronized关键字,来保证executorMapData安全地并发写操作。
executorRef.send(RegisteredExecutor)发消息RegisteredExecutor给我们的sender,sender是CoarseGrainedExecutorBackend。而CoarseGrainedExecutorBackend收到消息RegisteredExecutor以后,就调用new()函数创建了Executor。
CoarseGrainedExecutorBackend收到DriverEndpoint发送过来的RegisteredExecutor消息后会启动Executor实例对象,而Executor实例对象事实上是负责真正Task计算的。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P225_253653.jpg?sign=1739193367-Toe25938jmc6a3HfEFX80OsTYLOUWAmo-0-dd1101a9b5445d07fdf33c502e106939)
下面来看一下Executor.scala,其中的threadPool是一个线程池。
Executor是真正负责Task计算的;其在实例化的时候会实例化一个线程池threadPool来准备Task的计算。threadPool是一个newDaemonCachedThreadPool。newDaemonCached-ThreadPool创建线程池,线程工厂按照需要的格式调用new()函数创建线程。语法实现如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P225_253655.jpg?sign=1739193367-zr0buKnbylSSVQKPenUL1jOXEW6upVdg-0-e96a5a6744229648717ddbf929725f17)
namedThreadFactory的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P225_253656.jpg?sign=1739193367-133gaQBaGCLIHnl5QFLRK8ZrSgNZS1wN-0-f5498228cfe1c311a05aa058376843de)
newCachedThreadPool创建一个线程池,根据需要创建新线程,线程池中的线程可以复用,使用提供的ThreadFactory创建新线程。newCachedThreadPool的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P225_253657.jpg?sign=1739193367-XNKSlRqKnAw1KghLzJIKZfUzN9l3bMJh-0-ae01ce1f05671648a5f8ae4dfbe76415)
创建的threadPool中以多线程并发执行和线程复用的方式来高效地执行Spark发过来的Task。线程池创建好后,接下来是等待Driver发送任务给CoarseGrainedExecutorBackend,不是直接发送给Executor,因为Executor不是一个消息循环体。
Executor具体是如何工作的?
当Driver发送过来Task的时候,其实是发送给了CoarseGrainedExecutorBackend这个RpcEndpoint,而不是直接发送给了Executor(Executor由于不是消息循环体,所以永远也无法直接接收远程发过来的信息)。
Driver向CoarseGrainedExecutorBackend发送LaunchTask,转过来交给线程池中的线程去执行。先判断Executor是否为空,Executor为空,则提示错误,进程就直接退出。如果Executor不为空,则反序列化任务调用Executor的launchTask,其中,attemptNumber是任务可以重试的次数。
ExecutorBackend收到Driver发送的消息,调用launchTask方法,提交给Executor执行。
Executor.scala的launchTask接收到Task执行的命令后,首先将Task封装在TaskRunner里面,然后放到runningTasks。runningTasks是一个简单的数据结构。
1. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
launchTask最后交给threadPool.execute(tr),交给线程池中的线程执行任务。TaskRunner继承自Runnable,是Java的一个对象。
TaskRunner其实是Java中Runnable接口的具体实现,在真正工作时会交给线程池中的线程去运行,此时会调用run方法来执行Task。
Executor.scala中的Run方法最终调用task.run方法。
Spark 2.2.1版本的Executor.scala的run方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P226_253660.jpg?sign=1739193367-BLGKNxzi0L30RBVRWKfWLniEhc8ZTEhj-0-5016471585ec812df6de14b3e63037c8)
Spark 2.4.3版本的Executor.scala的run方法源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第4行的try语句调整为Utils.tryWithSafeFinally语句。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P226_253661.jpg?sign=1739193367-xVEBBZIRmaX7qIRWXI2uAdmvjBrtoIIw-0-890f52f087dd24926be3a86d3d11d935)
跟进Task.scala中的run方法,在里面调用runTask。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P226_253662.jpg?sign=1739193367-AS0a4Z6lQ8UlAmMIdPsObSKJj8YkJo0u-0-629b2bf893a8418602d9aba2782fe446)
TaskRunner在调用run方法时会调用Task的run方法,而Task的run方法会调用runTask,实际上,Task有ShuffleMapTask和ResultTask。