前言
本文所需要的安裝包&Flume配置文件,博主都已上傳,鏈接為本文涉及安裝包&Flume配置文件本文涉及的安裝包&Flume配置文件,請自行下載~
- flume作為日志實時采集的框架, 可以與Spark Streaming實時處理框架進行對接.
- flume實時產生數據, Spark Streaming做實時處理
- Spark Streaming對接fluem有兩種方式,一種是Flume將消息Push推給Spark Streaming;還有一種是Spark Streaming從flume中Poll拉取數據.
1. Flume向Spark Streaming中push推數據
1.1 Flume前期準備
-
安裝flume1.6以上
-
下載依賴包
spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目錄下.
-
修改flume/lib下的scala依賴包版本
從spark安裝目錄的jars文件夾下找到scala-library-2.11.8.jar 包, 替換掉flume/lib目錄下自帶的scala-library-2.11.8.jar包.
-
寫flume的agent, 注意既然是拉取的方式,那么flume向自己所在的機器上產數據就行.
-
編寫flume-push.conf配置文件
注意: 因為是Flume主動向Spark Streaming推送數據,所以sink要指定Spark Streaming程序啟動的IP地址和port端口號.
注意配置文件中指明的hostname和port是spark應用程序所在服務器的ip地址和端口。
a1.sources
= r1
a1.sinks
= k1
a1.channels
= c1
a1.sources.r1.channels
= c1
a1.sources.r1.type
= exec
a1.sources.r1.command
= tail -f /root/test.txt
a1.sources.r1.fileHeader
= true
a1.channels.c1.type
=memory
a1.channels.c1.capacity
= 20000
a1.channels.c1.transactionCapacity
=5000
a1.sinks.k1.channel
= c1
a1.sinks.k1.type
= avro
a1.sinks.k1.hostname
=172.16.43.63
a1.sinks.k1.port
= 9999
a1.sinks.k1.batchSize
= 2000
1.2 Spark Streaming前期準備,編寫Spark Streaming程序
<dependency><groupId>org.apache.spark
</groupId><artifactId>spark-streaming-flume_2.11
</artifactId><version>2.0.2
</version>
</dependency>
注意: 程序中需要指定本程序運行機器的IP地址和Port端口號,要和Flume配置文件flume-push.conf中sink指導的一樣
package cn
.acece
.sparkStreamingtest
import java
.net
.InetSocketAddress
import org
.apache
.spark
.storage
.StorageLevel
import org
.apache
.spark
.streaming
.dstream
.{DStream
, ReceiverInputDStream
}
import org
.apache
.spark
.streaming
.flume
.{FlumeUtils
, SparkFlumeEvent
}
import org
.apache
.spark
.streaming
.{Seconds
, StreamingContext
}
import org
.apache
.spark
.{SparkConf
, SparkContext
}
object SparkStreaming_Flume_Push
{def
updateFunction(newValues
: Seq
[Int
], runningCount
: Option
[Int
]): Option
[Int
] = {val newCount
=runningCount
.getOrElse(0)+newValues
.sum
Some(newCount
)}def
main(args
: Array
[String
]): Unit
= {val sparkConf
: SparkConf
= new SparkConf().setAppName("SparkStreaming_Flume_Push").setMaster("local[2]")val sc
: SparkContext
= new SparkContext(sparkConf
)val scc
: StreamingContext
= new StreamingContext(sc
, Seconds(5))sc
.setLogLevel("WARN")scc
.checkpoint("./")val flumeStream
: ReceiverInputDStream
[SparkFlumeEvent
] = FlumeUtils
.createStream(scc
,"172.16.43.63",9999,StorageLevel
.MEMORY_AND_DISK
)val lineStream
: DStream
[String
] = flumeStream
.map(x
=>new String(x
.event
.getBody
.array()))val result
: DStream
[(String
, Int
)] = lineStream
.flatMap(_
.split(" ")).map((_
,1)).updateStateByKey(updateFunction
)result
.print()scc
.start()scc
.awaitTermination()}}
}
1.3 Flume向Spark Streaming中push推數據, 要先啟動Spark Streaming程序
- 先啟動Spark Streaming程序,在IDEA中啟動程序
- 后啟動Flume程序, 先把**/root/data/ata.txt.COMPLETED 重命名為data.txt**,然后執行以下shell命令
flume-ng agent -n a1 \
-c /opt/bigdata/flume/conf \
-f /opt/bigdata/flume/conf/flume-push.conf \
-Dflume.root.logger
=INFO,console
1.4 觀察IDEA控制臺輸出
Flume向Spark Streaming中push推數據成功, 完美運行~
總結
以上是生活随笔為你收集整理的DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。