日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark中各类key算子的用法汇总(持续更新中)

發布時間:2023/12/31 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark中各类key算子的用法汇总(持续更新中) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

啟動方式:

spark-shell --master yarn

依賴導入:

import org.apache.spark.{SparkConf, SparkContext}

輸出rdd的類型舉例:

numRDD.getClass

打印RDD的兩個辦法:

numRDD.collect()

numRDD.collect.foreach(println)

#---------------------------------------------------------------------------------------------------------------------------------------

Transformation算子:

算子操作前的變量名

算子操作前的變量類型

?

算子操作前變量取值算子算子的作用

算子操作后,

變量類型

算子操作后變量取值
arrParallelCollectionRDDArray(1, 5, 4, 6, 8)map對每個元素進行映射MapPartitionsRDD?Array(1, 25, 16, 36, 64)
numRDDParallelCollectionRDDArray(1, 5, 4, 6, 8)filter篩選數據MapPartitionsRDDArray(4, 6, 8)
wordsParallelCollectionRDDArray("hello python","hello hadoop","hello spark")flatMap打散MapPartitionsRDDArray(hello, python, hello, hadoop, hello, spark)
scoreRDDParallelCollectionRDDArray(("class1",95),("class2",85),("class1",75))groupByKey根據key分組ShuffledRDDArray((class2,CompactBuffer(85)), (class1,CompactBuffer(95, 75)))
scoreRDDParallelCollectionRDDArray(("class1",95),("class2",85),("class1",75))reduceByKey根據key合并,相同key的value累加ShuffledRDDArray((class2,85), (class1,170))
arrParallelCollectionRDDArray ((7,"aa"),(1,"bb"),(5,"gb"))SortByKey根據key排序ShuffledRDD結果取決于排序還是逆排序

studentList

scoreList

ParallelCollectionRDD

Array((1,leo), (2,jack), (3,tom))

Array((1,78), (2,87), (3,94))

join兩張表整合MapPartitionsRDDArray((1,(leo,78)), (3,(tom,94)), (2,(jack,87)))

rdd1

rdd2

ParallelCollectionRDD

Array(("aa",1),("bb",2),("cc",6))

Array(("aa",3),("dd",4),("aa",5))

?

cogroup鍵值相同的合為一組?Array[(String, (Iterable[Int], Iterable[Int]))]Array((aa,(CompactBuffer(1),CompactBuffer(5, 3))), (dd,(CompactBuffer(),CompactBuffer(4))), (bb,(CompactBuffer(2),CompactBuffer())), (cc,(CompactBuffer(6),CompactBuffer())))
rddParallelCollectionRDDArray(3, 1, 90, 3, 5, 12)sortBy根據排序規則進行排序ShuffledRDD結果取決于排序還是逆排序

rdd1

rdd2

ParallelCollectionRDD

Array(1,5,4,6,8,6)

Array(1,5,2,3,6,8)

intersection交集MapPartitionsRDDArray(6, 8, 1, 5)

rdd1

rdd2

ParallelCollectionRDD

Array(1,5,4,6,8,6)

Array(1,5,2,3,6,8)

union并集不去重UnionRDDArray(1, 5, 4, 6, 8, 6, 1, 5, 2, 3, 6, 8)

idAndName

idAndScore

ParallelCollectionRDD

?Array(Tuple2(1, "David"), Tuple2(2, "Mary"), Tuple2(3, "Allin"))

Array(Tuple2(1, 98), Tuple2(2, 90))

leftOuterJoin根據左邊的rdd進行補全信息MapPartitionsRDDArray((2,(Mary,Some(90)))
(1,(David,Some(98)))
(3,(Allin,None)))

idAndName

idAndScore

ParallelCollectionRDD

Array(Tuple2(1, "David"), Tuple2(2, "Mary"), Tuple2(3, "Allin"))

Array(Tuple2(1, 98), Tuple2(2, 90))

rightOuterJoin

根據右邊的rdd進行補全信息MapPartitionsRDDArray((2,(Some(Mary),90)), (1,(Some(David),98)))
aParallelCollectionRDDArray(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)mapPartitions對每個partition執行自定義函數MapPartitionsRDD

3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60

rdd1ParallelCollectionRDDArray(Array(), Array(hello), Array(jason), Array(jim), Array(vin))coalesce數據重新分配到不同的partitionMapPartitionsRDDArray(Array(), Array(), Array(jason, jim), Array(hello), Array(vin), Array(), Array(), Array(), Array(), Array())
rdd1ParallelCollectionRDDArray(Array(), Array(hello), Array(jason), Array(jim), Array(vin))rePartitions
數據重新分配到不同的partition
MapPartitionsRDDArray(Array(), Array(), Array(jason, jim), Array(hello), Array(vin), Array(), Array(), Array(), Array(), Array())
resultUnionRDDArray(1, 5, 4, 6, 8, 6, 1, 5, 2, 3, 6, 8)distinct去重MapPartitionsRDDArray(4, 8, 1, 5, 6, 2, 3)
rdd1ParallelCollectionRDDList(1,2,3,4,5,6,7,8,9)mapPartitionsWithIndexPartition的ID和Partition中的元素組合MapPartitionsRDD?Array([PartID:0,value=1], [PartID:0,value=2], [PartID:0,value=3], [PartID:0,value=4], [PartID:1,value=5], [PartID:1,value=6], [PartID:1,value=7], [PartID:1,value=8], [PartID:1,value=9])
rdd2ParallelCollectionRDD?Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)aggregate(num)(pfunc1,pfunc2)連續執行兩個函數Int10334
pairRDDParallelCollectionRDDList(("cat",2),("cat", 5),("mouse", 4),("cat", 12),("dog", 12),("mouse", 2))aggregateByKey連續執行兩個函數ArrayArray

?

?

?

上述代碼用法如下:

實例0

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> def func1(index:Int,iter:Iterator[Int]):Iterator[String]={
? ? ?| iter.toList.map(x=>"[PartID:"+index+",value="+x+"]").iterator}
func1: (index: Int, iter: Iterator[Int])Iterator[String]

scala> rdd1.mapPartitionsWithIndex(func1).collect()
res1: Array[String] = Array([PartID:0,value=1], [PartID:0,value=2], [PartID:0,value=3], [PartID:0,value=4], [PartID:1,value=5], [PartID:1,value=6], [PartID:1,value=7], [PartID:1,value=8], [PartID:1,value=9])

?

下面是aggregate用法[4]

scala> val rdd2 = sc.parallelize(List(1,2,3,4,5),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> rdd2.mapPartitionsWithIndex(func1).collect
res3: Array[String] = Array([PartID:0,value=1], [PartID:0,value=2], [PartID:1,value=3], [PartID:1,value=4], [PartID:1,value=5])

scala> ?rdd2.aggregate(0)(math.max(_,_),_+_)
res4: Int = 7

scala> ?rdd2.aggregate(10)(math.max(_,_),_+_)
res5: Int = 30

上面的代碼的意思是:

partition0中的數據:1,2

partition1中的護具是:3,4,5

math.max(_,_)的意思是:每個partition中的最大值與aggregate(num)中的num進行比較,保留最大值。

所以?rdd2.aggregate(0)(math.max(_,_),_+_)中

max(partition0,0)=2

max(partition1,0)=5

所以math.max(_,_),_+_的結果是:

2+5+0=7

同理:

所以?rdd2.aggregate(10)(math.max(_,_),_+_)

所以?rdd2.aggregate(10)(math.max(_,_),_+_)中

max(partition0,10)=10

max(partition1,10)=10

所以math.max(_,_),_+_的結果是:

10+10+10=30

?

?

?


?

?

?

?

實例1

scala> val arr=Array(1,5,4,6,8)

scala> val numRDD=sc.parallelize(arr)? //sc=? new SparkContext

scala> val resultRDD=numRDD.map(x=>x*x)?? //每一個元素變成平方

scala> val resultRDD=numRDD.filter(_%2==0)

?

實例2

scala> val words=Array("hello python","hello hadoop","hello spark")

scala> val wordRDD=sc.parallelize(words)

scala> wordRDD.flatMap(_.split(" ")).collect.foreach(println)

?

實例3

scala> val scores = Array(("class1",95),("class2",85),("class1",75))

scala> val scoreRDD=sc.parallelize(scores)

scala> scoreRDD.groupByKey.collect.foreach(_._2.foreach(println))

scala> val result=scoreRDD.groupByKey

scala> scoreRDD.reduceByKey(_+_)
res18: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:26

scala> scoreRDD.reduceByKey(_+_).collect()

?

實例4

scala> val arr=sc.parallelize(Array ((7,"aa"),(1,"bb"),(5,"gb")))

scala> arr.sortByKey(true).collect()

scala> arr.sortByKey(false).collect()

?

實例5

scala> val studentList = Array(
???? |???????? Tuple2(1,"leo"),
???? |???????? Tuple2(2,"jack"),
???? |???????? Tuple2(3,"tom"))

scala>?? val scoreList = Array(
???? |???????? Tuple2(1,78),
???? |???????? Tuple2(2,87),
???? |???????? Tuple2(3,94))

scala> val students = sc.parallelize(studentList, 1)
students: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[7] at parallelize at <console>:26

scala>???? val scores = sc.parallelize(scoreList, 1)
scores: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:26

scala>??? ?
???? |
???? |? val studentScores = students.join(scores)
studentScores: org.apache.spark.rdd.RDD[(Int, (String, Int))] = MapPartitionsRDD[11] at join at <console>:29

scala> studentScores.collect()
res6: Array[(Int, (String, Int))] = Array((1,(leo,78)), (3,(tom,94)), (2,(jack,87)))

scala> studentScores.collect.foreach(studentScore => {
???? |????? |?????? println("student id:" + studentScore._1);
???? |????? |?????? println("student name:" + studentScore._2._1)
???? |????? |?????? println("student score:" + studentScore._2._2) ?
???? |????? |?????? println("===================================")
???? |????? |???? })
student id:1
student name:leo
student score:78
===================================
student id:3
student name:tom
student score:94
===================================
student id:2
student name:jack
student score:87
===================================

?

實例6

scala> val rdd1 = sc.parallelize(Array(("aa",1),("bb",2),("cc",6)))

scala> val rdd2 = sc.parallelize(Array(("aa",3),("dd",4),("aa",5)))

scala> val rdd3 = rdd1.cogroup(rdd2).collect()

scala> rdd3

?

實例7

scala> val data = List(3,1,90,3,5,12)

scala> val rdd = sc.parallelize(data)

scala> rdd.collect

scala> rdd.sortBy(x => x*x+1).collect
res20: Array[Int] = Array(1, 3, 3, 5, 12, 90)

scala> rdd.sortBy(x => -x).collect
res21: Array[Int] = Array(90, 12, 5, 3, 3, 1)

?

?

實例8

scala> val rdd1=sc.parallelize(Array(1,5,4,6,8,6))

scala> val rdd2=sc.parallelize(Array(1,5,2,3,6,8))

scala> val result=rdd1.intersection(rdd2)

scala> result.collect()

scala> val result=rdd1.union(rdd2)

scala> result.collect()

scala> result.distinct.collect()

?

實例9

scala>???? val idAndName = Array(Tuple2(1, "David"), Tuple2(2, "Mary"), Tuple2(3, "Allin"))

scala>???? val idAndScore = Array(Tuple2(1, 98), Tuple2(2, 90))

scala>???? val names = sc.parallelize(idAndName);

scala>???? val scores = sc.parallelize(idAndScore)

scala> val nameAndScore = names.leftOuterJoin(scores)

scala>???? nameAndScore.collect.foreach(println)
(2,(Mary,Some(90)))
(1,(David,Some(98)))
(3,(Allin,None))

scala> val nameAndScore = names.rightOuterJoin(scores)

scala> nameAndScore.collect()
res38: Array[(Int, (Option[String], Int))] = Array((2,(Some(Mary),90)), (1,(Some(David),98)))

?

實例10

scala>???? val a = sc.parallelize(1 to 20, 5)

scala>???? class CustomIterator(iter: Iterator[Int]) extends Iterator[Int] {
? def hasNext : Boolean = {iter.hasNext}
?def next : Int= {val cur = iter.next
???? cur*3}
?}
defined class CustomIterator

scala>???? val result = a.mapPartitions(v => new CustomIterator(v))

scala>???? println(result.collect().mkString(","))

3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60

?

實例11

scala> ? ? val rdd1 = sc.parallelize(List("hello","jason","jim","vin"),5)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[33] at parallelize at <console>:24

scala> ? ? val rdd3 = rdd1.repartition(10)
rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[37] at repartition at <console>:25

scala> ?val rdd4 = rdd1.coalesce(10,true)?
rdd4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at coalesce at <console>:25

scala> rdd1.glom().collect()
res11: Array[Array[String]] = Array(Array(), Array(hello), Array(jason), Array(jim), Array(vin))

scala> rdd3.glom().collect()
res12: Array[Array[String]] = Array(Array(), Array(), Array(jason, jim), Array(hello), Array(vin), Array(), Array(), Array(), Array(), Array())

scala> rdd4.glom().collect()
res13: Array[Array[String]] = Array(Array(), Array(), Array(jason, jim), Array(hello), Array(vin), Array(), Array(), Array(), Array(), Array())

?

?

?

Action算子:

算子操作前的變量名

算子操作前的變量類型

?

算子操作前變量取值算子算子的作用

算子操作后,

變量類型

算子操作后變量取值
rdd1ParallelCollectionRDDArray(1,23,4,6)reduce求和Int34
rdd1ParallelCollectionRDDArray(1,23,4,6)collect輸出RDDParallelCollectionRDDArray(1,23,4,6)
rdd1ParallelCollectionRDDArray(1,23,4,6)count統計RDD中元素的數量ParallelCollectionRDD4
rdd1ParallelCollectionRDDList("Gnu", "Cat", "Rat", "Dog")firstRDD中第一個元素ParallelCollectionRDDGnu
rdd1ParallelCollectionRDDSeq(10, 4, 2, 12, 3)take(num)take函數用于獲取RDD中從0到num-1下標的元素ParallelCollectionRDDArray(10, 4)
見下面詳情見下面詳情見下面詳情takeSample見下面詳情見下面詳情見下面詳情
rdd1ParallelCollectionRDDSeq(10, 4, 2, 12, 3)takeOrdered從小到大排序ParallelCollectionRDDArray(2, 3)
rdd1ParallelCollectionRDDSeq(10, 4, 2, 12, 3)top從大到小排序ParallelCollectionRDDArray(12,10)
rdd1ParallelCollectionRDDSeq(10, 4, 2, 12, 3)saveAsTextFile數據寫入到文件ParallelCollectionRDDeq(10, 4, 2, 12, 3)
rdd1ParallelCollectionRDDArray(1, 2, 3, 4)foreach對每個RDD分別操作ParallelCollectionRDD5,6,7,8
rdd2ParallelCollectionRDDArray(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)

foreachPartition

對每個Partition分別操作ParallelCollectionRDD4,4,4,4,4
rdd1ParallelCollectionRDDArray((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (1,1), (2,1), (3,1), (4,1), (5,1), (6,1))countByValuekey->value對進行統計scala.collection.Map[(Int, Int),Long]Map((5,1) -> 2, (3,1) -> 2, (6,1) -> 2, (4,1) -> 2, (1,1) -> 2, (2,1) -> 2)
?
rdd1ParallelCollectionRDDArray(1,2,3,4,5,6,1,2,3,4,5,6)countByKey統計rdd中數據的頻率,然后寫以key->value形式寫入mapscala.collection.Map[Int,Long]Map(5 -> 2, 1 -> 2, 6 -> 2, 2 -> 2, 3 -> 2, 4 -> 2)
rdd1ParallelCollectionRDDSeq(10, 4, 2, 12, 3)savaAsObjectFile數據寫入到文件ParallelCollectionRDDeq(10, 4, 2, 12, 3)
rdd1ParallelCollectionRDDSeq(10, 4, 2, 12, 3)saveAsSequenceFile數據寫入到文件ParallelCollectionRDDeq(10, 4, 2, 12, 3)

a

b

ParallelCollectionRDD

List(2,3,4,5)

List("a","b","c","d")?

collectAsMap兩組數據搞成映射關系scala.collection.Map[Int,String]Map(2 -> a, 5 -> d, 4 -> c, 3 -> b)
rdd1ParallelCollectionRDDList((1,"a"),(4,"he"),(2,"b"),(3,"c"))lookup?根據key查valueParallelCollectionRDDWrappedArray

#--------------------------------------------------附錄-------------------------------------------------------------------------------------

例子1:

scala> val rdd1 = sc.makeRDD(Array(1,23,4,6))

scala> val result = rdd1.reduce(_+_)

scala> println(rdd1)

scala> rdd1.collect()

scala> val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)//注意,這里的2是partitions數量的意思

scala> c.getNumPartitions

scala> c.count

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))

scala> ?rdd1.take(2)
?

?

takeSample用法

重復抽樣樣本個數num與RDD個數代碼結果
X樣本個數num>RDD個數

val a = sc.parallelize(1 to 10)

a.takeSample(false,11)

Array(4, 2, 5, 6, 7, 9, 3, 1, 8, 10)
X樣本個數num<RDD個數a.takeSample(false,5)Array(10, 6, 9, 5, 1)
?樣本個數num>RDD個數a.takeSample(true,15)Array(5, 1, 4, 9, 2, 3, 6, 6, 10, 8, 7, 9, 7, 8, 9)
?樣本個數num<RDD個數a.takeSample(true,3)Array(7, 4, 6)

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))

scala> rdd1.top(2)
res15: Array[Int] = Array(12, 10)

scala> rdd1.takeOrdered(2)
res16: Array[Int] = Array(2, 3)
?

?

?

[1]foreachPartition屬于算子操作,可以提高模型效率。比如在使用foreach時,將RDD中所有數據寫Mongo中,就會一條數據一條數據地寫,每次函數調用可能就會創建一個數據庫連接,此時就勢必會頻繁地創建和銷毀數據庫連接,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個partition的數據,那么對于每個partition,只要創建一個數據庫連接即可,然后執行批量插入操作,此時性能是比較高的。

?

?

scala> var rdd1=sc.parallelize(List((1,"a"),(4,"he"),(2,"b"),(3,"c")))

scala> rdd1.lookup(4)
res28: Seq[String] = WrappedArray(he)

?

例子[2]

val a = sc.parallelize(List(2,3,4,5))

val b = sc.parallelize(List("a","b","c","d"))

a.zip(b).collectAsMap 結果為:scala.collection.Map[Int,String] = Map(2 -> a, 5 -> d, 4 -> c, 3 -> b)

?

[3]

scala> ?val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,1,2,3,4,5,6))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:29

scala> ?val map = rdd1.map((_,1)).countByKey()
map: scala.collection.Map[Int,Long] = Map(5 -> 2, 1 -> 2, 6 -> 2, 2 -> 2, 3 -> 2, 4 -> 2)

scala> ?val map = rdd1.map((_,1)).countByValue()
map: scala.collection.Map[(Int, Int),Long] = Map((5,1) -> 2, (3,1) -> 2, (6,1) -> 2, (4,1) -> 2, (1,1) -> 2, (2,1) -> 2)

?

?

?

[5] scala> val rdd2 = sc.makeRDD(1 to 10, 3) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24scala> rdd2.getNumPartitions res26: Int = 3

這里定義了一個擁有 3?個分片的 RDD 。然后 aggregate?的兩個函數參數仍然是使用上面定義的 pfun1?與 pfun2 。

scala> rdd2.aggregate(2)(pfun1, pfun2) res29: Int = 10334

2 * 1 * 2 * 3 ? ? ? = 12
2 * 4 * 5 * 6 ? ? ? = 240
2 * 7 * 8 * 9 * 10 ?= 10080

在這里 pfun1 的輸出結果有 3 個值。然后就來應用 combOp 即這里的 pfun2

2 + 12 + 240 + 10080 ?= 10334

aggregate后面的數據傳遞給pfun1,pfun1的結果作為入口參數傳給pfun2

?

?

?

scala> val pairRDD = sc.parallelize(List(("cat",2),("cat", 5),("mouse", 4),("cat", 12),("dog", 12),("mouse", 2)), 2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> ? ? ? ? def func3(index:Int,iter:Iterator[(String,Int)]) = {iter.toList.map(x =>"[PartID:"+index+",value="+x+"]").iterator

scala> pairRDD.mapPartitionsWithIndex(func3).collect
scala> pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
res15: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) ? ? ? ? ? ? ?

scala> ?pairRDD.aggregateByKey(0)(_+_,_+_).collect
res16: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))

?

?

下面是aggregateByBey的用法:

[4]類似aggregate操作,區別:操作的是<Key Value>的數據類型
????????API說明:PairRDDFunctions.aggregateByKey
????????def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U)
????????
????????準備數據:
????????val pairRDD = sc.parallelize(List(("cat",2),("cat", 5),("mouse", 4),("cat", 12),("dog", 12),("mouse", 2)), 2)
????????
????????重寫一個func3查看每個分區中的元素
????????def func3(index:Int,iter:Iterator[(String,Int)]) = {
????????????iter.toList.map(x =>"[PartID:"+index+",value="+x+"]").iterator
????????}
????????
????????pairRDD.mapPartitionsWithIndex(func3).collect
????????
????????結果
????????0號分區(0動物園)
????????[PartID:0,value=(cat,2)], [PartID:0,value=(cat,5)], [PartID:0,value=(mouse,4)],?
????????
????????1號分區(1動物園)
????????[PartID:1,value=(cat,12)], [PartID:1,value=(dog,12)], [PartID:1,value=(mouse,2)]
????????
????????操作:
????????(1)將每個動物園(分區)中動物數最多的個數進行求和
????????????????pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
????????????????結果:
????????????????Array((dog,12), (cat,17), (mouse,6))
????????????????
????????(2)將所有的動物求和
????????????????pairRDD.aggregateByKey(0)(_+_,_+_).collect
????????????????結果:
????????????????Array((dog,12), (cat,19), (mouse,6))
????????????????
????????????????也可以使用reduceByKey
????????????????結果:Array((dog,12), (cat,19), (mouse,6))
?

?

foreach和ForeachPartition的用法[6]

scala> ?var rdd1=sc.makeRDD(1 to 4,2)

scala> rdd1.foreach{x=>println(x+4)}

scala> var rdd2=sc.makeRDD(1 to 12,3)

scala> ?rdd2.foreachPartition{x=>println("--------------")}

scala> ?rdd2.foreachPartition{x=>println(x.size)}

這段代碼的實驗結果需要在spark web UI上觀察,可以參考:
?

?

Reference:

[1]spark foreachPartition foreach

[2]Spark API 之 collectAsMap 使用

[3]Spark行動算子之countByKey

[4]Spark高級算子:mapPartitionsWithIndex,aggregate,aggregateByKey

[5]輕松理解 Spark 的 aggregate 方法

[6]foreach和foreachPartition--Action類算子

[7]spark的foreach(println)看不到輸出結果

?

?

?

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的spark中各类key算子的用法汇总(持续更新中)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。