日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

發布時間:2024/9/27 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、創建Maven項目

創建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374

2、啟動Kafka

A:安裝kafka集群:http://blog.csdn.net/tototuzuoquan/article/details/73430874
B:創建topic等:http://blog.csdn.net/tototuzuoquan/article/details/73430874

3、編寫Pom文件

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion><groupId>cn.toto.spark</groupId> <artifactId>bigdata</artifactId> <version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><spark.version>1.6.2</spark.version><hadoop.version>2.6.4</hadoop.version> </properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>${spark.version}</version></dependency> </dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.toto.spark.FlumeStreamingWordCount</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins> </build></project>

4.編寫代碼

package cn.toto.sparkimport cn.toto.spark.streams.LoggerLevels import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/13.* 從kafka中讀數據,并且進行單詞數量的計算*/ object KafkaWordCount {/*** String :單詞* Seq[Int] :單詞在當前批次出現的次數* Option[Int] :歷史結果*/val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {//iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) }}def main(args: Array[String]): Unit = {LoggerLevels.setStreamingLogLevels()//這里的args從IDEA中傳入,在Program arguments中填寫如下內容://參數用一個數組來接收://zkQuorum :zookeeper集群的//group :組//topic :kafka的組//numThreads :線程數量//hadoop11:2181,hadoop12:2181,hadoop13:2181 g1 wordcount 1 要注意的是要創建line這個topicval Array(zkQuorum, group, topics, numThreads) = argsval sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(5))ssc.checkpoint("E:\\wordcount\\outcheckpoint")//"alog-2016-04-16,alog-2016-04-17,alog-2016-04-18"//"Array((alog-2016-04-16, 2), (alog-2016-04-17, 2), (alog-2016-04-18, 2))"val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap//保存到內存和磁盤,并且進行序列化val data: ReceiverInputDStream[(String, String)] =KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)//從kafka中寫數據其實也是(key,value)形式的,這里的_._2就是valueval words = data.map(_._2).flatMap(_.split(" "))val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism), true)wordCounts.print()ssc.start()ssc.awaitTermination()} }

5.配置IDEA中運行的參數:


配置說明:

hadoop11:2181,hadoop12:2181,hadoop13:2181 g1 wordcount 1 hadoop11:2181,hadoop12:2181,hadoop13:2181 :zookeeper集群地址 g1 :組 wordcount :kafkatopic 1 :線程數為1

6、創建kafka,并在kafka中傳遞參數

啟動kafka

[root@hadoop1 kafka]# pwd /home/tuzq/software/kafka/servers/kafka [root@hadoop1 kafka]# bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

創建topic

[root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 --partitions 1 --topic wordcount Created topic "wordcount".

查看主題

bin/kafka-topics.sh --list --zookeeper hadoop11:2181

啟動一個生產者發送消息(我的kafka在hadoop1,hadoop2,hadoop3這幾臺機器上)

[root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordcount No safe wading in an unknown water Anger begins with folly,and ends in repentance No safe wading in an unknown water Anger begins with folly,and ends in repentance Anger begins with folly,and ends in repentance

使用spark-submit來運行程序

#啟動spark-streaming應用程序 bin/spark-submit --class cn.toto.spark.KafkaWordCount /root/streaming-1.0.jar hadoop11:2181 group1 wordcount 1

7、查看運行結果


8、再如統計URL出現的次數

package cn.toto.sparkimport org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/14.*/ object UrlCount {val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))}}def main(args: Array[String]) {//接收命令行中的參數val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args//創建SparkConf并設置AppNameval conf = new SparkConf().setAppName("UrlCount")//創建StreamingContextval ssc = new StreamingContext(conf, Seconds(2))//設置檢查點ssc.checkpoint(hdfs)//設置topic信息val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap//重Kafka中拉取數據創建DStreamval lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)//切分數據,截取用戶點擊的urlval urls = lines.map(x=>(x.split(" ")(6), 1))//統計URL點擊量val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)//將結果打印到控制臺result.print()ssc.start()ssc.awaitTermination()} }

總結

以上是生活随笔為你收集整理的Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。