kafka消息消费有延迟_简易实现kafka延迟消息
背景
當(dāng)前業(yè)務(wù)存在以下場(chǎng)景:在一個(gè)事務(wù)內(nèi)的最后一步是發(fā)送kafka消息,消費(fèi)端收到通知后讀取數(shù)據(jù)并做處理。但是由于kafka幾乎是即時(shí)收到消息,導(dǎo)致偶爾出現(xiàn)“在發(fā)完kafka和提交事務(wù)的間隙,消費(fèi)端收到了消息并讀取到了事務(wù)提交前的數(shù)據(jù)”。
這個(gè)問(wèn)題可以通過(guò)延遲消息來(lái)解決。
發(fā)送端 vs 消費(fèi)端
要做延遲,那么首先要考慮的是:延遲放在發(fā)送端,還是放在消費(fèi)端?最終選擇放在消費(fèi)端:讓數(shù)據(jù)先被kafka存儲(chǔ)起來(lái),數(shù)據(jù)更安全。
想把延遲消息做成一個(gè)服務(wù),不只是支持某一個(gè)場(chǎng)景/業(yè)務(wù),在這種設(shè)計(jì)前提下,讓延遲邏輯放在消費(fèi)端,可以統(tǒng)一調(diào)整邏輯,也方便排查問(wèn)題。
思路
是在整體外面包一層代理:另外創(chuàng)建一個(gè)延遲Topic,延遲消息都發(fā)到延遲Topic里。
有專(zhuān)門(mén)的服務(wù)來(lái)消費(fèi)延遲Topic的消息,取到消息之后存儲(chǔ)起來(lái),定期檢查消息是否已經(jīng)延遲時(shí)間。
已到延遲時(shí)間的消息,重新發(fā)送到原先Topic。
這樣做的好處是,不需要對(duì)kafka做任何改造。
存儲(chǔ)
延遲隊(duì)列消費(fèi)者拉取到消息之后,要怎么存儲(chǔ)?第三方存儲(chǔ),其需要滿(mǎn)足以下幾個(gè)條件:高性能:寫(xiě)入延遲要低,MQ的一個(gè)重要作用是削峰填谷,在選擇臨時(shí)存儲(chǔ)時(shí),寫(xiě)入性能必須要高,關(guān)系型數(shù)據(jù)庫(kù)(如Mysql)通常不滿(mǎn)足需求。
高可靠:延遲消息寫(xiě)入后,不能丟失,需要進(jìn)行持久化,并進(jìn)行備份
存儲(chǔ)成本低:可以支持大量消息存儲(chǔ),(Redis存儲(chǔ)成本太高)。
支持排序: 支持按照某個(gè)字段對(duì)消息進(jìn)行排序,對(duì)于延遲消息需要按照時(shí)間進(jìn)行排序。普通消息通常先發(fā)送的會(huì)被先消費(fèi),延遲消息與普通消息不同,需要進(jìn)行排序。例如先發(fā)一條延遲10s的消息,再發(fā)一條延遲5s的消息,那么后發(fā)送的消息需要被先消費(fèi)。
支持長(zhǎng)時(shí)間保存:一些業(yè)務(wù)的延遲消息,需要延遲幾個(gè)月,甚至更長(zhǎng),所以延遲消息必須能長(zhǎng)時(shí)間保留。不過(guò)通常不建議延遲太長(zhǎng)時(shí)間,存儲(chǔ)成本比較大,且業(yè)務(wù)邏輯可能已經(jīng)發(fā)生變化,已經(jīng)不需要消費(fèi)這些消息。
基于以上條件,選擇了RocksDB來(lái)存儲(chǔ)數(shù)據(jù):高性能嵌入式KV存儲(chǔ)引擎。
數(shù)據(jù)持久化到磁盤(pán)。
基于LMS存儲(chǔ),key自然排序,迭代器(Iterator)根據(jù)key順序遍歷。
代碼
發(fā)送端
消息基類(lèi)public class DelayMessage {
/**
* 事件唯一ID,用于去重檢查
*/
private String eventId = UUIDGenerator.generateString();
/**
* 事件時(shí)間
*/
@JSONField(format = KafkaConstants.DATETIME_FORMAT)
private Date eventTime = new Date();
/**
* 真實(shí)事件時(shí)間
*/
@JSONField(format = KafkaConstants.DATETIME_FORMAT)
private Date actualTime;
/**
* 真實(shí)Topic
*/
private String actualTopic;
public Date getActualTime() {
return actualTime;
}
public T setActualTime(Date actualTime) {
this.actualTime = actualTime;
return (T) this;
}
public String getActualTopic() {
return actualTopic;
}
public T setActualTopic(String actualTopic) {
this.actualTopic = actualTopic;
return (T) this;
}
public Date getEventTime() {
return eventTime;
}
public T setEventTime(Date eventTime) {
this.eventTime = eventTime;
return (T) this;
}
}
消息對(duì)象繼承DelayMessage,將消息發(fā)送到延遲Topic。
延遲服務(wù)消費(fèi)端
接收延遲消息@KafkaListener(topics = {KafkaConstants.KAFKA_TOPIC_MESSAGE_DELAY}, containerFactory = "kafkaContainerFactory")
public boolean onMessage(String json) throws Throwable {
try {
DelayMessage delayMessage = deserialize(json, DelayMessage.class);
if (!isDelay(delayMessage)) {
// 如果接收到消息時(shí),消息已經(jīng)可以發(fā)送了,直接發(fā)送到實(shí)際的隊(duì)列
sendActualTopic(delayMessage, json);
} else {
// 存儲(chǔ)
localStorage(delayMessage, json);
}
} catch (Throwable e) {
log.error("consumer kafka delay message[{}] error!", json, e);
throw e;
}
return true;
}
private void sendActualTopic(DelayMessage delayMessage, String message) {
kafkaSender.send(message, delayMessage.getActualTopic());
}
@SneakyThrows
private void localStorage(DelayMessage delayMessage, String message) {
String key = generateRdbKey(delayMessage);
if (rocksDb.keyMayExist(RocksDbUtils.toByte(key), null)) {
return;
}
rocksDb.put(RocksDbUtils.toByte(key), RocksDbUtils.toByte(message));
}
private String generateRdbKey(DelayMessage delayMessage) {
return delayMessage.getActualTime().getTime() + RDB_KEY_SPLITTER + delayMessage.getEventId();
}
這里要注意生成key的方法:RocksDB是按key自然排序,迭代器遍歷時(shí)是按key順序遍歷。
按時(shí)間來(lái)生成key,遍歷時(shí)遇到第一個(gè)不符合的key,即可結(jié)束遍歷。
key里加上消息ID,用以去重。
處理存儲(chǔ)的延遲消息
啟動(dòng)定時(shí)任務(wù)(ScheduledExecutorService)定時(shí)檢查消息。private void handleRdbMessage() {
try {
try (RocksIterator rocksIterator = rocksDb.newIterator()) {
for (rocksIterator.seekToFirst(); rocksIterator.isValid(); rocksIterator.next()) {
String key = "";
String value = "";
try {
byte[] keyByte = rocksIterator.key();
key = RocksDbUtils.toString(keyByte);
if (!isMessageExpired(key)) {
break;
}
value = RocksDbUtils.toString(rocksIterator.value());
DelayMessage delayMessage = JSON.parseObject(value, DelayMessage.class);
sendActualTopic(delayMessage, value);
rocksDb.delete(keyByte);
} catch (NumberFormatException e) {
// 異常key
log.error("handler kafka rocksdb delay message[{}:{}] NumberFormatException error!", key, value, e);
if (StringUtils.isNotBlank(key)) {
rocksDb.delete(RocksDbUtils.toByte(key));
}
} catch (Exception e) {
log.error("handler kafka rocksdb delay message[{}:{}] error!", key, value, e);
}
}
}
} catch (Exception e) {
// 捕獲異常,否則ScheduledExecutorService會(huì)停止定時(shí)任務(wù)
log.error("handler kafka rocksdb delay message error!", e);
}
}
private boolean isMessageExpired(String rdbKey) {
long actualTime = Long.valueOf(rdbKey.split(RDB_KEY_SPLITTER)[0]);
return actualTime <= System.currentTimeMillis();
}
這里sendActualTopic和rocksDb.delete兩個(gè)操作并不是原子性,但一般kafka消費(fèi)端都會(huì)做防重復(fù),所以也不會(huì)有問(wèn)題。
其他
當(dāng)前僅僅簡(jiǎn)易實(shí)現(xiàn)了延遲隊(duì)列,還有很多需要完成完善的地方,比如:當(dāng)前數(shù)據(jù)分散到不同的消費(fèi)節(jié)點(diǎn)上,如果某一個(gè)節(jié)點(diǎn)服務(wù)器異常導(dǎo)致數(shù)據(jù)丟失,就只能人工介入,從kafka文件里獲取數(shù)據(jù);可通過(guò)部署不同的kafka group來(lái)達(dá)到數(shù)據(jù)備份,通過(guò)選主方式來(lái)決定哪一個(gè)group執(zhí)行業(yè)務(wù)。
一條消息被存儲(chǔ)三份:實(shí)際隊(duì)列,延遲隊(duì)列,RocksDB,可以通過(guò)操作kafka CommitLog的方式,讓RocksDB里僅存儲(chǔ)CommitLog offset 相關(guān)信息,減小RocksDB占用空間。
參考:
總結(jié)
以上是生活随笔為你收集整理的kafka消息消费有延迟_简易实现kafka延迟消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 利用JAVA流处理-统计男员工人数;找出
- 下一篇: 父类引用指向子类对象是什么意思