Kafka开发指南之 如何Kafka 事务型生产者,保证生产者exactly once
生活随笔
收集整理的這篇文章主要介紹了
Kafka开发指南之 如何Kafka 事务型生产者,保证生产者exactly once
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
目錄
至少一次(at least once)
最多一次(at most once)
精確一次(exactly once)
冪等性
冪等性作用范圍
實現方法
代碼
事務
事務作用范圍
實現方法
代碼
我們知道Kafka的消息交付可靠性保障分為 最多一次(at most once),至少一次(at least once),精確一次(exactly once)
?
至少一次(at least once)
什么時候Producer數據會重復發送 呢?
比如當Producer發送一條數據,當數據發送過去了,由于某種原因Broker沒有反饋給Producer已經提交成功,Producer此時設置了重試機制,retries (設置方法:props.put(ProducerConfig.RETRIES_CONFIG, 5); ),則會再次發送數據,此時會導致數據重復發送
最多一次(at most once)
與at least once 相反,我們把retries 禁止,則就是最多一次,如果禁止重試,會導致數據丟失
?
精確一次(exactly once)
如何實現精確一次呢
Producer 有兩種方法 冪等性與事務型
冪等性
冪等性作用范圍
只能保證單個Producer不會產生重復數據,如果Producer重啟或者多個Producer無法保證數據不重復
實現方法
設置一下配置即可
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)代碼
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import wiki.hadoop.kafka.config.Constant;import java.util.Properties; import java.util.concurrent.ExecutionException;/*** 冪等性生產者** 它只能保證單分區上的冪等性,即一個冪等性 Producer 能夠保證某個主題的一個 分區上不出現重復消息,它無法實現多個分區的冪等性* 它只能實現單會話上的冪等性,不能實現跨會話的冪等性。這里的會話,你可以理 解為 Producer 進程的一次運行。當你重啟了 Producer 進程之后,這種冪等性保 證就喪失了* @author jast* @date 2020/4/19 22:38*/ public class IdempotenceProducer {private static Producer<String, String> producer ;public IdempotenceProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//設置Producer冪等性,其他不用變化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);producer = new KafkaProducer<String, String>(props);}public Producer<String,String> getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {IdempotenceProducer idempotenceProducer = new IdempotenceProducer();Producer<String, String> producer = idempotenceProducer.getProducer();producer.send(new ProducerRecord<String,String>("test","1234")).get();}}?
?
事務
事務作用范圍
全部
實現方法
Producer設置//設置Producer冪等性,其他不用變化 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); //設置事務,同時也要指定冪等性,自定義id名稱 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"jast-acid");-------------------------------------------------------------------Consumer設置//設置只讀事務提交成功后的數據props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());代碼
Producer
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import wiki.hadoop.kafka.config.Constant;import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;/*** Kafka事務提交,保證exactly once producer* 要么全部成功,要么全部失敗* @author jast* @date 2020/4/21 22:38*/ public class TransactionProducer {private static Producer<String, String> producer ;public TransactionProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//設置Producer冪等性,其他不用變化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//設置事務,同時也要指定冪等性,自定義id名稱props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"jast-acid");producer = new KafkaProducer<String, String>(props);}public Producer<String,String> getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {TransactionProducer transactionProducer = new TransactionProducer();Producer<String, String> producer = transactionProducer.getProducer();//初始化事務producer.initTransactions();boolean flag = true;//循環四次,最后一次我們把事務成功提交//理想結果:前三次事務提交失敗// 事務消費者消費不到數據1,2,第四次可以消費到1,2,3,4;// 普通消費者可以消費到前三次的1,2 ,也可以消費到第四次1,2,3,4// 運行方法 TransactionConsumer/*** 結果如下,事務提交成功* 普通消費者消費數據->1 partition:2 offset:3080713* 事務消費者消費數據->3 partition:2 offset:3080717* 普通消費者消費數據->2 partition:1 offset:3081410* 普通消費者消費數據->1 partition:3 offset:3081465* 普通消費者消費數據->1 partition:2 offset:3080715* 普通消費者消費數據->3 partition:2 offset:3080717* 事務消費者消費數據->4 partition:1 offset:3081414* 事務消費者消費數據->2 partition:0 offset:3081470* 事務消費者消費數據->1 partition:3 offset:3081467* 普通消費者消費數據->2 partition:1 offset:3081412* 普通消費者消費數據->4 partition:1 offset:3081414* 普通消費者消費數據->2 partition:0 offset:3081468* 普通消費者消費數據->2 partition:0 offset:3081470* 普通消費者消費數據->1 partition:3 offset:3081467*/for(int i=0;i<=3;i++) {if(i==3)flag = false;try {//事務開始producer.beginTransaction();producer.send(new ProducerRecord<String, String>("test", "1")).get();producer.send(new ProducerRecord<String, String>("test", "2")).get();//手動制造異常if (flag)throw new RuntimeException("程序異常");producer.send(new ProducerRecord<String, String>("test", "3")).get();producer.send(new ProducerRecord<String, String>("test", "4")).get();//事務提交producer.commitTransaction();} catch (Exception e) {//中止事務producer.abortTransaction();e.printStackTrace();}}} }Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.serialization.StringDeserializer; import wiki.hadoop.kafka.config.Constant; import wiki.hadoop.kafka.util.LogInit;import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;/*** 消費Kafka,保證事務性* @author jast* @date 2020/4/21 22:54*/ public class TransactionConsumer {/*** 事務性kafka消費* @return KafkaConsumer<String,String>* @param topic* @param max_poll_records* @param group* @return*/public KafkaConsumer<String, String> transactionConsumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props = new Properties();//-----------------------------------------------------------------------------------//設置只讀事務提交成功后的數據props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());//-----------------------------------------------------------------------------------props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的數量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自動提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public KafkaConsumer<String, String> consumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的數量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自動提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public static void main(String[] args) throws InterruptedException, ExecutionException {TransactionConsumer transactionConsumer = new TransactionConsumer();TransactionConsumer transactionConsumer2 = new TransactionConsumer();KafkaConsumer<String, String> consumer = transactionConsumer.consumer("test", "test", 10, false);KafkaConsumer<String, String> consumer2 = transactionConsumer2.transactionConsumer("test", "test2", 10, false);CompletableFuture.runAsync(()->{while(true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {System.out.println("普通消費者消費數據->" + record.value() + " partition:"+record.partition()+ " offset:"+record.offset());} // System.out.println("普通消費者休眠1秒");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}});CompletableFuture.runAsync(()->{while(true) {ConsumerRecords<String, String> records2 = consumer2.poll(1000);for (ConsumerRecord<String, String> record : records2) {System.out.println("事務消費者消費數據->" + record.value() + " partition:"+record.partition()+ " offset:"+record.offset());} // System.out.println("事務消費者休眠1秒");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}).get();} }?
?
?
總結
以上是生活随笔為你收集整理的Kafka开发指南之 如何Kafka 事务型生产者,保证生产者exactly once的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hbase Memstore刷新方式与R
- 下一篇: libcurl使用方法