日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flume-NG源码阅读之AvroSink

發布時間:2023/12/18 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume-NG源码阅读之AvroSink 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  org.apache.flume.sink.AvroSink是用來通過網絡來傳輸數據的,可以將event發送到RPC服務器(比如AvroSource),使用AvroSink和AvroSource可以組成分層結構。它繼承自AbstractRpcSink ?extends AbstractSink?implements Configurable這跟其他的sink一樣都得extends AbstractSink?implements Configurable,所以重點也在confgure、start、process、stop這四個方法,實現了initializeRpcClient(Properties props)方法。

  一、configure(Context context)方法,先獲取配置文件中的主機hostname和端口port;設置clientProps的屬性hosts=h1,hosts.h1=hostname:port;然后將配置信息中的所有信息放入clientProps中;獲取cxnResetInterval表示重復建立連接的時間間隔,默認是0就是不重復建立連接。

  二、start()方法是調用createConnection()建立連接,如果出現異常就調用destroyConnection()掐斷連接,避免資源泄漏。createConnection()方法主要是初始化client = initializeRpcClient(clientProps)以及創建一個線程,并執行在給定延遲cxnResetInterval后執行一次銷毀鏈接destroyConnection(),由于默認cxnResetInterval=0,所以是不會執行這個線程的。這點不是很明白,為什么要銷毀???initializeRpcClient(clientProps)方法會根據配置文件中的信息進行構造相應的RpcClient:首先會獲取"client.type"參數指定的類型可用的有四種(NettyAvroRpcClient(如果沒有"client.type"則使用這個作為默認Client)、FailoverRpcClient、LoadBalancingRpcClient、ThriftRpcClient),實例化之后需要對其在進行必要的配置執行client.configure(properties)進行配置:

  (1)NettyAvroRpcClient.configure(Properties properties)方法首先會獲取鎖,檢查connState連接狀態要保證是沒有配置過的;其次獲取"batch-size"設置batchSize,如果配置的小于1則使用默認值100;獲取“hosts”,如果配置了多個hosts則只使用第一個;獲取"hosts."前綴,如果有多個則使用第一個,再解析出hostname和port,構建一個InetSocketAddress的對象address;獲取連接超時時間"connect-timeout",設置connectTimeout,如果配置的小于1000則使用默認值20000,單位是ms;獲取相應時間"request-timeout",設置requestTimeout,如果配置的小于1000,則使用默認值20000,單位ms;獲取壓縮類型"compression-type",如果有配置壓縮還需要獲取壓縮的等級compressionLevel;最后調用connect()鏈接RPC服務器。

  實際的鏈接在connect(long timeout, TimeUnit tu)方法中,先構造一個線程池callTimeoutPool;然后根據是否有壓縮構造相應的工廠類CompressionChannelFactory(有壓縮配置)或者NioClientSocketChannelFactory(無壓縮配置);構造一個

NettyTransceiver(this.address,socketChannelFactory,tu.toMillis(timeout))收發器對象transceiver;根據transceiver返回一個avroClient;最后設置鏈接狀態為READY。

  (2)FailoverRpcClient.configure(Properties properties)方法會調用configureHosts(Properties properties)方法,這個方法會獲取配置文件中的host列表hosts;獲取最大嘗試次數"max-attempts",設置maxTries,默認是hosts的大小;獲取批量大小

"batch-size",設置batchSize,如果配置的小于1則使用默認大小100;將此client置為活動的isActive=true。可以看出這個client可以使用多個host。

  (3)LoadBalancingRpcClient.configure(Properties properties)會獲取配置文件中的host列表hosts,且不允許少于兩個,否則爆異常;獲取主機選擇器"host-selector",有兩種內置的選擇器:LoadBalancingRpcClient.RoundRobinHostSelector和LoadBalancingRpcClient.RandomOrderHostSelector,默認是ROUND_ROBIN(即RoundRobinHostSelector)輪詢的方式(也可以自定義,要實現LoadBalancingRpcClient.HostSelector接口);獲取"backoff",設置backoff(是否使用推遲算法,就是sink.process出問題后對這個sink設置懲罰時間,在此期間不再認為其可活動)的boolean值(默認false就是不啟用);獲取最大推遲時間"maxBackoff",設置maxBackoff;然后根據選擇器是ROUND_ROBIN還是RANDOM選擇對應的類并實例化selector,最后設置主機selector.setHosts(hosts)。

  這兩個內置選擇器:RoundRobinHostSelector實際使用的是RoundRobinOrderSelector;RandomOrderHostSelector實際使用的是RandomOrderSelector,這兩個都在Flume-NG源碼閱讀之SinkGroups和SinkRunner?這篇文章中有介紹,這里不再說明。

  (4)ThriftRpcClient.configure(Properties properties)會獲取狀態鎖stateLock.lock();獲取配置文件中的host列表中的第一個,只需要一個;獲取批量大小"batch-size",設置batchSize,如果配置的小于1則使用默認大小100;獲取主機名hostname和端口port;獲取響應時間requestTimeout,如果小于1000設置為默認的20000ms;獲取連接池大小"maxConnections",設置connectionPoolSize,如果大小小于1則設置為默認的值5;創建連接池管理對象connectionManager= new ConnectionPoolManager(connectionPoolSize);設置連接狀態為READY,connState = State.READY;最后狀態鎖解鎖stateLock.unlock()。

  這四個Client都是extends AbstractRpcClient?implements RpcClient。

  三、process()方法,代碼如下:

1 public Status process() throws EventDeliveryException { 2 Status status = Status.READY; 3 Channel channel = getChannel(); //獲得channel 4 Transaction transaction = channel.getTransaction(); //創建事務 5 6 try { 7 transaction.begin(); //事務開始 8 9 verifyConnection(); //確保存在鏈接且處于活動狀態,如果鏈接處于非活動狀態銷毀并重建鏈接 10 11 List<Event> batch = Lists.newLinkedList(); 12 13 for (int i = 0; i < client.getBatchSize(); i++) { //保證這批次的event數量不可能超過客戶端批量處理的最大處理數量 14 Event event = channel.take(); 15 16 if (event == null) { //表示channel中沒有數據了 17 break; 18 } 19 20 batch.add(event); //加入event列表 21 } 22 23 int size = batch.size(); //獲取這批次取得的event的數量 24 int batchSize = client.getBatchSize(); //獲取客戶端可以批量處理的大小 25 26 if (size == 0) { 27 sinkCounter.incrementBatchEmptyCount(); 28 status = Status.BACKOFF; 29 } else { 30 if (size < batchSize) { 31 sinkCounter.incrementBatchUnderflowCount(); 32 } else { 33 sinkCounter.incrementBatchCompleteCount(); 34 } 35 sinkCounter.addToEventDrainAttemptCount(size); 36 client.appendBatch(batch); //批量處理event 37 } 38 39 transaction.commit(); //事務提交 40 sinkCounter.addToEventDrainSuccessCount(size); 41 42 } catch (Throwable t) { 43 transaction.rollback(); //事務回滾 44 if (t instanceof Error) { 45 throw (Error) t; 46 } else if (t instanceof ChannelException) { 47 logger.error("Rpc Sink " + getName() + ": Unable to get event from" + 48 " channel " + channel.getName() + ". Exception follows.", t); 49 status = Status.BACKOFF; 50 } else { 51 destroyConnection(); //銷毀鏈接 52 throw new EventDeliveryException("Failed to send events", t); 53 } 54 } finally { 55 transaction.close(); //事務關閉 56 } 57 58 return status; 59 }

  即使本批次event的數量達不到client.getBatchSize()(channel中沒數據了)也會立即發送到RPC服務器。verifyConnection()方法是確保存在鏈接且處于活動狀態,如果鏈接處于非活動狀態銷毀并重建鏈接。如果本批次沒有event,則不會想RPC發送任何數據。client.appendBatch(batch)方法是批量發送event。

  (1)NettyAvroRpcClient.appendBatch(batch)方法會調用appendBatch(events, requestTimeout, TimeUnit.MILLISECONDS)方法,該方法會首先確認鏈接處于READY狀態,否則報錯;然后將每個event重新封裝成AvroFlumeEvent,放入avroEvents列表中;然后構造一個CallFuture和avroEvents一同封裝成一個Callable放入線程池?handshake =?callTimeoutPool.submit(callable)中去執行,其call方法內容是avroClient.appendBatch(avroEvents, callFuture)就是在此批量提交到RPC服務器;然后handshake.get(connectTimeout, TimeUnit.MILLISECONDS)在規定時間等待執行的返回結果以及等待append的完成waitForStatusOK(callFuture, timeout, tu),詳細的可看這里Flume的Avro Sink和Avro Source研究之二 : Avro Sink?,有對于這兩個future更深入的分析。一個批次傳輸的event的數量是min(batchSize,events.size())

  (2)FailoverRpcClient.appendBatch(batch)方法會做最多maxTries次嘗試直到獲取到可以正確發送events的Client,通過localClient=getClient()--》getNextClient()來獲取client,這個方法每次會獲取hosts中的下一個HostInfo,并使用NettyAvroRpcClient來作為RPC Client,這就又回到了(1)中,這個方法還有一個要注意的就是會先從當前的lastCheckedhost+1位置向后找可以使用的Client,如果不行會再從開始到到lastCheckedhost再找,再找不到就報錯。使用localClient.appendBatch(events)來處理events,可參考(1)。

  (3)LoadBalancingRpcClient.appendBatch(batch)方法,首先會獲取可以發送到的RPC服務器的迭代器Iterator<HostInfo>?it = selector.createHostIterator();然后取一個HostInfo,RpcClient client = getClient(host)這個Client和(2)一樣都是NettyAvroRpcClient,但是getClient方法會設置一個保存名字和client映射的clientMap;client.appendBatch(events)執行之后就會跳出循環,下一次appendBatch會選擇下一個client執行。

  (4)ThriftRpcClient.appendBatch(batch)方法,從connectionManager.checkout()獲取一個client,ConnectionPoolManager類主要維護倆對象availableClients用來存放可用的client(是一個ClientWrapper,維護一個ThriftSourceProtocol.Client client 是用來批量處理event的)、checkedOutClients用來存儲從availableClients中拿出的Client表示正在使用的Client;ConnectionPoolManager.checkout()用于從availableClients中remove出client并放入checkedOutClients中,返回這個client;ConnectionPoolManager.checkIn(ClientWrapper client)方法用于將指定的Client從checkedOutClient中remove出并放入availableClients中;ConnectionPoolManager.destroy(ClientWrapper client)用于將checkedOutClients中的指定Client ? remove并close。appendBatch方法中獲得client后,會每次封裝min(batchSize,events.size())個event,把他們封裝成ThriftFlumeEvent加入thriftFlumeEvents列表,然后如果thriftFlumeEvents>0則執行doAppendBatch(client, thriftFlumeEvents).get(requestTimeout,TimeUnit.MILLISECONDS)阻塞等待傳輸完畢。doAppendBatch方法會構建一個Callable其call方法執行client.client.appendBatch(e),將這個Callable放入線程池callTimeoutPool中執行并返回執行結果Future。

  以上四種RpcClient的append(Event event)方法也比較容易理解,不再講述。

  四、stop()方法主要是銷毀鏈接,關閉cxnResetExecutor。

  

  其實flume支持avro和thrift兩種(目前)傳輸,上面的(2)和(3)只不過是對(1)的上層業務做了一次封裝而已,本質上還是一樣的都是avro(基于netty)。同時記住avrosink是支持壓縮的。

  在此,由于博主對avro、netty、thrift并未深入研究過,所以只能從flume層面講解avrosink,對于某些人來說,可能講的并不深入,相關內容請自行學習!!

轉載于:https://www.cnblogs.com/lxf20061900/p/3753630.html

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Flume-NG源码阅读之AvroSink的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。