日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flume Source 实例

發布時間:2024/4/14 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume Source 实例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
  • Avro?Source

    監聽avro端口,接收外部avro客戶端數據流。跟前面的agent的Avro?Sink可以組成多層拓撲結構。

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 a1.sources=s1 a1.sinks=k1 a1.channels=c1 ??? a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1 ??? a1.sources.s1.type=avro a1.sources.s1.bind=vm1 a1.sources.s1.port=41414 ??? a1.sinks.k1.type=logger ??? a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100

    啟動命令:

    1 flume-ng agent --conf conf/ --conf-file?conf/a1.conf --name a1 -Dflume.root.logger=INFO,console

    測試方法:

    1 flume-ng avro-client --host vm1 --port 41414 --filename a.log

    ?

  • Exec?Source

    啟動source的時候執行unix命令,并期望不斷獲取數據,比如tail?-f命令。

    Exec?Source有可能會丟失數據,可以考慮使用Spooling?Dircetory?Source。

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 a1.sources=s1 a1.sinks=k1 a1.channels=c1 ??? a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1 ??? a1.sources.s1.type=exec a1.sources.s1.command= tail -f /home/flume/a.log ??? a1.sinks.k1.type=logger ??? a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100

    執行命令:

    1 flume-ng agent --conf conf/ --conf-file?conf/a1.conf --name a1 -Dflume.root.logger=INFO,console

    測試方法:

    向unix命令監聽的文件追加內容,

    1 echo?'hello, everyone!'?>> a.log
  • Spooling?Directory?Source

    監控某個文件夾,將新產生的文件解析成event,解析方式是可插拔的,默認是LINE,將新文件中的每行數據轉換成一個event。文件解析完成后,該文件名字被追加.completed。

    Spooling?Dircetory?Source相對于Exec?Source更可靠,它不會丟失數據(甚至被重啟或殺死)。為了這種特性,只能是不可修改的、名字不重復的文件才可使用這個source:

    1)監控文件夾內的文件被flume使用后不能進行修改。

    2)文件名字不能重復,如果已經存在了xx.completed,不能在放入名字叫xx的文件。否則報錯,停止。

    3)被監控文件夾里的子文件夾不處理

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 a1.sources=s1 a1.sinks=k1 a1.channels=c1 ??? a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1 ??? a1.sources.s1.type=spooldir a1.sources.s1.spoolDir=/home/flume/a ??? a1.sinks.k1.type=logger ??? a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100

    啟動命令:

    1 flume-ng agent --conf conf/ --conf-file?conf/a1.conf --name a1 -Dflume.root.logger=INFO,console

    測試方法:

    向/home/flume/a文件夾內放入文件。

  • Netcat?Source

    監聽端口數據。類似于nc?-k?-l?[host]?[port]命令.

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 a1.sources=s1 a1.sinks=k1 a1.channels=c1 ??? a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1 ??? a1.sources.s1.type=netcat a1.sources.s1.bind=localhost a1.sources.s1.port=41414 ??? a1.sinks.k1.type=logger ??? a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100

    啟動命令:

    1 flume-ng agent --conf conf/ --conf-file?conf/a1.conf --name a1 -Dflume.root.logger=INFO,console

    測試方式:

    1 telnet localhost 41414
  • Sequence?Generator?Source

    不斷生成從0開始的數字,主要用于測試

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 a1.sources=s1 a1.sinks=k1 a1.channels=c1 ??? a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1 ??? a1.sources.s1.type=seq ??? a1.sinks.k1.type=logger ??? a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100

    啟動命令:

    1 flume-ng agent --conf conf/ --conf-file?conf/a1.conf --name a1 -Dflume.root.logger=INFO,console
  • Syslog?Source

    讀取syslog數據,它分為:Syslog?TCP?Source、Multiport?Syslog?TCP?Source、Syslog?UDP?Source

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 a1.sources=s1 a1.sinks=k1 a1.channels=c1 ??? a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1 ??? a1.sources.s1.type=syslogtcp a1.sources.s1.host=localhost a1.sources.s1.port=41414 ??? a1.sinks.k1.type=logger ??? a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100

    啟動命令:

    1 flume-ng agent --conf conf/ --conf-file?conf/a1.conf --name a1 -Dflume.root.logger=INFO,console

    測試方法:

    需要syslog服務

  • Http?Source

    通過Http獲取event,可以是GET、POST方式,GET方式應該在測試時使用。一個HTTP?Request被handler解析成若干個event,這組event在一個事務里。

    如果handler拋異常,http狀態是400;如果channel滿了,http狀態是503。

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 a1.sources=s1 a1.sinks=k1 a1.channels=c1 ??? a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1 ??? a1.sources.s1.type=http a1.sources.s1.port=41414 ??? a1.sinks.k1.type=logger ??? a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100

    啟動命令:

    1 flume-ng agent --conf conf/ --conf-file?conf/a1.conf --name a1 -Dflume.root.logger=INFO,console

    測試方法:

    1 curl -XPOST http://vm1:41414 -d '[{"headers" : {"timestamp" : "434324343","host" : "random_host.example.com"},"body" : "random_body"},{"headers" : {"namenode" : "namenode.example.com","datanode" : "random_datanode.example.com"},"body" : "really_random_body"}]'

    參數的格式是:

    1 2 3 4 5 6 7 8 9 10 11 12 13 [{ ????"headers" : { ?????????"timestamp" : "434324343", ?????????"host" : "random_host.example.com" ????}, ????"body" : "random_body" }, { ????"headers" : { ?????????"namenode" : "namenode.example.com", ?????????"datanode" : "random_datanode.example.com" ?????}, ????"body" : "really_random_body" }]

    Http?Source?Handler:

    Handler類型說明
    JSONHandler默認值,將文本輸入的每行轉換成一個event
    BlobHandler讀取avro文件,將其中的每條avro記錄轉換成一個event,每個event都附帶著模式信息

  • 轉載于:https://www.cnblogs.com/moonandstar08/p/6284252.html

    總結

    以上是生活随笔為你收集整理的Flume Source 实例的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。