日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark _03RDD_Transformations_Action_使用scalajavaAPI

發布時間:2024/2/28 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark _03RDD_Transformations_Action_使用scalajavaAPI 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Scala API? Transformations轉換算子&Action行動算子

【友情提示】代碼頁,請從下往上看。

package ddd.henu.transformationsimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** 延遲執行算子* words.txt文檔:* hello world* hello henu* hello george* hello honey* a li* ba ba*/ object TransformationsDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local")conf.setAppName("test")val sc = new SparkContext(conf)//減少日志輸出sc.setLogLevel("Error")val lines = sc.textFile("./data/words.txt")//take取前多少條val result: Array[String] = lines.take(5)result.foreach(println)//first 第一條數據 first底層take(1)/*val first = lines.first()println(first)*///collect 回收結果,內容放在JVM內存中,如果數據量較大,就不要調用了,會報出OOM錯誤/*val result: Array[String] = lines.collect()result.foreach(println)*///count 總行數/*val l: Long = lines.count()println(l)*///sample 抽樣 。【注】這里我將words.txt文件里的數據進行復制,100條/*第一個boolean參true表示抽到的數據會在放進源數據中。反之第二個參數,抽樣的比例 值100中的100*0.1條左右!!!第三個參數,指定了每次抽樣不變《seed 種子》*/ // val result: RDD[String] = lines.sample(true,0.1) // val result: RDD[String] = lines.sample(true,0.1,100L) // result.foreach(println)/*val words = lines.flatMap(one => {one.split(" ")})val pairWords = words.map(one =>{(one,1)})val reduceResult = pairWords.reduceByKey((v1:Int,v2:Int) => (v1+v2))//使用 sortByKey進行排序(多此一舉)兩次反轉val transRDD: RDD[(Int, String)] = reduceResult.map(tp =>{tp.swap})val result: RDD[(Int, String)] = transRDD.sortByKey(false)//再轉回來,否則數據變為 Int,String 類型result.map(_.swap).foreach(println)*///進行出現次數的排序 sortBy//升序 // reduceResult.sortBy(tp => (tp._2)).foreach(println)//降序 // reduceResult.sortBy(tp => (tp._2),false).foreach(println)/*** (a,1)* (george,1)* (li,1)* (honey,1)* (henu,1)* (world,1)* (ba,2)* (hello,4)*///filter/*val rdd1 = lines.flatMap(one => {one.split(" ")})rdd1.filter(one => {"hello".equals(one)}).foreach(println)*//*** hello* hello* hello* hello*///flatmap 一對多 // lines.flatMap(one => {one.split(" ")}).foreach(println)/*** hello* world* hello* henu* hello* george* hello* honey* a* li* ba* ba*///map 一對一/*lines.map(one =>{one + "#"}).foreach(println)*//*hello world#hello henu#hello george#hello honey#a li#ba ba#*/} }

Java API? Transformations轉換算子&Action行動算子

package eee;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import scala.Tuple2;import java.util.Arrays; import java.util.Iterator; import java.util.List;/*** @author George* @description**/ public class TransformationsDemo {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("test");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("Error");JavaRDD<String> lines = sc.textFile("./data/words.txt");//sample 抽樣 當抽樣的時候,給words中的數據進行復制添加多點數據,如100條//添加種子后,抽樣結果不在變化JavaRDD<String> sample = lines.sample(true, 0.1);sample.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});//take/*List<String> take = lines.take(3);for (String s : take) {System.out.println(s);}*///first/*String first = lines.first();System.out.println(first);*///count/*long count = lines.count();System.out.println(count);*///collect/*List<String> collect = lines.collect();for (String s : collect) {System.out.println(s);}*///reduceByKey + 排序 沒有sortBy 只有sortByKey/*lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {return Arrays.asList(s.split(" ")).iterator();}}).mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word,1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {return tp.swap();}}).sortByKey(false).foreach(new VoidFunction<Tuple2<Integer, String>>() {@Overridepublic void call(Tuple2<Integer, String> integerStringTuple2) throws Exception {System.out.println(integerStringTuple2);}});*///flatmap/*JavaRDD<String> result = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {return Arrays.asList(s.split(" ")).iterator();}});result.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});*///mapToPair 一對一 返回值為kv格式的使用mapToPair 返回String的用map/*JavaPairRDD<String, String> mapToPair = lines.mapToPair(new PairFunction<String, String, String>() {@Overridepublic Tuple2<String, String> call(String s) throws Exception {return new Tuple2<>(s, s + "#");}});*//*JavaRDD<String> map = lines.map(new Function<String, String>() {@Overridepublic String call(String line) throws Exception {return line + "*";}});map.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});*//*JavaRDD<String> result = lines.filter(new Function<String, Boolean>() {@Overridepublic Boolean call(String line) throws Exception {return "hello george".equals(line);}});result.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});System.out.println(result.count());*/sc.stop();} }

?

總結

以上是生活随笔為你收集整理的Spark _03RDD_Transformations_Action_使用scalajavaAPI的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。