日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop辅助工具——Flume、Sqoop

發布時間:2024/1/17 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop辅助工具——Flume、Sqoop 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  

前言

  在一個完整的離線大數據處理系統中,除了hdfs+mapreduce+hive組成分析系統的核心之外,還需要數據采集、結果數據導出、任務調度等不可或缺的輔助系統,而這些輔助工具在hadoop生態體系中都有便捷的開源框架,如圖所示:

1. Flume日志采集框架

1.1 Flume介紹

1.1.1 概述

  • Flume是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統。
  • Flume可以采集文件,socket數據包、文件、文件夾、kafka等各種形式源數據,又可以將采集到的數據(下沉sink)輸出到HDFS、hbase、hive、kafka等眾多外部存儲系統中
  • 一般的采集需求,通過對flume的簡單配置即可實現
  • Flume針對特殊場景也具備良好的自定義擴展能力

  因此,flume可以適用于大部分的日常數據采集場景。

1.1.2 運行機制

  • Flume分布式系統中最核心的角色是agent,flume采集系統就是由一個個agent所連接起來形成。
  • 每一個agent相當于一個數據傳遞員(Source 到 Channel 到 Sink之間傳遞數據的形式是Event事件,Event事件是一個數據流單元),內部有3個組件:
  •     a)?Source:采集組件,用于跟數據源對接,以獲取數據

        b)?Sink:下沉組件,用于往下一級agent傳遞數據或者往最終存儲系統傳遞數據

        c)?Channel:傳輸通道組件,用于從source將數據傳遞到sink

    ?

    1.1.3 Flume采集系統結構圖

    ?1. 簡單結構——單個agent采集數據

    ?

    2. 復雜結構——多級agent之間串聯

    ?

    ?

    1.2 Flume實戰案例

    1.2.1 Flume的安裝部署

      1、Flume的安裝非常簡單,只需要解壓即可,當然,前提是已有hadoop環境。上傳安裝包到數據源所在節點上

      然后解壓 ?tar -zxvf apache-flume-1.6.0-bin.tar.gz

      然后進入flume的目錄,修改conf下的flume-env.sh,在里面配置JAVA_HOME

      2、根據數據采集的需求配置采集方案,描述在配置文件中(文件名可任意自定義)

      3、指定采集方案配置文件,在相應的節點上啟動flume agent

      先用一個最簡單的例子來測試一下程序環境是否正常

      1、先在flume的conf目錄下新建一個配置文件(采集方案)

      vi ??netcat-logger.properties

    # 定義這個agent中各組件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1# 描述和配置source組件:r1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444# 描述和配置sink組件:k1 a1.sinks.k1.type = logger# 描述和配置channel組件,此處使用是內存緩存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# 描述和配置source channel sink之間的連接關系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

      2、啟動agent去采集數據

    bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console

      -c conf ??指定flume自身的配置文件所在目錄

      -f conf/netcat-logger.con ?指定我們所描述的采集方案

      -n a1 ?指定我們這個agent的名字

      3、測試

      先要往agent的source所監聽的端口上發送數據,讓agent有數據可采。隨便在一個能跟agent節點聯網的機器上。

      telnet anget-hostname ?port ??(telnet localhost 44444)?

    1.2.2 采集案例

      1、采集日志目錄中的文件到HDFS

      結構示意圖:

      采集需求:某服務器的某特定目錄下,會不斷產生新的文件,每當有新文件出現,就需要把文件采集到HDFS中去

      根據需求,首先定義以下3大要素

  • 數據源組件,即source ——監控文件目錄 : ?spooldir
  •     spooldir特性:

          1、監視一個目錄,只要目錄中出現新文件,就會采集文件中的內容

          2、采集完成的文件,會被agent自動添加一個后綴:COMPLETED

          3、所監視的目錄中不允許重復出現相同文件名的文件

  • 下沉組件,即sink——HDFS文件系統 ?: ?hdfs sink
  • 通道組件,即channel——可用file channel 也可以用內存channel
  •   配置文件編寫:

    #定義三大組件的名稱 agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1# 配置source組件 agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /home/hadoop/logs/ agent1.sources.source1.fileHeader = false#配置攔截器 agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname# 配置sink組件 agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 #agent1.sinks.sink1.hdfs.round = true #agent1.sinks.sink1.hdfs.roundValue = 10 #agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600# Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1

      Channel參數解釋:

      capacity:默認該通道中最大的可以存儲的event數量

      trasactionCapacity:每次最大可以從source中拿到或者送到sink中的event數量

      keep-alive:event添加到通道中或者移出的允許時間

    ?

      測試階段,啟動flume agent的命令:

    bin/flume-ng agent -c ./conf -f ./dir-hdfs.conf -n agent1 -Dflume.root.logger=DEBUG,console

      -D后面跟的是log4j的參數,用于測試觀察

      生產中,啟動flume,應該把flume啟動在后臺:

    nohup bin/flume-ng agent -c ./conf -f ./dir-hdfs.conf -n agent1 1>/dev/null 2>&1 &

    2、采集文件到HDFS

      采集需求:比如業務系統使用log4j生成的日志,日志內容不斷增加,需要把追加到日志文件中的數據實時采集到hdfs

      根據需求,首先定義以下3大要素

  • 采集源,即source——監控文件內容更新 : ?exec ?‘tail -F file’
  • 下沉目標,即sink——HDFS文件系統 ?: ?hdfs sink
  • Source和sink之間的傳遞通道——channel,可用file channel 也可以用 內存channel
  •   配置文件編寫:

    agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1# Describe/configure tail -F source1 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log agent1.sources.source1.channels = channel1#configure host for source agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname# Describe sink1 agent1.sinks.sink1.type = hdfs #a1.sinks.k1.channel = c1 agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 10 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600# Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1

    3、兩個agent級聯

    1.3 更多source和sink組件

      Flume支持眾多的source和sink類型,詳細手冊可參考官方文檔?http://flume.apache.org/FlumeUserGuide.html

    2. sqoop數據遷移工具

    2.1 概述

      sqoop是apache旗下一款“Hadoop和關系數據庫服務器之間傳送數據”的工具。

      導入數據:MySQL,Oracle導入數據到Hadoop的HDFS、HIVE、HBASE等數據存儲系統;

      導出數據:從Hadoop的文件系統中導出數據到關系數據庫mysql等

    2.2 工作機制

      將導入或導出命令翻譯成mapreduce程序來實現,在翻譯出的mapreduce中主要是對inputformat和outputformat進行定制。

    2.3 sqoop實戰及原理

    2.3.1 sqoop安裝

      安裝sqoop的前提是已經具備java和hadoop的環境

      1、下載并解壓

      最新版下載地址http://ftp.wayne.edu/apache/sqoop/1.4.6/

      2、修改配置文件

        $ cd $SQOOP_HOME/conf

        $ mv sqoop-env-template.sh sqoop-env.sh

      打開sqoop-env.sh并編輯下面幾行:

    export HADOOP_COMMON_HOME=/home/hadoop/apps/hadoop-2.6.1/ export HADOOP_MAPRED_HOME=/home/hadoop/apps/hadoop-2.6.1/ export HIVE_HOME=/home/hadoop/apps/hive-1.2.1

      3、加入mysql的jdbc驅動包

        cp ?~/app/hive/lib/mysql-connector-java-5.1.28.jar ??$SQOOP_HOME/lib/

      4、驗證啟動

        $ cd $SQOOP_HOME/bin

        $ sqoop-version

      預期的輸出:

        15/12/17 14:52:32 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6

        Sqoop 1.4.6?git commit id 5b34accaca7de251fc91161733f906af2eddbe83

        Compiled by abe on Fri Aug 1 11:19:26 PDT 2015

      到這里,整個Sqoop安裝工作完成。

    ?

      驗證sqoop到mysql業務庫之間的連通性:

        bin/sqoop-list-databases --connect jdbc:mysql://localhost:3306 --username root --password root

        bin/sqoop-list-tables --connect jdbc:mysql://localhost:3306/userdb?--username root --password root

    2.4 Sqoop的數據導入

      “導入工具”導入單個表從RDBMS到HDFS。表中的每一行被視為HDFS的記錄。所有記錄都存儲為文本文件的文本數據(或者Avro、sequence文件等二進制數據)?

    2.4.1 語法

      下面的語法用于將數據導入HDFS

    $ sqoop import (generic-args) (import-args)

    2.4.2 示例

    表數據

      在mysql中有一個庫userdb中三個表:emp,?emp_add和emp_conn

      表emp:

    id

    name

    deg

    salary

    dept

    1201

    gopal

    manager

    50,000

    TP

    1202

    manisha

    Proof reader

    50,000

    TP

    1203

    khalil

    php dev

    30,000

    AC

    1204

    prasanth

    php dev

    30,000

    AC

    1205

    kranthi

    admin

    20,000

    TP

      表emp_add:

    id

    hno

    street

    city

    1201

    288A

    vgiri

    jublee

    1202

    108I

    aoc

    sec-bad

    1203

    144Z

    pgutta

    hyd

    1204

    78B

    old city

    sec-bad

    1205

    720X

    hitec

    sec-bad

      表emp_conn:

    id

    phno

    email

    1201

    2356742

    gopal@tp.com

    1202

    1661663

    manisha@tp.com

    1203

    8887776

    khalil@ac.com

    1204

    9988774

    prasanth@ac.com

    1205

    1231231

    kranthi@tp.com

    導入表表數據到HDFS

      下面的命令用于從MySQL數據庫服務器中的emp表導入HDFS

    bin/sqoop import \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --table emp \ --m 1

      如果成功執行,那么會得到下面的輸出

    14/12/22 15:24:54 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5 14/12/22 15:24:56 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset. INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hadoop/compile/cebe706d23ebb1fd99c1f063ad51ebd7/emp.jar ----------------------------------------------------- O mapreduce.Job: map 0% reduce 0% 14/12/22 15:28:08 INFO mapreduce.Job: map 100% reduce 0% 14/12/22 15:28:16 INFO mapreduce.Job: Job job_1419242001831_0001 completed successfully ----------------------------------------------------- ----------------------------------------------------- 14/12/22 15:28:17 INFO mapreduce.ImportJobBase: Transferred 145 bytes in 177.5849 seconds (0.8165 bytes/sec) 14/12/22 15:28:17 INFO mapreduce.ImportJobBase: Retrieved 5 records.

      為了驗證在HDFS導入的數據,請使用以下命令查看導入的數據

    $ $HADOOP_HOME/bin/hadoop fs -cat /user/hadoop/emp/part-m-00000

      emp表的數據和字段之間用逗號(,)表示。

    1201, gopal, manager, 50000, TP 1202, manisha, preader, 50000, TP 1203, kalil, php dev, 30000, AC 1204, prasanth, php dev, 30000, AC 1205, kranthi, admin, 20000, TP

    導入到HDFS指定目錄

      在導入表數據到HDFS使用Sqoop導入工具,我們可以指定目標目錄。

      以下是指定目標目錄選項的Sqoop導入命令的語法。

    --target-dir <new or exist directory in HDFS>

    下面的命令是用來導入emp_add表數據到'/queryresult'目錄。

    bin/sqoop import \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --target-dir /queryresult \ --fields-terminated-by ‘\001’ \ --table emp --split-by id --m 1

      注意:如果報錯,說emp類找不到,則可以手動從sqoop生成的編譯目錄(/tmp/sqoop-root/compile)中,找到這個emp.class和emp.jar,拷貝到sqoop的lib目錄下:

      如果設置了 --m 1,則意味著只會啟動一個maptask執行數據導入

      如果不設置 --m 1,則默認為啟動4個map task執行數據導入,則需要指定一個列來作為劃分map task任務的依據

    ?

      下面的命令是用來驗證 /queryresult?目錄中 emp_add表導入的數據形式。

    $HADOOP_HOME/bin/hadoop fs -cat /queryresult/part-m-*

      它會用逗號(,)分隔emp_add表的數據和字段。

    1201, 288A, vgiri, jublee 1202, 108I, aoc, sec-bad 1203, 144Z, pgutta, hyd 1204, 78B, oldcity, sec-bad 1205, 720C, hitech, sec-bad

    導入關系表到HIVE

    bin/sqoop import --connect jdbc:mysql://hdp-node-01:3306/test --username root --password root --table emp --hive-import --split-by id --m 1

    導入表數據子集

      我們可以導入表的使用Sqoop導入工具,"where"子句的一個子集。它執行在各自的數據庫服務器相應的SQL查詢,并將結果存儲在HDFS的目標目錄。

      where子句的語法如下。

    --where <condition>

      下面的命令用來導入emp_add表數據的子集。子集查詢檢索員工ID和地址,居住城市為:Secunderabad

    bin/sqoop import \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --where "city ='sec-bad'" \ --target-dir /wherequery \ --table emp_add \--m 1

      按需導入

    bin/sqoop import \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --target-dir /wherequery2 \ --query 'select id,name,deg from emp WHERE id>1207 and $CONDITIONS' \ --split-by id \ --fields-terminated-by '\t' \ --m 2

      下面的命令用來驗證數據從emp_add表導入/wherequery目錄

    $HADOOP_HOME/bin/hadoop fs -cat /wherequery/part-m-*

      它用逗號(,)分隔 emp_add表數據和字段。

    1202, 108I, aoc, sec-bad 1204, 78B, oldcity, sec-bad 1205, 720C, hitech, sec-bad

    增量導入

      增量導入是僅導入新添加的表中的行的技術。

      sqoop支持兩種增量MySql導入到hive的模式,

      一種是append,即通過指定一個遞增的列,比如:

      --incremental append ?--check-column num_id --last-value 0

      另種是可以根據時間戳,比如:

      --incremental lastmodified --check-column created --last-value '2012-02-01 11:0:00'

      就是只導入created 比'2012-02-01 11:0:00'更大的數據。

    ?

      1/ append模式

      它需要添加‘incremental’, ‘check-column’, 和 ‘last-value’選項來執行增量導入。

      下面的語法用于Sqoop導入命令增量選項。

    --incremental <mode> --check-column <column name> --last value <last check column value>

      假設新添加的數據轉換成emp表如下:

      1206, satish p, grp des, 20000, GR

      下面的命令用于在EMP表執行增量導入。

    bin/sqoop import \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --table emp --m 1 \ --incremental append \ --check-column id \ --last-value 1208

      以下命令用于從emp表導入HDFS?emp/?目錄的數據驗證。

    $ $HADOOP_HOME/bin/hadoop fs -cat /user/hadoop/emp/part-m-* 1201, gopal, manager, 50000, TP 1202, manisha, preader, 50000, TP 1203, kalil, php dev, 30000, AC 1204, prasanth, php dev, 30000, AC 1205, kranthi, admin, 20000, TP 1206, satish p, grp des, 20000, GR

      下面的命令是從表emp 用來查看修改或新添加的行

    $HADOOP_HOME/bin/hadoop fs -cat /emp/part-m-*1 1206, satish p, grp des, 20000, GR

    2.5 Sqoop的數據導出

      1/ 將數據從HDFS把文件導出到RDBMS數據庫

      導出前,目標表必須存在于目標數據庫中。

  • 默認操作是從將文件中的數據使用INSERT語句插入到表中
  • 更新模式下,是生成UPDATE語句更新表數據
  •   語法

      以下是export命令語法。

    $ sqoop export (generic-args) (export-args)

      示例

      數據是在HDFS?中“EMP/”目錄的emp_data文件中。所述emp_data如下:

    1201, gopal, manager, 50000, TP 1202, manisha, preader, 50000, TP 1203, kalil, php dev, 30000, AC 1204, prasanth, php dev, 30000, AC 1205, kranthi, admin, 20000, TP 1206, satish p, grp des, 20000, GR

      1、首先需要手動創建mysql中的目標表

    $ mysql mysql> USE db; mysql> CREATE TABLE employee ( id INT NOT NULL PRIMARY KEY, name VARCHAR(20), deg VARCHAR(20),salary INT,dept VARCHAR(10));

      2、然后執行導出命令

    bin/sqoop export \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --table employee \ --export-dir /user/hadoop/emp/

      3、驗證表mysql命令行。

    mysql>select * from employee; 如果給定的數據存儲成功,那么可以找到數據在如下的employee表。 +------+--------------+-------------+-------------------+--------+ | Id | Name | Designation | Salary | Dept | +------+--------------+-------------+-------------------+--------+ | 1201 | gopal | manager | 50000 | TP | | 1202 | manisha | preader | 50000 | TP | | 1203 | kalil | php dev | 30000 | AC | | 1204 | prasanth | php dev | 30000 | AC | | 1205 | kranthi | admin | 20000 | TP | | 1206 | satish p | grp des | 20000 | GR | +------+--------------+-------------+-------------------+--------+

    ?

    轉載于:https://www.cnblogs.com/zhangchao162/p/9896805.html

    創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

    總結

    以上是生活随笔為你收集整理的Hadoop辅助工具——Flume、Sqoop的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。