日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

kafka消息消费有延迟_简易实现kafka延迟消息

發(fā)布時(shí)間:2023/12/20 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka消息消费有延迟_简易实现kafka延迟消息 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

背景

當(dāng)前業(yè)務(wù)存在以下場(chǎng)景:在一個(gè)事務(wù)內(nèi)的最后一步是發(fā)送kafka消息,消費(fèi)端收到通知后讀取數(shù)據(jù)并做處理。但是由于kafka幾乎是即時(shí)收到消息,導(dǎo)致偶爾出現(xiàn)“在發(fā)完kafka和提交事務(wù)的間隙,消費(fèi)端收到了消息并讀取到了事務(wù)提交前的數(shù)據(jù)”。

這個(gè)問(wèn)題可以通過(guò)延遲消息來(lái)解決。

發(fā)送端 vs 消費(fèi)端

要做延遲,那么首先要考慮的是:延遲放在發(fā)送端,還是放在消費(fèi)端?最終選擇放在消費(fèi)端:讓數(shù)據(jù)先被kafka存儲(chǔ)起來(lái),數(shù)據(jù)更安全。

想把延遲消息做成一個(gè)服務(wù),不只是支持某一個(gè)場(chǎng)景/業(yè)務(wù),在這種設(shè)計(jì)前提下,讓延遲邏輯放在消費(fèi)端,可以統(tǒng)一調(diào)整邏輯,也方便排查問(wèn)題。

思路

是在整體外面包一層代理:另外創(chuàng)建一個(gè)延遲Topic,延遲消息都發(fā)到延遲Topic里。

有專(zhuān)門(mén)的服務(wù)來(lái)消費(fèi)延遲Topic的消息,取到消息之后存儲(chǔ)起來(lái),定期檢查消息是否已經(jīng)延遲時(shí)間。

已到延遲時(shí)間的消息,重新發(fā)送到原先Topic。

這樣做的好處是,不需要對(duì)kafka做任何改造。

存儲(chǔ)

延遲隊(duì)列消費(fèi)者拉取到消息之后,要怎么存儲(chǔ)?第三方存儲(chǔ),其需要滿(mǎn)足以下幾個(gè)條件:高性能:寫(xiě)入延遲要低,MQ的一個(gè)重要作用是削峰填谷,在選擇臨時(shí)存儲(chǔ)時(shí),寫(xiě)入性能必須要高,關(guān)系型數(shù)據(jù)庫(kù)(如Mysql)通常不滿(mǎn)足需求。

高可靠:延遲消息寫(xiě)入后,不能丟失,需要進(jìn)行持久化,并進(jìn)行備份

存儲(chǔ)成本低:可以支持大量消息存儲(chǔ),(Redis存儲(chǔ)成本太高)。

支持排序: 支持按照某個(gè)字段對(duì)消息進(jìn)行排序,對(duì)于延遲消息需要按照時(shí)間進(jìn)行排序。普通消息通常先發(fā)送的會(huì)被先消費(fèi),延遲消息與普通消息不同,需要進(jìn)行排序。例如先發(fā)一條延遲10s的消息,再發(fā)一條延遲5s的消息,那么后發(fā)送的消息需要被先消費(fèi)。

支持長(zhǎng)時(shí)間保存:一些業(yè)務(wù)的延遲消息,需要延遲幾個(gè)月,甚至更長(zhǎng),所以延遲消息必須能長(zhǎng)時(shí)間保留。不過(guò)通常不建議延遲太長(zhǎng)時(shí)間,存儲(chǔ)成本比較大,且業(yè)務(wù)邏輯可能已經(jīng)發(fā)生變化,已經(jīng)不需要消費(fèi)這些消息。

基于以上條件,選擇了RocksDB來(lái)存儲(chǔ)數(shù)據(jù):高性能嵌入式KV存儲(chǔ)引擎。

數(shù)據(jù)持久化到磁盤(pán)。

基于LMS存儲(chǔ),key自然排序,迭代器(Iterator)根據(jù)key順序遍歷。

代碼

發(fā)送端

消息基類(lèi)public class DelayMessage {

/**

* 事件唯一ID,用于去重檢查

*/

private String eventId = UUIDGenerator.generateString();

/**

* 事件時(shí)間

*/

@JSONField(format = KafkaConstants.DATETIME_FORMAT)

private Date eventTime = new Date();

/**

* 真實(shí)事件時(shí)間

*/

@JSONField(format = KafkaConstants.DATETIME_FORMAT)

private Date actualTime;

/**

* 真實(shí)Topic

*/

private String actualTopic;

public Date getActualTime() {

return actualTime;

}

public T setActualTime(Date actualTime) {

this.actualTime = actualTime;

return (T) this;

}

public String getActualTopic() {

return actualTopic;

}

public T setActualTopic(String actualTopic) {

this.actualTopic = actualTopic;

return (T) this;

}

public Date getEventTime() {

return eventTime;

}

public T setEventTime(Date eventTime) {

this.eventTime = eventTime;

return (T) this;

}

}

消息對(duì)象繼承DelayMessage,將消息發(fā)送到延遲Topic。

延遲服務(wù)消費(fèi)端

接收延遲消息@KafkaListener(topics = {KafkaConstants.KAFKA_TOPIC_MESSAGE_DELAY}, containerFactory = "kafkaContainerFactory")

public boolean onMessage(String json) throws Throwable {

try {

DelayMessage delayMessage = deserialize(json, DelayMessage.class);

if (!isDelay(delayMessage)) {

// 如果接收到消息時(shí),消息已經(jīng)可以發(fā)送了,直接發(fā)送到實(shí)際的隊(duì)列

sendActualTopic(delayMessage, json);

} else {

// 存儲(chǔ)

localStorage(delayMessage, json);

}

} catch (Throwable e) {

log.error("consumer kafka delay message[{}] error!", json, e);

throw e;

}

return true;

}

private void sendActualTopic(DelayMessage delayMessage, String message) {

kafkaSender.send(message, delayMessage.getActualTopic());

}

@SneakyThrows

private void localStorage(DelayMessage delayMessage, String message) {

String key = generateRdbKey(delayMessage);

if (rocksDb.keyMayExist(RocksDbUtils.toByte(key), null)) {

return;

}

rocksDb.put(RocksDbUtils.toByte(key), RocksDbUtils.toByte(message));

}

private String generateRdbKey(DelayMessage delayMessage) {

return delayMessage.getActualTime().getTime() + RDB_KEY_SPLITTER + delayMessage.getEventId();

}

這里要注意生成key的方法:RocksDB是按key自然排序,迭代器遍歷時(shí)是按key順序遍歷。

按時(shí)間來(lái)生成key,遍歷時(shí)遇到第一個(gè)不符合的key,即可結(jié)束遍歷。

key里加上消息ID,用以去重。

處理存儲(chǔ)的延遲消息

啟動(dòng)定時(shí)任務(wù)(ScheduledExecutorService)定時(shí)檢查消息。private void handleRdbMessage() {

try {

try (RocksIterator rocksIterator = rocksDb.newIterator()) {

for (rocksIterator.seekToFirst(); rocksIterator.isValid(); rocksIterator.next()) {

String key = "";

String value = "";

try {

byte[] keyByte = rocksIterator.key();

key = RocksDbUtils.toString(keyByte);

if (!isMessageExpired(key)) {

break;

}

value = RocksDbUtils.toString(rocksIterator.value());

DelayMessage delayMessage = JSON.parseObject(value, DelayMessage.class);

sendActualTopic(delayMessage, value);

rocksDb.delete(keyByte);

} catch (NumberFormatException e) {

// 異常key

log.error("handler kafka rocksdb delay message[{}:{}] NumberFormatException error!", key, value, e);

if (StringUtils.isNotBlank(key)) {

rocksDb.delete(RocksDbUtils.toByte(key));

}

} catch (Exception e) {

log.error("handler kafka rocksdb delay message[{}:{}] error!", key, value, e);

}

}

}

} catch (Exception e) {

// 捕獲異常,否則ScheduledExecutorService會(huì)停止定時(shí)任務(wù)

log.error("handler kafka rocksdb delay message error!", e);

}

}

private boolean isMessageExpired(String rdbKey) {

long actualTime = Long.valueOf(rdbKey.split(RDB_KEY_SPLITTER)[0]);

return actualTime <= System.currentTimeMillis();

}

這里sendActualTopic和rocksDb.delete兩個(gè)操作并不是原子性,但一般kafka消費(fèi)端都會(huì)做防重復(fù),所以也不會(huì)有問(wèn)題。

其他

當(dāng)前僅僅簡(jiǎn)易實(shí)現(xiàn)了延遲隊(duì)列,還有很多需要完成完善的地方,比如:當(dāng)前數(shù)據(jù)分散到不同的消費(fèi)節(jié)點(diǎn)上,如果某一個(gè)節(jié)點(diǎn)服務(wù)器異常導(dǎo)致數(shù)據(jù)丟失,就只能人工介入,從kafka文件里獲取數(shù)據(jù);可通過(guò)部署不同的kafka group來(lái)達(dá)到數(shù)據(jù)備份,通過(guò)選主方式來(lái)決定哪一個(gè)group執(zhí)行業(yè)務(wù)。

一條消息被存儲(chǔ)三份:實(shí)際隊(duì)列,延遲隊(duì)列,RocksDB,可以通過(guò)操作kafka CommitLog的方式,讓RocksDB里僅存儲(chǔ)CommitLog offset 相關(guān)信息,減小RocksDB占用空間。

參考:

總結(jié)

以上是生活随笔為你收集整理的kafka消息消费有延迟_简易实现kafka延迟消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。