日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flume的Avro Sink和Avro Source研究之一: Avro Source

發(fā)布時(shí)間:2023/12/19 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume的Avro Sink和Avro Source研究之一: Avro Source 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

問題 : Avro Source提供了怎么樣RPC服務(wù),是怎么提供的?

問題 1.1 Flume Source是如何啟動(dòng)一個(gè)Netty Server來提供RPC服務(wù)。

由GitHub上avro-rpc-quickstart知道可以通過下面這種方式啟動(dòng)一個(gè)NettyServer,來提供特定的RPC。那么Flume Source 是通過這種方法來提供的RPC服務(wù)嗎?

server = new NettyServer(new SpecificResponder(Mail.class, new MailImpl()), new InetSocketAddress(65111));

?

?AvroSource中創(chuàng)建NettyServer的源碼為:

  

Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory();ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),socketChannelFactory, pipelineFactory, null);

  看來AvroSource也是直接用Avro提供的NettyServer類來建立了一個(gè)NettyServe,不過它使用了另一個(gè)構(gòu)造函數(shù),指定了ChannelFactory和ChannelPipelineFactory.

?  那么AvroSource使用的是怎么樣的一個(gè)ChannelFactory呢?

  initSocketChannelFactory()方法的實(shí)現(xiàn)為:

private NioServerSocketChannelFactory initSocketChannelFactory() {NioServerSocketChannelFactory socketChannelFactory;if (maxThreads <= 0) {socketChannelFactory = new NioServerSocketChannelFactory(Executors .newCachedThreadPool(), Executors.newCachedThreadPool());} else {socketChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newFixedThreadPool(maxThreads));}return socketChannelFactory;}

  看來之所以要指定ChannelFactory,是為了根據(jù)AvroSource的"threads”這個(gè)參數(shù),來決定可以使用worker thread的最大個(gè)數(shù)。這個(gè)數(shù)字決定了最多有多少個(gè)線程來處理RPC請求。

  參見NioServerChannelFactory的說明

  

A ServerSocketChannelFactory which creates a server-side NIO-based ServerSocketChannel. It utilizes the non-blocking I/O mode which was introduced with NIO to serve many number of concurrent connections efficiently.How threads workThere are two types of threads in a NioServerSocketChannelFactory; one is boss thread and the other is worker thread.Boss threadsEach bound ServerSocketChannel has its own boss thread. For example, if you opened two server ports such as 80 and 443, you will have two boss threads. A boss thread accepts incoming connections until the port is unbound. Once a connection is accepted successfully, the boss thread passes the accepted Channel to one of the worker threads that the NioServerSocketChannelFactory manages.Worker threadsOne NioServerSocketChannelFactory can have one or more worker threads. A worker thread performs non-blocking read and write for one or more Channels in a non-blocking mode.

  ChannelPipelineFactory是干嘛的呢?為什么也要特化一個(gè)?

  ChannelPipleline類的說明為:

  A list of?ChannelHandlers which handles or intercepts?ChannelEvents of a?Channel.?ChannelPipeline?implements an advanced form of the?Intercepting Filter?pattern to give a user full control over how an event is handled and how the?ChannelHandlers in the pipeline interact with each other.

?

  看來這東西提供了一種更高級的攔截器組合。那就來看看AvroSource是用了怎么樣的ChannelPiplelineFactory

  

private ChannelPipelineFactory initChannelPipelineFactory() {ChannelPipelineFactory pipelineFactory;boolean enableCompression = compressionType.equalsIgnoreCase("deflate");if (enableCompression || enableSsl) {pipelineFactory = new SSLCompressionChannelPipelineFactory(enableCompression, enableSsl, keystore,keystorePassword, keystoreType);} else {pipelineFactory = new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() throws Exception {return Channels.pipeline();}};}return pipelineFactory;}

  看來如果開啟了壓縮或者使用了ssl,就使用SSLCompressionChannelPiplelineFactory,這類是AvroSource一個(gè)私有的靜態(tài)內(nèi)部類。否則就使用Channels.pipleline()新建一個(gè),這個(gè)pipleline貌似啥都不做?

  

問題 1.2這樣Server是起來了,可是Server提供了什么樣的RPC服務(wù)呢?

  關(guān)鍵在這一句。

  

Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);

  查下Avro的API,得知道SpecificResponder的兩個(gè)參數(shù)是protocol和protocol的實(shí)現(xiàn)。看起來AvroSource這個(gè)類實(shí)現(xiàn)了AvroSourceProtocol。Yes, AvroSource的聲明為

  

public class AvroSource extends AbstractSource implements EventDrivenSource,Configurable, AvroSourceProtocol

  那就看看AvroSourceProtocol是怎么樣定義的吧。它定義在flume-ng-sdk工程的src/main/avro目錄下,由flume.avdl定義。avdl是使用Avro IDL定義的協(xié)議。放在那個(gè)特定的目錄下,是avro-maven-plugin的約定。

  這個(gè)avdl是這樣的

  

@namespace("org.apache.flume.source.avro")

protocol AvroSourceProtocol {

enum Status {
  OK, FAILED, UNKNOWN
}

record AvroFlumeEvent {
  map<string> headers;
  bytes body;
}

Status append( AvroFlumeEvent event );

Status appendBatch( array<AvroFlumeEvent> events );

}

  

  它定義了一個(gè)枚舉,用作append和appendBatch的返回值。表示Source端對傳輸來的消息處理的結(jié)果,有OK FAILED UNKNOWN三種狀態(tài)。

  定義了 AvroFlumeEvent這樣一個(gè)record類型,符合Flume對Event的定義,header是一系列K-V對,即一個(gè)Map, body是byte數(shù)組。

  定義了兩個(gè)方法,append單條AvroFlumeEvent,以及append一批AvroFlumeEvent.

  由此avdl,Avro生成了三個(gè)java文件,包括:一個(gè)枚舉Status,一個(gè)類AvroFlumeEvent,一個(gè)接口AvroSourceProtocol。其中AvroSource類實(shí)現(xiàn)了AvroSourceProtocol接口,對外提供了append和appendBatch這兩個(gè)遠(yuǎn)程方法調(diào)用。

  append方法實(shí)現(xiàn)為:

  

@Overridepublic Status append(AvroFlumeEvent avroEvent) {logger.debug("Avro source {}: Received avro event: {}", getName(),avroEvent);sourceCounter.incrementAppendReceivedCount();sourceCounter.incrementEventReceivedCount();Event event = EventBuilder.withBody(avroEvent.getBody().array(),toStringMap(avroEvent.getHeaders()));try {getChannelProcessor().processEvent(event);} catch (ChannelException ex) {logger.warn("Avro source " + getName() + ": Unable to process event. " +"Exception follows.", ex);return Status.FAILED;}sourceCounter.incrementAppendAcceptedCount();sourceCounter.incrementEventAcceptedCount();return Status.OK;}

  這個(gè)方法就是用獲取的AvroFlumeEvent對象,經(jīng)過轉(zhuǎn)換構(gòu)建一個(gè)Event對象。這個(gè)轉(zhuǎn)換只是將不對等的數(shù)據(jù)類型進(jìn)行了轉(zhuǎn)換,arvoEvent.getBody()返回的是ByteBuffer,而avroEvent.getHeaders()返回的是Map<CharSequence,CharSequence>。

構(gòu)建完Event后,把這個(gè)消息傳遞給這個(gè)Source對應(yīng)的ChannelProcessor來處理。

  appendBatch方法和append方法的實(shí)現(xiàn)很相似。

?

?

轉(zhuǎn)載于:https://www.cnblogs.com/devos/p/3617764.html

總結(jié)

以上是生活随笔為你收集整理的Flume的Avro Sink和Avro Source研究之一: Avro Source的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。