spring集成kafka,以及常见错误解决
spring集成kafka,以及常見錯誤解決
一.配置kafka
1.引入jar包
<!--Kafka和spring集成的支持類庫,spring和kafka通信監聽--><!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-kafka --> <dependency> <groupId>org.springframework.integration</groupId><artifactId>spring-integration-kafka</artifactId><version>3.2.1.RELEASE</version> </dependency>2.配置KafkaConfiguration,生產者和消費者
package com.frame.kafka.config;import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.*;@Configuration @EnableKafka public class KafkaConfiguration {/***服務器ip*/private String BOOTSTRAP_SERVERS_CONFIG=localhost;/***是否自動啟動監聽器*/private Boolean IS_OPEN_LISTENER;/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 15:23* @Description:ConcurrentKafkaListenerContainerFactory為創建Kafka監聽器的工程類,這里只配置了消費者* @param* @return org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<java.lang.String,java.lang.String>*/@Beanpublic ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//防止topic不存在的時候報錯factory.setMissingTopicsFatal(false);//是否自動啟動factory.setAutoStartup(IS_OPEN_LISTENER);return factory;}//根據consumerProps填寫的參數創建消費者工廠/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 15:23* @Description:ConcurrentKafkaListenerContainerFactory為創建Kafka監聽器的工程類,這里只配置了消費者* @param* @return*/@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerProps());}/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 15:23* @Description:根據senderProps填寫的參數創建生產者工廠* @param* @return*/@Beanpublic ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(senderProps());}@Bean("tranProducerFactory")public ProducerFactory<Integer, String> tranProducerFactory() {DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(senderProps());factory.transactionCapable();factory.setTransactionIdPrefix("tran-");return factory;}/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 15:23* @Description:kafkaTemplate實現了Kafka發送接收等功能* @param* @return*/@Beanpublic KafkaTemplate<Integer, String> kafkaTemplate() {KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());return template;}/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 15:23* @Description:kafkaTemplate實現了Kafka發送接收等功能(支持事務)* @param* @return*/@Bean("tranKafkaTemplate")public KafkaTemplate<Integer, String> tranKafkaTemplate() {KafkaTemplate template = new KafkaTemplate<Integer, String>(tranProducerFactory());return template;}/*** 操作kafka的admin工具* @return*/@Beanpublic KafkaAdmin kafkaAdmin() {Map<String, Object> props = new HashMap<>();//配置Kafka實例的連接地址props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);KafkaAdmin admin = new KafkaAdmin(props);return admin;}/*** 初始化topic,無法增加分區數*/@Beanpublic void creatTopic() {AdminClient adminClient = AdminClient.create(kafkaAdmin().getConfig());List<NewTopic> topics = new ArrayList<>();topics.add(new NewTopic("test", 3, (short) 1));adminClient.createTopics(topics);}/*** 增加topic的分區,只能增加不能減少*/@Beanpublic void createPartitions() {AdminClient adminClient = AdminClient.create(kafkaAdmin().getConfig());Map<String, NewPartitions> newPartitions=new HashMap<String, NewPartitions>();newPartitions.put(integral_update_topic, NewPartitions.increaseTo(3));adminClient.createPartitions(newPartitions);}/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 15:23* @Description:消費者配置參數* @param* @return*/private Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();//連接地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);//GroupIDprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//是否自動提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自動提交的頻率props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");//consumer 讀取級別 (開啟事務的時候一定要設置為讀已提交)props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));//Session超時設置props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");//鍵的反序列化方式props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);//值的反序列化方式props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}//生產者配置/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 15:23* @Description:ConcurrentKafkaListenerContainerFactory為創建Kafka監聽器的工程類,這里只配置了消費者* @param* @return*/private Map<String, Object> senderProps (){Map<String, Object> props = new HashMap<>();//連接地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);//重試,0為不啟用重試機制props.put(ProducerConfig.RETRIES_CONFIG, 1);//acks=0 : 生產者在成功寫入消息之前不會等待任何來自服務器的響應。//# acks=1 : 只要集群的首領節點收到消息,生產者就會收到一個來自服務器成功響應。//acks=all :只有當所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。props.put(ProducerConfig.ACKS_CONFIG, "all");// 設置冪等性props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//控制批處理大小,單位為字節props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//批量發送,延遲為1毫秒,啟用該功能能有效減少生產者發送消息次數,從而提高并發量props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//生產者可以使用的總內存字節來緩沖等待發送到服務器的記錄props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);//鍵的序列化方式props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//值的序列化方式props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}}3.配置手動提交的監聽器,AckListener
package com.frame.kafka.config;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.stereotype.Component;import java.util.HashMap; import java.util.Locale; import java.util.Map;/*** @Title: AckListener* @Description: 使用Ack機制確認消費* @Auther:wangli* @Version: 1.0* @create 2019-04-23 17:19*/ @Component public class AckListener {/***服務器ip*/private String BOOTSTRAP_SERVERS_CONFIG=localhost;/***是否自動啟動監聽器*/private Boolean IS_OPEN_LISTENER;private static final Logger log= LoggerFactory.getLogger(AckListener.class);private Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);//consumer 讀取級別props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));//禁止自動提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Bean("ackContainerFactory")public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));factory.setAutoStartup(IS_OPEN_LISTENER);//防止topic不存在的時候報錯factory.setMissingTopicsFatal(false);return factory;}}3.配置批量監聽器,BatchListener
package com.frame.kafka.config;import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.stereotype.Component;import java.util.HashMap; import java.util.Locale; import java.util.Map;/*** @Title: BatchListener* @Description: 批量監聽器* @Auther:wangli* @Version: 1.0* @create 2019-04-23 16:34*/ @Component public class BatchListener {/***服務器ip*/private String BOOTSTRAP_SERVERS_CONFIG=localhost;/***是否自動啟動監聽器*/private Boolean IS_OPEN_LISTENER;private static final Logger log= LoggerFactory.getLogger(BatchListener.class);private Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);//consumer 讀取級別props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");//一次拉取消息數量props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Bean("batchContainerFactory")public ConcurrentKafkaListenerContainerFactory listenerContainer() {ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));//設置并發量,小于或等于Topic的分區數container.setConcurrency(5);//設置為批量監聽container.setBatchListener(true);container.setAutoStartup(IS_OPEN_LISTENER);//防止topic不存在的時候報錯container.setMissingTopicsFatal(false);return container;}}4.發送者測試demo,kafaProductTest
package com.frame.web.notify.controller.kafkaTest;import com.alibaba.fastjson.JSON; import com.frame.common.dto.TempFormMap; import com.frame.kafka.handler.KafkaSendResultHandler; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.SendResult; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.concurrent.FailureCallback; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.SuccessCallback; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.util.HashMap; import java.util.Map;/*** @Title: DrugstoreMember* @Description:* @Auther:wangli* @Version: 1.0* @create 2018/11/30 14:05*/@RestController @RequestMapping("/kafka") public class kafaProductTest{@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;@Autowiredprivate KafkaTemplate tranKafkaTemplate;@Autowiredprivate KafkaSendResultHandler producerListener;@Value("${integral.update.topic}")private String integral_update;/**==================================================================================================topic:這里填寫的是Topic的名字partition:這里填寫的是分區的id,其實也是就第幾個分區,id從0開始。表示指定發送到該分區中timestamp:時間戳,一般默認當前時間戳key:消息的鍵data:消息的數據ProducerRecord:消息對應的封裝類,包含上述字段Message<?>:Spring自帶的Message封裝類,包含消息及消息頭====================================================================================================**//*** @Auther:wangli* @Version: 1.0* @create:2019-04-01 11:35* @Description: 普通發消息測試* @param req* @param res* @param tempFormMap* @return void*/@SuppressWarnings("rawtypes")@RequestMapping(value = "/sendMessage", method = RequestMethod.POST)public void sendMessage(HttpServletRequest req,HttpServletResponse res, @RequestBody TempFormMap tempFormMap) throws Exception {String message = tempFormMap.getStr("message");//kafkaTemplate.send("first",message);ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send("first", message); //異步發送kafkaTemplate.send("test", message); //異步發送//int a = 0 / 0;//ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send("first",message).get(); //同步發送Thread.sleep(1000);//發送成功回調SuccessCallback<SendResult<Integer, String>> successCallback = new SuccessCallback<SendResult<Integer, String>>() {@Overridepublic void onSuccess(SendResult<Integer, String> result) {//成功業務邏輯System.out.println("onSuccess");}};//發送失敗回調FailureCallback failureCallback = new FailureCallback() {@Overridepublic void onFailure(Throwable ex) {//失敗業務邏輯System.out.println("onFailure");}};listenableFuture.addCallback(successCallback, failureCallback);}/*** @param req* @param res* @param tempFormMap* @return void* @Auther:wangli* @Version: 1.0* @create:2019-04-01 11:35* @Description: 發送帶有時間搓的消息*/@SuppressWarnings("rawtypes")@RequestMapping(value = "/sendMessage1", method = RequestMethod.POST)public void sendMessage1(HttpServletRequest req,HttpServletResponse res, @RequestBody TempFormMap tempFormMap) throws Exception {//發送帶有時間戳的消息kafkaTemplate.send("test", 0, System.currentTimeMillis(), 0, "send message with timestamp");}/*** @param req* @param res* @param tempFormMap* @return void* @Auther:wangli* @Version: 1.0* @create:2019-04-01 11:35* @Description: 使用ProducerRecord發送消息*/@SuppressWarnings("rawtypes")@RequestMapping(value = "/sendMessage2", method = RequestMethod.POST)public void sendMessage2(HttpServletRequest req,HttpServletResponse res, @RequestBody TempFormMap tempFormMap) throws Exception {//使用ProducerRecord發送消息ProducerRecord record = new ProducerRecord(integral_update, JSON.toJSONString(tempFormMap));kafkaTemplate.send(record);}/*** @param req* @param res* @param tempFormMap* @return void* @Auther:wangli* @Version: 1.0* @create:2019-04-01 11:35* @Description: 使用Message發送消息*/@SuppressWarnings("rawtypes")@RequestMapping(value = "/sendMessage3", method = RequestMethod.POST)public void sendMessage3(HttpServletRequest req,HttpServletResponse res, @RequestBody TempFormMap tempFormMap) throws Exception {//使用Message發送消息Map map = new HashMap();map.put(KafkaHeaders.TOPIC, "test");map.put(KafkaHeaders.PARTITION_ID, 0);map.put(KafkaHeaders.MESSAGE_KEY, 0);GenericMessage message = new GenericMessage("use Message to send message", new MessageHeaders(map));kafkaTemplate.send(message);}/*** @param req* @param res* @param tempFormMap* @return void* @Auther:wangli* @Version: 1.0* @create:2019-04-01 11:35* @Description: KafkaSendResultHandler實現消息發送結果回調*/@SuppressWarnings("rawtypes")@RequestMapping(value = "/sendMessage4", method = RequestMethod.POST)public void sendMessage4(HttpServletRequest req,HttpServletResponse res, @RequestBody TempFormMap tempFormMap) throws Exception {kafkaTemplate.setProducerListener(producerListener);kafkaTemplate.send("test", "test producer listen");//發送消息的時候需要休眠一下,否則發送時間較長的時候會導致進程提前關閉導致無法調用回調時間。主要是因為KafkaTemplate發送消息是采取異步方式發送的Thread.sleep(1000);}/*** @param req* @param res* @param tempFormMap* @return void* @Auther:wangli* @Version: 1.0* @create:2019-04-01 11:35* @Description: 注解事務開啟*/@SuppressWarnings("rawtypes")@RequestMapping(value = "/sendMessage5", method = RequestMethod.POST)@Transactionalpublic void sendMessage5(HttpServletRequest req,HttpServletResponse res, @RequestBody TempFormMap tempFormMap) throws Exception {tranKafkaTemplate.send("test", "test transactional annotation");throw new RuntimeException("fail");}/*** @param req* @param res* @param tempFormMap* @return void* @Auther:wangli* @Version: 1.0* @create:2019-04-01 11:35* @Description: 本地事務開啟*/@SuppressWarnings("rawtypes")@RequestMapping(value = "/sendMessage6", method = RequestMethod.POST)public void sendMessage6(HttpServletRequest req,HttpServletResponse res, @RequestBody TempFormMap tempFormMap) throws Exception {tranKafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {@Overridepublic Object doInOperations(KafkaOperations kafkaOperations) {kafkaOperations.send("test", "test executeInTransaction");throw new RuntimeException("fail");//return true;}});}/*** @param req* @param res* @param tempFormMap* @return void* @Auther:wangli* @Version: 1.0* @create:2019-04-01 11:35* @Description: 批量發送消息測試*/@SuppressWarnings("rawtypes")@RequestMapping(value = "/sendMessage7", method = RequestMethod.POST)public void sendMessage7(HttpServletRequest req,HttpServletResponse res, @RequestBody TempFormMap tempFormMap) throws Exception {for (int i = 0; i < 12; i++) {// kafkaTemplate.send("topic.quick.batch", "test batch listener,dataNum-" + i);kafkaTemplate.send("topic.quick.batch.partition", "test batch listener,dataNum-" + i);}} }5.監聽器demo,DemoListener
package com.frame.web.notify.service;import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.PartitionOffset; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component;import java.util.List;/*** @Title: DemoListener* @Description:* @Auther:wangli* @Version: 1.0* @create 2019-04-22 16:35*/ @Component public class DemoListener {@Autowiredprivate KafkaTemplate kafkaTemplate;private static final Logger log= LoggerFactory.getLogger(DemoListener.class);/**===================================================================================================* data : 對于data值的類型其實并沒有限定,根據KafkaTemplate所定義的類型來決定。data為List集合的則是用作批量消費。* ConsumerRecord:具體消費數據類,包含Headers信息、分區信息、時間戳等* Acknowledgment:用作Ack機制的接口* Consumer:消費者類,使用該類我們可以手動提交偏移量、控制消費速率等功能*====================================================================================================* **//**===================================================================================================*id:消費者的id,當GroupId沒有被配置的時候,默認id為GroupId* containerFactory:上面提到了@KafkaListener區分單數據還是多數據消費只需要配置一下注解的containerFactory屬性就可以了,這里面配置的是監聽容器工廠,也就是ConcurrentKafkaListenerContainerFactory,配置BeanName* topics:需要監聽的Topic,可監聽多個* topicPartitions:可配置更加詳細的監聽信息,必須監聽某個Topic中的指定分區,或者從offset為200的偏移量開始監聽* errorHandler:監聽異常處理器,配置BeanName* groupId:消費組ID* idIsGroup:id是否為GroupId* clientIdPrefix:消費者Id前綴* beanRef:真實監聽容器的BeanName,需要在 BeanName前加 "__"*====================================================================================================* **//*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 16:23* @Description:基礎接收消息 直接獲取topic* @param* @return void*/@KafkaListener(groupId = "dyzh",topics = "test")public void listen(String msgData) {System.out.println("test-------"+msgData);log.info("demo receive : "+msgData);}/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 16:23* @Description:基礎接收消息 直接獲取topic 同一個消費者,監聽同一個topic* @param* @return void*/@KafkaListener(groupId = "dyzh",topics = "test")public void listen1(String msgData) {//同一個消費者組里面的消息只能監聽一次,所以這個方法不會被消費System.out.println("test-------"+msgData);log.info("demo receive : "+msgData);}/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 16:23* @Description:不同的消費者組,監聽同一個topic* @param* @return void*/@KafkaListener(groupId = "dyzhTTTT",topics = "test")public void listen2(String msgData) {//同一個消費者組里面的消息只能監聽一次,所以這個方法不會被消費System.out.println("test-------"+msgData);log.info("demo receive : "+msgData);}/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 16:26* @Description: ConsumerRecord類里面包含分區信息、消息頭、消息體等內容,如果業務需要獲取這些參數時,使用ConsumerRecord會是個不錯的選擇* @param* @return void*/@KafkaListener(id = "consumer", topics = "first")public void listen3(ConsumerRecord<Integer, String> record) {log.info("topic.quick.consumer receive : " + record.toString());}/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 17:02* @Description: 批量接收消息* @param* @return void*/@KafkaListener(id = "batch",topics = {"topic.quick.batch"},containerFactory = "batchContainerFactory")public void batchListener(List<String> data) {log.info("topic.quick.batch receive : ");for (String s : data) {log.info( s);}}/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 17:08* @Description: 監聽Topic中指定的分區* @KafkaListener注解的topicPartitions屬性監聽不同的partition分區。* @TopicPartition:topic--需要監聽的Topic的名稱,partitions --需要監聽Topic的分區id,partitionOffsets --可以設置從某個偏移量開始監聽* @PartitionOffset:partition --分區Id,非數組,initialOffset --初始偏移量*** @param * @return void*/@KafkaListener(id = "batchWithPartition",containerFactory = "batchContainerFactory",topicPartitions = {@TopicPartition(topic = "topic.quick.batch.partition",partitions = {"1","3"}),@TopicPartition(topic = "topic.quick.batch.partition",partitions = {"0","4"},partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100"))})public void batchListenerWithPartition(List<String> data) {log.info("topic.quick.batch.partition receive : ");for (String s : data) {log.info(s);}}/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 17:17* @Description: 注解方式獲取消息頭及消息體* @Payload:獲取的是消息的消息體,也就是發送內容* @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):獲取發送消息的key* @Header(KafkaHeaders.RECEIVED_PARTITION_ID):獲取當前消息是從哪個分區中監聽到的* @Header(KafkaHeaders.RECEIVED_TOPIC):獲取監聽的TopicName* @Header(KafkaHeaders.RECEIVED_TIMESTAMP):獲取時間戳* @return void*/@KafkaListener(id = "anno", topics = "topic.quick.anno")public void annoListener(@Payload String data,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {log.info("topic.quick.anno receive : \n" +"data : " + data + "\n" +"key : " + key + "\n" +"partitionId : " + partition + "\n" +"topic : " + topic + "\n" +"timestamp : " + ts + "\n");}/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 17:24* @Description: 使用Ack機制確認消費* @param record* @param ack* @return void*/@KafkaListener(id = "ack", topics = "topic.quick.ack",containerFactory = "ackContainerFactory")public void ackListener(ConsumerRecord record, Acknowledgment ack) {log.info("topic.quick.ack receive : " + record.value());ack.acknowledge();}/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 17:27* @Description: 重新將消息發送到隊列中,這種方式比較簡單而且可以使用Headers實現第幾次消費的功能,用以下次判斷* @param record* @param ack* @param consumer* @return void*/@KafkaListener(id = "ack2", topics = "topic.quick.ack", containerFactory = "ackContainerFactory")public void ackListener(ConsumerRecord record, Acknowledgment ack, Consumer consumer) {log.info("topic.quick.ack receive : " + record.value());//如果偏移量為偶數則確認消費,否則拒絕消費if (record.offset() % 2 == 0) {log.info(record.offset()+"--ack");ack.acknowledge();} else {log.info(record.offset()+"--nack");kafkaTemplate.send("topic.quick.ack", record.value());}}/*** @Auther:wangli* @Version: 1.0* @create:2019-04-23 17:28* @Description: 使用Consumer.seek方法,重新回到該未ack消息偏移量的位置重新消費,這種可能會導致死循環,原因出現于業務一直沒辦法處理這條數據,但還是不停的重新定位到該數據的偏移量上。* @param record* @param ack* @param consumer* @return void*/@KafkaListener(id = "ack3", topics = "topic.quick.ack", containerFactory = "ackContainerFactory")public void ackListener2(ConsumerRecord record, Acknowledgment ack, Consumer consumer) {log.info("topic.quick.ack receive : " + record.value());//如果偏移量為偶數則確認消費,否則拒絕消費if (record.offset() % 2 == 0) {log.info(record.offset()+"--ack");ack.acknowledge();} else {log.info(record.offset()+"--nack");consumer.seek(new org.apache.kafka.common.TopicPartition("topic.quick.ack",record.partition()),record.offset());}}}二.常見錯誤及注意事項
1.啟動kafka報錯
ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.net.UnknownHostException: centos6.8002: centos6.8002: unknown error-
修改/etc/hosts文件
操作:1、cd etc ——2、vi hosts ——3、在最后一行添加如下所示;
127.0.0.1 主機名 localhost.localdomain localhost
2.啟動之后topic不存在,監聽器報錯
Kafka報錯: Topic(s) [publish] is/are not present and missingTopicsFatal is true報錯原因: 消費監聽接口監聽的主題不存在時,默認會報錯
解決方法: 配置文件中將listener的屬性missingTopicsFatal設置為false
-
方案1:springboot直接在配置文件中加上
spring.kafka.listener.missing-topics-fatal=false
-
方案2:springmvc中設置Kafka監聽器工程參數
ConcurrentKafkaListenerContainerFactory.setMissingTopicsFatal(false);
3.kafka事務消息
-
保證消息不丟失:
- 多broker的情況下設置 ack為 -1 或者all
- 必須設置消費者的隔離級別:isolation.level 為 read_committed,默認是read_uncommitted,防止讀到未提交的數據
- 生產者盡可能的設置較大的重試次數(參數是retries )
- 建議消費者設置成手動提交 enable.auto.commit=false
-
關于事務消息,偏移量不是自增長的問題(kafka發送消息是2的倍數增加)
-
kafka在發送事務消息的時候,有一條消息是記錄本次發送消息的事務狀態的
There is no “2-phase commit” with Kafka transactions. These extra records are not real records, they are markers to indicate whether the previous transaction was successful or not. There is nothing to “get” that is meaningful outside of the broker.
If you publish 10 records in a transaction, it will use 11 slots in the log; if you only publish 1 record, it will use 2 slots.
-
-
kafka在spring中的Xa事務
在spring的XA事務中,應用程序代碼才能通過檢索事務性Kafka資源 ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactory, String, java.time.Duration)。Spring KafkaTemplate會自動檢測線程綁定的Producer并自動參與其中。所以開啟了數據庫事務就不用在開啟KafkaTransactionManager ,注意消費者需要設置事務隔離級別為:READ_COMMITTED,防止未提交讀
-
關于事務的錯誤
java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
- 使用spring-kafka事務消息的時候務必加上: @Transactional注解 ,KafkaTransactionManager 和DataSourceTransactionManager 都是繼承了AbstractPlatformTransactionManager,使用事務消息的時候都需要把注解加上或者開啟本地事務
-
事務開啟失敗(僅供參考)
Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
- 請求過快導致事務開啟的id相同
- 未加@Transactional或者未開啟本地事務,導致事務無法開啟
- 消費者事務隔離級別為read_uncommitted,導致讀取到數據異常
4. kafka 消費者 auto.offset.reset 參數含義
- earliest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
- latest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
- none
topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
ps:
默認建議用earliest。設置該參數后 kafka出錯后重啟,找到未消費的offset可以繼續消費。而latest 這個設置容易丟失消息,假如kafka出現問題,還有數據往topic中寫,這個時候重啟kafka,這個設置會從最新的offset開始消費,中間出問題的哪些就不管了。 none這個設置沒有用過,兼容性太差,經常出問題。
三.kafka日志查看工具
Dump Log Segment
有時候我們需要驗證日志索引是否正確,或者僅僅想從log文件中直接打印消息,我們可以使用kafka.tools.DumpLogSegments類來實現,先來看看它需要的參數:
[iteblog@www.iteblog.com /]$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment. Option Description ------ ----------- --deep-iteration if set, uses deep instead of shallow iteration --files <file1, file2, ...> REQUIRED: The comma separated list of data and index log files to be dumped --key-decoder-class if set, used to deserialize the keys. This class should implement kafka. serializer.Decoder trait. Custom jar should be available in kafka/libs directory. (default: kafka. serializer.StringDecoder) --max-message-size <Integer: size> Size of largest message. (default: 5242880) --print-data-log if set, printing the messages content when dumping data logs --value-decoder-class if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory. (default: kafka.serializer. StringDecoder) --verify-index-only if set, just verify the index log without printing its content很明顯,我們在使用kafka.tools.DumpLogSegments的時候必須輸入–files,這個參數指的就是kafka中Topic分區所在的絕對路徑。分區所在的目錄由config/server.properties文件中log.dirs參數決定。比如我們想看/home/q/kafka/kafka_2.10-0.8.2.1/data/test-4/00000000000034245135.log日志文件的相關情況可以 使用下面的命令:
[xxx/]$ /usr/local/kafka_2.12-2.3.1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /usr/local/kafka_2.12-2.3.1/logs/dyzh.pharmacyWallet.topic.dev-0/00000000000000010708.log Dumping /usr/local/kafka_2.12-2.3.1/logs/dyzh.pharmacyWallet.topic.dev-0/00000000000000010708.log Starting offset: 10708 baseOffset: 10708 lastOffset: 10708 count: 1 baseSequence: 0 lastSequence: 0 producerId: 85094 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1588921890685 size: 100 magic: 2 compresscodec: NONE crc: 2784045922 isvalid: true | offset: 10708 CreateTime: 1588921890685 keysize: -1 valuesize: 32 sequence: 0 headerKeys: [] payload: b7ddad61094f4849a17cedfed4dfb119 baseOffset: 10709 lastOffset: 10709 count: 1 baseSequence: -1 lastSequence: -1 producerId: 85094 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 100 CreateTime: 1588921890700 size: 78 magic: 2 compresscodec: NONE crc: 811342613 isvalid: true | offset: 10709 CreateTime: 1588921890700 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 70 baseOffset: 10710 lastOffset: 10712 count: 3 baseSequence: 0 lastSequence: 2 producerId: 85094 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 178 CreateTime: 1588922828302 size: 178 magic: 2 compresscodec: NONE crc: 182076061 isvalid: true可以看出,這個命令將Kafka中Message中Header的相關信息和偏移量都顯示出來了,但是沒有看到日志的內容,我們可以通過–print-data-log來設置。如果需要查看多個日志文件,可以以逗號分割。
kafka是根據 baseSequence, lastSequence,producerId 判斷數據的冪等性
事務屬性實現前提是冪等性,即在配置事務屬性transaction id時,必須還得配置冪等性;但是冪等性是可以獨立使用的,不需要依賴事務屬性。
-
冪等性引入了Porducer ID
-
事務屬性引入了Transaction Id屬性。、
設置 -
enable.idempotence = true,transactional.id不設置:只支持冪等性。
-
enable.idempotence = true,transactional.id設置:支持事務屬性和冪等性
-
enable.idempotence = false,transactional.id不設置:沒有事務屬性和冪等性的kafka
-
enable.idempotence = false,transactional.id設置:無法獲取到PID,此時會報錯
參考鏈接:
KafkaTransactionManager
Kafka生產者事務和冪等
歡迎大家關注我的微信公眾號共同學習進步:
總結
以上是生活随笔為你收集整理的spring集成kafka,以及常见错误解决的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Android 实现全屏截图、剪裁图片、
- 下一篇: 直播音视频测试心得