日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Kafka Producer 发送消息源码阅读

發(fā)布時(shí)間:2025/3/17 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka Producer 发送消息源码阅读 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>

今天看了kafka 發(fā)送消息部分的源碼(0.8.2.1版本的),針對(duì)kafka的消息發(fā)送,分區(qū)策略如下:

1 kafka的分區(qū)策略

?1.1 如果指定了partition,則將消息發(fā)到對(duì)應(yīng)的partition

?1.2 如果沒(méi)有指定partition,但指定了key, 會(huì)根據(jù)key的hash選擇一個(gè)partition, ?

? ?如果如果key名固定,則消息只會(huì)發(fā)到固定的一個(gè)partition上, 所以key不要設(shè)置為固定的值,如果需要設(shè)置,則需要考慮修改kafka的源碼,以支持將數(shù)據(jù)均勻發(fā)到不同的partition上

1.3 如果key,partition都沒(méi)有指定,則采用round-robin即輪循的方式發(fā)到每個(gè)partition

2 消息的發(fā)送都是異步的,發(fā)送過(guò)程如下

涉及到三個(gè)對(duì)象:

2.1?RecordAccumulator

維護(hù)了一個(gè)ConcurrentMap<TopicPartition,?Deque<RecordBatch>>?batches 對(duì)象

一個(gè)partition對(duì)應(yīng)一個(gè)RecordBatch的ArrayDeque? ?

調(diào)用KafkaProducer.send方法發(fā)送消息,最終調(diào)用如下方法:

? ? ? ? ? ?

如果RecordBatch已經(jīng)滿 或 創(chuàng)建了新的RecordBatch,則喚醒發(fā)送對(duì)象Sender

???? ? ? ? ? ? ??

2.2?Sender

?The?background?thread?that?handles?the?sending?of?produce?requests?to?the?Kafka?cluster

Sender通過(guò)kafkaclient將RecordAccumulator 的數(shù)據(jù)批量寫入到server? ??

Sender定義的run方法實(shí)現(xiàn)如下:

? ?

在run(long now)中,實(shí)現(xiàn)邏輯如下:

2.2.1 首先通過(guò)如下條件獲取發(fā)送數(shù)據(jù)的節(jié)點(diǎn)?

2.2.2刪除掉當(dāng)前不能發(fā)送的kafka node

? ??? ??? ??? ?? ? ? ? ? ?

2.2.3 獲取發(fā)送的數(shù)據(jù)列表

? ? 循環(huán)此節(jié)點(diǎn)上是leader的partition

? ? ? ? ? 根據(jù)partition,獲取此partition對(duì)應(yīng)的RecordBatch,并放到此節(jié)點(diǎn)對(duì)應(yīng)的?List<RecordBatch>

? ??? ??? ??? ??? ??? ??? ???? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

2.2.4組裝請(qǐng)求對(duì)象,發(fā)送到不同的kafka節(jié)點(diǎn)

計(jì)算pollTimeout并發(fā)送請(qǐng)求對(duì)象到不同的kafka節(jié)點(diǎn)

// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes// with sendable data that aren't ready to send since they would cause busy looping.long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);if (result.readyNodes.size() > 0) {log.trace("Nodes with data ready to send: {}", result.readyNodes);log.trace("Created {} produce requests: {}", requests.size(), requests);pollTimeout = 0;}// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);

2.2.5 針對(duì)返回的數(shù)據(jù)進(jìn)行處理

// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);for (ClientResponse response : responses) {if (response.wasDisconnected())handleDisconnect(response, now);elsehandleResponse(response, now);}

2.3?KafkaClient

其實(shí)現(xiàn)類是:NetworkClient,基于socket方式與server進(jìn)行數(shù)據(jù)交互

3 kafka參數(shù)配置

用于存儲(chǔ)批量數(shù)據(jù)的緩沖大小(對(duì)應(yīng)類:MemoryRecords)?batch-size :?16384

用于整個(gè)client緩存所有發(fā)送對(duì)象的大小(對(duì)應(yīng)類:BufferPool ) :BUFFER_MEMORY ?32?*?1024?*?1024L 即 32M

用于發(fā)送延遲的時(shí)間配置(LINGER_MS),如果設(shè)置為1秒,則記錄先發(fā)送到client緩存中,等待1秒后再發(fā)送數(shù)據(jù),默認(rèn)為0 表示立即發(fā)送

指定數(shù)據(jù)壓縮類型:?compression.type ,支持:none,gzip, snappy, lz4, 默認(rèn)為none

理論上,設(shè)置LINGER_MS 會(huì)提高消息的吞吐量

轉(zhuǎn)載于:https://my.oschina.net/cloudcoder/blog/917309

總結(jié)

以上是生活随笔為你收集整理的Kafka Producer 发送消息源码阅读的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产精品丝袜黑色高跟鞋的设计特点 | 又粗又猛又爽又黄的视频 | 伊人一区二区三区 | 成人av电影天堂 | 欧美大浪妇猛交饥渴大叫 | 亚洲免费大全 | 亚洲一二三四在线观看 | 涩涩涩在线视频 | 日本亚洲欧洲色 | 日本人六九视频 | 亚洲中文一区二区 | 亚洲欲| 九草视频在线 | 国产色网站 | 影音先锋色小姐 | 女人18毛片水真多 | 九九热精品视频 | 日本小视频网站 | 中文字幕自拍偷拍 | 一级中国毛片 | 成人性生交大全免 | 嫩操影院| 荒野求生21天去码版网站 | 草碰在线 | 自拍愉拍| 欧美少妇b| 精品一区二区三区久久 | 91视频.com| 日韩成人精品一区二区 | 欧美在线观看视频一区 | 日日操日日操 | 高清国产一区二区三区四区五区 | 中文字幕3区 | 一卡二卡久久 | 日本精品一二三区 | 欧美日韩免费一区二区 | 天天操天天操天天操天天操天天操 | 夜夜欢天天干 | 欧美激情国产在线 | 久久亚洲av无码西西人体 | 久久色图| 成人午夜性视频 | 欧美日皮视频 | 成人久久网| 亚洲AV无码成人精品一区 | 波多野结衣国产在线 | 美女黄视频网站 | 日韩高清中文字幕 | 亚洲 欧美 自拍偷拍 | 在线免费观看国产精品 | 深夜福利免费视频 | 一级黄色视 | 亚洲福利电影网 | 日韩免费专区 | 艳妇av| 精品一区二区三区毛片 | 成人小视频免费在线观看 | 在线观看国产福利 | 日韩有码第一页 | 国产h视频在线观看 | h视频网站在线观看 | jizz日本女人 | 一本高清dvd在线播放 | 插插插干干干 | 国语对白做受按摩的注意事项 | 91九色成人 | 麻豆视频官网 | 五月视频 | 女女h百合无遮羞羞漫画软件 | 激情综合网五月 | 99国产精品久久久久99打野战 | 黄瓜视频在线观看 | www.免费av| 中文字幕亚洲在线观看 | 少妇av在线播放 | 久久国产精品一区二区 | 九九九视频在线观看 | 欧美第三页 | 一区二区伦理 | 中文字幕h | 国产高潮视频在线观看 | 中文字幕无线码 | 欧美日韩国产黄色 | 国产乱码精品一区二区三区不卡 | 欧美日韩不卡一区二区 | 中文字幕一区二区三区乱码在线 | 日韩精品――色哟哟 | 欧美一区二区三区四区视频 | 日韩精品久久久久久久 | 国产探花视频在线观看 | 狠狠操在线视频 | 久久国产黄色片 | 久久看av | 岛国av噜噜噜久久久狠狠av | 天堂av手机版| 亚洲天堂网在线视频 | 中国av免费看 | 超碰在线a | 黄色小视频在线播放 |