日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flume + kafka

發布時間:2025/3/15 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flume + kafka 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前提:

1、下載 flume?http://flume.apache.org/download.html

2、下載配置 kafka?http://www.cnblogs.com/eggplantpro/articles/8428932.html

3、服務器3臺,我這邊是5臺

s1:10.211.55.16 zk&kafka? ? ? ? ? ?zk是zookeeper

s2:10.211.55.17 zk

s3:10.211.55.18 zk?

s4:10.211.55.19 kafka&flume

s5:10.211.55.20 kafka

安裝:

1、解壓

結構如上,也是中規中矩。顯然,配置文件在 conf 下

2、配置

flume 的配置不同于其他的軟件,flume是一種類型的服務,就是一種配置。conf 也有模板,建議配置對應的 sources 、channel、sink 都去官方指導文檔里去找

http://flume.apache.org/FlumeUserGuide.html

vim flume-kafka.properties

?

這是官方給的demo,我們可以跟著demo改就好了

# example.conf: A single-node Flume configuration# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1# Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444# Describe the sink a1.sinks.k1.type = logger# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

改好的配置

# example.conf: A single-node Flume configuration# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1# Describe/configure the source #a1.sources.r1.type = netcat #a1.sources.r1.bind = localhost #a1.sources.r1.port = 44444a1.sources.r1.type = exec # source 的類型是命令 a1.sources.r1.command = tail -F /home/test.log #tail 一個日志 的命令。只要有日志寫入,就會下沉到sink# Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mxb # 這是 kafka topic的名稱 a1.sinks.k1.kafka.bootstrap.servers = s1:9092 這是kafka的服務器地址和ip a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy# Use a channel which buffers events in memory a1.channels.c1.type = memory #管道類型是 內存# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

 3、啟動

../bin/flume-ng agent --conf conf --conf-file flume-kafka.properties --name a1 -Dflume.root.logger=INFO,console

 4、檢驗

在kafka的服務器上 啟動一個kafka的consumer?

cd /kafka/bin ./kafka-console-consumer.sh --zookeeper s1:2181 --from-beginning --topic mxb

?

因為在flume source是 tail 一個日志,所以我們往?/home/test.log 寫入內容即可

for((i=0;i<5000;i++)) do echo test$i done

執行寫入日志的命令后,在啟動 kafka-consumer 的服務器會看到消費的信息

?

?

?

?

  

轉載于:https://www.cnblogs.com/chouc/p/8429324.html

總結

以上是生活随笔為你收集整理的flume + kafka的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。