javascript
SpringBoot整合kafka(实现producer和consumer)
轉(zhuǎn)載自?SpringBoot整合kafka(實現(xiàn)producer和consumer)
在Windows環(huán)境下安裝運行Kafka:https://www.jianshu.com/p/d64798e81f3b
本文代碼使用的是Spring Boot 2.1.1.RELEASE 版本
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.1.RELEASE</version><relativePath/> <!-- lookup parent from repository --> </parent>一、 pom.xml文件,引入依賴
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency> </dependencies>采用Kafka提供的StringSerializer和StringDeserializer進行序列化和反序列化
1、在application-dev.properties配置生產(chǎn)者
#============== kafka =================== # 指定kafka server的地址,集群配多個,中間,逗號隔開 spring.kafka.bootstrap-servers=127.0.0.1:9092#=============== provider ======================= # 寫入失敗時,重試次數(shù)。當(dāng)leader節(jié)點失效,一個repli節(jié)點會替代成為leader節(jié)點,此時可能出現(xiàn)寫入失敗, # 當(dāng)retris為0時,produce不會重復(fù)。retirs重發(fā),此時repli節(jié)點完全成為leader節(jié)點,不會產(chǎn)生消息丟失。 spring.kafka.producer.retries=0 # 每次批量發(fā)送消息的數(shù)量,produce積累到一定數(shù)據(jù),一次發(fā)送 spring.kafka.producer.batch-size=16384 # produce積累數(shù)據(jù)一次發(fā)送,緩存大小達到buffer.memory就發(fā)送數(shù)據(jù) spring.kafka.producer.buffer-memory=33554432#procedure要求leader在考慮完成請求之前收到的確認數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化,其值可以為如下: #acks = 0 如果設(shè)置為零,則生產(chǎn)者將不會等待來自服務(wù)器的任何確認,該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無法保證服務(wù)器已收到記錄,并且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設(shè)置為-1。 #acks = 1 這意味著leader會將記錄寫入其本地日志,但無需等待所有副本服務(wù)器的完全確認即可做出回應(yīng),在這種情況下,如果leader在確認記錄后立即失敗,但在將數(shù)據(jù)復(fù)制到所有的副本服務(wù)器之前,則記錄將會丟失。 #acks = all 這意味著leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務(wù)器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當(dāng)于acks = -1的設(shè)置。 #可以設(shè)置的值為:all, -1, 0, 1 spring.kafka.producer.acks=1# 指定消息key和消息體的編解碼方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer-
bootstrap.servers:kafka server的地址
-
acks:寫入kafka時,leader負責(zé)一個該partion讀寫,當(dāng)寫入partition時,需要將記錄同步到repli節(jié)點,all是全部同步節(jié)點都返回成功,leader才返回ack。
-
retris:寫入失敗時,重試次數(shù)。當(dāng)leader節(jié)點失效,一個repli節(jié)點會替代成為leader節(jié)點,此時可能出現(xiàn)寫入失敗,當(dāng)retris為0時,produce不會重復(fù)。retirs重發(fā),此時repli節(jié)點完全成為leader節(jié)點,不會產(chǎn)生消息丟失。
-
batch.size:produce積累到一定數(shù)據(jù),一次發(fā)送。
buffer.memory: produce積累數(shù)據(jù)一次發(fā)送,緩存大小達到buffer.memory就發(fā)送數(shù)據(jù)。
-
linger.ms :當(dāng)設(shè)置了緩沖區(qū),消息就不會即時發(fā)送,如果消息總不夠條數(shù)、或者消息不夠buffer大小就不發(fā)送了嗎?當(dāng)消息超過linger時間,也會發(fā)送。
-
key/value serializer:序列化類。
2、生產(chǎn)者向kafka發(fā)送消息
@RestController public class KafkaController {@Autowiredprivate KafkaTemplate<String,Object> kafkaTemplate;@GetMapping("/message/send")public boolean send(@RequestParam String message){kafkaTemplate.send("testTopic",message);return true;}}3、在application-dev.properties配置消費者
#=============== consumer ======================= # 指定默認消費者group id --> 由于在kafka中,同一組中的consumer不會讀取到同一個消息,依靠groud.id設(shè)置組名 spring.kafka.consumer.group-id=testGroup # smallest和largest才有效,如果smallest重新0開始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設(shè)置smallest spring.kafka.consumer.auto-offset-reset=earliest # enable.auto.commit:true --> 設(shè)置自動提交offset spring.kafka.consumer.enable-auto-commit=true #如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認值為5000。 spring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息體的編解碼方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer-
Producer是一個接口,聲明了同步send和異步send兩個重要方法。
-
ProducerRecord 消息實體類,每條消息由(topic,key,value,timestamp)四元組封裝。一條消息key可以為空和timestamp可以設(shè)置當(dāng)前時間為默認值。
4、消費者監(jiān)聽topic=testTopic的消息
@Component public class ConsumerListener {@KafkaListener(topics = "testTopic")public void onMessage(String message){//insertIntoDb(buffer);//這里為插入數(shù)據(jù)庫代碼System.out.println(message);}}到此,采用Kafka提供的StringSerializer和StringDeserializer進行序列化和反序列化,因為此種序列化方式無法序列化實體類,顧,下面為自定義序列化和反序列化器進行實體類的消息傳遞
采用自定義序列化和反序列化器進行實體類的序列化和反序列化
和內(nèi)置的StringSerializer字符串序列化一樣,如果要自定義序列化方式,需要實現(xiàn)接口Serializer。假設(shè)每個字段按照下圖所示的方式自定義序列化:
?
1、創(chuàng)建User實體類
public class User implements Serializable {private Long id;private String name;private Integer age;/*** transient 關(guān)鍵字修飾的字段不會被序列化*/private transient String desc;public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "User{" +"id=" + id +", name='" + name + '\'' +", age=" + age +", desc='" + desc + '\'' +'}';} }2、創(chuàng)建User序列化器
public class UserSerializable implements Serializer<User> {@Overridepublic void configure(Map<String, ?> map, boolean b) {}@Overridepublic byte[] serialize(String topic, User user) {System.out.println("topic : " + topic + ", user : " + user);byte[] dataArray = null;ByteArrayOutputStream outputStream = null;ObjectOutputStream objectOutputStream = null;try {outputStream = new ByteArrayOutputStream();objectOutputStream = new ObjectOutputStream(outputStream);objectOutputStream.writeObject(user);dataArray = outputStream.toByteArray();} catch (Exception e) {throw new RuntimeException(e);}finally {if(outputStream != null){try {outputStream.close();} catch (IOException e) {e.printStackTrace();}}if(objectOutputStream != null){try {objectOutputStream.close();} catch (IOException e) {e.printStackTrace();}}}return dataArray;}@Overridepublic void close() {} }3、創(chuàng)建User反序列化器
public class UserDeserializer implements Deserializer<User> {@Overridepublic void configure(Map<String, ?> map, boolean b) {}@Overridepublic User deserialize(String topic, byte[] bytes) {User user = null;ByteArrayInputStream inputStream = null;ObjectInputStream objectInputStream = null;try {inputStream = new ByteArrayInputStream(bytes);objectInputStream = new ObjectInputStream(inputStream);user = (User)objectInputStream.readObject();} catch (Exception e) {throw new RuntimeException(e);}finally {if(inputStream != null){try {inputStream.close();} catch (IOException e) {e.printStackTrace();}}if(objectInputStream != null){try {objectInputStream.close();} catch (IOException e) {e.printStackTrace();}}}return user;}@Overridepublic void close() {} }4、修改application-dev.properties配置
A、修改生產(chǎn)者配置的value-serializer
# 指定生產(chǎn)者消息key和消息體的編解碼方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.yibo.springbootkafkademo.Serializable.UserSerializableB、修改消費者配置的value-deserializer
# 指定消費者消息key和消息體的編解碼方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=com.yibo.springbootkafkademo.Serializable.UserDeserializer5、生產(chǎn)者向kafka發(fā)送消息
@RestController public class KafkaController {@Autowiredprivate KafkaTemplate<String,Object> kafkaTemplate;@PostMapping("/user/save")public boolean saveUser(@RequestBody User user){kafkaTemplate.send("userTopic",user);return true;} }6、消費者監(jiān)聽topic=userTopic的消息
@Component public class ConsumerListener {@KafkaListener(topics = "userTopic")public void onMessage(User user){//insertIntoDb(buffer);//這里為插入數(shù)據(jù)庫代碼System.out.println(user);} }總結(jié)
可以看到,自定義Serializer和Deserializer非常痛苦,還有很多類型不支持,非常脆弱。復(fù)雜類型的支持更是一件痛苦的事情,不同版本之間的兼容性問題更是一個極大的挑戰(zhàn)。由于Serializer和Deserializer影響到上下游系統(tǒng),導(dǎo)致牽一發(fā)而動全身。自定義序列化&反序列化實現(xiàn)不是能力的體現(xiàn),而是逗比的體現(xiàn)。所以強烈不建議自定義實現(xiàn)序列化&反序列化,推薦直接使用StringSerializer和StringDeserializer,然后使用json作為標(biāo)準(zhǔn)的數(shù)據(jù)傳輸格式。站在巨人的肩膀上,事半功倍。
?
總結(jié)
以上是生活随笔為你收集整理的SpringBoot整合kafka(实现producer和consumer)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cf对电脑配置要求高吗(cf对电脑配置要
- 下一篇: gradle idea java ssm