日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark _10_补充部分算子【三】

發(fā)布時間:2024/2/28 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark _10_补充部分算子【三】 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

補充算子

transformations

  • mapPartitionWithIndex

類似于mapPartitions,除此之外還會攜帶分區(qū)的索引值。

  • repartition

增加或減少分區(qū)。會產(chǎn)生shuffle。(多個分區(qū)分到一個分區(qū)不會產(chǎn)生shuffle

  • coalesce

coalesce常用來減少分區(qū),第二個參數(shù)是減少分區(qū)的過程中是否產(chǎn)生shuffle。

true為產(chǎn)生shuffle,false不產(chǎn)生shuffle。默認(rèn)是false。

如果coalesce設(shè)置的分區(qū)數(shù)比原來的RDD的分區(qū)數(shù)還多的話,第二個參數(shù)設(shè)置為false不會起作用,如果設(shè)置成true,效果和repartition一樣。即repartition(numPartitions) = coalesce(numPartitions,true)

  • groupByKey

作用在K,V格式的RDD上。根據(jù)Key進行分組。作用在(K,V),返回(K,Iterable <V>)。

  • zip

將兩個RDD中的元素(KV格式/非KV格式)變成一個KV格式的RDD,兩個RDD的每個分區(qū)元素個數(shù)必須相同。

  • zipWithIndex

該函數(shù)將RDD中的元素和這個元素在RDD中的索引號(從0開始)組合成(K,V)對。

Action

  • countByKey

?作用到K,V格式的RDD上,根據(jù)Key計數(shù)相同Key的數(shù)據(jù)集元素。

  • countByValue

根據(jù)數(shù)據(jù)集每個元素相同的內(nèi)容來計數(shù)。返回相同內(nèi)容的元素對應(yīng)的條數(shù)。

  • reduce

根據(jù)聚合邏輯聚合數(shù)據(jù)集中的每個元素。

Scala API

package ddd.henu.transformationsimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject ElseFun3 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("test")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd1: RDD[String] = sc.parallelize(List[String]("geroge1", "geroge2", "geroge3", "geroge4","geroge5", "geroge6", "geroge7", "geroge8","geroge9", "geroge10", "geroge11", "geroge12"), 3)/*** ?mapPartitionWithIndex* 類似于mapPartitions,除此之外還會攜帶分區(qū)的索引值* rdd1 partition = 【0】,value =[geroge1]*/val rdd2 = rdd1.mapPartitionsWithIndex((index, iter) => {val list = new ListBuffer[String]()while (iter.hasNext) {val one = iter.next()list.+=(s"rdd1 partition = 【$index】,value =[$one]")}list.iterator}) // rdd2.foreach(println)/*** repartition 可以增多分區(qū),可以減少分區(qū) 會產(chǎn)生shuffer,寬依賴** coalesce* 可以增多分區(qū),也可以減少分區(qū) 不會產(chǎn)生shuffer,窄依賴 默認(rèn)false,不產(chǎn)生shuffer* 產(chǎn)生shuffer true** [注]coalesce有少的分區(qū)到多的分區(qū)時,不讓產(chǎn)生shuffer,不起作用** 所以repartition 常用于增多分區(qū),coalesce常用于減少分區(qū)*/ // val rdd3: RDD[String] = rdd2.repartition(2) // val rdd3: RDD[String] = rdd2.repartition(4) // val rdd3 = rdd2.coalesce(2,true)val rdd3 = rdd2.coalesce(2)println(s"rdd3.paration is {${rdd3.getNumPartitions}}")val rdd4 = rdd3.mapPartitionsWithIndex((index, iter) => {val list = new ListBuffer[String]()while (iter.hasNext) {val one = iter.next()list.+=(s"rdd3 partition = 【$index】,value =[$one]")}list.iterator})val result = rdd4.collect()result.foreach(println)}} package ddd.henu.transformationsimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object ElseFun4 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("error")/*** countByKey* countByValue* reduce*//*val rdd: RDD[Int] = sc.parallelize(List[Int](2,3,4,5,6)) // val result = rdd.reduce((v1,v2) => {v1+v2})val result = rdd.reduce(_+_)println(result)*/val rdd = sc.parallelize(List[(String, Int)](("George", 100),("honey", 100),("kk", 100),("ll", 200),("dd", 98),("lxm", 100))) // val result: collection.Map[String, Long] = rdd.countByKey() // result.foreach(println) //看key出現(xiàn)幾次val result: collection.Map[(String, Int), Long] = rdd.countByValue()result.foreach(println)//將每一個元素看成一個整體,然后看其出現(xiàn)幾次。/*** groupByKey*/ // val rdd = sc.parallelize(List[(String,Int)](("george",22),("george",18),("kk",20),("ll",28))) // rdd.groupByKey().foreach(println)//Distribute a local Scala collection to form an RDD.//parallelize 描述一個當(dāng)?shù)氐膕cala集合給rdd // val rdd1: RDD[String] = sc.parallelize(List[String]("george","honey","kk","ll")) // val rdd2: RDD[String] = sc.parallelize(List[String]("90","100","222","300","400")) // val rdd2: RDD[String] = sc.parallelize(List[String]("100","222","300","400"))/*** zip,ziPWithIndex* 當(dāng)zip的數(shù)據(jù)不對應(yīng)的時候會報錯SparkException: Can only zip RDDs with same number of elements in each partition*/ // rdd1.zip(rdd2).foreach(println)/*** (george,100)* (honey,222)* (kk,300)* (ll,400)*/ // rdd1.zipWithIndex().foreach(println)/*** (george,0)* (honey,1)* (kk,2)* (ll,3)*/} }

Java API

package eee;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2;import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List;/*** @author George* @description**/ public class ElseFun3 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("fun3");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");JavaRDD<String> rdd = sc.parallelize(Arrays.asList("geroge1", "geroge2", "geroge3", "geroge4","geroge5", "geroge6", "geroge7", "geroge8","geroge9", "geroge10", "geroge11", "geroge12"), 3);/*** Return a new RDD by applying a function to each partition of this RDD, while tracking the index* of the original partition.* 通過對這個RDD的每個分區(qū)應(yīng)用一個函數(shù)來返回一個新的RDD,同時跟蹤原始分區(qū)的索引。*/JavaRDD<String> rdd1 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {@Overridepublic Iterator<String> call(Integer index, Iterator<String> iter) throws Exception {List<String> list = new ArrayList<>();while (iter.hasNext()) {String currOne = iter.next();list.add("rdd.partition is [" + index + "],value is [" + currOne + "]");}return iter;}}, true);List<String> collect = rdd1.collect();for (String s : collect) {System.out.println(s);}} }

【注】其他方法略了。偷懶。。。

?

總結(jié)

以上是生活随笔為你收集整理的Spark _10_补充部分算子【三】的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。