Spark分区器HashPartitioner和RangePartitioner代码详解
轉(zhuǎn)載:
https://www.iteblog.com/archives/1522.html
在Spark中分區(qū)器直接決定了RDD中分區(qū)的個數(shù);也決定了RDD中每條數(shù)據(jù)經(jīng)過Shuffle過程屬于哪個分區(qū);也決定了Reduce的個數(shù)。這三點(diǎn)看起來是不同的方面的,但其深層的含義是一致的。
我們需要注意的是,只有Key-Value類型的RDD才有分區(qū)的,非Key-Value類型的RDD分區(qū)的值是None的。
在Spark中,存在兩類分區(qū)函數(shù):HashPartitioner和RangePartitioner,它們都是繼承自Partitioner,主要提供了每個RDD有幾個分區(qū)(numPartitions)以及對于給定的值返回一個分區(qū)ID(0~numPartitions-1),也就是決定這個值是屬于那個分區(qū)的。
文章目錄
- 1 HashPartitioner分區(qū)
- 2 RangePartitioner分區(qū)
- 3 確認(rèn)邊界
HashPartitioner分區(qū)
HashPartitioner分區(qū)的原理很簡單,對于給定的key,計(jì)算其hashCode,并除于分區(qū)的個數(shù)取余,如果余數(shù)小于0,則用余數(shù)+分區(qū)的個數(shù),最后返回的值就是這個key所屬的分區(qū)ID。實(shí)現(xiàn)如下:
| ? class HashPartitioner(partitions: Int) extends Partitioner { ??require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") ? ??def numPartitions: Int = partitions ? ??def getPartition(key: Any): Int = key match { ????case null => 0 ????case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) ??} ? ??override def equals(other: Any): Boolean = other match { ????case h: HashPartitioner => ??????h.numPartitions == numPartitions ????case _ => ??????false ??} ? ??override def hashCode: Int = numPartitions } |
RangePartitioner分區(qū)
從HashPartitioner分區(qū)的實(shí)現(xiàn)原理我們可以看出,其結(jié)果可能導(dǎo)致每個分區(qū)中數(shù)據(jù)量的不均勻,極端情況下會導(dǎo)致某些分區(qū)擁有RDD的全部數(shù)據(jù),這顯然不是我們需要的。而RangePartitioner分區(qū)則盡量保證每個分區(qū)中數(shù)據(jù)量的均勻,而且分區(qū)與分區(qū)之間是有序的,也就是說一個分區(qū)中的元素肯定都是比另一個分區(qū)內(nèi)的元素小或者大;但是分區(qū)內(nèi)的元素是不能保證順序的。簡單的說就是將一定范圍內(nèi)的數(shù)映射到某一個分區(qū)內(nèi)。
前面討論過,RangePartitioner分區(qū)器的主要作用就是將一定范圍內(nèi)的數(shù)映射到某一個分區(qū)內(nèi),所以它的實(shí)現(xiàn)中分界的算法尤為重要。這個算法對應(yīng)的函數(shù)是rangeBounds。這個函數(shù)主要經(jīng)歷了兩個過程:以Spark 1.1版本為界,Spark 1.1版本社區(qū)對rangeBounds函數(shù)進(jìn)行了一次重大的重構(gòu)。
因?yàn)樵赟park 1.1版本之前,RangePartitioner分區(qū)對整個數(shù)據(jù)集進(jìn)行了2次的掃描:一次是計(jì)算RDD中元素的個數(shù);一次是進(jìn)行采樣。具體的代碼如下:
| // An array of upper bounds for the first (partitions - 1) partitions private val rangeBounds: Array[K] = { ????if (partitions == 1) { ??????Array() ????} else { ??????val rddSize = rdd.count() ??????val maxSampleSize = partitions * 20.0 ??????val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0) ??????val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted ??????if (rddSample.length == 0) { ????????Array() ??????} else { ????????val bounds = new Array[K](partitions - 1) ????????for (i <- 0 until partitions - 1) { ??????????val index = (rddSample.length - 1) * (i + 1) / partitions ??????????bounds(i) = rddSample(index) ????????} ????????bounds ??????} ????} } |
注意看里面的rddSize的計(jì)算和rdd.sample的計(jì)算。所以如果你進(jìn)行一次sortByKey操作就會對RDD掃描三次!而我們都知道,分區(qū)函數(shù)性能對整個Spark作業(yè)的性能是有直接的影響,而且影響很大,直接影響作業(yè)運(yùn)行的總時間,所以社區(qū)不得不對RangePartitioner中的rangeBounds算法進(jìn)行重構(gòu)。
在閱讀新版本的RangePartitioner之前,建議先去了解一下Reservoir sampling(水塘抽樣),因?yàn)槠渲械膶?shí)現(xiàn)用到了Reservoir sampling算法進(jìn)行采樣。
采樣總數(shù)
在新的rangeBounds算法總,采樣總數(shù)做了一個限制,也就是最大只采樣1e6的樣本(也就是1000000):
| val sampleSize = math.min(20.0 * partitions, 1e6) |
所以如果你的分區(qū)個數(shù)為5,則采樣樣本數(shù)量為100.0
父RDD中每個分區(qū)采樣樣本數(shù)
按照我們的思路,正常情況下,父RDD每個分區(qū)需要采樣的數(shù)據(jù)量應(yīng)該是sampleSize/rdd.partitions.size,但是我們看代碼的時候發(fā)現(xiàn)父RDD每個分區(qū)需要采樣的數(shù)據(jù)量是正常數(shù)的3倍。
| val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt |
這是因?yàn)楦窻DD各分區(qū)中的數(shù)據(jù)量可能會出現(xiàn)傾斜的情況,乘于3的目的就是保證數(shù)據(jù)量小的分區(qū)能夠采樣到足夠的數(shù)據(jù),而對于數(shù)據(jù)量大的分區(qū)會進(jìn)行第二次采樣。
采樣算法
這個地方就是RangePartitioner分區(qū)的核心了,其內(nèi)部使用的就是水塘抽樣,而這個抽樣特別適合那種總數(shù)很大而且未知,并無法將所有的數(shù)據(jù)全部存放到主內(nèi)存中的情況。也就是我們不需要事先知道RDD中元素的個數(shù)(不需要調(diào)用rdd.count()了!)。其主要實(shí)現(xiàn)如下:
| ? ? val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) ? def sketch[K : ClassTag]( ??????rdd: RDD[K], ??????sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = { ????val shift = rdd.id ????// val classTagK = classTag[K] // to avoid serializing the entire partitioner object ????val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => ??????val seed = byteswap32(idx ^ (shift << 16)) ??????val (sample, n) = SamplingUtils.reservoirSampleAndCount( ????????iter, sampleSizePerPartition, seed) ??????Iterator((idx, n, sample)) ????}.collect() ????val numItems = sketched.map(_._2.toLong).sum ????(numItems, sketched) } ? def reservoirSampleAndCount[T: ClassTag]( ??????input: Iterator[T], ??????k: Int, ??????seed: Long = Random.nextLong()) ????: (Array[T], Int) = { ????val reservoir = new Array[T](k) ????// Put the first k elements in the reservoir. ????var i = 0 ????while (i < k && input.hasNext) { ??????val item = input.next() ??????reservoir(i) = item ??????i += 1 ????} ? ????// If we have consumed all the elements, return them. Otherwise do the replacement. ????if (i < k) { ??????// If input size < k, trim the array to return only an array of input size. ??????val trimReservoir = new Array[T](i) ??????System.arraycopy(reservoir, 0, trimReservoir, 0, i) ??????(trimReservoir, i) ????} else { ??????// If input size > k, continue the sampling process. ??????val rand = new XORShiftRandom(seed) ??????while (input.hasNext) { ????????val item = input.next() ????????val replacementIndex = rand.nextInt(i) ????????if (replacementIndex < k) { ??????????reservoir(replacementIndex) = item ????????} ????????i += 1 ??????} ??????(reservoir, i) } } |
RangePartitioner.sketch的第一個參數(shù)是rdd.map(_._1),也就是把父RDD的key傳進(jìn)來,因?yàn)榉謪^(qū)只需要對Key進(jìn)行操作即可。該函數(shù)返回值是val (numItems, sketched) ,其中numItems相當(dāng)于記錄rdd元素的總數(shù);而sketched的類型是Array[(Int, Int, Array[K])],記錄的是分區(qū)的編號、該分區(qū)中總元素的個數(shù)以及從父RDD中每個分區(qū)采樣的數(shù)據(jù)。
sketch函數(shù)對父RDD中的每個分區(qū)進(jìn)行采樣,并記錄下分區(qū)的ID和分區(qū)中數(shù)據(jù)總和。
reservoirSampleAndCount函數(shù)就是典型的水塘抽樣實(shí)現(xiàn),唯一不同的是該算法還記錄下i的值,這個就是該分區(qū)中元素的總和。
我們之前討論過,父RDD各分區(qū)中的數(shù)據(jù)量可能不均勻,在極端情況下,有些分區(qū)內(nèi)的數(shù)據(jù)量會占有整個RDD的絕大多數(shù)的數(shù)據(jù),如果按照水塘抽樣進(jìn)行采樣,會導(dǎo)致該分區(qū)所采樣的數(shù)據(jù)量不足,所以我們需要對該分區(qū)再一次進(jìn)行采樣,而這次采樣使用的就是rdd的sample函數(shù)。實(shí)現(xiàn)如下:
| val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) val candidates = ArrayBuffer.empty[(K, Float)] val imbalancedPartitions = mutable.Set.empty[Int] sketched.foreach { case (idx, n, sample) => ??if (fraction * n > sampleSizePerPartition) { ????imbalancedPartitions += idx ??} else { ????// The weight is 1 over the sampling probability. ????val weight = (n.toDouble / sample.size).toFloat ????for (key <- sample) { ??????candidates += ((key, weight)) ????} ??} } if (imbalancedPartitions.nonEmpty) { ??// Re-sample imbalanced partitions with the desired sampling probability. ??val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) ??val seed = byteswap32(-rdd.id - 1) ??val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() ??val weight = (1.0 / fraction).toFloat ??candidates ++= reSampled.map(x => (x, weight)) } |
我們可以看到,重新采樣的采樣因子和Spark 1.1之前的采樣因子一致。對于滿足于fraction * n > sampleSizePerPartition條件的分區(qū),我們對其再一次采樣。所有采樣完的數(shù)據(jù)全部存放在candidates 中。
確認(rèn)邊界
從上面的采樣算法可以看出,對于不同的分區(qū)weight的值是不一樣的,這個值對應(yīng)的就是每個分區(qū)的采樣間隔。
| def determineBounds[K : Ordering : ClassTag]( ????candidates: ArrayBuffer[(K, Float)], ????partitions: Int): Array[K] = { ??val ordering = implicitly[Ordering[K]] ??val ordered = candidates.sortBy(_._1) ??val numCandidates = ordered.size ??val sumWeights = ordered.map(_._2.toDouble).sum ??val step = sumWeights / partitions ??var cumWeight = 0.0 ??var target = step ??val bounds = ArrayBuffer.empty[K] ??var i = 0 ??var j = 0 ??var previousBound = Option.empty[K] ??while ((i < numCandidates) && (j < partitions - 1)) { ????val (key, weight) = ordered(i) ????cumWeight += weight ????if (cumWeight > target) { ??????// Skip duplicate values. ??????if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { ????????bounds += key ????????target += step ????????j += 1 ????????previousBound = Some(key) ??????} ????} ????i += 1 ??} ??bounds.toArray } |
這個函數(shù)最后返回的就是分區(qū)的劃分邊界。
注意,按照理想情況,選定的劃分邊界需要保證劃分后的分區(qū)中數(shù)據(jù)量是均勻的,但是這個算法中如果將cumWeight > target修改成cumWeight >= target的時候會保證各分區(qū)之間數(shù)據(jù)量更加均衡??梢钥催@里https://issues.apache.org/jira/browse/SPARK-10184。
定位分區(qū)ID
分區(qū)類的一個重要功能就是對給定的值計(jì)算其屬于哪個分區(qū)。這個算法并沒有太大的變化。
| def getPartition(key: Any): Int = { ??val k = key.asInstanceOf[K] ??var partition = 0 ??if (rangeBounds.length <= 128) { ????// If we have less than 128 partitions naive search ????while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { ??????partition += 1 ????} ??} else { ????// Determine which binary search method to use only once. ????partition = binarySearch(rangeBounds, k) ????// binarySearch either returns the match location or -[insertion point]-1 ????if (partition < 0) { ??????partition = -partition-1 ????} ????if (partition > rangeBounds.length) { ??????partition = rangeBounds.length ????} ??} ??if (ascending) { ????partition ??} else { ????rangeBounds.length - partition ??} } |
如果分區(qū)邊界數(shù)組的大小小于或等于128的時候直接變量數(shù)組,否則采用二分查找法確定key屬于某個分區(qū)。
總結(jié)
以上是生活随笔為你收集整理的Spark分区器HashPartitioner和RangePartitioner代码详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark数据倾斜的完美解决
- 下一篇: Spark 资源调度及任务调度