![Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)](https://wfqqreader-1252317822.image.myqcloud.com/cover/174/40375174/b_40375174.jpg)
7.5 Tungsten Sorted Based Shuffle
本节讲解Tungsten Sorted Based Shuffle,包括Tungsten Sorted Based Shuffle概述、Tungsten Sorted Based内核、Tungsten Sorted Based数据读写的源码解析等内容。
7.5.1 概述
基于Tungsten Sort的Shuffle实现机制主要是借助Tungsten项目所做的优化来高效处理Shuffle。
Spark提供了配置属性,用于选择具体的Shuffle实现机制,但需要说明的是,虽然默认情况下Spark默认开启的是基于Sort的Shuffle实现机制(对应spark.shuffle.manager的默认值),但实际上,参考Shuffle的框架内核部分可知基于Sort的Shuffle实现机制与基于Tungsten Sort的Shuffle实现机制都是使用SortShuffleManager,而内部使用的具体的实现机制,是通过提供的两个方法进行判断的。对应非基于Tungsten Sort时,通过SortShuffleWriter.shouldBypassMergeSort方法判断是否需要回退到Hash风格的Shuffle实现机制,当该方法返回的条件不满足时,则通过SortShuffleManager.canUseSerializedShuffle方法判断是否需要采用基于Tungsten Sort的Shuffle实现机制,而当这两个方法返回都为false,即都不满足对应的条件时,会自动采用常规意义上的基于Sort的Shuffle实现机制。
因此,当设置了spark.shuffle.manager=tungsten-sort时,也不能保证就一定采用基于Tungsten Sort的Shuffle实现机制。有兴趣的读者可以参考Spark 1.5及之前的注册方法的实现,该实现中SortShuffleManager的注册方法仅构建了BaseShuffleHandle实例,同时对应的getWriter中也只对应构建了BaseShuffleHandle实例。
7.5.2 Tungsten Sorted Based Shuffle内核
基于Tungsten Sort的Shuffle实现机制的入口点仍然是SortShuffleManager类,与同样在SortShuffleManager类控制下的其他两种实现机制不同的是,基于Tungsten Sort的Shuffle实现机制使用的ShuffleHandle与ShuffleWriter分别为SerializedShuffleHandle与UnsafeShuffleWriter。因此,对应的具体实现机制如图7-12所示。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P339_57802.jpg?sign=1739193212-pbNt7kvb3wpzwRuTi9II9McxwSJRYT7n-0-7a3fe8164361c6b7e29bbd17d3899cd5)
图7-12 基于TungstenSort的Shuffle实现机制的框架类图
在Sorted Based Shuffle中,SortShuffleManager根据内部采用的不同实现细节,分别给出两种排序模式,而基于TungstenSort的Shuffle实现机制对应的就是序列化排序模式。
从图7-12中可以看到基于Sort的Shuffle实现机制,具体的写入器的选择与注册得到的ShuffleHandle类型有关,参考SortShuffleManager类的registerShuffle方法。
registerShuffle方法中会判断是否满足序列化模式的条件,如果满足,则使用基于TungstenSort的Shuffle实现机制,对应在代码中,表现为使用类型为SerializedShuffleHandle的ShuffleHandle。上述代码进一步说明了在spark.shuffle.manager设置为sort时,内部会自动选择具体的实现机制。对应代码的先后顺序,就是选择的先后顺序。
对应的序列化排序(Serialized sorting)模式需要满足的条件如下所示。
(1)Shuffle依赖中不带聚合操作或没有对输出进行排序的要求。
(2)Shuffle的序列化器支持序列化值的重定位(当前仅支持KryoSerializer以及Spark SQL子框架自定义的序列化器)。
(3)Shuffle过程中的输出分区个数少于16 777 216个。
实际上,使用过程中还有其他一些限制,如引入那个Page形式的内存管理模型后,内部单条记录的长度不能超过128MB(具体内存模型可以参考PackedRecordPointer类)。另外,分区个数的限制也是该内存模型导致的(同样参考PackedRecordPointer类)。
所以,目前使用基于TungstenSort的Shuffle实现机制条件还是比较苛刻的。
7.5.3 Tungsten Sorted Based Shuffle数据读写的源码解析
对应这种SerializedShuffleHandle及其相关的Shuffle数据写入器类型的相关代码,可以参考SortShuffleManager类的getWriter方法。
SortShuffleManager.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P340_254076.jpg?sign=1739193212-ZxDdQqfhSGfjdA8YF08vz983oMG3OF9j-0-f1582ee86404a4403af00198d3724134)
数据写入器类UnsafeShuffleWriter中使用SortShuffleManager实例中的变量shuffleBlockResolver来对逻辑数据块与物理数据块的映射进行解析,而该变量使用的是与基于Hash的Shuffle实现机制不同的解析类,即当前使用的IndexShuffleBlockResolver。
UnsafeShuffleWriter构建时传入了一个与其他两种基于Sorted的Shuffle实现机制不同的参数:context.taskMemoryManager(),在此构建了一个TaskMemoryManager实例并传入UnsafeShuffleWriter。TaskMemoryManager与Task是一对一的关系,负责管理分配给Task的内存。
下面开始解析写数据块的UnsafeShuffleWriter类的源码实现。首先来看其write的方法。
UnsafeShuffleWriter.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P341_254079.jpg?sign=1739193212-9rcNO0cPhqWblPrrilCnnP27HUBaoRQ4-0-48ffaf2cc668a305afa58a412209fad2)
写过程的关键步骤有以下三步。
(1)通过insertRecordIntoSorter(records.next())方法将每条记录插入外部排序器。
(2)closeAndWriteOutput方法写数据文件与索引文件,在写的过程中,会先合并外部排序器在插入过程中生成的Spill中间文件。
(3)sorter.cleanupResources()最后释放外部排序器的资源。
首先查看将每条记录插入外部排序器(ShuffleExternalSorter)时所使用的insertRecordIntoSorter方法。
UnsafeShuffleWriter.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P341_254080.jpg?sign=1739193212-a2TL60keGWCiF1XpurzKbJ8WcwK3Z1zW-0-fbe79efc127b212d0164ab7762cf569f)
下面继续查看第二步写数据文件与索引文件的closeAndWriteOutput方法。
closeAndWriteOutput的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P342_254083.jpg?sign=1739193212-MekatgeAwOwr8xugaXLnIrgc8pvxUvaJ-0-be303bf6f301ff5636ab945c8d93cb79)
closeAndWriteOutput方法主要有以下三步。
(1)触发外部排序器,获取Spill信息。
(2)合并中间的Spill文件,生成数据文件,并返回各个分区对应的数据量信息。
(3)根据各个分区的数据量信息生成数据文件对应的索引文件。
writeIndexFileAndCommit方法和Sorted Based Shuffle机制的实现一样,在此仅分析过程中不同的Spill文件合并步骤,即mergeSpills方法的具体实现。
UnsafeShuffleWriter.scala的mergeSpills方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P343_254086.jpg?sign=1739193212-XsUdivicMSW36g2CpgYxp7w0CemvCX09-0-2bdc08f05e25de6ab365796c687f3af9)
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P344_254088.jpg?sign=1739193212-CfJYDNbj5BDmO6ClKqH3uJhGltiSk5mw-0-f880f3702dba897f3232a7ca5a5fd385)
各种合并策略在性能上具有一定差异,会根据具体的条件采用,主要有基于Java NIO(New I/O)和基于普通文件流合并文件的方式。下面简单描述一下基于文件合并流的处理过程。
Spark 2.2.1版本的UnsafeShuffleWriter.scala的mergeSpillsWithFileStream方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P344_254089.jpg?sign=1739193212-ciNcyYPYb2KZR337jiAmHtWa15byvmmw-0-8c985b70befceb315ee3992f1ffeda4b)
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P345_254091.jpg?sign=1739193212-z9EAlPYnmDyCY1kfZG81lRu4c2ifRACQ-0-2b73469e78b76a0d5fb10106bb2c487a)
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P346_254093.jpg?sign=1739193212-BWbqtQ9GbEO80ju4dx4zxUHbEI1PjU8e-0-1ddd540175c654ae00788639e8392aec)
Spark 2.4.3版本的UnsafeShuffleWriter.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第15行之前新增构建BufferedOutputStream实例bos的代码。
上段代码中第16行将new FileOutputStream(outputFile)调整为bos。
上段代码中第22行将构建FileInputStream实例,调整为构建NioBufferedFileInputStream实例。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P346_254094.jpg?sign=1739193212-mgRJ8HtNEemonPdBeIAzFzNT0Wu68ubA-0-e68314196e09c3c2946925681e0d5326)
基于NIO的文件合并流程基本类似,只是底层采用NIO的技术实现。