日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

简单的combineByKey算子【看完就懂系列】

發(fā)布時(shí)間:2024/2/28 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 简单的combineByKey算子【看完就懂系列】 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

代碼先行:?

val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0))) // val rdd2: RDD[(String, Double)] = rdd.coalesce(3)//求和/*** createCombiner: V => C ,這個(gè)函數(shù)把當(dāng)前的值作為參數(shù),此時(shí)我們可以對(duì)其做些附加操作(類型轉(zhuǎn)換)并把它返回?(這一步類似于初始化操作)* mergeValue: (C, V) => C,該函數(shù)把元素V合并到之前的元素C(createCombiner)上 (這個(gè)操作在每個(gè)分區(qū)內(nèi)進(jìn)行)* mergeCombiners: (C, C) => C,該函數(shù)把2個(gè)元素C合并 (這個(gè)操作在不同分區(qū)間進(jìn)行)*/val res = rdd.combineByKey(x => {println(s"$x******");x},(x: Double, y: Double) => {println(s"$x%%%%%%$y");x+y},(x: Double, y: Double) => {println(s"$x@@@@@@$y");x+y})res.foreach(println)

輸出結(jié)果:

88.0****** 88.0%%%%%%95.0 183.0%%%%%%88.0 93.0****** 93.0%%%%%%95.0 188.0%%%%%%98.0 98.0****** (George,271.0) (KangKang,286.0) (limu,98.0)

圖示:

那么怎么走第三部呢?

mergeCombiners: (C, C) => C,該函數(shù)把2個(gè)元素C合并 (這個(gè)操作在不同分區(qū)間進(jìn)行) val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0)),3) // val rdd2: RDD[(String, Double)] = rdd.coalesce(3)//求和/*** createCombiner: V => C ,這個(gè)函數(shù)把當(dāng)前的值作為參數(shù),此時(shí)我們可以對(duì)其做些附加操作(類型轉(zhuǎn)換)并把它返回?(這一步類似于初始化操作)* mergeValue: (C, V) => C,該函數(shù)把元素V合并到之前的元素C(createCombiner)上 (這個(gè)操作在每個(gè)分區(qū)內(nèi)進(jìn)行)* mergeCombiners: (C, C) => C,該函數(shù)把2個(gè)元素C合并 (這個(gè)操作在不同分區(qū)間進(jìn)行)*/val res = rdd.combineByKey(x => {println(s"$x******");x},(x: Double, y: Double) => {println(s"$x%%%%%%$y");x+y},(x: Double, y: Double) => {println(s"$x@@@@@@$y");x+y})res.foreach(println)

結(jié)果展示:

88.0****** 88.0%%%%%%95.0 183.0%%%%%%88.0 271.0%%%%%%88.0 359.0%%%%%%95.0 88.0****** 88.0%%%%%%88.0 176.0%%%%%%95.0 271.0%%%%%%88.0 359.0%%%%%%88.0 95.0****** 95.0%%%%%%88.0 93.0****** 93.0%%%%%%95.0 188.0%%%%%%98.0 98.0****** 454.0@@@@@@447.0 901.0@@@@@@183.0 (George,1084.0) (limu,98.0) (KangKang,286.0)

圖示:

【總結(jié)】

方法的第一個(gè)操作在相同分區(qū)相同key的時(shí)候只操作一次,然后一直進(jìn)行第二個(gè)操作,如果不同分區(qū)中有相同的key值則進(jìn)行第三步操作,否則不執(zhí)行第三步操作。【因?yàn)榈诙讲僮饕呀?jīng)把結(jié)果算出來(lái)了】

友情提示:之所以我們的輸出第二步操作時(shí)沒(méi)有輸出最終結(jié)果,原因在于,為了返回值。我們把輸出語(yǔ)句放在了前面,也就是說(shuō)輸出語(yǔ)句后,還有一步加的操作。


?

趁熱打鐵:

val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0)),3)//求平均數(shù)val res: RDD[(String, (Int, Double))] = rdd.combineByKey(score => (1, score),(total: (Int, Double), newScore) => (total._1 + 1, total._2 + newScore),(total: (Int, Double), sum: (Int, Double)) => (total._1 + sum._1, total._2 + sum._2))val fin: RDD[(String, Double)] = res.map{case (name,(num,score)) => (name,score/num)}fin.foreach(println)

輸出結(jié)果:

(George,90.33333333333333) (limu,98.0) (KangKang,95.33333333333333)

再此總結(jié):【誰(shuí)讓它不是很好理解】

combineByKey

是針對(duì)不同partition進(jìn)行操作的。它的第一個(gè)參數(shù)用于數(shù)據(jù)初始化(后面著重講),第二個(gè)是針對(duì)一個(gè)partition的combine操作函數(shù),第三個(gè)是在所有partition都combine完畢后,針對(duì)所有臨時(shí)結(jié)果進(jìn)行combine操作的函數(shù)。

友情補(bǔ)充:

關(guān)于數(shù)據(jù)初始化

之前有人說(shuō),初始化是對(duì)每個(gè)數(shù)據(jù)進(jìn)行操作,這其實(shí)是錯(cuò)誤的。應(yīng)該是針對(duì)每個(gè)partition中,每個(gè)key下的第一個(gè)數(shù)據(jù)進(jìn)行操作。這句話怎么理解呢?看代碼:

val rdd1 = sc.parallelize(List(1,2,2,3,3,3,3,4,4,4,4,4), 2)

val rdd2 = rdd1.map((_, 1))

val rdd3 = rdd2.combineByKey(-_, (x:Int, y:Int) => x + y, (x:Int, y:Int) => x + y)

val rdd4 = rdd2.combineByKey(+_, (x:Int, y:Int) => x + y, (x:Int, y:Int) => x + y)

?

rdd2.collect

rdd3.collect

rdd4.collect

?

Array((1,1), (2,1), (2,1), (3,1), (3,1), (3,1), (3,1), (4,1), (4,1), (4,1), (4,1), (4,1))

Array((4,3), (2,0), (1,-1), (3,0))

Array((4,5), (2,2), (1,1), (3,4)) ?

?

在上述代碼中,(1,1), (2,1), (2,1), (3,1), (3,1), (3,1) 被劃分到第一個(gè)partition,(3,1), (4,1), (4,1), (4,1), (4,1), (4,1) 被劃分到第二個(gè)。于是有如下操作:

?

(1, 1):由于只有1個(gè),所以在值取負(fù)的情況下,自然輸出(1, -1)

(2, 1):由于有2個(gè),第一個(gè)取負(fù),第二個(gè)不變,因此combine后為(2, 0)

(3, 1):partition1中有3個(gè),參照上述規(guī)則,combine后為(3, 1),partition2中有1個(gè),因此combine后為(3, -1)。在第二次combine時(shí),不會(huì)有初始化操作,因此直接相加,結(jié)果為(3, 0)

(4, 1):過(guò)程同上,結(jié)果為(4, 3)

總結(jié)

以上是生活随笔為你收集整理的简单的combineByKey算子【看完就懂系列】的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。