日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

SpringBoot 自定义Kafka消息序列化和反序列化

發布時間:2025/3/19 javascript 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SpringBoot 自定义Kafka消息序列化和反序列化 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. 概述

Kafka傳輸自定義的DTO對象時,不能像平時一樣使用StringSerializer和StringDeserializer。這種情況需要自己實現對應DTO的序列化器和反序列化器。假設現在有個 KafkaMsgDto 類,代碼如下:

@Data public class KafkaMsgDto {private String id;private ActionEnum action;public KafkaMsgDto(){}public KafkaMsgDto(String id, ActionEnum action){this.id = id;this.action = action;}public enum ActionEnum{SAVE,DELETE;} }

2. Serializer

public class KafkaMsgSerializer implements Serializer<KafkaMsgDto> {private String encoding = "UTF8";public KafkaMsgSerializer(){}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";Object encodingValue = configs.get(propertyName);if (encodingValue == null) {encodingValue = configs.get("serializer.encoding");}if (encodingValue instanceof String) {this.encoding = (String)encodingValue;}}@Overridepublic byte[] serialize(String s, KafkaMsgDto data) {try {if (data == null){return null;}return JSON.toJSONString(data).getBytes(this.encoding);} catch (UnsupportedEncodingException var4) {throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);}} }

3. Deserializer

public class KafkaMsgDeserializer implements Deserializer<KafkaMsgDto> {private String encoding = "UTF8";public KafkaMsgDeserializer(){}public void configure(Map<String, ?> configs, boolean isKey) {String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";Object encodingValue = configs.get(propertyName);if (encodingValue == null) {encodingValue = configs.get("deserializer.encoding");}if (encodingValue instanceof String) {this.encoding = (String)encodingValue;}}public KafkaMsgDto deserialize(String topic, byte[] data) {try {if (data == null){return null;}return JSON.parseObject(new String(data, this.encoding), KafkaMsgDto.class);} catch (UnsupportedEncodingException var4) {throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + this.encoding);}} }

4. application.properties

修改對應的序列化、反序列化配置

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.train.kafka.serialization.KafkaMsgSerializerspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=com.train.kafka.serialization.KafkaMsgDeserializer

5. 發送消息

KafkaMsgDto dto = new KafkaMsgDto(id, KafkaMsgDto.ActionEnum.SAVE); kafkaTemplate.send("test-topic", dto);

6. 接收消息

@KafkaListener(topics = "test-topic")public void onListener(KafkaMsgDto dto){if (dto == null){System.out.println("接收到空消息");return;}System.out.println(String.format("接收到消息:%s", dto.toString()));if (dto.getAction() == KafkaMsgDto.ActionEnum.DELETE){//執行刪除業務邏輯}else{//執行保存業務邏輯}}

總結

以上是生活随笔為你收集整理的SpringBoot 自定义Kafka消息序列化和反序列化的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。