spark中各类key算子的用法汇总(持续更新中)
啟動方式:
spark-shell --master yarn
依賴導入:
import org.apache.spark.{SparkConf, SparkContext}
輸出rdd的類型舉例:
numRDD.getClass
打印RDD的兩個辦法:
numRDD.collect()
numRDD.collect.foreach(println)
#---------------------------------------------------------------------------------------------------------------------------------------
Transformation算子:
| 算子操作前的變量名 | 算子操作前的變量類型 ? | 算子操作前變量取值 | 算子 | 算子的作用 | 算子操作后, 變量類型 | 算子操作后變量取值 | |
| arr | ParallelCollectionRDD | Array(1, 5, 4, 6, 8) | map | 對每個元素進行映射 | MapPartitionsRDD | ?Array(1, 25, 16, 36, 64) | |
| numRDD | ParallelCollectionRDD | Array(1, 5, 4, 6, 8) | filter | 篩選數據 | MapPartitionsRDD | Array(4, 6, 8) | |
| words | ParallelCollectionRDD | Array("hello python","hello hadoop","hello spark") | flatMap | 打散 | MapPartitionsRDD | Array(hello, python, hello, hadoop, hello, spark) | |
| scoreRDD | ParallelCollectionRDD | Array(("class1",95),("class2",85),("class1",75)) | groupByKey | 根據key分組 | ShuffledRDD | Array((class2,CompactBuffer(85)), (class1,CompactBuffer(95, 75))) | |
| scoreRDD | ParallelCollectionRDD | Array(("class1",95),("class2",85),("class1",75)) | reduceByKey | 根據key合并,相同key的value累加 | ShuffledRDD | Array((class2,85), (class1,170)) | |
| arr | ParallelCollectionRDD | Array ((7,"aa"),(1,"bb"),(5,"gb")) | SortByKey | 根據key排序 | ShuffledRDD | 結果取決于排序還是逆排序 | |
| studentList scoreList | ParallelCollectionRDD | Array((1,leo), (2,jack), (3,tom)) Array((1,78), (2,87), (3,94)) | join | 兩張表整合 | MapPartitionsRDD | Array((1,(leo,78)), (3,(tom,94)), (2,(jack,87))) | |
| rdd1 rdd2 | ParallelCollectionRDD | Array(("aa",1),("bb",2),("cc",6)) Array(("aa",3),("dd",4),("aa",5)) ? | cogroup | 鍵值相同的合為一組 | ?Array[(String, (Iterable[Int], Iterable[Int]))] | Array((aa,(CompactBuffer(1),CompactBuffer(5, 3))), (dd,(CompactBuffer(),CompactBuffer(4))), (bb,(CompactBuffer(2),CompactBuffer())), (cc,(CompactBuffer(6),CompactBuffer()))) | |
| rdd | ParallelCollectionRDD | Array(3, 1, 90, 3, 5, 12) | sortBy | 根據排序規則進行排序 | ShuffledRDD | 結果取決于排序還是逆排序 | |
| rdd1 rdd2 | ParallelCollectionRDD | Array(1,5,4,6,8,6) Array(1,5,2,3,6,8) | intersection | 交集 | MapPartitionsRDD | Array(6, 8, 1, 5) | |
| rdd1 rdd2 | ParallelCollectionRDD | Array(1,5,4,6,8,6) Array(1,5,2,3,6,8) | union | 并集不去重 | UnionRDD | Array(1, 5, 4, 6, 8, 6, 1, 5, 2, 3, 6, 8) | |
| idAndName idAndScore | ParallelCollectionRDD | ?Array(Tuple2(1, "David"), Tuple2(2, "Mary"), Tuple2(3, "Allin")) Array(Tuple2(1, 98), Tuple2(2, 90)) | leftOuterJoin | 根據左邊的rdd進行補全信息 | MapPartitionsRDD | Array((2,(Mary,Some(90))) (1,(David,Some(98))) (3,(Allin,None))) | |
| idAndName idAndScore | ParallelCollectionRDD | Array(Tuple2(1, "David"), Tuple2(2, "Mary"), Tuple2(3, "Allin")) Array(Tuple2(1, 98), Tuple2(2, 90)) | rightOuterJoin | 根據右邊的rdd進行補全信息 | MapPartitionsRDD | Array((2,(Some(Mary),90)), (1,(Some(David),98))) | |
| a | ParallelCollectionRDD | Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) | mapPartitions | 對每個partition執行自定義函數 | MapPartitionsRDD | 3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60 | |
| rdd1 | ParallelCollectionRDD | Array(Array(), Array(hello), Array(jason), Array(jim), Array(vin)) | coalesce | 數據重新分配到不同的partition | MapPartitionsRDD | Array(Array(), Array(), Array(jason, jim), Array(hello), Array(vin), Array(), Array(), Array(), Array(), Array()) | |
| rdd1 | ParallelCollectionRDD | Array(Array(), Array(hello), Array(jason), Array(jim), Array(vin)) | rePartitions |
| MapPartitionsRDD | Array(Array(), Array(), Array(jason, jim), Array(hello), Array(vin), Array(), Array(), Array(), Array(), Array()) | |
| result | UnionRDD | Array(1, 5, 4, 6, 8, 6, 1, 5, 2, 3, 6, 8) | distinct | 去重 | MapPartitionsRDD | Array(4, 8, 1, 5, 6, 2, 3) | |
| rdd1 | ParallelCollectionRDD | List(1,2,3,4,5,6,7,8,9) | mapPartitionsWithIndex | Partition的ID和Partition中的元素組合 | MapPartitionsRDD | ?Array([PartID:0,value=1], [PartID:0,value=2], [PartID:0,value=3], [PartID:0,value=4], [PartID:1,value=5], [PartID:1,value=6], [PartID:1,value=7], [PartID:1,value=8], [PartID:1,value=9]) | |
| rdd2 | ParallelCollectionRDD | ?Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) | aggregate(num)(pfunc1,pfunc2) | 連續執行兩個函數 | Int | 10334 | |
| pairRDD | ParallelCollectionRDD | List(("cat",2),("cat", 5),("mouse", 4),("cat", 12),("dog", 12),("mouse", 2)) | aggregateByKey | 連續執行兩個函數 | Array | Array |
?
?
?
上述代碼用法如下:
實例0
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> def func1(index:Int,iter:Iterator[Int]):Iterator[String]={
? ? ?| iter.toList.map(x=>"[PartID:"+index+",value="+x+"]").iterator}
func1: (index: Int, iter: Iterator[Int])Iterator[String]
scala> rdd1.mapPartitionsWithIndex(func1).collect()
res1: Array[String] = Array([PartID:0,value=1], [PartID:0,value=2], [PartID:0,value=3], [PartID:0,value=4], [PartID:1,value=5], [PartID:1,value=6], [PartID:1,value=7], [PartID:1,value=8], [PartID:1,value=9])
?
下面是aggregate用法[4]
scala> val rdd2 = sc.parallelize(List(1,2,3,4,5),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> rdd2.mapPartitionsWithIndex(func1).collect
res3: Array[String] = Array([PartID:0,value=1], [PartID:0,value=2], [PartID:1,value=3], [PartID:1,value=4], [PartID:1,value=5])
scala> ?rdd2.aggregate(0)(math.max(_,_),_+_)
res4: Int = 7
scala> ?rdd2.aggregate(10)(math.max(_,_),_+_)
res5: Int = 30
上面的代碼的意思是:
partition0中的數據:1,2
partition1中的護具是:3,4,5
math.max(_,_)的意思是:每個partition中的最大值與aggregate(num)中的num進行比較,保留最大值。
所以?rdd2.aggregate(0)(math.max(_,_),_+_)中
max(partition0,0)=2
max(partition1,0)=5
所以math.max(_,_),_+_的結果是:
2+5+0=7
同理:
所以?rdd2.aggregate(10)(math.max(_,_),_+_)
所以?rdd2.aggregate(10)(math.max(_,_),_+_)中
max(partition0,10)=10
max(partition1,10)=10
所以math.max(_,_),_+_的結果是:
10+10+10=30
?
?
?
?
?
?
?
實例1
scala> val arr=Array(1,5,4,6,8)
scala> val numRDD=sc.parallelize(arr)? //sc=? new SparkContext
scala> val resultRDD=numRDD.map(x=>x*x)?? //每一個元素變成平方
scala> val resultRDD=numRDD.filter(_%2==0)
?
實例2
scala> val words=Array("hello python","hello hadoop","hello spark")
scala> val wordRDD=sc.parallelize(words)
scala> wordRDD.flatMap(_.split(" ")).collect.foreach(println)
?
實例3
scala> val scores = Array(("class1",95),("class2",85),("class1",75))
scala> val scoreRDD=sc.parallelize(scores)
scala> scoreRDD.groupByKey.collect.foreach(_._2.foreach(println))
scala> val result=scoreRDD.groupByKey
scala> scoreRDD.reduceByKey(_+_)
res18: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:26
scala> scoreRDD.reduceByKey(_+_).collect()
?
實例4
scala> val arr=sc.parallelize(Array ((7,"aa"),(1,"bb"),(5,"gb")))
scala> arr.sortByKey(true).collect()
scala> arr.sortByKey(false).collect()
?
實例5
scala> val studentList = Array(
???? |???????? Tuple2(1,"leo"),
???? |???????? Tuple2(2,"jack"),
???? |???????? Tuple2(3,"tom"))
scala>?? val scoreList = Array(
???? |???????? Tuple2(1,78),
???? |???????? Tuple2(2,87),
???? |???????? Tuple2(3,94))
scala> val students = sc.parallelize(studentList, 1)
students: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[7] at parallelize at <console>:26
scala>???? val scores = sc.parallelize(scoreList, 1)
scores: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:26
scala>??? ?
???? |
???? |? val studentScores = students.join(scores)
studentScores: org.apache.spark.rdd.RDD[(Int, (String, Int))] = MapPartitionsRDD[11] at join at <console>:29
scala> studentScores.collect()
res6: Array[(Int, (String, Int))] = Array((1,(leo,78)), (3,(tom,94)), (2,(jack,87)))
scala> studentScores.collect.foreach(studentScore => {
???? |????? |?????? println("student id:" + studentScore._1);
???? |????? |?????? println("student name:" + studentScore._2._1)
???? |????? |?????? println("student score:" + studentScore._2._2) ?
???? |????? |?????? println("===================================")
???? |????? |???? })
student id:1
student name:leo
student score:78
===================================
student id:3
student name:tom
student score:94
===================================
student id:2
student name:jack
student score:87
===================================
?
實例6
scala> val rdd1 = sc.parallelize(Array(("aa",1),("bb",2),("cc",6)))
scala> val rdd2 = sc.parallelize(Array(("aa",3),("dd",4),("aa",5)))
scala> val rdd3 = rdd1.cogroup(rdd2).collect()
scala> rdd3
?
實例7
scala> val data = List(3,1,90,3,5,12)
scala> val rdd = sc.parallelize(data)
scala> rdd.collect
scala> rdd.sortBy(x => x*x+1).collect
res20: Array[Int] = Array(1, 3, 3, 5, 12, 90)
scala> rdd.sortBy(x => -x).collect
res21: Array[Int] = Array(90, 12, 5, 3, 3, 1)
?
?
實例8
scala> val rdd1=sc.parallelize(Array(1,5,4,6,8,6))
scala> val rdd2=sc.parallelize(Array(1,5,2,3,6,8))
scala> val result=rdd1.intersection(rdd2)
scala> result.collect()
scala> val result=rdd1.union(rdd2)
scala> result.collect()
scala> result.distinct.collect()
?
實例9
scala>???? val idAndName = Array(Tuple2(1, "David"), Tuple2(2, "Mary"), Tuple2(3, "Allin"))
scala>???? val idAndScore = Array(Tuple2(1, 98), Tuple2(2, 90))
scala>???? val names = sc.parallelize(idAndName);
scala>???? val scores = sc.parallelize(idAndScore)
scala> val nameAndScore = names.leftOuterJoin(scores)
scala>???? nameAndScore.collect.foreach(println)
(2,(Mary,Some(90)))
(1,(David,Some(98)))
(3,(Allin,None))
scala> val nameAndScore = names.rightOuterJoin(scores)
scala> nameAndScore.collect()
res38: Array[(Int, (Option[String], Int))] = Array((2,(Some(Mary),90)), (1,(Some(David),98)))
?
實例10
scala>???? val a = sc.parallelize(1 to 20, 5)
scala>???? class CustomIterator(iter: Iterator[Int]) extends Iterator[Int] {
? def hasNext : Boolean = {iter.hasNext}
?def next : Int= {val cur = iter.next
???? cur*3}
?}
defined class CustomIterator
scala>???? val result = a.mapPartitions(v => new CustomIterator(v))
scala>???? println(result.collect().mkString(","))
3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60
?
實例11
scala> ? ? val rdd1 = sc.parallelize(List("hello","jason","jim","vin"),5)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> ? ? val rdd3 = rdd1.repartition(10)
rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[37] at repartition at <console>:25
scala> ?val rdd4 = rdd1.coalesce(10,true)?
rdd4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at coalesce at <console>:25
scala> rdd1.glom().collect()
res11: Array[Array[String]] = Array(Array(), Array(hello), Array(jason), Array(jim), Array(vin))
scala> rdd3.glom().collect()
res12: Array[Array[String]] = Array(Array(), Array(), Array(jason, jim), Array(hello), Array(vin), Array(), Array(), Array(), Array(), Array())
scala> rdd4.glom().collect()
res13: Array[Array[String]] = Array(Array(), Array(), Array(jason, jim), Array(hello), Array(vin), Array(), Array(), Array(), Array(), Array())
?
?
?
Action算子:
| 算子操作前的變量名 | 算子操作前的變量類型 ? | 算子操作前變量取值 | 算子 | 算子的作用 | 算子操作后, 變量類型 | 算子操作后變量取值 |
| rdd1 | ParallelCollectionRDD | Array(1,23,4,6) | reduce | 求和 | Int | 34 |
| rdd1 | ParallelCollectionRDD | Array(1,23,4,6) | collect | 輸出RDD | ParallelCollectionRDD | Array(1,23,4,6) |
| rdd1 | ParallelCollectionRDD | Array(1,23,4,6) | count | 統計RDD中元素的數量 | ParallelCollectionRDD | 4 |
| rdd1 | ParallelCollectionRDD | List("Gnu", "Cat", "Rat", "Dog") | first | RDD中第一個元素 | ParallelCollectionRDD | Gnu |
| rdd1 | ParallelCollectionRDD | Seq(10, 4, 2, 12, 3) | take(num) | take函數用于獲取RDD中從0到num-1下標的元素 | ParallelCollectionRDD | Array(10, 4) |
| 見下面詳情 | 見下面詳情 | 見下面詳情 | takeSample | 見下面詳情 | 見下面詳情 | 見下面詳情 |
| rdd1 | ParallelCollectionRDD | Seq(10, 4, 2, 12, 3) | takeOrdered | 從小到大排序 | ParallelCollectionRDD | Array(2, 3) |
| rdd1 | ParallelCollectionRDD | Seq(10, 4, 2, 12, 3) | top | 從大到小排序 | ParallelCollectionRDD | Array(12,10) |
| rdd1 | ParallelCollectionRDD | Seq(10, 4, 2, 12, 3) | saveAsTextFile | 數據寫入到文件 | ParallelCollectionRDD | eq(10, 4, 2, 12, 3) |
| rdd1 | ParallelCollectionRDD | Array(1, 2, 3, 4) | foreach | 對每個RDD分別操作 | ParallelCollectionRDD | 5,6,7,8 |
| rdd2 | ParallelCollectionRDD | Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12) | foreachPartition | 對每個Partition分別操作 | ParallelCollectionRDD | 4,4,4,4,4 |
| rdd1 | ParallelCollectionRDD | Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (1,1), (2,1), (3,1), (4,1), (5,1), (6,1)) | countByValue | 對key->value對進行統計 | scala.collection.Map[(Int, Int),Long] | Map((5,1) -> 2, (3,1) -> 2, (6,1) -> 2, (4,1) -> 2, (1,1) -> 2, (2,1) -> 2) ? |
| rdd1 | ParallelCollectionRDD | Array(1,2,3,4,5,6,1,2,3,4,5,6) | countByKey | 統計rdd中數據的頻率,然后寫以key->value形式寫入map | scala.collection.Map[Int,Long] | Map(5 -> 2, 1 -> 2, 6 -> 2, 2 -> 2, 3 -> 2, 4 -> 2) |
| rdd1 | ParallelCollectionRDD | Seq(10, 4, 2, 12, 3) | savaAsObjectFile | 數據寫入到文件 | ParallelCollectionRDD | eq(10, 4, 2, 12, 3) |
| rdd1 | ParallelCollectionRDD | Seq(10, 4, 2, 12, 3) | saveAsSequenceFile | 數據寫入到文件 | ParallelCollectionRDD | eq(10, 4, 2, 12, 3) |
| a b | ParallelCollectionRDD | List(2,3,4,5) List("a","b","c","d")? | collectAsMap | 兩組數據搞成映射關系 | scala.collection.Map[Int,String] | Map(2 -> a, 5 -> d, 4 -> c, 3 -> b) |
| rdd1 | ParallelCollectionRDD | List((1,"a"),(4,"he"),(2,"b"),(3,"c")) | lookup | ?根據key查value | ParallelCollectionRDD | WrappedArray |
#--------------------------------------------------附錄-------------------------------------------------------------------------------------
例子1:
scala> val rdd1 = sc.makeRDD(Array(1,23,4,6))
scala> val result = rdd1.reduce(_+_)
scala> println(rdd1)
scala> rdd1.collect()
scala> val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)//注意,這里的2是partitions數量的意思
scala> c.getNumPartitions
scala> c.count
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
scala> ?rdd1.take(2)
?
?
takeSample用法
| 重復抽樣 | 樣本個數num與RDD個數 | 代碼 | 結果 |
| X | 樣本個數num>RDD個數 | val a = sc.parallelize(1 to 10) a.takeSample(false,11) | Array(4, 2, 5, 6, 7, 9, 3, 1, 8, 10) |
| X | 樣本個數num<RDD個數 | a.takeSample(false,5) | Array(10, 6, 9, 5, 1) |
| ? | 樣本個數num>RDD個數 | a.takeSample(true,15) | Array(5, 1, 4, 9, 2, 3, 6, 6, 10, 8, 7, 9, 7, 8, 9) |
| ? | 樣本個數num<RDD個數 | a.takeSample(true,3) | Array(7, 4, 6) |
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
scala> rdd1.top(2)
res15: Array[Int] = Array(12, 10)
scala> rdd1.takeOrdered(2)
res16: Array[Int] = Array(2, 3)
?
?
?
[1]foreachPartition屬于算子操作,可以提高模型效率。比如在使用foreach時,將RDD中所有數據寫Mongo中,就會一條數據一條數據地寫,每次函數調用可能就會創建一個數據庫連接,此時就勢必會頻繁地創建和銷毀數據庫連接,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個partition的數據,那么對于每個partition,只要創建一個數據庫連接即可,然后執行批量插入操作,此時性能是比較高的。
?
?
scala> var rdd1=sc.parallelize(List((1,"a"),(4,"he"),(2,"b"),(3,"c")))
scala> rdd1.lookup(4)
res28: Seq[String] = WrappedArray(he)
?
例子[2]
val a = sc.parallelize(List(2,3,4,5))
val b = sc.parallelize(List("a","b","c","d"))
a.zip(b).collectAsMap 結果為:scala.collection.Map[Int,String] = Map(2 -> a, 5 -> d, 4 -> c, 3 -> b)
?
[3]
scala> ?val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,1,2,3,4,5,6))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:29
scala> ?val map = rdd1.map((_,1)).countByKey()
map: scala.collection.Map[Int,Long] = Map(5 -> 2, 1 -> 2, 6 -> 2, 2 -> 2, 3 -> 2, 4 -> 2)
scala> ?val map = rdd1.map((_,1)).countByValue()
map: scala.collection.Map[(Int, Int),Long] = Map((5,1) -> 2, (3,1) -> 2, (6,1) -> 2, (4,1) -> 2, (1,1) -> 2, (2,1) -> 2)
?
?
?
[5] scala> val rdd2 = sc.makeRDD(1 to 10, 3) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24scala> rdd2.getNumPartitions res26: Int = 3這里定義了一個擁有 3?個分片的 RDD 。然后 aggregate?的兩個函數參數仍然是使用上面定義的 pfun1?與 pfun2 。
scala> rdd2.aggregate(2)(pfun1, pfun2) res29: Int = 103342 * 1 * 2 * 3 ? ? ? = 12
2 * 4 * 5 * 6 ? ? ? = 240
2 * 7 * 8 * 9 * 10 ?= 10080
在這里 pfun1 的輸出結果有 3 個值。然后就來應用 combOp 即這里的 pfun2
2 + 12 + 240 + 10080 ?= 10334
aggregate后面的數據傳遞給pfun1,pfun1的結果作為入口參數傳給pfun2
?
?
?
scala> val pairRDD = sc.parallelize(List(("cat",2),("cat", 5),("mouse", 4),("cat", 12),("dog", 12),("mouse", 2)), 2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> ? ? ? ? def func3(index:Int,iter:Iterator[(String,Int)]) = {iter.toList.map(x =>"[PartID:"+index+",value="+x+"]").iterator
scala> pairRDD.mapPartitionsWithIndex(func3).collect
scala> pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
res15: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) ? ? ? ? ? ? ?
scala> ?pairRDD.aggregateByKey(0)(_+_,_+_).collect
res16: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
?
?
下面是aggregateByBey的用法:
[4]類似aggregate操作,區別:操作的是<Key Value>的數據類型
????????API說明:PairRDDFunctions.aggregateByKey
????????def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U)
????????
????????準備數據:
????????val pairRDD = sc.parallelize(List(("cat",2),("cat", 5),("mouse", 4),("cat", 12),("dog", 12),("mouse", 2)), 2)
????????
????????重寫一個func3查看每個分區中的元素
????????def func3(index:Int,iter:Iterator[(String,Int)]) = {
????????????iter.toList.map(x =>"[PartID:"+index+",value="+x+"]").iterator
????????}
????????
????????pairRDD.mapPartitionsWithIndex(func3).collect
????????
????????結果
????????0號分區(0動物園)
????????[PartID:0,value=(cat,2)], [PartID:0,value=(cat,5)], [PartID:0,value=(mouse,4)],?
????????
????????1號分區(1動物園)
????????[PartID:1,value=(cat,12)], [PartID:1,value=(dog,12)], [PartID:1,value=(mouse,2)]
????????
????????操作:
????????(1)將每個動物園(分區)中動物數最多的個數進行求和
????????????????pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
????????????????結果:
????????????????Array((dog,12), (cat,17), (mouse,6))
????????????????
????????(2)將所有的動物求和
????????????????pairRDD.aggregateByKey(0)(_+_,_+_).collect
????????????????結果:
????????????????Array((dog,12), (cat,19), (mouse,6))
????????????????
????????????????也可以使用reduceByKey
????????????????結果:Array((dog,12), (cat,19), (mouse,6))
?
?
foreach和ForeachPartition的用法[6]
scala> ?var rdd1=sc.makeRDD(1 to 4,2)
scala> rdd1.foreach{x=>println(x+4)}
scala> var rdd2=sc.makeRDD(1 to 12,3)
scala> ?rdd2.foreachPartition{x=>println("--------------")}
scala> ?rdd2.foreachPartition{x=>println(x.size)}
這段代碼的實驗結果需要在spark web UI上觀察,可以參考:
?
?
Reference:
[1]spark foreachPartition foreach
[2]Spark API 之 collectAsMap 使用
[3]Spark行動算子之countByKey
[4]Spark高級算子:mapPartitionsWithIndex,aggregate,aggregateByKey
[5]輕松理解 Spark 的 aggregate 方法
[6]foreach和foreachPartition--Action類算子
[7]spark的foreach(println)看不到輸出結果
?
?
?
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的spark中各类key算子的用法汇总(持续更新中)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: scala一些奇怪的操作符的效果(持续更
- 下一篇: scala获得成员函数列表