日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Spark Shuffle Write阶段磁盘文件分析

發(fā)布時(shí)間:2024/1/23 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Shuffle Write阶段磁盘文件分析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

流程分析

入口處:

org.apache.spark.scheduler.ShuffleMapTask.runTask


override def runTask(context: TaskContext): MapStatus = {// Deserialize the RDD using the broadcast variable. val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTime = System.currentTimeMillis()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0L val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0L var writer: ShuffleWriter[Any, Any] = null try {val manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])writer.stop(success = true).get} catch {case e: Exception =>try {if (writer != null) {writer.stop(success = false)}} catch {case e: Exception =>log.debug("Could not stop writer", e)}throw e} }

這里manager 拿到的是

先看private[spark] trait ShuffleManager? 是一個(gè)接口,

SortShuffleManager實(shí)現(xiàn)了該接口。

private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging

org.apache.spark.shuffle.sort.SortShuffleWriter

我們看他是如何拿到可以寫磁盤的那個(gè)sorter的。

override def getWriter[K, V](handle: ShuffleHandle,mapId: Int,context: TaskContext): ShuffleWriter[K, V] = {numMapsForShuffle.putIfAbsent(handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)val env = SparkEnv.get handle match {case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>new UnsafeShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],context.taskMemoryManager(),unsafeShuffleHandle,mapId,context,env.conf)case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>new BypassMergeSortShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],bypassMergeSortHandle,mapId,context,env.conf)case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)} } 這里case了2種情況:/** * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the * serialized shuffle. 是否序列化 */ private[spark] class SerializedShuffleHandle[K, V](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, V])extends BaseShuffleHandle(shuffleId, numMaps, dependency) { }/** * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the * bypass merge sort shuffle path. 繞過(guò)歸并排序的shuffle路徑。 */ private[spark] class BypassMergeSortShuffleHandle[K, V](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, V])extends BaseShuffleHandle(shuffleId, numMaps, dependency) { }這里再看看BaseShuffleHandle /** * A basic ShuffleHandle implementation that just captures registerShuffle's parameters. */ private[spark] class BaseShuffleHandle[K, V, C](shuffleId: Int,val numMaps: Int,val dependency: ShuffleDependency[K, V, C])extends ShuffleHandle(shuffleId)
繼續(xù)看abstract class ShuffleHandle(val shuffleId: Int) extends Serializable {}是一個(gè)抽象類,實(shí)現(xiàn)了序列化。

繼續(xù)new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
類的定義:private[spark] class SortShuffleWriter[K, V, C](shuffleBlockResolver: IndexShuffleBlockResolver,handle: BaseShuffleHandle[K, V, C],mapId: Int,context: TaskContext)extends ShuffleWriter[K, V] with Logging然后write操作
/** Write a bunch of records to this task's output */ 一串 bunch override def write(records: Iterator[Product2[K, V]]): Unit = {sorter = if (dep.mapSideCombine) {require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}sorter.insertAll(records)// Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)val tmp = Utils.tempFileWith(output)try {val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)val partitionLengths = sorter.writePartitionedFile(blockId, tmp)shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}} } 我們分析的線路假設(shè)需要做mapSideCombine sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C](dep.aggregator, Some(dep.partitioner), dep.keyOrdering, de.serializer)

接著將map的輸出放到sorter當(dāng)中:

sorter.insertAll(records) //備注一下sorter位置 //private var sorter: ExternalSorter[K, V, _] = null //import org.apache.spark.util.collection.ExternalSorter
def insertAll(records: Iterator[Product2[K, V]]): Unit = {// TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefinedif (shouldCombine) {// Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValueval createCombiner = aggregator.get.createCombinervar kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => {if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)}while (records.hasNext) {addElementsRead()kv = records.next()map.changeValue((getPartition(kv._1), kv._1), update)maybeSpillCollection(usingMap = true)}} else {// Stick values into our buffer while (records.hasNext) {addElementsRead()val kv = records.next()buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])maybeSpillCollection(usingMap = false)}} }

其中insertAll 的流程是這樣的:

while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update)maybeSpillCollection(usingMap = true)}

private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 // Size of object batches when reading/writing from serializers. // // Objects are written in batches, with each batch using its own serialization stream. This // cuts down on the size of reference-tracking maps constructed when deserializing a stream. // // NOTE: Setting this too low can cause excessive copying when serializing, since some serializers // grow internal data structures by growing + copying every time the number of objects doubles. private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000)// Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we // store them in an array buffer. @volatile private var map = new PartitionedAppendOnlyMap[K, C] @volatile private var buffer = new PartitionedPairBuffer[K, C]

里面的map 其實(shí)就是PartitionedAppendOnlyMap,這個(gè)是全內(nèi)存的一個(gè)結(jié)構(gòu)。當(dāng)把這個(gè)寫滿了,才會(huì)觸發(fā)spill操作。你可以看到maybeSpillCollection在PartitionedAppendOnlyMap每次更新后都會(huì)被調(diào)用。

一旦發(fā)生呢個(gè)spill后,產(chǎn)生的文件名稱是:

"temp_shuffle_" + id

邏輯在這:

val (blockId, file) = diskBlockManager.createTempShuffleBlock() def createTempShuffleBlock(): (TempShuffleBlockId, File) = { var blockId = new TempShuffleBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { blockId = new TempShuffleBlockId(UUID.randomUUID()) } (blockId, getFile(blockId))}

產(chǎn)生的所有 spill文件被被記錄在一個(gè)數(shù)組里:

private val spills = new ArrayBuffer[SpilledFile]

迭代完一個(gè)task對(duì)應(yīng)的partition數(shù)據(jù)后,會(huì)做merge操作,把磁盤上的spill文件和內(nèi)存的,迭代處理,得到一個(gè)新的iterator,這個(gè)iterator的元素會(huì)是這個(gè)樣子的:

(p, mergeWithAggregation( iterators, aggregator.get.mergeCombiners, keyComparator,ordering.isDefined))

其中p 是reduce 對(duì)應(yīng)的partitionId, p對(duì)應(yīng)的所有數(shù)據(jù)都會(huì)在其對(duì)應(yīng)的iterator中。

接著會(huì)獲得最后的輸出文件名:

val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)

文件名格式會(huì)是這樣的:

"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"

其中reduceId 是一個(gè)固定值NOOP_REDUCE_ID,默認(rèn)為0。

然后開始真實(shí)寫入文件

val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)

寫入文件的過(guò)程過(guò)程是這樣的:

for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { val writer = blockManager.getDiskWriter(blockId,outputFile, serInstance,fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem <- elements) { writer.write(elem._1, elem._2) } writer.commitAndClose() val segment = writer.fileSegment() lengths(id) = segment.length } }

剛剛我們說(shuō)了,這個(gè) this.partitionedIterator 其實(shí)內(nèi)部元素是reduce partitionID -> 實(shí)際record 的 iterator,所以它其實(shí)是順序?qū)懨總€(gè)分區(qū)的記錄,寫完形成一個(gè)fileSegment,并且記錄偏移量。這樣后續(xù)每個(gè)的reduce就可以根據(jù)偏移量拿到自己需要的數(shù)據(jù)。對(duì)應(yīng)的文件名,前面也提到了,是:

"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".data"

剛剛我們說(shuō)偏移量,其實(shí)是存在內(nèi)存里的,所以接著要持久化,通過(guò)下面的writeIndexFile來(lái)完成:

shuffleBlockResolver.writeIndexFile(dep.shuffleId,mapId, partitionLengths)

具體的文件名是:

"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".index"

至此,一個(gè)task的寫入操作完成,對(duì)應(yīng)一個(gè)文件。

最終結(jié)論

所以最后的結(jié)論是,一個(gè)Executor 最終對(duì)應(yīng)的文件數(shù)應(yīng)該是:

MapNum (注:不包含index文件)

同時(shí)持有并且會(huì)進(jìn)行寫入的文件數(shù)最多為::

CoreNum
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)

總結(jié)

以上是生活随笔為你收集整理的Spark Shuffle Write阶段磁盘文件分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。