Spark入门(十一)之排序
生活随笔
收集整理的這篇文章主要介紹了
Spark入门(十一)之排序
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
?一、Sort
計算文本里面的每個單詞出現的個數,單詞個數逆序(相同個數單詞正序)輸出結果。
?
二、maven設置
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.mk</groupId><artifactId>spark-test</artifactId><version>1.0</version><name>spark-test</name><url>http://spark.mk.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.11.1</scala.version><spark.version>2.4.4</spark.version><hadoop.version>2.6.0</hadoop.version></properties><dependencies><!-- scala依賴--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- spark依賴--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies><build><pluginManagement><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-jar-plugin</artifactId><version>3.0.2</version></plugin></plugins></pluginManagement></build> </project>?
三、編程代碼?
public class SortApp implements SparkConfInfo{public static class SortData implements Comparable<SortData>, Serializable {private String word;private Integer count;public SortData(String word, Integer count) {this.word = word;this.count = count;}public String getWord() {return word;}public void setWord(String word) {this.word = word;}public Integer getCount() {return count;}public void setCount(Integer count) {this.count = count;}@Overridepublic int compareTo(SortData o) {if (o == null) {return 1;}int diff = o.count - this.count;if (diff != 0)return diff;if(word == o.word)return 0 ;if(word == null)return -1;if(o.word == null)return 1;return this.word.compareTo(o.word);}}public static void main(String[]args){String filePath = "F:\\test\\log.txt";SparkSession sparkSession = new SortApp().getSparkConf("sort");List<String> wordCounts = sparkSession.sparkContext().textFile(filePath, 4).toJavaRDD().flatMap(v -> Arrays.asList(v.split("[(\\s+)(\r?\n),.。'’]")).iterator()).filter(v -> v.matches("[a-zA-Z-]+")).map(String::toLowerCase).mapToPair(v -> new Tuple2<>(v, 1)).reduceByKey(Integer::sum).map(v->new SortData(v._1, v._2)).sortBy(v -> v, true,4).map(v->v.word).collect();wordCounts.forEach(v -> System.out.println(v));sparkSession.stop();} }public interface SparkConfInfo {default SparkSession getSparkConf(String appName){SparkConf sparkConf = new SparkConf();if(System.getProperty("os.name").toLowerCase().contains("win")) {sparkConf.setMaster("local[4]");System.out.println("使用本地模擬是spark");}else{sparkConf.setMaster("spark://hadoop01:7077,hadoop02:7077,hadoop03:7077");sparkConf.set("spark.driver.host","192.168.150.1");//本地ip,必須與spark集群能夠相互訪問,如:同一個局域網sparkConf.setJars(new String[] {".\\out\\artifacts\\spark_test\\spark-test.jar"});//項目構建生成的路徑}SparkSession session = SparkSession.builder().appName(appName).config(sparkConf).config(sparkConf).getOrCreate();return session;} }文件內容
Spark Streaming is an extension of the core Spark API that enables scalable,high-throughput, fault-tolerant stream processing of live 。data streams. Data, can be ,ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems,Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.This guide shows you how to start writing Spark Streaming programs with DStreams. You can write Spark Streaming programs in Scala, Java or Python (introduced in Spark 1.2), all of which are presented in this guide. You will find tabs throughout this guide that let you choose between code snippets of different languages. databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.輸出
spark can of and data you a be in or streaming dstreams from guide high-level stream streams this algorithms as dstream flume is kafka kinesis like live on processed processing programs sources that to which with abstraction all an api apply applying are between by called choose code complex continuous core created dashboards databases different discretized either enables expressed extension fact fault-tolerant filesystems finally find functions graph high-throughput how input internally introduced java join languages learning let machine many map operations other out presented provides pushed python rdds reduce represented represents s scala scalable sequence shows snippets sockets start such tabs tcp the throughout using will window write writing總結
以上是生活随笔為你收集整理的Spark入门(十一)之排序的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: qq消息怎么不在手机屏幕上方显示
- 下一篇: Spark入门(十二)之最值