![Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)](https://wfqqreader-1252317822.image.myqcloud.com/cover/174/40375174/b_40375174.jpg)
7.6 Shuffle与Storage模块间的交互
在Spark中,存储模块被抽象成Storage。顾名思义,Storage是存储的意思,代表着Spark中的数据存储系统,负责管理和实现数据块(Block)的存放。其中存取数据的最小单元是Block,数据由不同的Block组成,所有操作都是以Block为单位进行的。从本质上讲,RDD中的Partition和Storage中的Block是等价的,只是所处的模块不同,看待的角度不一样而已。
Storage抽象模块的实现分为两个层次,如图7-13所示。
(1)通信层:通信层是典型的Master-Slave结构,Master和Slave之间传输控制和状态信息。通信层主要由BlockManager、BlockManagerMaster、BlockManagerMasterEndpoint、BlockManagerSlaveEndpoint等类实现。
(2)存储层:负责把数据存储到内存、磁盘或者堆外内存中,有时还需要为数据在远程节点上生成副本,这些都由存储层提供的接口实现。Spark 2.2.0具体的存储层的实现类有DiskStore和MemoryStore。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P347_59662.jpg?sign=1739480904-VJ8qbpOnSTDUNrdJRKhUGcISkuXMeH52-0-3a7d5c29e4727bbb1c764c298578af79)
图7-13 Storage存储模块
Shuffle模块若要和Storage模块进行交互,需要通过调用统一的操作类BlockManager来完成。如果把整个存储模块看成一个黑盒,BlockManager就是黑盒上留出的一个供外部调用的接口。
7.6.1 Shuffle注册的交互
Spark中BlockManager在Driver端的创建,在SparkContext创建的时候会根据具体的配置创建SparkEnv对象。
Spark 2.2.1版本的SparkContext.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P347_254097.jpg?sign=1739480904-sTOX7NXSBHFlPNX2gJZ2ppJLPAJeHCPU-0-7610f6e4e0fae5efba193b43006b88f0)
Spark 2.4.3版本的SparkContext.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第9行在SparkContext.numDriverCores新增一个参数conf。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P347_254098.jpg?sign=1739480904-IyvtVHuY07Eg5DquQrG348pu56gW4PiX-0-46a08bf38ce0584241c13a69605ba285)
createSparkEnv方法中,传入SparkConf配置对象、isLocal标志,以及LiveListenerBus,方法中使用SparkEnv对象的createDriverEnv方法创建SparkEnv并返回。在SparkEnv的createDriverEvn方法中,将会创建BlockManager、BlockManagerMaster等对象,完成Storage在Driver端的部署。
SparkEnv中创建BlockManager、BlockManagerMaster关键源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P348_254100.jpg?sign=1739480904-Oee32mtVq4MAAaHJQGIG58uwq0VMhn77-0-a057741f3b46b68700063d49d2ffc9b3)
使用new关键字实例化出BlockManagerMaster,传入BlockManager的构造函数,实例化出BlockManager对象。这里的BlockManagerMaster和BlockManager属于聚合关系。BlockManager主要对外提供统一的访问接口,BlockManagerMaster主要对内提供各节点之间的指令通信服务。
构建BlockManager时,传入shuffleManager参数,shuffleManager是在SparkEnv中创建的,将shuffleManager传入到BlockManager中,BlockManager就拥有shuffleManager的成员变量,从而可以调用shuffleManager的相关方法。
BlockManagerMaster在Driver端和Executors中的创建稍有差别。首先来看在Driver端创建的情形。创建BlockManagerMaster传入的isDriver参数,isDriver为true,表示在Driver端创建,否则视为在Slave节点上创建。
当SparkContext中执行_env.blockManager.initialize(_applicationId)代码时,会调用Driver端BlockManager的initialize方法。Initialize方法的源码如下所示。
SparkContext.scala的源码如下:
1. _env.blockManager.initialize(_applicationId)
BlockManager.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P348_254102.jpg?sign=1739480904-pCN7t8EQxuAWhEZlRvXccSzRPkPMCGpt-0-9bf13fd14a141e20b8fb506959288760)
如上面的源码所示,initialize方法使用appId初始化BlockManager,主要完成以下工作。
(1)初始化BlockTransferService。
(2)初始化ShuffleClient。
(3)创建BlockManagerId。
(4)将BlockManager注册到BlockManagerMaster上。
(5)若ShuffleService可用,则注册ShuffleService。
在BlockManager的initialize方法上右击Find Usages,可以看到initialize方法在两个地方得到调用:一个是SparkContext;另一个是Executor。启动Executor时,会调用BlockManager的initialize方法。Executor中调用initialize方法的源码如下所示。
Spark 2.2.1版本的Executor.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P349_254105.jpg?sign=1739480904-Ffx8UYp9vL0uWoLdf8etAwFvjYSviFfg-0-c640626951d3a245a255cd4d1b7574f4)
Spark 2.4.3版本的Executor.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第5行和第7行互换一下顺序。
上段代码中第7行后新增代码,向度量系统进行env.blockManager.shuffleMetricsSource的注册。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P350_254108.jpg?sign=1739480904-Mtvg1eoi4l0LIjCmwm7SOOOSijvaPpwF-0-e95a69378abccc5192148d631e408d71)
上面代码中调用了env.blockManager.initialize方法。在initialize方法中,完成BlockManger向Master端BlockManagerMaster的注册。使用方法master.registerBlockManager(id,maxMemory,slaveEndpoint)完成注册,registerBlockManager方法中传入Id、maxMemory、salveEndPoint引用,分别表示Executor中的BlockManager、最大内存、BlockManager中的BlockMangarSlaveEndpoint。BlockManagerSlaveEndpoint是一个RPC端点,使用它完成同BlockManagerMaster的通信。BlockManager收到注册请求后将Executor中注册的BlockManagerInfo存入哈希表中,以便通过BlockManagerSlaveEndpoint向Executor发送控制命令。
ShuffleManager是一个用于shuffle系统的可插拔接口。在Driver端SparkEnv中创建ShuffleManager,在每个Executor上也会创建。基于spark.shuffle.manager进行设置。Driver使用ShuffleManager注册到shuffles系统,Executors(或Driver在本地运行的任务)可以请求读取和写入数据。这将被SparkEnv的SparkConf和isDriver布尔值作为参数。
ShuffleManager.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P350_254109.jpg?sign=1739480904-Rjav8tNCJdGWnanfdv7CDaxXohDNpQAX-0-b34f9d422e721a777207d4d2bfccc939)
Spark Shuffle Pluggable框架ShuffleBlockManager在Spark 1.6.0之后改成了ShuffleBlockResolver。ShuffleBlockResolver具体读取shuffle数据,是一个trait。在ShuffleBlockResolver中已无getBytes方法。getBlockData(blockId: ShuffleBlockId)方法返回的是ManagedBuffer,这是核心。
ShuffleBlockResolver的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P351_254112.jpg?sign=1739480904-uZUG2AYoOUg9v7kZAuHh8jHefiz6MIPv-0-ad3dadca998109e4a97bfc18aa7262a7)
Spark 2.0版本中通过IndexShuffleBlockResolver来具体实现ShuffleBlockResolver(SortBasedShuffle方式),已无FileShuffleBlockManager(Hashshuffle方式)。IndexShuffle-BlockResolver创建和维护逻辑块和物理文件位置之间的shuffle blocks映射关系。来自于相同map task任务的shuffle blocks数据存储在单个合并数据文件中;数据文件中的数据块的偏移量存储在单独的索引文件中。将shuffleBlockId + reduce ID set to 0 + ".后缀"作为数据shuffle data的shuffleBlockId名字。其中,文件名后缀为".data"的是数据文件;文件名后缀为".index"的是索引文件。
7.6.2 Shuffle写数据的交互
基于Sort的Shuffle实现的ShuffleHandle包含BypassMergeSortShuffleHandle与BaseShuffleHandle。两种ShuffleHandle写数据的方法可以参考SortShuffleManager类的getWriter方法,关键代码如下所示。
SortShuffleManager的getWriter的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P352_254114.jpg?sign=1739480904-rVdjeAef3GDRpGr3HHID754rhvhb1FiX-0-3160589175252bf362a2e3cdff638438)
在对应构建的两种数据写入器类BypassMergeSortShuffleWriter与SortShuffleWriter中,都是通过变量shuffleBlockResolver对逻辑数据块与物理数据块的映射进行解析。BypassMergeSortShuffleWriter写数据的具体实现位于实现的write方法中,其中调用的createTempShuffleBlock方法描述了各个分区所生成的中间临时文件的格式与对应的BlockId。SortShuffleWriter写数据的具体实现位于实现的write方法中。
7.6.3 Shuffle读数据的交互
SparkEnv.get.shuffleManager.getReader是SortShuffleManager的getReader,是获取数据的阅读器,getReader方法中创建了一个BlockStoreShuffleReader实例。SortShuffleManager.scala的read()方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P352_254115.jpg?sign=1739480904-FqiOQxo7kVMZcKRXIJDf1rSDe6XxlNRn-0-f0058613ed168b65ddc0591c7be1c918)
BlockStoreShuffleReader实例的read方法,首先实例化new ShuffleBlockFetcherIterator。ShuffleBlockFetcherIterator是一个阅读器,里面有一个成员blockManager。blockManager是内存和磁盘上数据读写的统一管理器;ShuffleBlockFetcherIterator.scala的initialize方法中splitLocalRemoteBlocks()划分本地和远程的blocks,Utils.randomize(remoteRequests)把远程请求通过随机的方式添加到队列中,fetchUpToMaxBytes()发送远程请求获取我们的blocks,fetchLocalBlocks()获取本地的blocks。
7.6.4 BlockManager架构原理、运行流程图和源码解密
BlockManager是管理整个Spark运行时数据的读写,包含数据存储本身,在数据存储的基础上进行数据读写。由于Spark是分布式的,所以BlockManager也是分布式的,BlockManager本身相对而言是一个比较大的模块,Spark中有非常多的模块:调度模块、资源管理模块等。BlockManager是另外一个非常重要的模块。BlockManager本身的源码量非常大。本节从BlockManager原理流程对BlockManager做深刻地讲解。在Shuffle读写数据的时候,我们需要读写BlockManager。因此,BlockManager是至关重要的内容。
编写一个业务代码WordCount.scala,通过观察WordCount运行时BlockManager的日志来理解BlockManager的运行。
WordCount.scala的代码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P353_254117.jpg?sign=1739480904-krAzn6yQVYWIQEfju0uR8VLa06pANWWE-0-8adca067dcf6e8999a2b361081e0ad08)
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P354_254119.jpg?sign=1739480904-L8EKQXp70Ml3GDT2RmYdGQvmrRz9npaV-0-2c0cf710cc56b17df6c609a974cdd4f2)
在IDEA中运行一个业务程序WordCount.scala,日志中显示以下内容。
SparkEnv:Registering MapOutputTracker,其中MapOutputTracker中数据的读写都和BlockManager关联。
SparkEnv:Registering BlockManagerMaste,其中Registering BlockManagerMaster由BlockManagerMaster进行注册。
DiskBlockManager:Created local directory C:\Users\dell\AppData\Local\Temp\blockmgr-...其中DiskBlockManager是管理磁盘存储的,里面有我们的数据。可以访问Temp目录下以blockmgr-开头的文件的内容。
WordCount运行结果如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P355_254121.jpg?sign=1739480904-fnohk9oaa0yXHOlmm0PVjndzwMCS36EQ-0-8adf859251061842bda2c62b7ea9d901)
从Application启动的角度观察BlockManager。
(1)Application启动时会在SparkEnv中注册BlockManagerMaster以及MapOutputTracker,其中:
① BlockManagerMaster:对整个集群的Block数据进行管理。
② MapOutputTrackerMaster:跟踪所有的Mapper的输出。
BlockManagerMaster中有一个引用driverEndpoint,isDriver判断是否运行在Driver上。
BlockManagerMaster的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P355_254122.jpg?sign=1739480904-T86nmjntVLF0Q0tjewZFkpxdhwdReEjd-0-f1c08ff3569834c22b806e6ae40a4703)
BlockManagerMaster注册给SparkEnv,SparkEnv在SparkContext中。
SparkContext.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P355_254123.jpg?sign=1739480904-pXMKTCuShtXaHC9rc88ki2OKUkoaLW8x-0-b4ac791008f3d1be75f87b5ea2010d92)
在SparkContext.scala的createSparkEnv方法中调用SparkEnv.createDriverEnv方法。
进入SparkEnv.scala的createDriverEnv方法。
Spark 2.2.1版本的SparkEnv.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P355_254124.jpg?sign=1739480904-qsynfBZRNrsucOxL7clySCZAu1UJiiAu-0-7d95293155cdeefbce0b6103275fd32f)
Spark 2.4.3版本的SparkEnv.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第8行将port调整为Option(port)。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P356_254127.jpg?sign=1739480904-cfMQnJeZBlovksOh6oYmxalqWewUBfeK-0-915b69d83bf5f765c29afc11ad95fd9a)
SparkEnv.scala的createDriverEnv中调用了create方法,判断是否是Driver。create方法的源码如下。
Spark 2.2.1版本的SparkEnv.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P356_254128.jpg?sign=1739480904-HbvkiZnEZ6WdAxV02fGwNCXiX0iw1s9T-0-5488585531188967539c582d023e3ee2)
Spark 2.4.3版本的SparkEnv.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第6行将port: Int调整为port: Option[Int]。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P357_254130.jpg?sign=1739480904-3sa2vtIY78RRO1twwSgs6kRUTbtBS9m8-0-90941c19847a544464620dba796f89f6)
在SparkEnv.scala的createDriverEnv中调用new()函数创建一个MapOutputTrackerMaster。MapOutputTrackerMaster的源码如下。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P357_254131.jpg?sign=1739480904-p1JjjnfMIETI8Vf1CHsypdrtBzw7SuPS-0-3409d33e7c7dbae8d52260cf4d2c20be)
然后看一下blockManagerMaster。在SparkEnv.scala中调用new()函数创建一个blockManagerMaster。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P357_254132.jpg?sign=1739480904-MlKJETRx1mmR3bqkssXZJtvSyL1Y0E4V-0-7b170f4a073f3b713ecc75ec8d6990e7)
BlockManagerMaster对整个集群的Block数据进行管理,Block是Spark数据管理的单位,与数据存储没有关系,数据可能存在磁盘上,也可能存储在内存中,还可能存储在offline,如Alluxio上,源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P357_254133.jpg?sign=1739480904-GJKyGhJN0oRbYpF1FnSGCtUmb9MNT9fG-0-d7692e56a133ebeeb4708f0d8983ebe0)
构建BlockManagerMaster的时候调用new()函数创建一个BlockManagerMasterEndpoint,这是循环消息体。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P357_254134.jpg?sign=1739480904-mix2cIb6cHssGkpUmPJmnBe36kHdGGqA-0-8fe76d8d9236e1a766690bd1e6e4bd5d)
(2)BlockManagerMasterEndpoint本身是一个消息体,会负责通过远程消息通信的方式去管理所有节点的BlockManager。
查看WordCount在IDEA中的运行日志,日志中显示BlockManagerMasterEndpoint:Registering block manager,向block manager进行注册。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P357_254135.jpg?sign=1739480904-UFihRNIr4K52QUNicorNNFeQbv3s3xRX-0-52b84faad2a296dd40deca0787f11ab4)
(3)每启动一个ExecutorBackend,都会实例化BlockManager,并通过远程通信的方式注册给BlockManagerMaster;实质上是Executor中的BlockManager在启动的时候注册给了Driver上的BlockManagerMasterEndpoint。
(4)MemoryStore是BlockManager中专门负责内存数据存储和读写的类。
查看WordCount在IDEA中的运行日志,日志中显示MemoryStore: Block broadcast_0 stored as values in memory,数据存储在内存中。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P358_254138.jpg?sign=1739480904-8Ges8JW9yCcyHbiugSspUP8LsDRMMeik-0-d2158b44064b63345ba154756732cca0)
Spark读写数据是以block为单位的,MemoryStore将block数据存储在内存中。MemoryStore.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P358_254139.jpg?sign=1739480904-hDozBe2WZpr66ckJBOi8tgR8RZGZirfX-0-1948309a1024e425e4d6b5f2541aa3f7)
(5)DiskStore是BlockManager中专门负责基于磁盘的数据存储和读写的类。
DiskStore.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P358_254140.jpg?sign=1739480904-2BGJeqZUNFzJHMkchVi5LbqXbffUfQvq-0-d7421bcb1bf225e282362a0a4d73e27e)
(6)DiskBlockManager:管理Logical Block与Disk上的Physical Block之间的映射关系并负责磁盘文件的创建、读写等。
查看WordCount在IDEA中的运行日志,日志中显示INFO DiskBlockManager: Created local directory。DiskBlockManager负责磁盘文件的管理。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P358_254141.jpg?sign=1739480904-BPuAasu0XzM06dDPIbBZI2UKUQPm8TjN-0-97d7610904de234d94f31254c1a41b1d)
DiskBlockManager负责管理逻辑级别和物理级别的映射关系,根据BlockID映射一个文件。在目录spark.local.dir或者SPARK_LOCAL_DIRS中,Block文件进行hash生成。通过createLocalDirs生成本地目录。DiskBlockManager的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P359_254144.jpg?sign=1739480904-xYWWSNfX2bSWPzCRza3sTTMIzucQGCKQ-0-77d34efac0d5e97df31acf50e160c8f9)
从Job运行的角度来观察BlockManager:
查看WordCount.scala的运行日志:日志中显示INFO BlockManagerInfo: Added broadcast_0_piece0 in memory,将BlockManagerInfo的广播变量加入到内存中。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P359_254145.jpg?sign=1739480904-8hDdk48iyYI7OQciFwaY0poCLLjjh8SF-0-191429e70e9872b6d88bd17b6466d40b)
Driver使用BlockManagerInfo管理ExecutorBackend中BlockManager的元数据,BlockManagerInfo的成员变量包括blockManagerId、系统当前时间timeMs、最大堆内内存maxOnHeapMem、最大堆外内存maxOffHeapMem、slaveEndpoint。
BlockManagerMasterEndpoint.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P359_254146.jpg?sign=1739480904-gZOdyH1RU7RGNNoa7L4WLX9eifX3K3cC-0-e2166517b6bdd00d7b29ab38082e4f68)
集群中每启动一个节点,就创建一个BlockManager,BlockManager是在每个节点(Driver及Executors)上运行的管理器,用于存放和检索本地和远程不同的存储块(内存、磁盘和堆外内存)。BlockManagerInfo中的BlockManagerId标明是哪个BlockManager,slaveEndpoint是消息循环体,用于消息通信。
(1)首先通过MemoryStore存储广播变量。
(2)在Driver中是通过BlockManagerInfo来管理集群中每个ExecutorBackend中的BlockManager中的元数据信息的。
(3)当改变了具体的ExecutorBackend上的Block信息后,就必须发消息给Driver中的BlockManagerMaster来更新相应的BlockManagerInfo。
(4)当执行第二个Stage的时候,第二个Stage会向Driver中的MapOutputTracker-MasterEndpoint发消息请求上一个Stage中相应的输出,此时MapOutputTrackerMaster会把上一个Stage的输出数据的元数据信息发送给当前请求的Stage。图7-14是BlockManager工作原理和运行机制简图。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P360_62427.jpg?sign=1739480904-CebOBs9NNipWD38INN3D2CvjAJSRsNJx-0-7b7dda6ad57ac86b1970a23e18385f84)
图7-14 BlockManager工作原理和运行机制简图
BlockManagerMasterEndpoint.scala中BlockManagerInfo的getStatus方法如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P360_254148.jpg?sign=1739480904-U2O8NaDaux0WcclW9lW9RoV6acYRVTbW-0-684b22c4473c517f56f6312b7cd1febb)
其中的BlockStatus是一个case class。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P360_254149.jpg?sign=1739480904-vd98ZOeaqgcNZIHWYe3Tf8dCOFnhtGbe-0-de6d4736a78b8dfe0ebbdd0befbdc437)
BlockTransferService.scala进行网络连接操作,获取远程数据。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P360_254150.jpg?sign=1739480904-dNzJpQdHUCF3zK2HOSHWMmSYWk1P8NeI-0-868ed53ee7704723471e10ca67a75837)
7.6.5 BlockManager解密进阶
本节讲解BlockManager初始化和注册解密、BlockManagerMaster工作解密、BlockTransferService解密、本地数据读写解密、远程数据读写解密。
BlockManager既可以运行在Driver上,也可以运行在Executor上。在Driver上的BlockManager管理集群中Executor的所有的BlockManager,BlockManager分成Master、Slave结构,一切的调度、一切的工作由Master触发,Executor在启动的时候一定会启动BlockManager。BlockManager主要提供了读和写数据的接口,可以从本地读写数据,也可以从远程读写数据。读写数据可以基于磁盘,也可以基于内存以及OffHeap。OffHeap就是堆外空间(如Alluxio是分布式内存管理系统,与基于内存计算的Spark系统形成天衣无缝的组合,在大数据领域中,Spark+Alluxio+Kafka是非常有用的组合)。
从整个程序运行的角度看,Driver也是Executor的一种,BlockManager可以运行在Driver上,也可以运行在Executor上。BlockManager.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P361_254152.jpg?sign=1739480904-aj26HIQrWDFrnmAQ8kUvGCWUxO5zc6EJ-0-927640d137a6c9d0e7cb7c1c23c334ab)
BlockManager中的成员变量中:BlockManagerMaster对整个集群的BlockManagerMaster进行管理;serializerManager是默认的序列化器;MemoryManager是内存管理;MapOutputTracker是Shuffle输出的时候,要记录ShuffleMapTask输出的位置,以供下一个Stage使用,因此需要进行记录。BlockTransferService是进行网络操作的,如果要连同另外一个BlockManager进行数据读写操作,就需要BlockTransferService。Block是Spark运行时数据的最小抽象单位,可能放入内存中,也可能放入磁盘中,还可能放在Alluxio上。
SecurityManager是安全管理;numUsableCores是可用的Cores。
BlockManager中DiskBlockManager管理磁盘的读写,创建并维护磁盘上逻辑块和物理块之间的逻辑映射位置。一个block被映射到根据BlockId生成的一个文件,块文件哈希列在目录spark.local.dir中(如果设置了SPARK LOCAL DIRS),或在目录(SPARK LOCAL DIRS)中。
然后在BlockManager中创建一个缓存池:block-manager-future以及memoryStore、diskStore。
Shuffle读写数据的时候是通过BlockManager进行管理的。
Spark 2.2.1版本的BlockManager.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P362_254154.jpg?sign=1739480904-blcjPIFhD9Ff6nBY1e3dKuSU9VWlsNzy-0-b012684ef7de186aa111f32e670f8f25)
Spark 2.4.3版本的BlockManager.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第12行新增加一个参数。conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P362_254155.jpg?sign=1739480904-tQD4vH7vY5RFfUULik2InMxUGXEqzgQe-0-19cbff0026bac79f15f48c5d18ea5fdf)
BlockManager.scala中,BlockManager实例对象通过调用initialize方法才能正式工作,传入参数是appId,基于应用程序的ID初始化BlockManager。initialize不是在构造器的时候被使用,因为BlockManager实例化的时候还不知道应用程序的ID,应用程序ID是应用程序启动时,ExecutorBackend向Master注册时候获得的。
BlockManager.scala的initialize方法中的BlockTransferService进行网络通信。ShuffleClient是BlockManagerWorker每次启动时向BlockManagerMaster注册。BlockManager.scala的initialize方法中调用了registerBlockManager,向Master进行注册,告诉BlockManagerMaster把自己注册进去。
BlockManagerMaster.scala的registerBlockManager的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P363_254157.jpg?sign=1739480904-CCj2g98eZCiUrJCA88pDhzb2bCnedwgz-0-ddb33e7d86d1187604505424f8dcdc7f)
registerBlockManager方法的RegisterBlockManager是一个case class。
BlockManagerMessages.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P363_254158.jpg?sign=1739480904-gPZayQooZT6CNACRVGPrMx2VW10NTJlh-0-361e8b4233b37f27fac68f2d3af7266d)
在Executor实例化的时候,要初始化blockManager。blockManager在initialize中将应用程序ID传进去。
Executor.scala中,Executor每隔10s向Master发送心跳消息,如收不到心跳消息,blockManager须重新注册。
Spark 2.1.2版本的Executor.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P363_254159.jpg?sign=1739480904-3AjHQsg1ooRUzFx4xrH6UlDtDS0ZsWES-0-9da256319592c0c5ffab145d84477d69)
Spark 2.4.3版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第5行RpcTimeout调整为以下代码:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P364_254161.jpg?sign=1739480904-5jAdE75DkOdpBpWGhUACN8pvFoySXexa-0-1ae5a4cacd3638dd2e1cebc36f7ddfd7)
回到BlockManagerMaster.scala的registerBlockManager:
registerBlockManager中RegisterBlockManager传入的slaveEndpoint是:具体的Executor启动时会启动一个BlockManagerSlaveEndpoint,会接收BlockManagerMaster发过来的指令。在initialize方法中通过master.registerBlockManager传入slaveEndpoint,而slaveEndpoint是在rpcEnv.setupEndpoint方法中调用new()函数创建的BlockManagerSlaveEndpoint。
总结如下。
(1)当Executor实例化的时候,会通过BlockManager.initialize来实例化Executor上的BlockManager,并且创建BlockManagerSlaveEndpoint这个消息循环体来接受Driver中BlockManagerMaster发过来的指令,如删除Block等。
1. env.blockManager.initialize(conf.getAppId)
BlockManagerSlaveEndpoint.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P364_254163.jpg?sign=1739480904-zg7JUMfeTIMz2BQgHaBHOJPAWXw1SpYP-0-f44d77a12274a6121f7b99bc30323afd)
(2)当BlockManagerSlaveEndpoint实例化后,Executor上的BlockManager需要向Driver上的BlockManagerMasterEndpoint注册。
BlockManagerMaster的registerBlockManager方法,其中的driverEndpoint是构建BlockManagerMaster时传进去的。
(3)BlockManagerMasterEndpoint接收到Executor上的注册信息并进行处理。
BlockManagerMasterEndpoint.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P364_254164.jpg?sign=1739480904-AiO7W8c2Ik0VGd9IGK1jRO8gmwW03I0L-0-effe200706dc78504e367443f74256cd)
BlockManagerMasterEndpoint的register注册方法,为每个Executor的BlockManager生成对应的BlockManagerInfo。BlockManagerInfo是一个HashMap[BlockManagerId, BlockManagerInfo]。
BlockManagerMasterEndpoint.scala的register注册方法源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P364_254165.jpg?sign=1739480904-vhzYNeju82iS8GoWXwP71Tawxf1VZpTe-0-4ebaef7cff459b7ca31f6245cad3b6e2)
BlockManagerMasterEndpoint中,BlockManagerId是一个class,标明了BlockManager在哪个Executor中,以及host主机名、port端口等信息。
BlockManagerId.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P365_254168.jpg?sign=1739480904-lsaU5xCMIfuDVmvcfqIAzpYtwrv1N53p-0-1f08db55424a080fbd8ae5234aad30df)
BlockManagerMasterEndpoint中,BlockManagerInfo包含内存、slaveEndpoint等信息。
回到BlockManagerMasterEndpoint的register注册方法:如果blockManagerInfo没有包含BlockManagerId,根据BlockManagerId.executorId查询BlockManagerId,如果匹配到旧的BlockManagerId,就进行清理。
BlockManagerMasterEndpoint的removeExecutor方法如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P365_254171.jpg?sign=1739480904-kKFkffrIUn3yQ3wzBawNeMFtJmOmJTKl-0-5947e1c5bba9b4ea77fecf908c7a011f)
进入removeBlockManager方法,从blockManagerIdByExecutor数据结构中清理掉block manager信息,从blockManagerInfo数据结构中清理掉所有的blocks信息。removeBlockManager源码如下。
BlockManagerMasterEndpoint.scala的removeBlockManager的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P366_254173.jpg?sign=1739480904-iImiWh0Qjd0yrpoXsrAYZEepBhvvvc43-0-02e2d26453429b47f911748de76e6fa6)
removeBlockManager中的一行代码blockLocations.remove的remove方法如下。
HashMap.java的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P367_254175.jpg?sign=1739480904-8P5cB2rq2buBaCbFyp80BCnzXEiNjIDl-0-4b0fecbcb906e9e909f893dc86cac3ee)
回到BlockManagerMasterEndpoint的register注册方法:然后在blockManagerIdByExecutor中加入BlockManagerId,将BlockManagerId加入BlockManagerInfo信息,在listenerBus中进行监听,函数返回BlockManagerId,完成注册。
回到BlockManager.scala,在initialize方法通过master.registerBlockManager注册成功以后,将返回值赋值给idFromMaster。Initialize初始化之后,看一下BlockManager.scala中其他的方法。
reportAllBlocks方法:具体的Executor须向Driver不断地汇报自己的状态。
BlockManager.scala的reportAllBlocks方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P367_254176.jpg?sign=1739480904-TeUbWkcxPS3mPwtrQCuVCq4TCBCdmxgH-0-508bbb4c0ba0a6bf87024d6c6360213c)
reportAllBlocks方法中调用了getCurrentBlockStatus,包括内存、磁盘等信息。
getCurrentBlockStatus的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P367_254177.jpg?sign=1739480904-rWWqcAnn9aMSy2O9TnkNKbXV8hALzHqZ-0-e9ef564fa079180db03c99d67925783c)
getCurrentBlockStatus方法中的BlockStatus,包含存储级别StorageLevel、内存大小、磁盘大小等信息。
BlockManagerMasterEndpoint.scala的BlockStatus的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P368_254179.jpg?sign=1739480904-8pPluVNBlLt1kPj4mnEOBj6paQRd9Y2w-0-2173893bee490adbfbf45fc06cc71bb2)
回到BlockManager.scala,其中的getLocationBlockIds方法比较重要,根据BlockId获取这个BlockId所在的BlockManager。
BlockManager.scala的getLocationBlockIds的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P368_254180.jpg?sign=1739480904-E8vZUGWK2MCdfb1SsVzYWaidhVKgwc8J-0-ec1933986ce855f24f01c437ee718aff)
getLocationBlockIds方法中根据BlockId通过master.getLocations向Master获取位置信息,因为master管理所有的位置信息。getLocations方法里的driverEndpoint是BlockManagerMasterEndpoint,Executor向BlockManagerMasterEndpoint发送GetLocationsMultipleBlockIds消息。
BlockManagerMaster.scala的getLocations方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P368_254181.jpg?sign=1739480904-KPnkC1DyS38lzLiMADy9qzIjpM9tsaWc-0-2cc38b6d9a801ca8a20e244184970e76)
getLocations中的GetLocationsMultipleBlockIds是一个case class。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P368_254182.jpg?sign=1739480904-TfsH4SLjL5cUF2vEC5oMZqwtG45xRp9u-0-635f945ecc0cf06b9e8b568f6a622568)
在BlockManagerMasterEndpoint侧接收GetLocationsMultipleBlockIds消息。
BlockManagerMasterEndpoint.scala的receiveAndReply方法如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P368_254183.jpg?sign=1739480904-eBSyK0PpV9g36ZlCh1pyGyTU95dFf4hU-0-bb05772ab7c25827b9aed5b998808bf7)
进入getLocationsMultipleBlockIds方法,进行map操作,开始查询位置信息。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P368_254184.jpg?sign=1739480904-DWqi7yHvmi7Z2bJgsVXNTnDX4hKCDN8A-0-f931be365bbc42293e64664df45acbc9)
进入getLocations方法,首先判断内存缓存结构blockLocations中是否包含blockId,如果已包含,就获取位置信息,否则返回空的信息。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P369_254186.jpg?sign=1739480904-2lGhzdFYLiNTs0CKXR3KHb9VpWWEnPzN-0-1b64d2d81b6ee1d6717017622218eaa3)
其中,blockLocations是一个重要的数据结构,是一个JHashMap。Key是BlockId。Value是一个HashSet[BlockManagerId],使用HashSet。因为每个BlockId在磁盘上有副本,不同机器的位置不一样,而且不同副本对应的BlockManagerId不一样,位于不同的机器上,所以使用HashSet数据结构。
BlockManagerMasterEndpoint.scala的blockLocations的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P369_254187.jpg?sign=1739480904-X6233T8qgIyzFajwQJ8fcX1ZRc7KWJnn-0-d8b0f99fd421328e6d8f460bf24dc66e)
回到BlockManager.scala,getLocalValues是一个重要的方法,从blockInfoManager中获取本地数据。
首先根据blockId从blockInfoManager中获取BlockInfo信息。
从BlockInfo信息获取level级别,根据level.useMemory && memoryStore.contains(blockId)判断是否在内存中,如果在内存中,就从memoryStore中获取数据。
根据level.useDisk && diskStore.contains(blockId)判断是否在磁盘中,如果在磁盘中,就从diskStore中获取数据。
BlockManager.scala的getLocalValues方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P369_254188.jpg?sign=1739480904-3uhl0RYit3sUH6yavoWpS8qpK1PhDomZ-0-475c1f215d969d926a04e41dc5259095)
回到BlockManager.scala,getRemoteValues方法从远程的BlockManager中获取block数据,在JVM中不需要去获取锁。
BlockManager.scala的getRemoteValues方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P370_254191.jpg?sign=1739480904-Pe5e6bqfXNQSvZ9VMJWErZJMGHJGvknL-0-203ca3a1682e5b7af3dda5a76dc452c5)
getRemoteValues方法中调用getRemoteBytes,获取远程的数据,如果获取的失败次数超过最大的获取次数(locations.size),就提示失败,返回空值;如果获取到远程数据,就返回。
getRemoteBytes方法调用blockTransferService.fetchBlockSync方法实现远程获取数据。
Spark 2.2.1版本的BlockTransferService.scala的fetchBlockSync方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P370_254192.jpg?sign=1739480904-Grmcc9Fz3Da2yJLLZ7NKysGMCKHoMiws-0-e1b48ed8afa2830c2b6bfc91cb14d32c)
Spark 2.4.3版本的BlockTransferService.scala的fetchBlockSync方法的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第1行新增加一个tempFileManager参数。
上段代码中第9~15行onBlockFetchSuccess方法整体替换为以下的onBlockFetchSuccess代码。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P371_254195.jpg?sign=1739480904-Q9B5D8e9EnOjv2vKiAMaJmvmc6HRoPhe-0-ffec17da312bbf99f0718f97741487f1)
fetchBlocks方法用于从远程节点异步获取序列块,仅在调用[init]之后可用。注意,这个API需要一个序列,可以实现批处理请求,而不是返回一个future,底层实现可以调用onBlockFetchSuccess来尽快获取块的数据,而不是等待所有块被取出来。
fetchBlockSync中调用fetchBlocks方法,NettyBlockTransferService继承自BlockTransfer-Service,是BlockTransferService实现子类。
Spark 2.2.1版本的NettyBlockTransferService的fetchBlocks的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P371_254196.jpg?sign=1739480904-smr60HqHX3Uv0qpJWe7Bs4Unf1dAu7Z7-0-e84ec1e2e97c53eb0d77cfd037109887)
Spark 2.4.3版本的NettyBlockTransferService.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第7、14行tempShuffleFileManager调整为tempFileManager。Download-FileManager用于创建临时块文件的管理器,用于获取远程数据以减少内存使用。当文件不再使用时,它将清除文件。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P372_65177.jpg?sign=1739480904-wSLb6aWRGf7Q36h0OuvVI6GL5NrdNqdV-0-e1532f1f4e8050e8d201f036f41f1112)
回到BlockManager.scala,无论是doPutBytes(),还是doPutIterator()方法中,都会使用doPut方法。
BlockManager.scala的doPut方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P372_254200.jpg?sign=1739480904-KwmjiNsAEhYAiXUYVPMrOLdnpfTgyECd-0-3697ed65b6d65811ef15577944f7f8e9)
doPut方法中,lockNewBlockForWriting写入一个新的块前先尝试获得适当的锁,如果我们是第一个写块,获得写入锁后继续后续操作。否则,如果另一个线程已经写入块,须等待写入完成,才能获取读取锁,调用new()函数创建一个BlockInfo赋值给putBlockInfo,然后通过putBody(putBlockInfo)将数据存入。putBody是一个匿名函数,输入BlockInfo,输出的是一个泛型Option[T]。putBody函数体内容是doPutIterator方法(doPutBytes方法也类似调用doPut)调用doPut时传入的。
BlockManager.scala的doPutIterator调用doPut方法,在其putBody匿名函数体中进行判断:
如果是level.useMemory,则在memoryStore中放入数据。
如果是level.useDisk,则在diskStore中放入数据。
如果level.replication大于1,则在其他节点中存入副本数据。
其中,Spark 2.2.1版本的BlockManager.scala的replicate方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P373_254203.jpg?sign=1739480904-41PUTHprkEIM76acjNuOoXuDUL0y2Eqv-0-cdd8972df791248bc2b54a3d7ae031a3)
Spark 2.4.3版本的BlockManager.scala的replicate方法的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第15行之后新增代码,构建BlockManagerManagedBuffer的实例buffer。
上段代码中第21行将BlockManagerManagedBuffer替换为buffer。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P374_254205.jpg?sign=1739480904-KPFQFBoe8BdMBMqzGznnpH7PLF3E2e6X-0-30411eca4f4fc57c663d918ebe99c802)
replicate方法中调用了blockTransferService.uploadBlockSync方法。
BlockTransferService.scala的uploadBlockSync的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P374_254206.jpg?sign=1739480904-dTI8cmj01qeOrYgulhLcPyEZ3GZ7iGu2-0-753df5d217fcd6dc4f47b62e341ddf30)
uploadBlockSync中又调用uploadBlock方法,BlockTransferService.scala的uploadBlock方法无具体实现,NettyBlockTransferService是BlockTransferService的子类,具体实现uploadBlock方法。
Spark 2.2.1版本的NettyBlockTransferService的uploadBlock的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P374_254207.jpg?sign=1739480904-vOSMzGNUQPLC0oUqhnocODDJnF6B05yA-0-682d52f8e132546fba086c2e020c6507)
Spark 2.4.3版本的NettyBlockTransferService.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中删掉第16~18行。
上段代码中第19行之前新增代码,新建asStream变量,其将blockData与MAX_REMOTE_ BLOCK_SIZE_FETCH_TO_MEM进行比较。当块的大小高于此阈值时,远程块以字节为单位将被提取到磁盘。这是为了避免一个大的请求占用太多的内存。通过设置特定值(例如200)可以启用这个配置。注意,此配置将影响shuffle获取和块管理器远程块获取。对于启用了外部shuffle服务,此功能只能在高于Spark 2.2的版本,在外部shuffle启用时使用服务。
上段代码中第19行新建一个变量callback。
上段代码中第21行logTrace日志的内容进行调整。
上段代码中第25行logError日志的内容进行调整。
上段代码中第29行之后新增一段代码,如果blockData大于MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM,则调用client.uploadStream方法,将数据作为流发送到远程端,与stream()方法的不同之处在于,这是一个请求向远程端发送数据,而不是从远程端接收数据。如果将blockData小于阈值,则将NIO缓冲区转换或复制到数组中,以便对其进行序列化。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P375_254210.jpg?sign=1739480904-T5c5Yi9YS7f8CSLxgd9vqFJRrA1Dcr3i-0-c7c2f4d1addb61488a34863842850348)
回到BlockManager.scala,看一下dropFromMemory方法。如果存储级别定位为MEMORY_AND_DISK,那么数据可能放在内存和磁盘中,内存够的情况下不会放到磁盘上;如果内存不够,就放到磁盘上,这时就会调用dropFromMemory。如果存储级别不是定义为MEMORY_AND_DISK,而只是存储在内存中,内存不够时,缓存的数据此时就会丢弃。如果仍需要数据,那就要重新计算。
BlockManager.scala的dropFromMemory的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P376_254213.jpg?sign=1739480904-259J8iYXZfE8q4R6ziyzRm7QhOEd9P8s-0-2a55e6b436cbdbc2fac3b78abbf9d8f7)
总结:dropFromMemory是指在内存不够的时候,尝试释放一部分内存给要使用内存的应用,释放的这部分内存数据需考虑是丢弃,还是放到磁盘上。如果丢弃,如5000个步骤作为一个Stage,前面4000个步骤进行了Cache,Cache时可能有100万个partition分区单位,其中丢弃了100个,丢弃的100个数据就要重新计算;但是,如果设置了同时放到内存和磁盘,此时会放入磁盘中,下次如果需要,就可以从磁盘中读取数据,而不是重新计算。