Spark _10_补充部分算子【三】
補充算子
transformations
- mapPartitionWithIndex
類似于mapPartitions,除此之外還會攜帶分區(qū)的索引值。
- repartition
增加或減少分區(qū)。會產(chǎn)生shuffle。(多個分區(qū)分到一個分區(qū)不會產(chǎn)生shuffle)
- coalesce
coalesce常用來減少分區(qū),第二個參數(shù)是減少分區(qū)的過程中是否產(chǎn)生shuffle。
true為產(chǎn)生shuffle,false不產(chǎn)生shuffle。默認(rèn)是false。
如果coalesce設(shè)置的分區(qū)數(shù)比原來的RDD的分區(qū)數(shù)還多的話,第二個參數(shù)設(shè)置為false不會起作用,如果設(shè)置成true,效果和repartition一樣。即repartition(numPartitions) = coalesce(numPartitions,true)
- groupByKey
作用在K,V格式的RDD上。根據(jù)Key進行分組。作用在(K,V),返回(K,Iterable <V>)。
- zip
將兩個RDD中的元素(KV格式/非KV格式)變成一個KV格式的RDD,兩個RDD的每個分區(qū)元素個數(shù)必須相同。
- zipWithIndex
該函數(shù)將RDD中的元素和這個元素在RDD中的索引號(從0開始)組合成(K,V)對。
Action
- countByKey
?作用到K,V格式的RDD上,根據(jù)Key計數(shù)相同Key的數(shù)據(jù)集元素。
- countByValue
根據(jù)數(shù)據(jù)集每個元素相同的內(nèi)容來計數(shù)。返回相同內(nèi)容的元素對應(yīng)的條數(shù)。
- reduce
根據(jù)聚合邏輯聚合數(shù)據(jù)集中的每個元素。
Scala API
package ddd.henu.transformationsimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject ElseFun3 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("test")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd1: RDD[String] = sc.parallelize(List[String]("geroge1", "geroge2", "geroge3", "geroge4","geroge5", "geroge6", "geroge7", "geroge8","geroge9", "geroge10", "geroge11", "geroge12"), 3)/*** ?mapPartitionWithIndex* 類似于mapPartitions,除此之外還會攜帶分區(qū)的索引值* rdd1 partition = 【0】,value =[geroge1]*/val rdd2 = rdd1.mapPartitionsWithIndex((index, iter) => {val list = new ListBuffer[String]()while (iter.hasNext) {val one = iter.next()list.+=(s"rdd1 partition = 【$index】,value =[$one]")}list.iterator}) // rdd2.foreach(println)/*** repartition 可以增多分區(qū),可以減少分區(qū) 會產(chǎn)生shuffer,寬依賴** coalesce* 可以增多分區(qū),也可以減少分區(qū) 不會產(chǎn)生shuffer,窄依賴 默認(rèn)false,不產(chǎn)生shuffer* 產(chǎn)生shuffer true** [注]coalesce有少的分區(qū)到多的分區(qū)時,不讓產(chǎn)生shuffer,不起作用** 所以repartition 常用于增多分區(qū),coalesce常用于減少分區(qū)*/ // val rdd3: RDD[String] = rdd2.repartition(2) // val rdd3: RDD[String] = rdd2.repartition(4) // val rdd3 = rdd2.coalesce(2,true)val rdd3 = rdd2.coalesce(2)println(s"rdd3.paration is {${rdd3.getNumPartitions}}")val rdd4 = rdd3.mapPartitionsWithIndex((index, iter) => {val list = new ListBuffer[String]()while (iter.hasNext) {val one = iter.next()list.+=(s"rdd3 partition = 【$index】,value =[$one]")}list.iterator})val result = rdd4.collect()result.foreach(println)}} package ddd.henu.transformationsimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object ElseFun4 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("error")/*** countByKey* countByValue* reduce*//*val rdd: RDD[Int] = sc.parallelize(List[Int](2,3,4,5,6)) // val result = rdd.reduce((v1,v2) => {v1+v2})val result = rdd.reduce(_+_)println(result)*/val rdd = sc.parallelize(List[(String, Int)](("George", 100),("honey", 100),("kk", 100),("ll", 200),("dd", 98),("lxm", 100))) // val result: collection.Map[String, Long] = rdd.countByKey() // result.foreach(println) //看key出現(xiàn)幾次val result: collection.Map[(String, Int), Long] = rdd.countByValue()result.foreach(println)//將每一個元素看成一個整體,然后看其出現(xiàn)幾次。/*** groupByKey*/ // val rdd = sc.parallelize(List[(String,Int)](("george",22),("george",18),("kk",20),("ll",28))) // rdd.groupByKey().foreach(println)//Distribute a local Scala collection to form an RDD.//parallelize 描述一個當(dāng)?shù)氐膕cala集合給rdd // val rdd1: RDD[String] = sc.parallelize(List[String]("george","honey","kk","ll")) // val rdd2: RDD[String] = sc.parallelize(List[String]("90","100","222","300","400")) // val rdd2: RDD[String] = sc.parallelize(List[String]("100","222","300","400"))/*** zip,ziPWithIndex* 當(dāng)zip的數(shù)據(jù)不對應(yīng)的時候會報錯SparkException: Can only zip RDDs with same number of elements in each partition*/ // rdd1.zip(rdd2).foreach(println)/*** (george,100)* (honey,222)* (kk,300)* (ll,400)*/ // rdd1.zipWithIndex().foreach(println)/*** (george,0)* (honey,1)* (kk,2)* (ll,3)*/} }Java API
package eee;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2;import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List;/*** @author George* @description**/ public class ElseFun3 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("fun3");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");JavaRDD<String> rdd = sc.parallelize(Arrays.asList("geroge1", "geroge2", "geroge3", "geroge4","geroge5", "geroge6", "geroge7", "geroge8","geroge9", "geroge10", "geroge11", "geroge12"), 3);/*** Return a new RDD by applying a function to each partition of this RDD, while tracking the index* of the original partition.* 通過對這個RDD的每個分區(qū)應(yīng)用一個函數(shù)來返回一個新的RDD,同時跟蹤原始分區(qū)的索引。*/JavaRDD<String> rdd1 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {@Overridepublic Iterator<String> call(Integer index, Iterator<String> iter) throws Exception {List<String> list = new ArrayList<>();while (iter.hasNext()) {String currOne = iter.next();list.add("rdd.partition is [" + index + "],value is [" + currOne + "]");}return iter;}}, true);List<String> collect = rdd1.collect();for (String s : collect) {System.out.println(s);}} }【注】其他方法略了。偷懶。。。
?
總結(jié)
以上是生活随笔為你收集整理的Spark _10_补充部分算子【三】的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 到底什么是RestFul架构?
- 下一篇: Spark _09资源调度和任务调度