简单的combineByKey算子【看完就懂系列】
代碼先行:?
val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0))) // val rdd2: RDD[(String, Double)] = rdd.coalesce(3)//求和/*** createCombiner: V => C ,這個(gè)函數(shù)把當(dāng)前的值作為參數(shù),此時(shí)我們可以對(duì)其做些附加操作(類型轉(zhuǎn)換)并把它返回?(這一步類似于初始化操作)* mergeValue: (C, V) => C,該函數(shù)把元素V合并到之前的元素C(createCombiner)上 (這個(gè)操作在每個(gè)分區(qū)內(nèi)進(jìn)行)* mergeCombiners: (C, C) => C,該函數(shù)把2個(gè)元素C合并 (這個(gè)操作在不同分區(qū)間進(jìn)行)*/val res = rdd.combineByKey(x => {println(s"$x******");x},(x: Double, y: Double) => {println(s"$x%%%%%%$y");x+y},(x: Double, y: Double) => {println(s"$x@@@@@@$y");x+y})res.foreach(println)輸出結(jié)果:
88.0****** 88.0%%%%%%95.0 183.0%%%%%%88.0 93.0****** 93.0%%%%%%95.0 188.0%%%%%%98.0 98.0****** (George,271.0) (KangKang,286.0) (limu,98.0)圖示:
那么怎么走第三部呢?
mergeCombiners: (C, C) => C,該函數(shù)把2個(gè)元素C合并 (這個(gè)操作在不同分區(qū)間進(jìn)行) val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0)),3) // val rdd2: RDD[(String, Double)] = rdd.coalesce(3)//求和/*** createCombiner: V => C ,這個(gè)函數(shù)把當(dāng)前的值作為參數(shù),此時(shí)我們可以對(duì)其做些附加操作(類型轉(zhuǎn)換)并把它返回?(這一步類似于初始化操作)* mergeValue: (C, V) => C,該函數(shù)把元素V合并到之前的元素C(createCombiner)上 (這個(gè)操作在每個(gè)分區(qū)內(nèi)進(jìn)行)* mergeCombiners: (C, C) => C,該函數(shù)把2個(gè)元素C合并 (這個(gè)操作在不同分區(qū)間進(jìn)行)*/val res = rdd.combineByKey(x => {println(s"$x******");x},(x: Double, y: Double) => {println(s"$x%%%%%%$y");x+y},(x: Double, y: Double) => {println(s"$x@@@@@@$y");x+y})res.foreach(println)結(jié)果展示:
88.0****** 88.0%%%%%%95.0 183.0%%%%%%88.0 271.0%%%%%%88.0 359.0%%%%%%95.0 88.0****** 88.0%%%%%%88.0 176.0%%%%%%95.0 271.0%%%%%%88.0 359.0%%%%%%88.0 95.0****** 95.0%%%%%%88.0 93.0****** 93.0%%%%%%95.0 188.0%%%%%%98.0 98.0****** 454.0@@@@@@447.0 901.0@@@@@@183.0 (George,1084.0) (limu,98.0) (KangKang,286.0)圖示:
【總結(jié)】
方法的第一個(gè)操作在相同分區(qū)相同key的時(shí)候只操作一次,然后一直進(jìn)行第二個(gè)操作,如果不同分區(qū)中有相同的key值則進(jìn)行第三步操作,否則不執(zhí)行第三步操作。【因?yàn)榈诙讲僮饕呀?jīng)把結(jié)果算出來(lái)了】
友情提示:之所以我們的輸出第二步操作時(shí)沒(méi)有輸出最終結(jié)果,原因在于,為了返回值。我們把輸出語(yǔ)句放在了前面,也就是說(shuō)輸出語(yǔ)句后,還有一步加的操作。
?
趁熱打鐵:
val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0)),3)//求平均數(shù)val res: RDD[(String, (Int, Double))] = rdd.combineByKey(score => (1, score),(total: (Int, Double), newScore) => (total._1 + 1, total._2 + newScore),(total: (Int, Double), sum: (Int, Double)) => (total._1 + sum._1, total._2 + sum._2))val fin: RDD[(String, Double)] = res.map{case (name,(num,score)) => (name,score/num)}fin.foreach(println)輸出結(jié)果:
(George,90.33333333333333) (limu,98.0) (KangKang,95.33333333333333)再此總結(jié):【誰(shuí)讓它不是很好理解】
combineByKey
是針對(duì)不同partition進(jìn)行操作的。它的第一個(gè)參數(shù)用于數(shù)據(jù)初始化(后面著重講),第二個(gè)是針對(duì)一個(gè)partition的combine操作函數(shù),第三個(gè)是在所有partition都combine完畢后,針對(duì)所有臨時(shí)結(jié)果進(jìn)行combine操作的函數(shù)。
友情補(bǔ)充:
關(guān)于數(shù)據(jù)初始化
之前有人說(shuō),初始化是對(duì)每個(gè)數(shù)據(jù)進(jìn)行操作,這其實(shí)是錯(cuò)誤的。應(yīng)該是針對(duì)每個(gè)partition中,每個(gè)key下的第一個(gè)數(shù)據(jù)進(jìn)行操作。這句話怎么理解呢?看代碼:
val rdd1 = sc.parallelize(List(1,2,2,3,3,3,3,4,4,4,4,4), 2)
val rdd2 = rdd1.map((_, 1))
val rdd3 = rdd2.combineByKey(-_, (x:Int, y:Int) => x + y, (x:Int, y:Int) => x + y)
val rdd4 = rdd2.combineByKey(+_, (x:Int, y:Int) => x + y, (x:Int, y:Int) => x + y)
?
rdd2.collect
rdd3.collect
rdd4.collect
?
Array((1,1), (2,1), (2,1), (3,1), (3,1), (3,1), (3,1), (4,1), (4,1), (4,1), (4,1), (4,1))
Array((4,3), (2,0), (1,-1), (3,0))
Array((4,5), (2,2), (1,1), (3,4)) ?
?
在上述代碼中,(1,1), (2,1), (2,1), (3,1), (3,1), (3,1) 被劃分到第一個(gè)partition,(3,1), (4,1), (4,1), (4,1), (4,1), (4,1) 被劃分到第二個(gè)。于是有如下操作:
?
(1, 1):由于只有1個(gè),所以在值取負(fù)的情況下,自然輸出(1, -1)
(2, 1):由于有2個(gè),第一個(gè)取負(fù),第二個(gè)不變,因此combine后為(2, 0)
(3, 1):partition1中有3個(gè),參照上述規(guī)則,combine后為(3, 1),partition2中有1個(gè),因此combine后為(3, -1)。在第二次combine時(shí),不會(huì)有初始化操作,因此直接相加,結(jié)果為(3, 0)
(4, 1):過(guò)程同上,結(jié)果為(4, 3)
總結(jié)
以上是生活随笔為你收集整理的简单的combineByKey算子【看完就懂系列】的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 小练习——过滤掉出现次数最多的数据
- 下一篇: Hadoop的TextInputForm