日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36

發布時間:2023/12/20 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

本文所需要的安裝包&Flume配置文件,博主都已上傳,鏈接為本文涉及安裝包&Flume配置文件本文涉及的安裝包&Flume配置文件,請自行下載~

  • flume作為日志實時采集的框架, 可以與Spark Streaming實時處理框架進行對接.
  • flume實時產生數據, Spark Streaming做實時處理
  • Spark Streaming對接fluem有兩種方式,一種是Flume將消息Push推給Spark Streaming;還有一種是Spark Streaming從flume中Poll拉取數據.

1. Flume向Spark Streaming中push推數據

1.1 Flume前期準備

  • 安裝flume1.6以上

  • 下載依賴包
    spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目錄下.

  • 修改flume/lib下的scala依賴包版本
    從spark安裝目錄的jars文件夾下找到scala-library-2.11.8.jar 包, 替換掉flume/lib目錄下自帶的scala-library-2.11.8.jar包.

  • flume的agent, 注意既然是拉取的方式,那么flume向自己所在的機器上產數據就行.

  • 編寫flume-push.conf配置文件
    注意: 因為是Flume主動向Spark Streaming推送數據,所以sink要指定Spark Streaming程序啟動的IP地址和port端口號.

    注意配置文件中指明的hostname和port是spark應用程序所在服務器的ip地址和端口。

#push mode a1.sources = r1 a1.sinks = k1 a1.channels = c1#source a1.sources.r1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -f /root/test.txt a1.sources.r1.fileHeader = true#channel a1.channels.c1.type =memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity=5000#sinks a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro # Spark Streaming程序啟動的IP地址和端口號 a1.sinks.k1.hostname=172.16.43.63 a1.sinks.k1.port = 9999 a1.sinks.k1.batchSize= 2000

1.2 Spark Streaming前期準備,編寫Spark Streaming程序

  • 導入pom依賴
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume_2.11</artifactId><version>2.0.2</version> </dependency>

注意: 程序中需要指定本程序運行機器的IP地址和Port端口號,要和Flume配置文件flume-push.conf中sink指導的一樣

  • 使用scala編寫程序
package cn.acece.sparkStreamingtestimport java.net.InetSocketAddressimport org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext}/*** sparkStreaming整合flume 推模式Push*/ object SparkStreaming_Flume_Push {//newValues 表示當前批次匯總成的(word,1)中相同單詞的所有的1//runningCount 歷史的所有相同key的value總和def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount =runningCount.getOrElse(0)+newValues.sumSome(newCount)}def main(args: Array[String]): Unit = {//配置sparkConf參數val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Push").setMaster("local[2]")//構建sparkContext對象val sc: SparkContext = new SparkContext(sparkConf)//構建StreamingContext對象,每個批處理的時間間隔val scc: StreamingContext = new StreamingContext(sc, Seconds(5))//設置日志輸出級別sc.setLogLevel("WARN")//設置檢查點目錄scc.checkpoint("./")//flume推數據過來// 當前應用程序部署的服務器ip地址,跟flume配置文件保持一致val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(scc,"172.16.43.63",9999,StorageLevel.MEMORY_AND_DISK)//獲取flume中數據,數據存在event的body中,轉化為Stringval lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))//實現單詞匯總val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)result.print()scc.start()scc.awaitTermination()}} }

1.3 Flume向Spark Streaming中push推數據, 要先啟動Spark Streaming程序

  • 先啟動Spark Streaming程序,在IDEA中啟動程序
  • 后啟動Flume程序, 先把**/root/data/ata.txt.COMPLETED 重命名為data.txt**,然后執行以下shell命令
flume-ng agent -n a1 \ -c /opt/bigdata/flume/conf \ -f /opt/bigdata/flume/conf/flume-push.conf \ -Dflume.root.logger=INFO,console

1.4 觀察IDEA控制臺輸出


Flume向Spark Streaming中push推數據成功, 完美運行~

總結

以上是生活随笔為你收集整理的DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。