本地日志数据实时接入到hadoop集群的数据接入方案
1.?概述
本手冊主要介紹了,一個將傳統(tǒng)數(shù)據(jù)接入到Hadoop集群的數(shù)據(jù)接入方案和實施方法。供數(shù)據(jù)接入和集群運維人員參考。
1.1.??整體方案
Flume作為日志收集工具,監(jiān)控一個文件目錄或者一個文件,當有新數(shù)據(jù)加入時,收集新數(shù)據(jù)發(fā)送給Kafka。Kafka用來做數(shù)據(jù)緩存和消息訂閱。Kafka里面的消息可以定時落地到HDFS上,也可以用Spark?Streaming來做實時處理,然后將處理后的數(shù)據(jù)落地到HDFS上。
1.2.?數(shù)據(jù)接入流程
本數(shù)據(jù)接入方案,分為以下幾個步驟:
l?安裝部署Flume:在每個數(shù)據(jù)采集節(jié)點上安裝數(shù)據(jù)采集工具Flume。詳見“2、安裝部署Flume”。
l?數(shù)據(jù)預處理:生成特定格式的數(shù)據(jù),供Flume采集。詳見“3、數(shù)據(jù)預處理”。
l?Flume采集數(shù)據(jù)到Kafka:?Flume采集數(shù)據(jù)并發(fā)送到Kafka消息隊列。詳見“4、Flume采集數(shù)據(jù)到Kafka”。
l?Kafka數(shù)據(jù)落地:將Kafka數(shù)據(jù)落地到HDFS。詳見“5、Kafka數(shù)據(jù)落地”。
?
2.?安裝部署Flume
若要采集數(shù)據(jù)節(jié)點的本地數(shù)據(jù),每個節(jié)點都需要安裝一個Flume工具,用來做數(shù)據(jù)采集。
2.2.1下載并安裝
到官網(wǎng)去下載最新版本的Flume
下載地址為:http://flume.apache.org/,目前最新版本為1.6.0,需要1.7及以上版本的JDK。
1、解壓
tar?-xzvf?apache-flume-1.6.0-bin.tar.gz??-C?/usr/local/
2、安裝JDK1.7
???如果節(jié)點上JDK版本低于1.7,需要安裝1.7或以上版本的JDK
JDK?1.7?下載地址:
http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html
在Flume目錄下創(chuàng)建一個java目錄,存放JDK
cd?/usr/local/apache-flume-1.6.0-bin
mkdir?java
cd?java
tar?-xzvf?jdk-7u79-linux-x64.tar.gz
?
2.2.2配置Flume系統(tǒng)參數(shù)
修改?flume-env.sh?配置文件,主要是JAVA_HOME變量設置
cd?/usr/local/apache-flume-1.6.0-bin/conf
cpflume-env.sh.templateflume-env.sh
在flume-env.sh里面設置FLUME_CLASSPATH變量和JAVA_HOME變量,
示例:
export?JAVA_HOME=/usr/local/apache-flume-1.6.0-bin/java/jdk1.7.0_79
FLUME_CLASSPATH="/usr/local/apache-flume-1.6.0-bin/"
變量具體內(nèi)容根據(jù)實際修改
?
2.2.3添加Flume第三方依賴
添加第三方依賴包flume-plugins-1.0-SNAPSHOT.jar,此包實現(xiàn)了一個Flume攔截器,將Flume采集到的數(shù)據(jù)進行序列化、結(jié)構(gòu)化等預處理,最后每條數(shù)據(jù)生成一條Event數(shù)據(jù)返回。
?
cd?/usr/local/apache-flume-1.6.0-bin
mkdir?plugins.d????--創(chuàng)建依賴目錄,目錄名必須為plugins.d
cd?plugins.d?
mkdir?flume-plugins??????????--項目目錄,目錄名隨意
cd?flume-plugins
mkdir?lib???????????--jar包目錄,目錄名必須為lib
將第三方jar包flume-plugins-1.0-SNAPSHOT.jar放在lib目錄下
2.2.4添加Hive配置文件
將hive-site.xml文件拷貝到/usr/local/apache-flume-1.6.0-bin/conf目錄下,并修改hive元數(shù)據(jù)地址與真實地址對應。如下所示:
?<property>
? <name>hive.metastore.uris</name>
? <value>thrift://m103:9083,thrift://m105:9083</value>
?</property>
?
2.2.5創(chuàng)建Flume?agent配置文件
創(chuàng)建flume啟動配置文件,指定source,channel,sink?3個組件內(nèi)容。每個組件都有好幾種配置選項,具體配置請查看Flume官網(wǎng)。創(chuàng)建配置文件flume.conf,示例如下:
?
vim?flume.conf
a1.sources?=?x1
a1.sinks?=?y1
a1.channels?=?z1
#?Describe/configure?the?source
a1.sources.x1.type?=?exec
a1.sources.x1.channels?=?z1
a1.sources.x1.command?=?tail?-F?/home/xdf/exec.txt
#?Describe?the?sink
a1.sinks.y1.type?=?logger
#?Use?a?channel?which?buffers?events?in?memory
a1.channels.z1.type?=?memory
a1.channels.z1.capacity?=?1000
a1.channels.z1.transactionCapacity?=?100
#?Bind?the?source?and?sink?to?the?channel
a1.sources.x1.channels?=?z1
a1.sinks.y1.channel?=?z1
?
2.2.6啟動Flume?Agent
生產(chǎn)環(huán)境下,參數(shù)-Dflume.root.logger=INFO,console去掉console,此處只為方便查看測試結(jié)果,選擇將日志打印到控制臺。若Flume?agent正常啟動,說明Flume安裝成功。
?
?
cd?/usr/local/apache-flume-1.6.0-bin
./bin/flume-ng?agent?--conf?conf?--conf-file?flume.conf?--name?a3?-Dflume.root.logger=INFO,console
?
2.2.7?測試
上面配置的example.conf文件,實現(xiàn)的功能是監(jiān)控文件/home/xdf/exec.txt,如果有新數(shù)據(jù)寫入時,Flume就會采集到新數(shù)據(jù)并打印在控制臺上。
測試用例:向/home/xdf/exec.txt文件中寫入內(nèi)容“hello?flume”,查看控制臺是否打印出“hello?flume”。正常輸出如下:
?
echo?'hello?flume'?>>?/home/xdf/exec.txt
2015-06-30?16:01:52,910?(SinkRunner-PollingRunner-DefaultSinkProcessor)?[INFO?-?org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)]?Event:?{?headers:{}?body:?68?65?6C?6C?6F?20?66?6C?75?6D?65?hello?flume?}
?
至此,Flume安裝部署完畢。
3.?數(shù)據(jù)預處理
1、Flume采集數(shù)據(jù)都是按行分割的,一行代表一條記錄。如果原始數(shù)據(jù)不符合要求,需要對數(shù)據(jù)進行預處理。示例如下:
原始數(shù)據(jù)格式為:
out:?===?START?OF?INFORMATION?SECTION?===
out:?Vendor:???????????????TOSHIBA
out:?Product:??????????????MBF2300RC
out:?Revision:?????????????0109
out:?User?Capacity:????????300,000,000,000?bytes?[300?GB]
out:?Logical?block?size:???512?bytes
???經(jīng)過預處理,我們將數(shù)據(jù)變?yōu)橐粭l5個字段的記錄:
TOSHIBA;MBF2300RC;0109;300;512
?
2、如果要將上面數(shù)據(jù)接入到hive中,我們還需要下面幾個處理:
a.?創(chuàng)建一張hive表
create?table?test(Vendor?string,Product?string,Revision?string,User_Capacity?string,block?string);
b.?在Kafka節(jié)點上創(chuàng)建一個topic,名字與上面hive表名對應,格式為“hive-數(shù)據(jù)庫名-表名”。示例如下:
bin/kafka-topics?--create?--zookeeper?localhost:2181/kafka?????--topic?hive-xdf-test??--partitions?1?--replication-factor?1
c.?將第一步得到的記錄數(shù)據(jù)與topic整合成一條記錄,用“@@”分割。示例如下:
hive-xdf-test?@@TOSHIBA;MBF2300RC;0109;300;512
d.?Flume采集整合后的一條數(shù)據(jù),通過topic獲取hive表的元數(shù)據(jù),根據(jù)元數(shù)據(jù)對記錄數(shù)據(jù)進行結(jié)構(gòu)化、序列化處理,然后經(jīng)過Kafka存入到hive表中。具體操作參考下面具體步驟所示。
4.?Flume采集數(shù)據(jù)到Kafka
Flume如果要將采集到的數(shù)據(jù)發(fā)送到Kafka,需要指定配置文件(如下:flume_test.conf)的sink類型為KafkaSink,并且指定Kafka?的broker?list。配置文件示例如下,紅色標注的為KafkaSink配置項:
vim?flume_test.conf
a3.channels?=?c3
a3.sources?=?r3
a3.sinks?=?k3
?
a3.sources.r3.type?=?exec
a3.sources.r3.channels?=?c3
a3.sources.r3.command?=?tail?-F?/home/xdf/exec.txt
a3.sources.r3.fileHeader?=?false
a3.sources.r3.basenameHeader?=?false
a3.sources.r3.interceptors?=?i3
a3.sources.r3.interceptors.i3.type?=iie.flume.interceptor.CSVInterceptor$Builder
a3.sources.r3.interceptors.i3.separator?=?;
a3.sources.r3.decodeErrorPolicy=IGNORE
?
a3.channels.c3.type?=?memory
a3.channels.c3.capacity?=?10000
a3.channels.c3.transactionCapacity?=?1000
?
a3.sinks.k3.channel?=?c3
#?a3.sinks.k3.type?=?logger
#a3.sinks.k3.batchSize?=?10
a3.sinks.k3.type?=?org.apache.flume.sink.kafka.KafkaSink
a3.sinks.k3.brokerList?=?localhost:9092
?
?
注意:此處有一個攔截器插件的定義,它就是用來做結(jié)構(gòu)化、序列化數(shù)據(jù)預處理的。此插件由上面配置的Flume第三方jar包中獲得。
a3.sources.r3.interceptors.i3.type?=iie.flume.interceptor.CSVInterceptor$Builder
?
5.?Kafka數(shù)據(jù)落地
我們提供了一個Camus工具,來定時將Kafka中的數(shù)據(jù)落地到hive表中。
Camus工具包含以下三個文件:
| 文件 | 說明 |
| camus-example-0.1.0-cdh-SNAPSHOT-shaded.jar | 程序運行jar包 |
| camus.properties | 配置文件 |
| camusrun.sh | 運行腳本 |
?
配置文件需要根據(jù)實際情況,修改以下兩個參數(shù)
kafka.whitelist.topics=hive-xdf-test?????????----數(shù)據(jù)對應的topic
kafka.brokers=m105:9092,m103:9092????????????----kafka?broker?lists
需要指定多個topic時,用逗號間隔,示例:
Kafka.whitelist.topics=topic1,topic2,topic3
修改完配置文件后,定時運行camusrun.sh腳本,就會將新生成的數(shù)據(jù)接入到topic所對應的hive表中了。
6.?具體案例
6.1?Smart數(shù)據(jù)接入
6.1.2?創(chuàng)建hive表
最終我們要將smart數(shù)據(jù)接入到hive表中,所以我們首先要創(chuàng)建一個滿足smart數(shù)據(jù)結(jié)構(gòu)的hive表。
create?table?smart_data(serial_number?String?,update_time?string,smart_health_status?string?,current_drive_temperature?int,drive_trip_temperature?int,elements_in_grown_defect_list?int,manufactured_time?string?,cycle_count?int????,start_stop_cycles?int????,load_unload_count?int????,load_unload_cycles?int????,blocks_sent_to_initiator?bigint?,blocks_received_from_initiator?bigint?,blocks_read_from_cache?bigint?,num_commands_size_not_larger_than_segment_size?bigint?,num_commands_size_larger_than_segment_size?bigint?,num_hours_powered_up?string??????,num_minutes_next_test?int????,read_corrected_ecc_fast?bigint?,read_corrected_ecc_delayed?bigint?,read_corrected_re?bigint?,read_total_errors_corrected?bigint?,read_correction_algo_invocations?bigint?,read_gigabytes_processed?bigint?,read_total_uncorrected_errors?string?,write_corrected_ecc_fast?bigint?,write_corrected_ecc_delayed?bigint?,write_corrected_re?bigint?,write_total_errors_corrected?bigint?,write_correction_algo_invocations?bigint?,write_gigabytes_processed?bigint?,write_total_uncorrected_errors?string?,verify_corrected_ecc_fast?bigint?,verify_corrected_ecc_delayed?bigint?,verify_corrected_re?bigint?,verify_total_errors_corrected?bigint?,verify_correction_algo_invocations?bigint?,verify_gigabytes_processed?bigint?,verify_total_uncorrected_errors?bigint?,non_medium_error_count?bigint);
6.1.2?創(chuàng)建topic
Flume采集到的數(shù)據(jù)要生成一條條的event數(shù)據(jù)傳給kafka消息系統(tǒng)保存,kafka需要事先創(chuàng)建一個topic來生產(chǎn)和消費指定數(shù)據(jù)。為系統(tǒng)正常運行,我們統(tǒng)一定義topic的名字結(jié)構(gòu)為“hive-數(shù)據(jù)庫名-表名”。需要在kafka集群節(jié)點上創(chuàng)建topic,示例如下:
bin/kafka-topics?--create?--zookeeper?localhost:2181/kafka?????--topic?hive-xdf-smart_data??--partitions?1?
--replication-factor?1
注意:此處的數(shù)據(jù)庫名、表名,必須為上一步創(chuàng)建的hive表,因為Flume會通過此topic名來獲取hive表的元數(shù)據(jù)信息,從而生成對應event數(shù)據(jù)。
6.1.2?配置Flume?agent啟動參數(shù)
生成參數(shù)文件smart_test.conf如下:
vim?smart_test.conf
a3.channels?=?c3
a3.sources?=?r3
a3.sinks?=?k3
?
a3.sources.r3.type?=?exec
a3.sources.r3.channels?=?c3
a3.sources.r3.command?=?tail?-F?/home/xdf/exec.txt
a3.sources.r3.fileHeader?=?false
a3.sources.r3.basenameHeader?=?false
a3.sources.r3.interceptors?=?i3
a3.sources.r3.interceptors.i3.type?=iie.flume.interceptor.CSVInterceptor$Builder
a3.sources.r3.interceptors.i3.separator?=?;
a3.sources.r3.decodeErrorPolicy=IGNORE
?
a3.channels.c3.type?=?memory
a3.channels.c3.capacity?=?10000
a3.channels.c3.transactionCapacity?=?1000
?
a3.sinks.k3.channel?=?c3
#?a3.sinks.k3.type?=?logger
#a3.sinks.k3.batchSize?=?10
a3.sinks.k3.type?=?org.apache.flume.sink.kafka.KafkaSink
a3.sinks.k3.brokerList?=?localhost:9092
?
注意:
1、此處數(shù)據(jù)源sources的類型為exec。具體命令為:
a3.sources.r3.command?=?tail?-F?/home/xdf/exec.txt
我們定時在每個節(jié)點運行一個腳本生成一條smart數(shù)據(jù),將數(shù)據(jù)寫入/home/xdf/exec.txt文件。
?
flume用上面那個命令一直監(jiān)控文件/home/xdf/exec.txt,如有新數(shù)據(jù)寫入,則采集傳輸?shù)絢afka里。
?
2、指定了一個自定義的第三方插件,Flume過濾器CSVInterceptor,將CSV格式的數(shù)據(jù)轉(zhuǎn)化成結(jié)構(gòu)化,序列化的Event格式。
?
3、Sink為KafkaSink,數(shù)據(jù)會寫到kafka里面,特別注意:這里需要指定對應的brokerList,示例如下:
a3.sinks.k3.brokerList?=?m103:9092,m105:9092
6.1.3?開啟Flume?Agent
執(zhí)行命令:
cd?/usr/local/apache-flume-1.6.0-bin
./bin/flume-ng?agent?--conf?conf?--conf-file?smart_test.conf?--name?a3?-Dflume.root.logger=INFO
6.1.4?生成Smart數(shù)據(jù)
在每個數(shù)據(jù)節(jié)點上運行createEvent.py腳本,生成一條結(jié)構(gòu)化好的smart數(shù)據(jù)。
腳本有兩個參數(shù)smart_data.log,hive-xdf-smart_data,前者為smart命令輸出的原始信息文件,后者是topic名字,即上一步生成的topic名。
python?createEvent.py?smart_data.log?hive-xdf-smart_data?>?
/home/xdf/exec.txt
?
此腳本會解析smart原始信息,生成一條帶topic字段的結(jié)構(gòu)化smart數(shù)據(jù)寫入到/home/xdf/exec.txt文件中,數(shù)據(jù)格式如下:
hive-xdf-smart_data@@EB00PC208HFC;2015-06-23?18:56:09;OK;28;65;0;week?08?of?year?2012;50000;21;200000;69;-1;-1;-1;-1;-1;-1;-1;0;0;0;0;0;0;300744.962;0;0;0;0;0;0;10841.446;0;-1;-1;-1;-1;-1;-1;-1
用符號“@@”將topic跟smart數(shù)據(jù)分開,smart數(shù)據(jù)每列間用逗號隔開。
6.1.5?測試時查看Kafka數(shù)據(jù)
查看數(shù)據(jù)是否成功生成到kafka中,可在kafka節(jié)點上,通過下面命令查看:
kafka-console-consumer?--zookeeper?localhost:2181/kafka?--topic?hive-xdf-smart_data?--from-beginning
結(jié)果展示:
6.1.6?Kafka數(shù)據(jù)落地到hive表中
打開camus.properties配置文件,修改以下兩個參數(shù)
kafka.whitelist.topics=hive-xdf-smart_data?????----smart數(shù)據(jù)對應topic
kafka.brokers=m105:9092,m103:9092???????????????----kafka?broker?lists
修改完配置文件后,定時運行camusrun.sh腳本,就會將新生成的smart數(shù)據(jù)接入到topic所對應的hive表中了。
至此,數(shù)據(jù)接入流程完畢。
轉(zhuǎn)載于:https://www.cnblogs.com/xiaodf/p/5027167.html
總結(jié)
以上是生活随笔為你收集整理的本地日志数据实时接入到hadoop集群的数据接入方案的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: daterangepicker 日期范围
- 下一篇: Entity Framework 使用注