日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark分区器HashPartitioner和RangePartitioner代码详解

發(fā)布時間:2024/4/15 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark分区器HashPartitioner和RangePartitioner代码详解 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

轉(zhuǎn)載:

https://www.iteblog.com/archives/1522.html

  在Spark中分區(qū)器直接決定了RDD中分區(qū)的個數(shù);也決定了RDD中每條數(shù)據(jù)經(jīng)過Shuffle過程屬于哪個分區(qū);也決定了Reduce的個數(shù)。這三點(diǎn)看起來是不同的方面的,但其深層的含義是一致的。

  我們需要注意的是,只有Key-Value類型的RDD才有分區(qū)的,非Key-Value類型的RDD分區(qū)的值是None的。

  在Spark中,存在兩類分區(qū)函數(shù):HashPartitioner和RangePartitioner,它們都是繼承自Partitioner,主要提供了每個RDD有幾個分區(qū)(numPartitions)以及對于給定的值返回一個分區(qū)ID(0~numPartitions-1),也就是決定這個值是屬于那個分區(qū)的。

文章目錄

  • 1 HashPartitioner分區(qū)
  • 2 RangePartitioner分區(qū)
  • 3 確認(rèn)邊界

HashPartitioner分區(qū)

  HashPartitioner分區(qū)的原理很簡單,對于給定的key,計(jì)算其hashCode,并除于分區(qū)的個數(shù)取余,如果余數(shù)小于0,則用余數(shù)+分區(qū)的個數(shù),最后返回的值就是這個key所屬的分區(qū)ID。實(shí)現(xiàn)如下:

?

class HashPartitioner(partitions: Int) extends Partitioner {

??require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

?

??def numPartitions: Int = partitions

?

??def getPartition(key: Any): Int = key match {

????case null => 0

????case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)

??}

?

??override def equals(other: Any): Boolean = other match {

????case h: HashPartitioner =>

??????h.numPartitions == numPartitions

????case _ =>

??????false

??}

?

??override def hashCode: Int = numPartitions

}

RangePartitioner分區(qū)

  從HashPartitioner分區(qū)的實(shí)現(xiàn)原理我們可以看出,其結(jié)果可能導(dǎo)致每個分區(qū)中數(shù)據(jù)量的不均勻,極端情況下會導(dǎo)致某些分區(qū)擁有RDD的全部數(shù)據(jù),這顯然不是我們需要的。而RangePartitioner分區(qū)則盡量保證每個分區(qū)中數(shù)據(jù)量的均勻,而且分區(qū)與分區(qū)之間是有序的,也就是說一個分區(qū)中的元素肯定都是比另一個分區(qū)內(nèi)的元素小或者大;但是分區(qū)內(nèi)的元素是不能保證順序的。簡單的說就是將一定范圍內(nèi)的數(shù)映射到某一個分區(qū)內(nèi)。

  前面討論過,RangePartitioner分區(qū)器的主要作用就是將一定范圍內(nèi)的數(shù)映射到某一個分區(qū)內(nèi),所以它的實(shí)現(xiàn)中分界的算法尤為重要。這個算法對應(yīng)的函數(shù)是rangeBounds。這個函數(shù)主要經(jīng)歷了兩個過程:以Spark 1.1版本為界,Spark 1.1版本社區(qū)對rangeBounds函數(shù)進(jìn)行了一次重大的重構(gòu)。

  因?yàn)樵赟park 1.1版本之前,RangePartitioner分區(qū)對整個數(shù)據(jù)集進(jìn)行了2次的掃描:一次是計(jì)算RDD中元素的個數(shù);一次是進(jìn)行采樣。具體的代碼如下:

// An array of upper bounds for the first (partitions - 1) partitions

private val rangeBounds: Array[K] = {

????if (partitions == 1) {

??????Array()

????} else {

??????val rddSize = rdd.count()

??????val maxSampleSize = partitions * 20.0

??????val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)

??????val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted

??????if (rddSample.length == 0) {

????????Array()

??????} else {

????????val bounds = new Array[K](partitions - 1)

????????for (i <- 0 until partitions - 1) {

??????????val index = (rddSample.length - 1) * (i + 1) / partitions

??????????bounds(i) = rddSample(index)

????????}

????????bounds

??????}

????}

}

  注意看里面的rddSize的計(jì)算和rdd.sample的計(jì)算。所以如果你進(jìn)行一次sortByKey操作就會對RDD掃描三次!而我們都知道,分區(qū)函數(shù)性能對整個Spark作業(yè)的性能是有直接的影響,而且影響很大,直接影響作業(yè)運(yùn)行的總時間,所以社區(qū)不得不對RangePartitioner中的rangeBounds算法進(jìn)行重構(gòu)。

  在閱讀新版本的RangePartitioner之前,建議先去了解一下Reservoir sampling(水塘抽樣),因?yàn)槠渲械膶?shí)現(xiàn)用到了Reservoir sampling算法進(jìn)行采樣。
采樣總數(shù)

  在新的rangeBounds算法總,采樣總數(shù)做了一個限制,也就是最大只采樣1e6的樣本(也就是1000000):

val sampleSize = math.min(20.0 * partitions, 1e6)

所以如果你的分區(qū)個數(shù)為5,則采樣樣本數(shù)量為100.0

父RDD中每個分區(qū)采樣樣本數(shù)

  按照我們的思路,正常情況下,父RDD每個分區(qū)需要采樣的數(shù)據(jù)量應(yīng)該是sampleSize/rdd.partitions.size,但是我們看代碼的時候發(fā)現(xiàn)父RDD每個分區(qū)需要采樣的數(shù)據(jù)量是正常數(shù)的3倍。

val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt

這是因?yàn)楦窻DD各分區(qū)中的數(shù)據(jù)量可能會出現(xiàn)傾斜的情況,乘于3的目的就是保證數(shù)據(jù)量小的分區(qū)能夠采樣到足夠的數(shù)據(jù),而對于數(shù)據(jù)量大的分區(qū)會進(jìn)行第二次采樣。

采樣算法

  這個地方就是RangePartitioner分區(qū)的核心了,其內(nèi)部使用的就是水塘抽樣,而這個抽樣特別適合那種總數(shù)很大而且未知,并無法將所有的數(shù)據(jù)全部存放到主內(nèi)存中的情況。也就是我們不需要事先知道RDD中元素的個數(shù)(不需要調(diào)用rdd.count()了!)。其主要實(shí)現(xiàn)如下:

?

?

val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)

?

def sketch[K : ClassTag](

??????rdd: RDD[K],

??????sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {

????val shift = rdd.id

????// val classTagK = classTag[K] // to avoid serializing the entire partitioner object

????val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>

??????val seed = byteswap32(idx ^ (shift << 16))

??????val (sample, n) = SamplingUtils.reservoirSampleAndCount(

????????iter, sampleSizePerPartition, seed)

??????Iterator((idx, n, sample))

????}.collect()

????val numItems = sketched.map(_._2.toLong).sum

????(numItems, sketched)

}

?

def reservoirSampleAndCount[T: ClassTag](

??????input: Iterator[T],

??????k: Int,

??????seed: Long = Random.nextLong())

????: (Array[T], Int) = {

????val reservoir = new Array[T](k)

????// Put the first k elements in the reservoir.

????var i = 0

????while (i < k && input.hasNext) {

??????val item = input.next()

??????reservoir(i) = item

??????i += 1

????}

?

????// If we have consumed all the elements, return them. Otherwise do the replacement.

????if (i < k) {

??????// If input size < k, trim the array to return only an array of input size.

??????val trimReservoir = new Array[T](i)

??????System.arraycopy(reservoir, 0, trimReservoir, 0, i)

??????(trimReservoir, i)

????} else {

??????// If input size > k, continue the sampling process.

??????val rand = new XORShiftRandom(seed)

??????while (input.hasNext) {

????????val item = input.next()

????????val replacementIndex = rand.nextInt(i)

????????if (replacementIndex < k) {

??????????reservoir(replacementIndex) = item

????????}

????????i += 1

??????}

??????(reservoir, i)

}

}

  RangePartitioner.sketch的第一個參數(shù)是rdd.map(_._1),也就是把父RDD的key傳進(jìn)來,因?yàn)榉謪^(qū)只需要對Key進(jìn)行操作即可。該函數(shù)返回值是val (numItems, sketched) ,其中numItems相當(dāng)于記錄rdd元素的總數(shù);而sketched的類型是Array[(Int, Int, Array[K])],記錄的是分區(qū)的編號、該分區(qū)中總元素的個數(shù)以及從父RDD中每個分區(qū)采樣的數(shù)據(jù)。

  sketch函數(shù)對父RDD中的每個分區(qū)進(jìn)行采樣,并記錄下分區(qū)的ID和分區(qū)中數(shù)據(jù)總和。

  reservoirSampleAndCount函數(shù)就是典型的水塘抽樣實(shí)現(xiàn),唯一不同的是該算法還記錄下i的值,這個就是該分區(qū)中元素的總和。

  我們之前討論過,父RDD各分區(qū)中的數(shù)據(jù)量可能不均勻,在極端情況下,有些分區(qū)內(nèi)的數(shù)據(jù)量會占有整個RDD的絕大多數(shù)的數(shù)據(jù),如果按照水塘抽樣進(jìn)行采樣,會導(dǎo)致該分區(qū)所采樣的數(shù)據(jù)量不足,所以我們需要對該分區(qū)再一次進(jìn)行采樣,而這次采樣使用的就是rdd的sample函數(shù)。實(shí)現(xiàn)如下:

val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)

val candidates = ArrayBuffer.empty[(K, Float)]

val imbalancedPartitions = mutable.Set.empty[Int]

sketched.foreach { case (idx, n, sample) =>

??if (fraction * n > sampleSizePerPartition) {

????imbalancedPartitions += idx

??} else {

????// The weight is 1 over the sampling probability.

????val weight = (n.toDouble / sample.size).toFloat

????for (key <- sample) {

??????candidates += ((key, weight))

????}

??}

}

if (imbalancedPartitions.nonEmpty) {

??// Re-sample imbalanced partitions with the desired sampling probability.

??val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)

??val seed = byteswap32(-rdd.id - 1)

??val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()

??val weight = (1.0 / fraction).toFloat

??candidates ++= reSampled.map(x => (x, weight))

}

我們可以看到,重新采樣的采樣因子和Spark 1.1之前的采樣因子一致。對于滿足于fraction * n > sampleSizePerPartition條件的分區(qū),我們對其再一次采樣。所有采樣完的數(shù)據(jù)全部存放在candidates 中。

確認(rèn)邊界

從上面的采樣算法可以看出,對于不同的分區(qū)weight的值是不一樣的,這個值對應(yīng)的就是每個分區(qū)的采樣間隔。

def determineBounds[K : Ordering : ClassTag](

????candidates: ArrayBuffer[(K, Float)],

????partitions: Int): Array[K] = {

??val ordering = implicitly[Ordering[K]]

??val ordered = candidates.sortBy(_._1)

??val numCandidates = ordered.size

??val sumWeights = ordered.map(_._2.toDouble).sum

??val step = sumWeights / partitions

??var cumWeight = 0.0

??var target = step

??val bounds = ArrayBuffer.empty[K]

??var i = 0

??var j = 0

??var previousBound = Option.empty[K]

??while ((i < numCandidates) && (j < partitions - 1)) {

????val (key, weight) = ordered(i)

????cumWeight += weight

????if (cumWeight > target) {

??????// Skip duplicate values.

??????if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {

????????bounds += key

????????target += step

????????j += 1

????????previousBound = Some(key)

??????}

????}

????i += 1

??}

??bounds.toArray

}

這個函數(shù)最后返回的就是分區(qū)的劃分邊界。

注意,按照理想情況,選定的劃分邊界需要保證劃分后的分區(qū)中數(shù)據(jù)量是均勻的,但是這個算法中如果將cumWeight > target修改成cumWeight >= target的時候會保證各分區(qū)之間數(shù)據(jù)量更加均衡??梢钥催@里https://issues.apache.org/jira/browse/SPARK-10184。

定位分區(qū)ID

分區(qū)類的一個重要功能就是對給定的值計(jì)算其屬于哪個分區(qū)。這個算法并沒有太大的變化。

def getPartition(key: Any): Int = {

??val k = key.asInstanceOf[K]

??var partition = 0

??if (rangeBounds.length <= 128) {

????// If we have less than 128 partitions naive search

????while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {

??????partition += 1

????}

??} else {

????// Determine which binary search method to use only once.

????partition = binarySearch(rangeBounds, k)

????// binarySearch either returns the match location or -[insertion point]-1

????if (partition < 0) {

??????partition = -partition-1

????}

????if (partition > rangeBounds.length) {

??????partition = rangeBounds.length

????}

??}

??if (ascending) {

????partition

??} else {

????rangeBounds.length - partition

??}

}

如果分區(qū)邊界數(shù)組的大小小于或等于128的時候直接變量數(shù)組,否則采用二分查找法確定key屬于某個分區(qū)。

總結(jié)

以上是生活随笔為你收集整理的Spark分区器HashPartitioner和RangePartitioner代码详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。