日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark 键值对RDD操作

發布時間:2023/11/29 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark 键值对RDD操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

https://www.cnblogs.com/yongjian/p/6425772.html

概述

鍵值對RDD是Spark操作中最常用的RDD,它是很多程序的構成要素,因為他們提供了并行操作各個鍵或跨界點重新進行數據分組的操作接口。

?

?

創建

Spark中有許多中創建鍵值對RDD的方式,其中包括

  • 文件讀取時直接返回鍵值對RDD
  • 通過List創建鍵值對RDD

在Scala中,可通過Map函數生成二元組

1 2 3 4 5 6 7 8 9 10 val listRDD = sc.parallelize(List(1,2,3,4,5)) val result = listRDD.map(x => (x,1)) result.foreach(println) //結果 (1,1) (2,1) (3,1) (4,1) (5,1)

?

?

鍵值對RDD的轉化操作

?

基本RDD轉化操作在此同樣適用。但因為鍵值對RDD中包含的是一個個二元組,所以需要傳遞的函數會由原來的操作單個元素改為操作二元組。

下表總結了針對單個鍵值對RDD的轉化操作,以?{ (1,2) , (3,4) , (3,6) }? 為例,f表示傳入的函數

函數名目的示例結果
reduceByKey(f)合并具有相同key的值rdd.reduceByKey( ( x,y) => x+y ){ (1,2) , (3,10) }
groupByKey()對具有相同key的值分組rdd.groupByKey(){ (1,2) , (3, [4,6] ) }
mapValues(f)對鍵值對中的每個值(value)應用一個函數,但不改變鍵(key)rdd.mapValues(x => x+1){ (1,3) , (3,5) , (3,7) }
combineBy Key( createCombiner, mergeValue, mergeCombiners, partitioner)使用不同的返回類型合并具有相同鍵的值下面有詳細講解-
flatMapValues(f)對鍵值對RDD中每個值應用返回一個迭代器的函數,然后對每個元素生成一個對應的鍵值對。常用語符號化rdd.flatMapValues(x => ( x to 5 ))

{ (1, 2) ,? (1, 3) ,?? (1, 4) , (1, 5) ,? (3, 4) , (3, 5) }

keys()獲取所有keyrdd.keys(){1,3,3}
values()獲取所有valuerdd.values(){2,4,6}
sortByKey()根據key排序rdd.sortByKey(){ (1,2) , (3,4) , (3,6) }

?

?

下表總結了針對兩個鍵值對RDD的轉化操作,以rdd1 = { (1,2) , (3,4) , (3,6)?}? rdd2 = { (3,9) }?為例,

函數名目的示例結果
subtractByKey刪掉rdd1中與rdd2的key相同的元素rdd1.subtractByKey(rdd2){ (1,2) }
join內連接rdd1.join(rdd2)

{(3, (4, 9)), (3, (6, 9))}

leftOuterJoin左外鏈接rdd1.leftOuterJoin (rdd2)

{(3,( Some( 4), 9)), (3,( Some( 6), 9))}

rightOuterJoin右外鏈接rdd1.rightOuterJoin(rdd2)

{(1,( 2, None)), (3, (4, Some( 9))), (3, (6, Some( 9)))}

cogroup將兩個RDD鐘相同key的數據分組到一起rdd1.cogroup(rdd2){(1,([ 2],[])), (3, ([4, 6],[ 9]))}

?

?

combineByKey

combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner,mapSideCombine)

combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner)

combineByKey( createCombiner, mergeValue, mergeCombiners)

?

函數功能:

聚合各分區的元素,而每個元素都是二元組。功能與基礎RDD函數aggregate()差不多,可讓用戶返回與輸入數據類型不同的返回值。

combineByKey函數的每個參數分別對應聚合操作的各個階段。所以,理解此函數對Spark如何操作RDD會有很大幫助。

?

參數解析:

createCombiner:分區內?創建組合函數

mergeValue:分區內?合并值函數

mergeCombiners:多分區?合并組合器函數

partitioner:自定義分區數,默認為HashPartitioner

mapSideCombine:是否在map端進行Combine操作,默認為true

?

工作流程:

  • combineByKey會遍歷分區中的所有元素,因此每個元素的key要么沒遇到過,要么和之前某個元素的key相同。
  • 如果這是一個新的元素,函數會調用createCombiner創建那個key對應的累加器初始值
  • 如果這是一個在處理當前分區之前已經遇到的key,會調用mergeCombiners把該key累加器對應的當前value與這個新的value合并
  • ?

    代碼例子:

    //統計男女個數

    1 2 3 4 5 6 7 8 9 10 val conf = new?SparkConf ().setMaster ("local").setAppName ("app_1") ???val sc = new?SparkContext (conf) ???val people = List(("男", "李四"), ("男", "張三"), ("女", "韓梅梅"), ("女", "李思思"), ("男", "馬云")) ???val rdd = sc.parallelize(people,2) ???val result = rdd.combineByKey( ?????(x: String) => (List(x), 1),? //createCombiner ?????(peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1), //mergeValue ?????(sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) //mergeCombiners ???result.foreach(println)

    結果

    (男, ( List( 張三,? 李四,? 馬云),3 ) )
    (女, ( List( 李思思,? 韓梅梅),2 ) )

    ?

    流程分解:

    ?

    解析:兩個分區,分區一按順序V1、V2、V3遍歷

    • V1,發現第一個key=男時,調用createCombiner,即 (x: String) => (List(x), 1)
    • V2,第二次碰到key=男的元素,調用mergeValue,即 (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1)
    • V3,發現第一個key=女,繼續調用createCombiner,即 (x: String) => (List(x), 1)
    • … …
    • 待各V1、V2分區都計算完后,數據進行混洗,調用mergeCombiners,即 (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))

    ?


    ?

    add by jan 2017-02-27?18:34:39

    以下例子都基于此RDD

    1 2 3 4 (Hadoop,1) (Spark,1) (Hive,1) (Spark,1)

    reduceByKey(func)

    reduceByKey(func)的功能是,使用func函數合并具有相同鍵的值。

    比如,reduceByKey((a,b) => a+b),有四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),對具有相同key的鍵值對進行合并后的結果就是:("spark",3)、("hadoop",8)。可以看出,(a,b) => a+b這個Lamda表達式中,a和b都是指value,比如,對于兩個具有相同key的鍵值對("spark",1)、("spark",2),a就是1,b就是2。

    1 2 3 4 scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println) (Spark,2) (Hive,1) (Hadoop,1)

      

    groupByKey()

    roupByKey()的功能是,對具有相同鍵的值進行分組。比如,對四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),采用groupByKey()后得到的結果是:("spark",(1,2))和("hadoop",(3,5))。

    1 2 3 4 5 6 7 scala> pairRDD.groupByKey() res15:?org.apache.spark.rdd.RDD[(String, Iterable[Int])]?=?ShuffledRDD[15] at groupByKey at <console>:34 //從上面執行結果信息中可以看出,分組后,value被保存到Iterable[Int]中 scala> pairRDD.groupByKey().foreach(println) (Spark,CompactBuffer(1,?1)) (Hive,CompactBuffer(1)) (Hadoop,CompactBuffer(1))

      

    keys

    keys只會把鍵值對RDD中的key返回形成一個新的RDD。比如,對四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)構成的RDD,采用keys后得到的結果是一個RDD[Int],內容是{"spark","spark","hadoop","hadoop"}。

    1 2 3 4 5 6 7 scala> pairRDD.keys res17:?org.apache.spark.rdd.RDD[String]?=?MapPartitionsRDD[17] at keys at <console>:34 scala> pairRDD.keys.foreach(println) Hadoop Spark Hive Spark

      

    values

    ?values只會把鍵值對RDD中的value返回形成一個新的RDD。比如,對四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)構成的RDD,采用keys后得到的結果是一個RDD[Int],內容是{1,2,3,5}。

    1 2 3 4 5 6 7 8 scala> pairRDD.values res0:?org.apache.spark.rdd.RDD[Int]?=?MapPartitionsRDD[2] at values at <console>:34 ?? scala> pairRDD.values.foreach(println) 1 1 1 1

      

    sortByKey()

    ?sortByKey()的功能是返回一個根據鍵排序的RDD。

    1 2 3 4 5 6 7 scala> pairRDD.sortByKey() res0:?org.apache.spark.rdd.RDD[(String, Int)]?=?ShuffledRDD[2] at sortByKey at <console>:34 scala> pairRDD.sortByKey().foreach(println) (Hadoop,1) (Hive,1) (Spark,1) (Spark,1)

      

    mapValues(func)

    我們經常會遇到一種情形,我們只想對鍵值對RDD的value部分進行處理,而不是同時對key和value進行處理。對于這種情形,Spark提供了mapValues(func),它的功能是,對鍵值對RDD中的每個value都應用一個函數,但是,key不會發生變化。比如,對四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)構成的pairRDD,如果執行pairRDD.mapValues(x => x+1),就會得到一個新的鍵值對RDD,它包含下面四個鍵值對("spark",2)、("spark",3)、("hadoop",4)和("hadoop",6)。?

    1 2 3 4 5 6 7 scala> pairRDD.mapValues(x?=> x+1) res2:?org.apache.spark.rdd.RDD[(String, Int)]?=?MapPartitionsRDD[4] at mapValues at <console>:34 scala> pairRDD.mapValues(x?=> x+1).foreach(println) (Hadoop,2) (Spark,2) (Hive,2) (Spark,2)

      

    join

    join(連接)操作是鍵值對常用的操作。“連接”(join)這個概念來自于關系數據庫領域,因此,join的類型也和關系數據庫中的join一樣,包括內連接(join)、左外連接(leftOuterJoin)、右外連接(rightOuterJoin)等。最常用的情形是內連接,所以,join就表示內連接。
    對于內連接,對于給定的兩個輸入數據集(K,V1)和(K,V2),只有在兩個數據集中都存在的key才會被輸出,最終得到一個(K,(V1,V2))類型的數據集。

    比如,pairRDD1是一個鍵值對集合{("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)},pairRDD2是一個鍵值對集合{("spark","fast")},那么,pairRDD1.join(pairRDD2)的結果就是一個新的RDD,這個新的RDD是鍵值對集合{("spark",1,"fast"),("spark",2,"fast")}。對于這個實例,我們下面在spark-shell中運行一下:

    1 2 3 4 5 6 7 8 9 10 11 12 scala>?val?pairRDD1?=?sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5))) pairRDD1:?org.apache.spark.rdd.RDD[(String, Int)]?=?ParallelCollectionRDD[24] at parallelize at <console>:27 ?? scala>?val?pairRDD2?=?sc.parallelize(Array(("spark","fast"))) pairRDD2:?org.apache.spark.rdd.RDD[(String, String)]?=?ParallelCollectionRDD[25] at parallelize at <console>:27 ?? scala> pairRDD1.join(pairRDD2) res9:?org.apache.spark.rdd.RDD[(String, (Int, String))]?=?MapPartitionsRDD[28] at join at <console>:32 ?? scala> pairRDD1.join(pairRDD2).foreach(println) (spark,(1,fast)) (spark,(2,fast))

    總結

    以上是生活随笔為你收集整理的Spark 键值对RDD操作的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。