日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率

發(fā)布時間:2024/9/27 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、 實戰(zhàn)

1.用Spark Streaming實現(xiàn)實時WordCount
架構圖:

說明:在hadoop1:9999下的nc上發(fā)送消息,消費端接收消息,然后并進行單詞統(tǒng)計計算。

* 2.安裝并啟動生成者 *
首先在一臺Linux(ip:192.168.10.101)上用YUM安裝nc工具
yum install -y nc

啟動一個服務端并監(jiān)聽9999端口
nc -lk 9999

2.編寫Spark Streaming程序
編寫Pom文件

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.spark</groupId><artifactId>bigdata</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><spark.version>1.6.2</spark.version><hadoop.version>2.6.4</hadoop.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>${spark.version}</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.toto.spark.JdbcRDDDemo</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project> package cn.toto.spark.streamsimport org.apache.log4j.{Level, Logger} import org.apache.spark.Loggingimport org.apache.log4j.{Logger, Level} import org.apache.spark.Loggingobject LoggerLevels extends Logging {def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}} }package cn.toto.sparkimport cn.toto.spark.streams.LoggerLevels import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/13.*/ object NetworkWordCount {def main(args: Array[String]) {//設置日志級別LoggerLevels.setStreamingLogLevels()//創(chuàng)建SparkConf并設置為本地模式運行//注意local[2]代表開兩個線程val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")//設置DStream批次時間間隔為5秒val ssc = new StreamingContext(conf, Seconds(5))//通過網(wǎng)絡讀取數(shù)據(jù)val lines = ssc.socketTextStream("hadoop1", 9999)//將讀到的數(shù)據(jù)用空格切成單詞val words = lines.flatMap(_.split(" "))//將單詞和1組成一個pairval pairs = words.map(word => (word, 1))//按單詞進行分組求相同單詞出現(xiàn)的次數(shù)val wordCounts = pairs.reduceByKey(_ + _)//打印結果到控制臺wordCounts.print()//開始計算ssc.start()//等待停止ssc.awaitTermination()} }

3.啟動Spark Streaming程序:由于使用的是本地模式”local[2]”所以可以直接在本地運行該程序
注意: 要指定并行度,如在本地運行設置setMaster(“l(fā)ocal[2]”),相當于啟動兩個線程,一個給receiver,一個給computer。如果是在集群中運行,必須要求集群中可用core數(shù)大于1

4.在Linux端命令行中輸入單詞

5.在IDEA控制臺中查看結果

二、DStream的使用

package cn.toto.sparkimport org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/13.*/ object StreamingWordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[2]")//創(chuàng)建StreamingContext并設置產(chǎn)生批次的間隔時間val ssc = new StreamingContext(conf, Seconds(5))//從Socket端口中創(chuàng)建RDDval lines:ReceiverInputDStream[String] = ssc.socketTextStream("hadoop1",9999)val words: DStream[String] = lines.flatMap(_.split(" "))val wordAndOne: DStream[(String, Int)] = words.map((_, 1))val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)//打印result.print()//開啟程序ssc.start()//等待結束ssc.awaitTermination()} }

運行結果:


上面的案例中,所有的都是臨時計算,然后獲得到結果內容,第二次計算的時候結果值不是在上一次基礎上進行累加的。下面的案例中將實現(xiàn)累加的效果:

在上述的wordCount案例中,每次在Linux端輸入的單詞次數(shù)都被正確的統(tǒng)計出來,但是結果不能累加,如果需要累加需要使用updateStateByKey(func)來更新狀態(tài)

package cn.toto.sparkimport cn.toto.spark.streams.LoggerLevels import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.streaming.{Seconds, StreamingContext}object NetworkUpdateStateWordCount {/*** String : 單詞* Seq[Int] :單詞在當前批次出現(xiàn)的次數(shù)* Option[Int] : 歷史結果*/val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {//iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}}def main(args: Array[String]) {LoggerLevels.setStreamingLogLevels()val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkUpdateStateWordCount")val ssc = new StreamingContext(conf, Seconds(5))//做checkpoint 寫入共享存儲中ssc.checkpoint("E://workspace//netresult")val lines = ssc.socketTextStream("hadoop1", 9999)//reduceByKey 結果不累加//val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)//updateStateByKey結果可以累加但是需要傳入一個自定義的累加函數(shù):updateFuncval results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)results.print()ssc.start()ssc.awaitTermination()}}

在nc上輸入內容:

運行結果如下:

總結

以上是生活随笔為你收集整理的Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。