日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flume案例实操

發布時間:2024/2/28 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume案例实操 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

案例實操

3.1?Flume實時讀取目錄中文件到HDFS案例

1)案例需求:使用flume監聽整個目錄的文件

2)需求分析:

?

3)實現步驟:

1.創建配置文件flume-dir-hdfs.conf

創建一個文件

[root@linux02 job]$ touch flume-dir-hdfs.conf

打開文件

[root@linux02 job]$ vim flume-dir-hdfs.conf

添加如下內容

a3.sources = r3

a3.sinks = k3

a3.channels = c3

?

# Describe/configure the source

a3.sources.r3.type = spooldir

a3.sources.r3.spoolDir = /opt/module/flume/upload

a3.sources.r3.fileSuffix = .COMPLETED

a3.sources.r3.fileHeader = true

#忽略所有以.tmp結尾的文件,不上傳

a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

?

# Describe the sink

a3.sinks.k3.type = hdfs

a3.sinks.k3.hdfs.path = hdfs://linux01:9000/flume/upload/%Y%m%d/%H

#上傳文件的前綴

a3.sinks.k3.hdfs.filePrefix = upload-

#是否按照時間滾動文件夾

a3.sinks.k3.hdfs.round = true

#多少時間單位創建一個新的文件夾

a3.sinks.k3.hdfs.roundValue = 1

#重新定義時間單位

a3.sinks.k3.hdfs.roundUnit = hour

#是否使用本地時間戳

a3.sinks.k3.hdfs.useLocalTimeStamp = true

?

#積攢多少個Event才flush到HDFS一次

a3.sinks.k3.hdfs.batchSize = 100

#設置文件類型,可支持壓縮

a3.sinks.k3.hdfs.fileType = DataStream

#多久生成一個新的文件

a3.sinks.k3.hdfs.rollInterval = 30

#設置每個文件的滾動大小大概是128M

a3.sinks.k3.hdfs.rollSize = 134217700

#文件的滾動與Event數量無關

a3.sinks.k3.hdfs.rollCount = 0

?

# Use a channel which buffers events in memory

a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactionCapacity = 100

?

# Bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

?

2. 啟動監控文件夾命令

[root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

說明:?在使用Spooling Directory Source時

  • 不要在監控目錄中創建并持續修改文件
  • 上傳完成的文件會以.COMPLETED結尾
  • 被監控文件夾每500毫秒掃描一次文件變動
  • 3. 向upload文件夾中添加文件

    /opt/module/flume目錄下創建upload文件

    [root@linux02 flume]$ mkdir upload

    upload文件夾中添加文件

    [root@linux02 upload]$ touch hadoop.txt

    [root@linux02 upload]$ touch hadoop.tmp

    [root@linux02 upload]$ touch hadoop.log

    4. 查看HDFS上的數據

    5. 等待1s,再次查詢upload文件夾

    [root@linux02 upload]$ ll

    總用量 0

    -rw-rw-r--. 1 hadoop hadoop 0 5月 ?20 22:31 bigdata.log.COMPLETED

    -rw-rw-r--. 1 hadoop hadoop 0 5月 ?20 22:31 bigdata.tmp

    -rw-rw-r--. 1 hadoop hadoop 0 5月 ?20 22:31 bigdata.txt.COMPLETED

    3.2 [重點]Flume實時讀取本地文件新增內容到HDFS案例

    1)案例需求:實時監控Hive日志,并上傳到HDFS中

    2)需求分析:

    ?

    3)實現步驟:

    創建flume-file-hdfs.conf文件

    創建文件

    [root@linux02 job]$ touch flume-file-hdfs.conf

    注:要想讀取Linux系統中的文件,就得按照Linux命令的規則執行命令。由于hive日志在Linux系統中所以讀取文件的類型選擇:exec即execute執行的意思。表示執行Linux命令來讀取文件。

    [root@linux02 job]$ vim flume-file-hdfs.conf

    添加如下內容

    # Name the components on this agent

    a2.sources = r2

    a2.sinks = k2

    a2.channels = c2

    ?

    # Describe/configure the source

    a2.sources.r2.type = exec

    a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log

    a2.sources.r2.shell = /bin/bash -c

    ?

    # Describe the sink

    a2.sinks.k2.type = hdfs

    a2.sinks.k2.hdfs.path = hdfs://linux01:9000/flume/%Y%m%d/%H

    #上傳文件的前綴

    a2.sinks.k2.hdfs.filePrefix = logs-

    #是否按照時間滾動文件夾

    a2.sinks.k2.hdfs.round = true

    #多少時間單位創建一個新的文件夾

    a2.sinks.k2.hdfs.roundValue = 1

    #重新定義時間單位

    a2.sinks.k2.hdfs.roundUnit = hour

    #是否使用本地時間戳

    a2.sinks.k2.hdfs.useLocalTimeStamp = true

    #積攢多少個Event才flush到HDFS一次

    a2.sinks.k2.hdfs.batchSize = 1000

    #設置文件類型,可支持壓縮

    a2.sinks.k2.hdfs.fileType = DataStream

    #多久生成一個新的文件

    a2.sinks.k2.hdfs.rollInterval = 600

    #設置每個文件的滾動大小

    a2.sinks.k2.hdfs.rollSize = 134217700

    #文件的滾動與Event數量無關

    a2.sinks.k2.hdfs.rollCount = 0

    ?

    # Use a channel which buffers events in memory

    a2.channels.c2.type = memory

    a2.channels.c2.capacity = 1000

    a2.channels.c2.transactionCapacity = 100

    ?

    # Bind the source and sink to the channel

    a2.sources.r2.channels = c2

    a2.sinks.k2.channel = c2

    ?

    執行監控配置

    [root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

    開啟hadoop和hive并操作hive產生日志

    [root@linux02 hadoop-2.7.2]$ sbin/start-dfs.sh

    [root@linux02 hadoop-2.7.2]$ sbin/start-yarn.sh

    ?

    [root@linux02 hive]$ bin/hive

    hive (default)>

    在HDFS上查看文件

    3.3?單數據源多出口案例

    單Source多Channel、Sink如圖所示

    圖?單Source多Channel、Sink

  • 案例需求:使用flume-1監控文件變動,flume-1將變動內容傳遞給flume-2,flume-2負責存儲到HDFS。同時flume-1將變動內容傳遞給flume-3,flume-3負責輸出到local filesystem。
  • 2)需求分析:

    3)實現步驟:

    0.準備工作

    在job目錄下創建group1文件夾

    [root@linux02 job]$ cd group1/

    在/opt/module/datas/目錄下創建flume3文件夾

    [root@linux02 datas]$ mkdir flume3

    1.創建flume-file-flume.conf

    配置1個接收日志文件的source和兩個channel、兩個sink,分別輸送給flume-flume-hdfs和flume-flume-dir。

    創建配置文件并打開

    [root@linux02 group1]$ touch flume-file-flume.conf

    [root@linux02 group1]$ vim flume-file-flume.conf

    添加如下內容

    # Name the components on this agent

    a1.sources = r1

    a1.sinks = k1 k2

    a1.channels = c1 c2

    # 將數據流復制給多個channel

    a1.sources.r1.selector.type = replicating

    ?

    # Describe/configure the source

    a1.sources.r1.type = exec

    a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log

    a1.sources.r1.shell = /bin/bash -c

    ?

    # Describe the sink

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = linux02

    a1.sinks.k1.port = 4141

    ?

    a1.sinks.k2.type = avro

    a1.sinks.k2.hostname = linux02

    a1.sinks.k2.port = 4142

    ?

    # Describe the channel

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    ?

    a1.channels.c2.type = memory

    a1.channels.c2.capacity = 1000

    a1.channels.c2.transactionCapacity = 100

    ?

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1 c2

    a1.sinks.k1.channel = c1

    a1.sinks.k2.channel = c2

    :Avro是由Hadoop創始人Doug Cutting創建的一種語言無關的數據序列化和RPC框架。

    注:RPC(Remote Procedure Call)—遠程過程調用,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。

    2.創建flume-flume-hdfs.conf

    配置上級flume輸出的source,輸出是到hdfs的sink。

    創建配置文件并打開

    [root@linux02 group1]$ touch flume-flume-hdfs.conf

    [root@linux02 group1]$ vim flume-flume-hdfs.conf

    添加如下內容

    # Name the components on this agent

    a2.sources = r1

    a2.sinks = k1

    a2.channels = c1

    ?

    # Describe/configure the source

    a2.sources.r1.type = avro

    a2.sources.r1.bind = linux02

    a2.sources.r1.port = 4141

    ?

    # Describe the sink

    a2.sinks.k1.type = hdfs

    a2.sinks.k1.hdfs.path = hdfs://linux01:9000/flume2/%Y%m%d/%H

    #上傳文件的前綴

    a2.sinks.k1.hdfs.filePrefix = flume2-

    #是否按照時間滾動文件夾

    a2.sinks.k1.hdfs.round = true

    #多少時間單位創建一個新的文件夾

    a2.sinks.k1.hdfs.roundValue = 1

    #重新定義時間單位

    a2.sinks.k1.hdfs.roundUnit = hour

    #是否使用本地時間戳

    a2.sinks.k1.hdfs.useLocalTimeStamp = true

    #積攢多少個Event才flush到HDFS一次

    a2.sinks.k1.hdfs.batchSize = 100

    #設置文件類型,可支持壓縮

    a2.sinks.k1.hdfs.fileType = DataStream

    #多久生成一個新的文件

    a2.sinks.k1.hdfs.rollInterval = 600

    #設置每個文件的滾動大小大概是128M

    a2.sinks.k1.hdfs.rollSize = 134217700

    #文件的滾動與Event數量無關

    a2.sinks.k1.hdfs.rollCount = 0

    #最小冗余數

    a2.sinks.k1.hdfs.minBlockReplicas = 1

    ?

    # Describe the channel

    a2.channels.c1.type = memory

    a2.channels.c1.capacity = 1000

    a2.channels.c1.transactionCapacity = 100

    ?

    # Bind the source and sink to the channel

    a2.sources.r1.channels = c1

    a2.sinks.k1.channel = c1

    3.創建flume-flume-dir.conf

    配置上級flume輸出的source,輸出是到本地目錄的sink。

    創建配置文件并打開

    [root@linux02 group1]$ touch flume-flume-dir.conf

    [root@linux02 group1]$ vim flume-flume-dir.conf

    添加如下內容

    # Name the components on this agent

    a3.sources = r1

    a3.sinks = k1

    a3.channels = c2

    ?

    # Describe/configure the source

    a3.sources.r1.type = avro

    a3.sources.r1.bind = linux02

    a3.sources.r1.port = 4142

    ?

    # Describe the sink

    a3.sinks.k1.type = file_roll

    a3.sinks.k1.sink.directory = /opt/module/datas/flume3

    ?

    # Describe the channel

    a3.channels.c2.type = memory

    a3.channels.c2.capacity = 1000

    a3.channels.c2.transactionCapacity = 100

    ?

    # Bind the source and sink to the channel

    a3.sources.r1.channels = c2

    a3.sinks.k1.channel = c2

    提示:輸出的本地目錄必須是已經存在的目錄,如果該目錄不存在,并不會創建新的目錄。

    4.執行配置文件

    分別開啟對應配置文件:flume-flume-dir,flume-flume-hdfs,flume-file-flume。

    [root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf

    [root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf

    [root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf

    5.啟動hadoop和hive

    [root@linux02 hadoop-2.7.2]$ sbin/start-dfs.sh

    [root@linux02 hadoop-2.7.2]$ sbin/start-yarn.sh

    ?

    [root@linux02 hive]$ bin/hive

    hive (default)>

    ?

    6.檢查HDFS上數據

    ?

    7檢查/opt/module/datas/flume3目錄中數據

    [root@linux02 flume3]$ ll

    總用量 8

    -rw-rw-r--. 1 root?root?5942 5月 ?22 00:09 1526918887550-3

    3.4?多數據源匯總案例

    多Source匯總數據到單Flume如圖所示

    圖?多Flume匯總數據到單Flume

  • 案例需求:
  • linux01上的flume-1監控文件hive.log,

    linux01上的flume-2監控某一個端口的數據流,

    flume-1與flume-2將數據發送給linux01上的flume-3,flume-3將最終數據打印到控制臺

  • 需求分析:
  • ?

    3)實現步驟:

    0.準備工作

    分發flume

    [root@linux02 module]$ scp?flume

    在linux02、linux03以及hadoop104的/opt/module/flume/job目錄下創建一個group2文件夾

    [root@linux02 job]$ mkdir group2

    [root@linux02 job]$ mkdir group2

    [root@linux03 job]$ mkdir group2

    1.創建flume1.conf

    配置source用于監控hive.log文件,配置sink輸出數據到下一級flume。

    在linux02上創建配置文件并打開

    [root@linux02 group2]$ touch flume-file.conf

    [root@linux02 group2]$ vim flume-file.conf

    添加如下內容

    # Name the components on this agent

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    ?

    # Describe/configure the source

    a1.sources.r1.type = exec

    a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log

    a1.sources.r1.shell = /bin/bash -c

    ?

    # Describe the sink

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = linux02

    a1.sinks.k1.port = 4141

    ?

    # Describe the channel

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    ?

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    2.創建flume2.conf

    配置source監控端口44444數據流,配置sink數據到下一級flume:

    在hadoop104上創建配置文件并打開

    [root@linux03 group2]$ touch flume2.conf

    [root@linux03 group2]$ vim flume2.conf

    添加如下內容

    # Name the components on this agent

    a2.sources = r1

    a2.sinks = k1

    a2.channels = c1

    ?

    # Describe/configure the source

    a2.sources.r1.type = netcat

    a2.sources.r1.bind = linux02

    a2.sources.r1.port = 44444

    ?

    # Describe the sink

    a2.sinks.k1.type = avro

    a2.sinks.k1.hostname = linux02

    a2.sinks.k1.port = 4141

    ?

    # Use a channel which buffers events in memory

    a2.channels.c1.type = memory

    a2.channels.c1.capacity = 1000

    a2.channels.c1.transactionCapacity = 100

    ?

    # Bind the source and sink to the channel

    a2.sources.r1.channels = c1

    a2.sinks.k1.channel = c1

    3.創建flume3.conf

    配置source用于接收flume1與flume2發送過來的數據流,最終合并后sink到控制臺。

    在linux02上創建配置文件并打開

    [root@linux02 group2]$ touch flume3.conf

    [root@linux02 group2]$ vim flume3.conf

    添加如下內容

    # Name the components on this agent

    a3.sources = r1

    a3.sinks = k1

    a3.channels = c1

    ?

    # Describe/configure the source

    a3.sources.r1.type = avro

    a3.sources.r1.bind = linux02

    a3.sources.r1.port = 4141

    ?

    # Describe the sink

    # Describe the sink

    a3.sinks.k1.type = logger

    ?

    # Describe the channel

    a3.channels.c1.type = memory

    a3.channels.c1.capacity = 1000

    a3.channels.c1.transactionCapacity = 100

    ?

    # Bind the source and sink to the channel

    a3.sources.r1.channels = c1

    a3.sinks.k1.channel = c1

    4.執行配置文件

    分別開啟對應配置文件:flume3.conf,flume2.conf,flume1.conf。

    [root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume3.conf -Dflume.root.logger=INFO,console

    [root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume2.conf

    [root@linux03 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume-file.conf

    5.在linux02上向/opt/module目錄下的group.log追加內容

    [root@linux02 module]$ echo 'hello' > group.log

    6.在linux03上向44444端口發送數據

    [root@linux03 flume]$ telnet hadoop104?44444

    7. ?檢查數據

    總結

    以上是生活随笔為你收集整理的Flume案例实操的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。