日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

本地日志数据实时接入到hadoop集群的数据接入方案

發(fā)布時間:2024/4/17 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 本地日志数据实时接入到hadoop集群的数据接入方案 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1.?概述

本手冊主要介紹了,一個將傳統(tǒng)數(shù)據(jù)接入到Hadoop集群的數(shù)據(jù)接入方案和實施方法。供數(shù)據(jù)接入和集群運維人員參考。

1.1.??整體方案

Flume作為日志收集工具,監(jiān)控一個文件目錄或者一個文件,當有新數(shù)據(jù)加入時,收集新數(shù)據(jù)發(fā)送給KafkaKafka用來做數(shù)據(jù)緩存和消息訂閱。Kafka里面的消息可以定時落地到HDFS上,也可以用Spark?Streaming來做實時處理,然后將處理后的數(shù)據(jù)落地到HDFS上。

1.2.?數(shù)據(jù)接入流程

本數(shù)據(jù)接入方案,分為以下幾個步驟:

l?安裝部署Flume:在每個數(shù)據(jù)采集節(jié)點上安裝數(shù)據(jù)采集工具Flume。詳見“2、安裝部署Flume”。

l?數(shù)據(jù)預處理:生成特定格式的數(shù)據(jù),供Flume采集。詳見“3、數(shù)據(jù)預處理”。

l?Flume采集數(shù)據(jù)到Kafka:?Flume采集數(shù)據(jù)并發(fā)送到Kafka消息隊列。詳見“4、Flume采集數(shù)據(jù)到Kafka”。

l?Kafka數(shù)據(jù)落地:將Kafka數(shù)據(jù)落地到HDFS。詳見“5、Kafka數(shù)據(jù)落地”。

?

2.?安裝部署Flume

若要采集數(shù)據(jù)節(jié)點的本地數(shù)據(jù),每個節(jié)點都需要安裝一個Flume工具,用來做數(shù)據(jù)采集。

2.2.1下載并安裝

到官網(wǎng)去下載最新版本的Flume

下載地址為:http://flume.apache.org/,目前最新版本為1.6.0,需要1.7及以上版本的JDK

1、解壓

tar?-xzvf?apache-flume-1.6.0-bin.tar.gz??-C?/usr/local/

2、安裝JDK1.7

???如果節(jié)點上JDK版本低于1.7,需要安裝1.7或以上版本的JDK

JDK?1.7?下載地址:

http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html

Flume目錄下創(chuàng)建一個java目錄,存放JDK

cd?/usr/local/apache-flume-1.6.0-bin

mkdir?java

cd?java

tar?-xzvf?jdk-7u79-linux-x64.tar.gz

?

2.2.2配置Flume系統(tǒng)參數(shù)

修改?flume-env.sh?配置文件,主要是JAVA_HOME變量設置

cd?/usr/local/apache-flume-1.6.0-bin/conf

cpflume-env.sh.templateflume-env.sh

flume-env.sh里面設置FLUME_CLASSPATH變量和JAVA_HOME變量,

示例:

export?JAVA_HOME=/usr/local/apache-flume-1.6.0-bin/java/jdk1.7.0_79

FLUME_CLASSPATH="/usr/local/apache-flume-1.6.0-bin/"

變量具體內(nèi)容根據(jù)實際修改

?

2.2.3添加Flume第三方依賴

添加第三方依賴包flume-plugins-1.0-SNAPSHOT.jar,此包實現(xiàn)了一個Flume攔截器,將Flume采集到的數(shù)據(jù)進行序列化、結(jié)構(gòu)化等預處理,最后每條數(shù)據(jù)生成一條Event數(shù)據(jù)返回。

?

cd?/usr/local/apache-flume-1.6.0-bin

mkdir?plugins.d????--創(chuàng)建依賴目錄,目錄名必須為plugins.d

cd?plugins.d?

mkdir?flume-plugins??????????--項目目錄,目錄名隨意

cd?flume-plugins

mkdir?lib???????????--jar目錄,目錄名必須為lib

將第三方jarflume-plugins-1.0-SNAPSHOT.jar放在lib目錄下

2.2.4添加Hive配置文件

hive-site.xml文件拷貝到/usr/local/apache-flume-1.6.0-bin/conf目錄下,并修改hive元數(shù)據(jù)地址與真實地址對應。如下所示:

?<property>

? <name>hive.metastore.uris</name>

? <value>thrift://m103:9083,thrift://m105:9083</value>

?</property>

?

2.2.5創(chuàng)建Flume?agent配置文件

創(chuàng)建flume啟動配置文件,指定sourcechannelsink?3個組件內(nèi)容。每個組件都有好幾種配置選項,具體配置請查看Flume官網(wǎng)。創(chuàng)建配置文件flume.conf,示例如下:

?

vim?flume.conf

a1.sources?=?x1

a1.sinks?=?y1

a1.channels?=?z1

#?Describe/configure?the?source

a1.sources.x1.type?=?exec

a1.sources.x1.channels?=?z1

a1.sources.x1.command?=?tail?-F?/home/xdf/exec.txt

#?Describe?the?sink

a1.sinks.y1.type?=?logger

#?Use?a?channel?which?buffers?events?in?memory

a1.channels.z1.type?=?memory

a1.channels.z1.capacity?=?1000

a1.channels.z1.transactionCapacity?=?100

#?Bind?the?source?and?sink?to?the?channel

a1.sources.x1.channels?=?z1

a1.sinks.y1.channel?=?z1

?

2.2.6啟動Flume?Agent

生產(chǎn)環(huán)境下,參數(shù)-Dflume.root.logger=INFO,console去掉console,此處只為方便查看測試結(jié)果,選擇將日志打印到控制臺。若Flume?agent正常啟動,說明Flume安裝成功。

?

?

cd?/usr/local/apache-flume-1.6.0-bin

./bin/flume-ng?agent?--conf?conf?--conf-file?flume.conf?--name?a3?-Dflume.root.logger=INFO,console

?

2.2.7?測試

上面配置的example.conf文件,實現(xiàn)的功能是監(jiān)控文件/home/xdf/exec.txt,如果有新數(shù)據(jù)寫入時,Flume就會采集到新數(shù)據(jù)并打印在控制臺上。

測試用例:向/home/xdf/exec.txt文件中寫入內(nèi)容“hello?flume”,查看控制臺是否打印出“hello?flume”。正常輸出如下:

?

echo?'hello?flume'?>>?/home/xdf/exec.txt

2015-06-30?16:01:52,910?(SinkRunner-PollingRunner-DefaultSinkProcessor)?[INFO?-?org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)]?Event:?{?headers:{}?body:?68?65?6C?6C?6F?20?66?6C?75?6D?65?hello?flume?}

?

至此,Flume安裝部署完畢。

3.?數(shù)據(jù)預處理

1、Flume采集數(shù)據(jù)都是按行分割的,一行代表一條記錄。如果原始數(shù)據(jù)不符合要求,需要對數(shù)據(jù)進行預處理。示例如下:

原始數(shù)據(jù)格式為:

out:?===?START?OF?INFORMATION?SECTION?===

out:?Vendor:???????????????TOSHIBA

out:?Product:??????????????MBF2300RC

out:?Revision:?????????????0109

out:?User?Capacity:????????300,000,000,000?bytes?[300?GB]

out:?Logical?block?size:???512?bytes

???經(jīng)過預處理,我們將數(shù)據(jù)變?yōu)橐粭l5個字段的記錄:

TOSHIBA;MBF2300RC;0109;300;512

?

2、如果要將上面數(shù)據(jù)接入到hive中,我們還需要下面幾個處理:

a.?創(chuàng)建一張hive

create?table?test(Vendor?string,Product?string,Revision?string,User_Capacity?string,block?string);

b.?在Kafka節(jié)點上創(chuàng)建一個topic,名字與上面hive表名對應,格式為“hive-數(shù)據(jù)庫名-表名”。示例如下:

bin/kafka-topics?--create?--zookeeper?localhost:2181/kafka?????--topic?hive-xdf-test??--partitions?1?--replication-factor?1

c.?將第一步得到的記錄數(shù)據(jù)與topic整合成一條記錄,用“@@”分割。示例如下:

hive-xdf-test?@@TOSHIBA;MBF2300RC;0109;300;512

d.?Flume采集整合后的一條數(shù)據(jù),通過topic獲取hive表的元數(shù)據(jù),根據(jù)元數(shù)據(jù)對記錄數(shù)據(jù)進行結(jié)構(gòu)化、序列化處理,然后經(jīng)過Kafka存入到hive表中。具體操作參考下面具體步驟所示。

4.?Flume采集數(shù)據(jù)到Kafka

Flume如果要將采集到的數(shù)據(jù)發(fā)送到Kafka,需要指定配置文件(如下:flume_test.conf)的sink類型為KafkaSink,并且指定Kafka?broker?list。配置文件示例如下,紅色標注的為KafkaSink配置項:

vim?flume_test.conf

a3.channels?=?c3

a3.sources?=?r3

a3.sinks?=?k3

?

a3.sources.r3.type?=?exec

a3.sources.r3.channels?=?c3

a3.sources.r3.command?=?tail?-F?/home/xdf/exec.txt

a3.sources.r3.fileHeader?=?false

a3.sources.r3.basenameHeader?=?false

a3.sources.r3.interceptors?=?i3

a3.sources.r3.interceptors.i3.type?=iie.flume.interceptor.CSVInterceptor$Builder

a3.sources.r3.interceptors.i3.separator?=?;

a3.sources.r3.decodeErrorPolicy=IGNORE

?

a3.channels.c3.type?=?memory

a3.channels.c3.capacity?=?10000

a3.channels.c3.transactionCapacity?=?1000

?

a3.sinks.k3.channel?=?c3

#?a3.sinks.k3.type?=?logger

#a3.sinks.k3.batchSize?=?10

a3.sinks.k3.type?=?org.apache.flume.sink.kafka.KafkaSink

a3.sinks.k3.brokerList?=?localhost:9092

?

?

注意:此處有一個攔截器插件的定義,它就是用來做結(jié)構(gòu)化、序列化數(shù)據(jù)預處理的。此插件由上面配置的Flume第三方jar包中獲得。

a3.sources.r3.interceptors.i3.type?=iie.flume.interceptor.CSVInterceptor$Builder

?

5.?Kafka數(shù)據(jù)落地

我們提供了一個Camus工具,來定時將Kafka中的數(shù)據(jù)落地到hive表中。

Camus工具包含以下三個文件:

文件

說明

camus-example-0.1.0-cdh-SNAPSHOT-shaded.jar

程序運行jar

camus.properties

配置文件

camusrun.sh

運行腳本

?

配置文件需要根據(jù)實際情況,修改以下兩個參數(shù)

kafka.whitelist.topics=hive-xdf-test?????????----數(shù)據(jù)對應的topic

kafka.brokers=m105:9092,m103:9092????????????----kafka?broker?lists

需要指定多個topic時,用逗號間隔,示例:

Kafka.whitelist.topics=topic1,topic2,topic3

修改完配置文件后,定時運行camusrun.sh腳本,就會將新生成的數(shù)據(jù)接入到topic所對應的hive表中了。

6.?具體案例

6.1?Smart數(shù)據(jù)接入

6.1.2?創(chuàng)建hive

最終我們要將smart數(shù)據(jù)接入到hive表中,所以我們首先要創(chuàng)建一個滿足smart數(shù)據(jù)結(jié)構(gòu)的hive表。

create?table?smart_data(serial_number?String?,update_time?string,smart_health_status?string?,current_drive_temperature?int,drive_trip_temperature?int,elements_in_grown_defect_list?int,manufactured_time?string?,cycle_count?int????,start_stop_cycles?int????,load_unload_count?int????,load_unload_cycles?int????,blocks_sent_to_initiator?bigint?,blocks_received_from_initiator?bigint?,blocks_read_from_cache?bigint?,num_commands_size_not_larger_than_segment_size?bigint?,num_commands_size_larger_than_segment_size?bigint?,num_hours_powered_up?string??????,num_minutes_next_test?int????,read_corrected_ecc_fast?bigint?,read_corrected_ecc_delayed?bigint?,read_corrected_re?bigint?,read_total_errors_corrected?bigint?,read_correction_algo_invocations?bigint?,read_gigabytes_processed?bigint?,read_total_uncorrected_errors?string?,write_corrected_ecc_fast?bigint?,write_corrected_ecc_delayed?bigint?,write_corrected_re?bigint?,write_total_errors_corrected?bigint?,write_correction_algo_invocations?bigint?,write_gigabytes_processed?bigint?,write_total_uncorrected_errors?string?,verify_corrected_ecc_fast?bigint?,verify_corrected_ecc_delayed?bigint?,verify_corrected_re?bigint?,verify_total_errors_corrected?bigint?,verify_correction_algo_invocations?bigint?,verify_gigabytes_processed?bigint?,verify_total_uncorrected_errors?bigint?,non_medium_error_count?bigint);

6.1.2?創(chuàng)建topic

Flume采集到的數(shù)據(jù)要生成一條條的event數(shù)據(jù)傳給kafka消息系統(tǒng)保存,kafka需要事先創(chuàng)建一個topic來生產(chǎn)和消費指定數(shù)據(jù)。為系統(tǒng)正常運行,我們統(tǒng)一定義topic的名字結(jié)構(gòu)為“hive-數(shù)據(jù)庫名-表名”。需要在kafka集群節(jié)點上創(chuàng)建topic,示例如下:

bin/kafka-topics?--create?--zookeeper?localhost:2181/kafka?????--topic?hive-xdf-smart_data??--partitions?1?

--replication-factor?1

注意:此處的數(shù)據(jù)庫名、表名,必須為上一步創(chuàng)建的hive表,因為Flume會通過此topic名來獲取hive表的元數(shù)據(jù)信息,從而生成對應event數(shù)據(jù)。

6.1.2?配置Flume?agent啟動參數(shù)

生成參數(shù)文件smart_test.conf如下:

vim?smart_test.conf

a3.channels?=?c3

a3.sources?=?r3

a3.sinks?=?k3

?

a3.sources.r3.type?=?exec

a3.sources.r3.channels?=?c3

a3.sources.r3.command?=?tail?-F?/home/xdf/exec.txt

a3.sources.r3.fileHeader?=?false

a3.sources.r3.basenameHeader?=?false

a3.sources.r3.interceptors?=?i3

a3.sources.r3.interceptors.i3.type?=iie.flume.interceptor.CSVInterceptor$Builder

a3.sources.r3.interceptors.i3.separator?=?;

a3.sources.r3.decodeErrorPolicy=IGNORE

?

a3.channels.c3.type?=?memory

a3.channels.c3.capacity?=?10000

a3.channels.c3.transactionCapacity?=?1000

?

a3.sinks.k3.channel?=?c3

#?a3.sinks.k3.type?=?logger

#a3.sinks.k3.batchSize?=?10

a3.sinks.k3.type?=?org.apache.flume.sink.kafka.KafkaSink

a3.sinks.k3.brokerList?=?localhost:9092

?

注意:

1、此處數(shù)據(jù)源sources的類型為exec。具體命令為:

a3.sources.r3.command?=?tail?-F?/home/xdf/exec.txt

我們定時在每個節(jié)點運行一個腳本生成一條smart數(shù)據(jù),將數(shù)據(jù)寫入/home/xdf/exec.txt文件。

?

flume用上面那個命令一直監(jiān)控文件/home/xdf/exec.txt,如有新數(shù)據(jù)寫入,則采集傳輸?shù)絢afka里。

?

2、指定了一個自定義的第三方插件,Flume過濾器CSVInterceptor,將CSV格式的數(shù)據(jù)轉(zhuǎn)化成結(jié)構(gòu)化,序列化的Event格式。

?

3、SinkKafkaSink,數(shù)據(jù)會寫到kafka里面,特別注意:這里需要指定對應的brokerList,示例如下:

a3.sinks.k3.brokerList?=?m103:9092,m105:9092

6.1.3?開啟Flume?Agent

執(zhí)行命令:

cd?/usr/local/apache-flume-1.6.0-bin

./bin/flume-ng?agent?--conf?conf?--conf-file?smart_test.conf?--name?a3?-Dflume.root.logger=INFO

6.1.4?生成Smart數(shù)據(jù)

在每個數(shù)據(jù)節(jié)點上運行createEvent.py腳本,生成一條結(jié)構(gòu)化好的smart數(shù)據(jù)。

腳本有兩個參數(shù)smart_data.loghive-xdf-smart_data,前者為smart命令輸出的原始信息文件,后者是topic名字,即上一步生成的topic名。

python?createEvent.py?smart_data.log?hive-xdf-smart_data?>?

/home/xdf/exec.txt

?

此腳本會解析smart原始信息,生成一條帶topic字段的結(jié)構(gòu)化smart數(shù)據(jù)寫入到/home/xdf/exec.txt文件中,數(shù)據(jù)格式如下:

hive-xdf-smart_data@@EB00PC208HFC;2015-06-23?18:56:09;OK;28;65;0;week?08?of?year?2012;50000;21;200000;69;-1;-1;-1;-1;-1;-1;-1;0;0;0;0;0;0;300744.962;0;0;0;0;0;0;10841.446;0;-1;-1;-1;-1;-1;-1;-1

用符號“@@”將topicsmart數(shù)據(jù)分開,smart數(shù)據(jù)每列間用逗號隔開。

6.1.5?測試時查看Kafka數(shù)據(jù)

查看數(shù)據(jù)是否成功生成到kafka中,可在kafka節(jié)點上,通過下面命令查看:

kafka-console-consumer?--zookeeper?localhost:2181/kafka?--topic?hive-xdf-smart_data?--from-beginning

結(jié)果展示:

6.1.6?Kafka數(shù)據(jù)落地到hive表中

打開camus.properties配置文件,修改以下兩個參數(shù)

kafka.whitelist.topics=hive-xdf-smart_data?????----smart數(shù)據(jù)對應topic

kafka.brokers=m105:9092,m103:9092???????????????----kafka?broker?lists

修改完配置文件后,定時運行camusrun.sh腳本,就會將新生成的smart數(shù)據(jù)接入到topic所對應的hive表中了。

至此,數(shù)據(jù)接入流程完畢。

轉(zhuǎn)載于:https://www.cnblogs.com/xiaodf/p/5027167.html

總結(jié)

以上是生活随笔為你收集整理的本地日志数据实时接入到hadoop集群的数据接入方案的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。