日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

Flume日志收集系统架构详解--转

發布時間:2025/4/5 windows 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume日志收集系统架构详解--转 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
?2017-09-06?朱潔?大數據和云計算技術

任何一個生產系統在運行過程中都會產生大量的日志,日志往往隱藏了很多有價值的信息。在沒有分析方法之前,這些日志存儲一段時間后就會被清理。隨著技術的發展和分析能力的提高,日志的價值被重新重視起來。在分析這些日志之前,需要將分散在各個生產系統中的日志收集起來。本節介紹廣泛應用的Flume日志收集系統。

一、概述 ? ?

Flume是Cloudera公司的一款高性能、高可用的分布式日志收集系統,現在已經是Apache的頂級項目。同Flume相似的日志收集系統還有Facebook Scribe、Apache Chuwka。

二、Flume發展歷程 ? ?

Flume?初始的發行版本目前被統稱為Flume OG(Original Generation),屬于Cloudera。但隨著?Flume?功能的擴展,Flume OG?代碼工程臃腫、核心組件設計不合理、核心配置不標準等缺點逐漸暴露出來,尤其是在?Flume OG?的最后一個發行版本0.94.0中,日志傳輸不穩定現象尤為嚴重。為了解決這些問題,2011?年?10?月?22日,Cloudera?完成了?Flume-728,對Flume進行了里程碑式的改動:重構核心組件、核心配置及代碼架構,重構后的版本統稱為?Flume NG(Next Generation);改動的另一原因是將?Flume?納入Apache?旗下,Cloudera Flume?更名為?Apache Flume。

三、Flume架構分析 ? ?
1. 系統特點① 可靠性

當節點出現故障時,日志能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次為:end-to-end(收到數據后,Agent首先將事件寫到磁盤上,當數據傳送成功后,再刪除;如果數據發送失敗,則重新發送)、Store on Failure(這也是Scribe采用的策略,當數據接收方崩潰時,將數據寫到本地,待恢復后繼續發送)、Best Effort(數據發送到接收方后,不會進行確認)。

② 可擴展性

Flume采用了三層架構,分別為Agent、Collector和Storage,每一層均可以水平擴展。其中,所有的Agent和Collector均由Master統一管理,這使得系統容易被監控和維護。并且Master允許有多個(使用ZooKeeper進行管理和負載均衡),這樣就避免了單點故障問題。

③ 可管理性

當有多個Master時,Flume利用ZooKeeper和Gossip保證動態配置數據的一致性。用戶可以在Master上查看各個數據源或者數據流執行情況,并且可以對各個數據源進行配置和動態加載。Flume提供了Web和Shell Script Command兩種形式對數據流進行管理。

④ 功能可擴展性

用戶可以根據需要添加自己的Agent、Collector或Storage。此外,Flume自帶了很多組件,包括各種Agent(如File、Syslog等)、Collector和Storage(如File、HDFS等)。

2. 系統架構

如圖所示是Flume OG的架構。

Flume NG的架構如下圖所示。Flume采用了分層架構,分別為Agent、Collector和Storage。其中,Agent和Collector均由Source和Sink兩部分組成,Source是數據來源,Sink是數據去向。

Flume使用了兩個組件:Master和Node。Node根據在Master Shell或Web中的動態配置,決定其是作為Agent還是作為Collector。

① Agent

Agent的作用是將數據源的數據發送給Collector。Flume自帶了很多直接可用的數據源(Source),如下。

text("filename"):將文件filename作為數據源,按行發送。

tail("filename"):探測filename新產生的數據,按行發送。

fsyslogTcp(5140):監聽TCP的5140端口,并將接收到的數據發送。

tailDir("dirname"[,fileregex=".*"[,startFromEnd=false[,recurseDepth=0]]]):監聽目錄中的文件末尾,使用正則表達式選定需要監聽的文件(不包含目錄),recurseDepth為遞歸監聽其下子目錄的深度,同時提供了很多Sink,如console[("format")],直接將數據顯示在console上。

text("txtfile"):將數據寫到文件txtfile中。

dfs("dfsfile"):將數據寫到HDFS上的dfsfile文件中。

syslogTcp("host",port):將數據通過TCP傳遞給host節點。

agentSink[("machine"[,port])]:等價于agentE2ESink,如果省略machine參數,則默認使用flume.collector.event.host與flume.collector.event.port作為默認collectro。

agentDFOSink[("machine"[,port])]:本地熱備Agent。Agent發現Collector節點故障后,不斷檢查Collector的存活狀態以便重新發送Event,在此期間產生的數據將緩存到本地磁盤中。

agentBESink[("machine"[,port])]:不負責的Agent。如果Collector出現故障,將不作任何處理,它發送的數據也將被直接丟棄。

agentE2EChain:指定多個Collector,以提高可用性。當向主Collector發送Event失效后,將轉向第二個Collector發送;當所有的Collector都失效后,它還會再發送一遍。

② Collector

Collector的作用是將多個Agent的數據匯總后,加載到Storage中。它的Source和Sink與Agent類似。

Source如下。

collectorSource[(port)]:Collector Source,監聽端口匯聚數據。

autoCollectorSource:通過Master協調物理節點自動匯聚數據。

logicalSource:邏輯Source,由Master分配端口并監聽rpcSink。

Sink如下。

collectorSink("fsdir","fsfileprefix",rollmillis):collectorSink,數據通過Collector匯聚之后發送到HDFS,fsdir是HDFS目錄,fsfileprefix為文件前綴碼。

customdfs("hdfspath"[,"format"]):自定義格式DFS。

③ Storage

Storage是存儲系統,可以是一個普通File,也可以是HDFS、Hive、HBase、分布式存儲等。

④ Master

Master負責管理、協調Agent和Collector的配置信息,是Flume集群的控制器。

在Flume中,最重要的抽象是Data Flow(數據流)。Data Flow描述了數據從產生、傳輸、處理到最終寫入目標的一條路徑,如下圖所示。

?

對于Agent數據流配置,就是從哪里得到數據,就把數據發送到哪個Collector。

對于Collector,就是接收Agent發送過來的數據,然后把數據發送到指定的目標機器上。

注:Flume框架對Hadoop和ZooKeeper的依賴只存在于JAR包上,并不要求Flume啟動時必須將Hadoop和ZooKeeper服務同時啟動。

3. 組件介紹

本文所說的Flume基于1.4.0版本。

① Client

路徑:apache-flume-1.4.0-src\flume-ng-clients。

操作最初的數據,把數據發送給Agent。在Client與Agent之間建立數據溝通的方式有兩種。

第一種方式:創建一個iclient繼承Flume已經存在的Source,如AvroSource或者SyslogTcpSource,但是必須保證所傳輸的數據Source可以理解。

第二種方式:寫一個Flume Source通過IPC或者RPC協議直接與已經存在的應用通信,需要轉換成Flume可以識別的事件。

Client SDK:是一個基于RPC協議的SDK庫,可以通過RPC協議使應用與Flume直接建立連接。可以直接調用SDK的api函數而不用關注底層數據是如何交互的,提供append和appendBatch兩個接口,具體的可以看看代碼apache-flume-1.4.0-src\flume-ng-sdk\src\main\java\org\apache\ flume\api\RpcClient.java。

② NettyAvroRpcClient

Avro是默認的RPC協議。NettyAvroRpcClient和ThriftRpcClient分別對RpcClient接口進行了實現,具體實現可以看下代碼apache-flume-1.4.0-src\flume-ng-sdk\src\main\java\org\apache\flume\api\ NettyAvroRpcClient.java和apache-flume-1.4.0-src\flume-ng-sdk\src\main\java\org\apache\flume\api\ ThriftRpcClient.java。

下面給出一個使用SDK與Flume建立連接的樣例如下,實際使用中可以參考實現:

import org.apache.flume.Event;

import org.apache.flume.EventDeliveryException;

import org.apache.flume.api.RpcClient;

import org.apache.flume.api.RpcClientFactory;

import org.apache.flume.event.EventBuilder;

import java.nio.charset.Charset;

public class MyApp {

public static void main(String[] args) {

MyRpcClientFacade client = new MyRpcClientFacade();

// Initialize client with the remote Flume agent's host and port

client.init("host.example.org",41414);

// Send 10 events to the remote Flume agent. That agent should be

// configured to listen with an AvroSource.

String sampleData = "Hello Flume!";

for (int i = 0; i < 10; i++) {

client.sendDataToFlume(sampleData);

}

client.cleanUp();

}

}

class MyRpcClientFacade {

private RpcClient client;

private String hostname;

private int port;

public void init(String hostname,int port) {

// Setup the RPC connection

this.hostname = hostname;

this.port = port;

this.client = RpcClientFactory.getDefaultInstance(hostname,port);

// Use the following method to create a thrift client (instead of the above line):

// this.client = RpcClientFactory.getThriftInstance(hostname,port);

}

public void sendDataToFlume(String data) {

// Create a Flume Event object that encapsulates the sample data

Event event = EventBuilder.withBody(data,Charset.forName("UTF-8"));

// Send the event

try {

client.append(event);

} catch (EventDeliveryException e) {

// clean up and recreate the client

client.close();

client = null;

client = RpcClientFactory.getDefaultInstance(hostname,port);

// Use the following method to create a thrift client (instead of the above line):

// this.client = RpcClientFactory.getThriftInstance(hostname,port);

}

}

public void cleanUp() {

// Close the RPC connection

client.close();

}

}

為了能夠監聽到關聯端口,需要在配置文件中增加端口和Host配置信息(配置文件apache-flume- 1.4.0-src\conf\flume-conf.properties.template)。

client.type = default (for avro) or thrift (for thrift)

hosts = h1 ? ? ? ? ? ? ? ? ? ? ? ? # default client accepts only 1 host

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # (additional hosts will be ignored)

hosts.h1 = host1.example.org:41414 # host and port must both be specified

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # (neither has a default)

batch-size = 100 ? ? ? ? ? ? ? ? # Must be >=1 (default:100)

connect-timeout = 20000 ? ? ? ? # Must be >=1000 (default:20000)

request-timeout = 20000 ? ? ? ? # Must be >=1000 (default:20000)

除了以上兩類實現外,FailoverRpcClient.java和LoadBalancingRpcClient.java也分別對RpcClient接口進行了實現。

③ FailoverRpcClient

該接口主要實現了主備切換,采用<host>:<port>的形式,一旦當前連接失敗,就會自動尋找下一個連接。

④ LoadBalancingRpcClient

該接口在有多個Host的時候起到負載均衡的作用。

?

?

?

⑤ Embeded Agent

?

?

Flume允許用戶在自己的Application里內嵌一個Agent。這個內嵌的Agent是一個輕量級的Agent,不支持所有的Source Sink Channel。

?

?

?

⑥ Transaction

?

?

Flume的三個主要組件——Source、Sink、Channel必須使用Transaction來進行消息收發。在Channel的類中會實現Transaction的接口,不管是Source還是Sink,只要連接上Channel,就必須先獲取Transaction對象,如下圖所示。

?

具體使用實例如下,可以供生成環境中參考:

Channel ch = new MemoryChannel();

Transaction txn = ch.getTransaction();

txn.begin();

try {

Event eventToStage = EventBuilder.withBody("Hello Flume!",Charset.forName ("UTF-8"));

ch.put(eventToStage);

txn.commit();

} catch (Throwable t) {

txn.rollback();

if (t instanceof Error) {

throw (Error)t;

}

} finally {

txn.close();

}

?

?

?

⑦ Sink

?

?

Sink的一個重要作用就是從Channel里獲取事件,然后把事件發送給下一個Agent,或者把事件存儲到另外的倉庫內。一個Sink會關聯一個Channel,這是配置在Flume的配置文件里的。SinkRunner.start()函數被調用后,會創建一個線程,該線程負責管理Sink的整個生命周期。Sink需要實現LifecycleAware接口的start()和stop()方法。

Sink.start():初始化Sink,設置Sink的狀態,可以進行事件收發。

Sink.stop():進行必要的cleanup動作。

Sink.process():負責具體的事件操作。

Sink使用參考代碼實例如下:

public class MySink extends AbstractSink implements Configurable {

private String myProp;

@Override

public void configure(Context context) {

String myProp = context.getString("myProp","defaultValue");

// Process the myProp value (e.g. validation)

// Store myProp for later retrieval by process() method

this.myProp = myProp;

}

@Override

public void start() {

// Initialize the connection to the external repository (e.g. HDFS) that

// this Sink will forward Events to ..

}

@Override

public void stop () {

// Disconnect from the external respository and do any

// additional cleanup (e.g. releasing resources or nulling-out

// field values) ..

}

@Override

public Status process() throws EventDeliveryException {

Status status = null;

// Start transaction

Channel ch = getChannel();

Transaction txn = ch.getTransaction();

txn.begin();

try {

// This try clause includes whatever Channel operations you want to do

Event event = ch.take();

// Send the Event to the external repository.

// storeSomeData(e);

txn.commit();

status = Status.READY;

} catch (Throwable t) {

txn.rollback();

// Log exception,handle individual exceptions as needed

status = Status.BACKOFF;

// re-throw all Errors

if (t instanceof Error) {

throw (Error)t;

}

} finally {

txn.close();

}

return status;

}

}

?

?

?

⑧ Source

?

?

Source的作用是從Client端接收事件,然后把事件存儲到Channel中。PollableSourceRunner.start()用于創建一個線程,管理PollableSource的生命周期。同樣也需要實現start()和stop()兩種方法。需要注意的是,還有一類Source,被稱為EventDrivenSource。區別是EventDrivenSource有自己的回調函數用于捕捉事件,并不是每個線程都會驅動一個EventDrivenSource。

以下是一個PollableSource的例子:

public class MySource extends AbstractSource implements Configurable, PollableSource {

private String myProp;

@Override

public void configure(Context context) {

String myProp = context.getString("myProp","defaultValue");

// Process the myProp value (e.g. validation,convert to another type,...)

// Store myProp for later retrieval by process() method

this.myProp = myProp;

}

@Override

public void start() {

// Initialize the connection to the external client

}

@Override

public void stop () {

// Disconnect from external client and do any additional cleanup

// (e.g. releasing resources or nulling-out field values) ..

}

@Override

public Status process() throws EventDeliveryException {

Status status = null;

// Start transaction

Channel ch = getChannel();

Transaction txn = ch.getTransaction();

txn.begin();

try {

// This try clause includes whatever Channel operations you want to do

// Receive new data

Event e = getSomeData();

// Store the Event into this Source's associated Channel(s)

getChannelProcessor().processEvent(e)

txn.commit();

status = Status.READY;

} catch (Throwable t) {

txn.rollback();

// Log exception,handle individual exceptions as needed

status = Status.BACKOFF;

// re-throw all Errors

if (t instanceof Error) {

throw (Error)t;

}

} finally {

txn.close();

}

return status;

}

}

?

?

4. Flume使用模式

?

?

?

Flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日志數據(字節數組形式)并且攜帶有頭信息,這些Event由Agent外部的Source,比如上圖中的Web Server生成。當Source捕獲事件后會進行特定的格式化,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它將保存事件直到Sink處理完該事件。Sink負責持久化日志或者把事件推向另一個Source。

很直白的設計,其中值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不同類型的Source,Channel和Sink可以自由組合。多Agent串聯,如下圖所示。

?

或者多Agent合并,如下圖所示。

如果你以為Flume就這些能耐那就大錯特錯了。Flume支持用戶建立多級流,也就是說,多個agent可以協同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes。如下圖所示。

?

?

?

參考文獻?? ?

?

?

  • 參考http://www.aboutyun.com/thread-7848-1-1.html官網用戶手冊,http://flume.apache.org/FlumeUserGuide.html。?

  • Github地址https://github.com/apache/flume。?

  • 參考http://flume.apache.org/FlumeUserGuide.html。?

轉載于:https://www.cnblogs.com/davidwang456/p/7483894.html

總結

以上是生活随笔為你收集整理的Flume日志收集系统架构详解--转的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。