udp怎么保证不丢包_在 Flink 算子中使用多线程如何保证不丢数据?
分析痛點(diǎn)
筆者線上有一個(gè) Flink 任務(wù)消費(fèi) Kafka 數(shù)據(jù),將數(shù)據(jù)轉(zhuǎn)換后,在 Flink 的 Sink 算子內(nèi)部調(diào)用第三方 api 將數(shù)據(jù)上報(bào)到第三方的數(shù)據(jù)分析平臺。這里使用批量同步 api,即:每 50 條數(shù)據(jù)請求一次第三方接口,可以通過批量 api 來提高請求效率。由于調(diào)用的外網(wǎng)接口,所以每次調(diào)用 api 比較耗時(shí)。假如批次大小為 50,且請求接口的平均響應(yīng)時(shí)間為 50ms,使用同步 api,因此第一次請求響應(yīng)以后才會發(fā)起第二次請求。請求示意圖如下所示:
平均下來,每 50 ms 向第三方服務(wù)器發(fā)送 50 條數(shù)據(jù),也就是每個(gè)并行度 1 秒鐘處理 1000 條數(shù)據(jù)。假設(shè)當(dāng)前業(yè)務(wù)數(shù)據(jù)量為每秒 10 萬條數(shù)據(jù),那么 Flink Sink 算子的并行度需要設(shè)置為 100 才能正常處理線上數(shù)據(jù)。從 Flink 資源分配來講,100 個(gè)并行度需要申請 100 顆 CPU,因此當(dāng)前 Flink 任務(wù)需要占用集群中 100 顆 CPU 以及不少的內(nèi)存資源。請問此時(shí) Flink Sink 算子的 CPU 或者內(nèi)存壓力大嗎?
上述請求示意圖可以看出 Flink 任務(wù)發(fā)出請求到響應(yīng)這 50ms 期間,Flink Sink 算子只是在 wait,并沒有實(shí)質(zhì)性的工作。因此,CPU 使用率肯定很低,當(dāng)前任務(wù)的瓶頸明顯在網(wǎng)絡(luò) IO。最后結(jié)論是 Flink 任務(wù)申請了 100 顆 CPU,導(dǎo)致 yarn 或其他資源調(diào)度框架沒有資源了,但是這 100 顆 CPU 的使用率并不高,這里能不能優(yōu)化通過提高 CPU 的使用率,從而少申請一些 CPU 呢?
同步批量請求優(yōu)化為異步請求
首先可以想到的是將同步請求改為異步請求,使得任務(wù)不會阻塞在網(wǎng)絡(luò)請求這一環(huán)節(jié),請求示意圖如下所示。
異步請求相比同步請求而言,優(yōu)化點(diǎn)在于每次發(fā)出請求時(shí),不需要等待請求響應(yīng)后再發(fā)送下一次請求,而是當(dāng)下一批次的 50 條數(shù)據(jù)準(zhǔn)備好之后,直接向第三方服務(wù)器發(fā)送請求。每次發(fā)送請求后,Flink Sink 算子的客戶端需要注冊監(jiān)聽器來等待響應(yīng),當(dāng)響應(yīng)失敗時(shí)需要做重試或者回滾策略。
通過異步請求的方式,可以優(yōu)化網(wǎng)絡(luò)瓶頸,假如 Flink Sink 算子的單個(gè)并行度平均 10ms 接收到 50 條數(shù)據(jù),那么使用異步 api 的方式平均 1 秒可以處理 5000 條數(shù)據(jù),整個(gè) Flink 任務(wù)的性能提高了 5 倍。對于每秒 10 萬數(shù)據(jù)量的業(yè)務(wù),這里僅需要申請 20 顆 CPU 資源即可。關(guān)于異步 api 的具體使用,可以根據(jù)場景具體設(shè)計(jì),這里不詳細(xì)討論。
多線程 Client 模式
對于一些不支持異步 api 的場景,可能并不能使用上述優(yōu)化方案,同樣,為了提高 CPU 使用率,可以在 Flink Sink 端使用多線程的方案。如下圖所示,可以在 Flink Sink 端開啟 5 個(gè)請求第三方服務(wù)器的 Client 線程:Client1、Client2、Client3、Client4、Client5。
這五個(gè)線程內(nèi)分別使用同步批量請求的 Client,單個(gè) Client 還是保持 50 條記錄為一個(gè)批次,即 50 條記錄請求一次第三方 api。請求第三方 api 耗時(shí)主要在于網(wǎng)絡(luò) IO(性能瓶頸在于網(wǎng)絡(luò)請求延遲),因此如果變成 5 個(gè) Client 線程,每個(gè) Client 的單次請求平均耗時(shí)還能保持在 50ms,除非網(wǎng)絡(luò)請求已經(jīng)達(dá)到了帶寬上限或整個(gè)任務(wù)又遇到其他瓶頸。所以,多線程模式下使用同步批量 api 也能將請求效率提升 5 倍。
說明:多線程的方案,不僅限于請求第三方接口,對于非 CPU 密集型的任務(wù)也可以使用該方案,在降低 CPU 數(shù)量的同時(shí),單個(gè) CPU 承擔(dān)多個(gè)線程的工作,從而提高 CPU 利用率。例如:請求 HBase 的任務(wù)或磁盤 IO 是瓶頸的任務(wù),可以降低任務(wù)的并行度,使得每個(gè)并行度內(nèi)處理多個(gè)線程。
Flink 算子內(nèi)多線程實(shí)現(xiàn)
Sink 算子的單個(gè)并行度內(nèi)現(xiàn)在有 5 個(gè) Client 用于消費(fèi)數(shù)據(jù),但 Sink 算子的數(shù)據(jù)都來自于上游算子。如下圖所示,一個(gè)簡單的實(shí)現(xiàn)方式是 Sink 算子接收到上游數(shù)據(jù)后通過輪循或隨機(jī)的策略將數(shù)據(jù)分發(fā)給 5 個(gè) Client 線程。
但是輪循或者隨機(jī)策略會存在問題,假如 5 個(gè) Client 中 Client3 線程消費(fèi)較慢,會導(dǎo)致給 Client3 分發(fā)數(shù)據(jù)時(shí)被阻塞,從而使得其他正常消費(fèi)的線程 Client1、2、4、5 也被分發(fā)不到數(shù)據(jù)。
為了解決上述問題,可以在 Sink 算子內(nèi)申請一個(gè)數(shù)據(jù)緩沖隊(duì)列,隊(duì)列有先進(jìn)先出(FIFO)的特性。Sink 算子接收到的數(shù)據(jù)直接插入到隊(duì)列尾部,五個(gè) Client 線程不斷地從隊(duì)首取數(shù)據(jù)并消費(fèi),即:Sink 算子先接收的數(shù)據(jù) Client 先消費(fèi),后接收的數(shù)據(jù) Client 后消費(fèi)。
- 若隊(duì)列一直是滿的,說明 Client 線程消費(fèi)較慢、Sink 算子上游生產(chǎn)數(shù)據(jù)較快。
- 若隊(duì)列一直為空,說明 Client 線程消費(fèi)較快、Sink 算子的上游生產(chǎn)數(shù)據(jù)較慢。
五個(gè)線程共用同一個(gè)隊(duì)列完美地解決了單個(gè)線程消費(fèi)慢的問題,當(dāng) Client3 線程阻塞時(shí),不影響其他線程從隊(duì)列中消費(fèi)數(shù)據(jù)。這里使用隊(duì)列還起到了削峰填谷的作用。
代碼實(shí)現(xiàn)
原理明白了,具體代碼如下所示,首先是消費(fèi)數(shù)據(jù)的 Client 線程代碼,代碼邏輯很簡單,一直從 bufferQueue 中 poll 數(shù)據(jù),取出數(shù)據(jù)后,執(zhí)行相應(yīng)的消費(fèi)邏輯即可,在本案例中消費(fèi)邏輯便是 Client 積攢批次并調(diào)用第三方 api。
public class MultiThreadConsumerClient implements Runnable {private LinkedBlockingQueue<String> bufferQueue;public MultiThreadConsumerClient(LinkedBlockingQueue<String> bufferQueue) {this.bufferQueue = bufferQueue;}@Overridepublic void run() {String entity;while (true){// 從 bufferQueue 的隊(duì)首消費(fèi)數(shù)據(jù)entity = bufferQueue.poll();// 執(zhí)行 client 消費(fèi)數(shù)據(jù)的邏輯doSomething(entity);}}// client 消費(fèi)數(shù)據(jù)的邏輯private void doSomething(String entity) {// client 積攢批次并調(diào)用第三方 api} }Sink 算子代碼如下所示,在 open 方法中需要初始化線程池、數(shù)據(jù)緩沖隊(duì)列并創(chuàng)建開啟消費(fèi)者線程,在 invoke 方法中只需要往 bufferQueue 的隊(duì)尾添加數(shù)據(jù)即可。
public class MultiThreadConsumerSink extends RichSinkFunction<String> {// Client 線程的默認(rèn)數(shù)量private final int DEFAULT_CLIENT_THREAD_NUM = 5;// 數(shù)據(jù)緩沖隊(duì)列的默認(rèn)容量private final int DEFAULT_QUEUE_CAPACITY = 5000;private LinkedBlockingQueue<String> bufferQueue;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// new 一個(gè)容量為 DEFAULT_CLIENT_THREAD_NUM 的線程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());// new 一個(gè)容量為 DEFAULT_QUEUE_CAPACITY 的數(shù)據(jù)緩沖隊(duì)列this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);// 創(chuàng)建并開啟消費(fèi)者線程MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue);for (int i=0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {threadPoolExecutor.execute(consumerClient);}}@Overridepublic void invoke(String value, Context context) throws Exception {// 往 bufferQueue 的隊(duì)尾添加數(shù)據(jù)bufferQueue.put(value);} }代碼邏輯相對比較簡單,請問上述 Sink 能保證 Exactly Once 嗎?
答:不能保證 Exactly Once,Flink 要想端對端保證 Exactly Once,必須要求外部組件支持事務(wù),這里第三方接口明顯不支持事務(wù)。
那么上述 Sink 能保證 At Lease Once 嗎?言外之意,上述 Sink 會丟數(shù)據(jù)嗎?
答:會丟數(shù)據(jù)。因?yàn)樯鲜霭咐惺褂玫呐?api 來消費(fèi)數(shù)據(jù),假如批量 api 是每積攢 50 條數(shù)據(jù)請求一次第三方接口,當(dāng)做 Checkpoint 時(shí)可能只積攢了 30 條數(shù)據(jù),所以做 Checkpoint 時(shí)內(nèi)存中可能還有數(shù)據(jù)未發(fā)送到外部系統(tǒng)。而且數(shù)據(jù)緩沖隊(duì)列中可能還有緩存的數(shù)據(jù),因此上述 Sink 在做 Checkpoint 時(shí)會出現(xiàn) Checkpoint 之前的數(shù)據(jù)未完全消費(fèi)的情況。
例如,Flink 任務(wù)消費(fèi)的 Kafka 數(shù)據(jù),當(dāng)做 Checkpoint 時(shí),Flink 任務(wù)消費(fèi)到 offset 為 10000 的位置,但實(shí)際上 offset 10000 之前的一小部分?jǐn)?shù)據(jù)可能還在數(shù)據(jù)緩沖隊(duì)列中尚未完全消費(fèi),或者因?yàn)闆]積攢夠一定批次所以數(shù)據(jù)緩存在 client 中,并未請求到第三方。當(dāng)任務(wù)失敗后,Flink 任務(wù)從 Checkpoint 處恢復(fù),會從 offset 為 10000 的位置開始消費(fèi),此時(shí) offset 10000 之前的一小部分緩存在內(nèi)存緩沖隊(duì)列中的數(shù)據(jù)不會再被消費(fèi),于是就出現(xiàn)了丟數(shù)據(jù)情況。
處理丟數(shù)據(jù)情況
如何保證數(shù)據(jù)不丟失呢?很簡單,可以在 Checkpoint 時(shí)強(qiáng)制將數(shù)據(jù)緩沖區(qū)的數(shù)據(jù)全部消費(fèi)完,并對 client 執(zhí)行 flush 操作,保證 client 端不會緩存數(shù)據(jù)。
實(shí)現(xiàn)思路:Sink 算子可以實(shí)現(xiàn) CheckpointedFunction 接口,當(dāng)做 Checkpoint 時(shí),會調(diào)用 snapshotState 方法,方法內(nèi)可以觸發(fā) client 的 flush 操作。但 client 在 MultiThreadConsumerClient 對應(yīng)的五個(gè)線程中,需要考慮線程同步的問題,即:Sink 算子的 snapshotState 方法中做一個(gè)操作,要使得五個(gè) Client 線程感知到當(dāng)前正在執(zhí)行 Checkpoint,此時(shí)應(yīng)該把數(shù)據(jù)緩沖區(qū)的數(shù)據(jù)全部消費(fèi)完,并對 client 執(zhí)行過 flush 操作。
如何實(shí)現(xiàn)呢?需要借助 CyclicBarrier。CyclicBarrier 會讓所有線程都等待某個(gè)操作完成后才會繼續(xù)下一步行動。在這里可以使用 CyclicBarrier,讓 Checkpoint 等待所有的 client 將數(shù)據(jù)緩沖區(qū)的數(shù)據(jù)全部消費(fèi)完并對 client 執(zhí)行過 flush 操作,言外之意,offset 10000 之前的數(shù)據(jù)必須全部消費(fèi)完成才允許 Checkpoint 執(zhí)行完成。這樣就可以保證 Checkpoint 時(shí)不會有數(shù)據(jù)被緩存在內(nèi)存,可以保證數(shù)據(jù)源 offset 10000 之前的數(shù)據(jù)都消費(fèi)完成。
MultiThreadConsumerSink 具體代碼如下所示:
public class MultiThreadConsumerSink extends RichSinkFunction<String> {// Client 線程的默認(rèn)數(shù)量private final int DEFAULT_CLIENT_THREAD_NUM = 5;// 數(shù)據(jù)緩沖隊(duì)列的默認(rèn)容量private final int DEFAULT_QUEUE_CAPACITY = 5000;private LinkedBlockingQueue<String> bufferQueue;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// new 一個(gè)容量為 DEFAULT_CLIENT_THREAD_NUM 的線程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());// new 一個(gè)容量為 DEFAULT_QUEUE_CAPACITY 的數(shù)據(jù)緩沖隊(duì)列this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);// 創(chuàng)建并開啟消費(fèi)者線程MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue);for (int i=0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {threadPoolExecutor.execute(consumerClient);}}@Overridepublic void invoke(String value, Context context) throws Exception {// 往 bufferQueue 的隊(duì)尾添加數(shù)據(jù)bufferQueue.put(value);} }MultiThreadConsumerSink 實(shí)現(xiàn)了 CheckpointedFunction 接口,在 open 方法中增加了 CyclicBarrier 的初始化,CyclicBarrier 預(yù)期容量設(shè)置為 client 線程數(shù)加一,表示當(dāng) client 線程數(shù)加一個(gè)線程都執(zhí)行了 await 操作時(shí),所有的線程的 await 方法才會執(zhí)行完成。這里為什么要加一呢?因?yàn)槌?client 線程外, snapshotState 方法中也需要執(zhí)行過 await。
當(dāng)做 Checkpoint 時(shí) snapshotState 方法中執(zhí)行 clientBarrier.await(),等待所有的 client 線程將緩沖區(qū)數(shù)據(jù)消費(fèi)完。snapshotState 方法執(zhí)行過程中 invoke 方法不會被執(zhí)行,即:Checkpoint 過程中數(shù)據(jù)緩沖隊(duì)列不會增加數(shù)據(jù),所以 client 線程很快就可以將緩沖隊(duì)列中的數(shù)據(jù)消費(fèi)完。
MultiThreadConsumerClient 具體代碼如下所示:
public class MultiThreadConsumerSink extends RichSinkFunction<String> implements CheckpointedFunction {private Logger LOG = LoggerFactory.getLogger(MultiThreadConsumerSink.class);// Client 線程的默認(rèn)數(shù)量private final int DEFAULT_CLIENT_THREAD_NUM = 5;// 數(shù)據(jù)緩沖隊(duì)列的默認(rèn)容量private final int DEFAULT_QUEUE_CAPACITY = 5000;private LinkedBlockingQueue<String> bufferQueue;private CyclicBarrier clientBarrier;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// new 一個(gè)容量為 DEFAULT_CLIENT_THREAD_NUM 的線程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());// new 一個(gè)容量為 DEFAULT_QUEUE_CAPACITY 的數(shù)據(jù)緩沖隊(duì)列this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);// barrier 需要攔截 (DEFAULT_CLIENT_THREAD_NUM + 1) 個(gè)線程this.clientBarrier = new CyclicBarrier(DEFAULT_CLIENT_THREAD_NUM + 1);// 創(chuàng)建并開啟消費(fèi)者線程MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue, clientBarrier);for (int i=0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {threadPoolExecutor.execute(consumerClient);}}@Overridepublic void invoke(String value, Context context) throws Exception {// 往 bufferQueue 的隊(duì)尾添加數(shù)據(jù)bufferQueue.put(value);}@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {LOG.info("snapshotState : 所有的 client 準(zhǔn)備 flush !!!");// barrier 開始等待clientBarrier.await();}@Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {}}從數(shù)據(jù)緩沖隊(duì)列中 poll 數(shù)據(jù)時(shí),增加了 timeout 時(shí)間為 50ms。如果從隊(duì)列中拿到數(shù)據(jù),則執(zhí)行消費(fèi)數(shù)據(jù)的邏輯,若拿不到數(shù)據(jù)說明數(shù)據(jù)緩沖隊(duì)列中數(shù)據(jù)消費(fèi)完了。此時(shí)需要判斷是否有等待的 CyclicBarrier,如果有等待的 CyclicBarrier 說明此時(shí)正在執(zhí)行 Checkpoint,所以 client 需要執(zhí)行 flush 操作。flush 完成后,Client 線程執(zhí)行 barrier.await() 操作。當(dāng)所有的 Client 線程都執(zhí)行到 await 時(shí),所有的 barrier.await() 都會被執(zhí)行完。此時(shí) Sink 算子的 snapshotState 方法就會執(zhí)行完。通過這種策略可以保證 Checkpoint 時(shí)將數(shù)據(jù)緩沖區(qū)中的數(shù)據(jù)消費(fèi)完,client 執(zhí)行 flush 操作可以保證 client 端不會緩存數(shù)據(jù)。
總結(jié)
分析到這里,我們設(shè)計(jì)的 Sink 終于可以保證不丟失數(shù)據(jù)了。對 CyclicBarrier 不了解的同學(xué)請 Google 或百度查詢。再次強(qiáng)調(diào)這里多線程的方案,不僅限于請求第三方接口,對于非 CPU 密集型的任務(wù)都可以使用該方案來提高 CPU 利用率,且該方案不僅限于 Sink 算子,各種算子都適用。本文主要希望幫助大家理解 Flink 中使用多線程的優(yōu)化及在 Flink 算子中使用多線程如何保證不丟數(shù)據(jù)。
原文鏈接
本文為阿里云內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載
總結(jié)
以上是生活随笔為你收集整理的udp怎么保证不丢包_在 Flink 算子中使用多线程如何保证不丢数据?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HTML的HTTP 中 GET 与 PO
- 下一篇: 大学计算机基础python学多久_基于P