日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring Kafka生产者/消费者样本

發布時間:2023/12/3 javascript 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Kafka生产者/消费者样本 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

我的目的是演示Spring Kafka如何為原始Kafka Producer和Consumer API提供一種易于使用且對具有Spring背景的人熟悉的抽象。

示例場景

示例場景是一個簡單的場景,我有一個系統,該系統生成一條消息,另一個系統對其進行處理

使用Raw Kafka Producer / Consumer API的實施

首先,我使用原始的Kafka Producer和Consumer API來實現此方案。 如果您想看一下代碼,可以在我的github倉庫中找到它 。

制片人

以下設置了一個KafkaProducer實例,該實例用于向Kafka主題發送消息:

KafkaProducer<String, WorkUnit> producer = new KafkaProducer<>(kafkaProps, stringKeySerializer(), workUnitJsonSerializer());

我使用了KafkaProducer構造函數的一種變體,該構造函數采用一個自定義的Serializer將域對象轉換為json表示形式。

一旦有KafkaProducer實例可用,就可以將其用于向Kafka集群發送消息,這里我使用了同步版本的發送器,它等待響應返回。

ProducerRecord<String, WorkUnit> record = new ProducerRecord<>("workunits", workUnit.getId(), workUnit);RecordMetadata recordMetadata = this.workUnitProducer.send(record).get();

消費者

在消費者方面,我們創建了一個KafkaConsumer,其中包含構造函數的一種變體,其中包含一個反序列化器 ,該解串器知道如何讀取json消息并將其轉換為域實例:

KafkaConsumer<String, WorkUnit> consumer= new KafkaConsumer<>(props, stringKeyDeserializer(), workUnitJsonValueDeserializer());

一旦KafkaConsumer實例可用,就可以建立一個監聽器循環,以讀取一批記錄,對其進行處理,并等待更多記錄通過:

consumer.subscribe("workunits);try {while (true) {ConsumerRecords<String, WorkUnit> records = this.consumer.poll(100);for (ConsumerRecord<String, WorkUnit> record : records) {log.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",record.topic(), record.partition(), record.offset(), record.key(), record.value());}} } finally {this.consumer.close(); }

使用Spring Kafka的實現

我在github repo中有使用Spring-kafka的實現。

制片人

Spring-Kafka提供了一個KafkaTemplate類,作為KafkaProducer上的包裝器,用于將消息發送到Kafka主題:

@Bean public ProducerFactory<String, WorkUnit> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer()); }@Bean public KafkaTemplate<String, WorkUnit> workUnitsKafkaTemplate() {KafkaTemplate<String, WorkUnit> kafkaTemplate = new KafkaTemplate<>(producerFactory());kafkaTemplate.setDefaultTopic("workunits");return kafkaTemplate; }

需要注意的一件事是,盡管我之前實現了一個自定義的Serializer / Deserializer,以將域類型作為json發送,然后將其轉換回去,但是Spring-Kafka開箱即用地為json提供了Seralizer / Deserializer。

并使用KafkaTemplate發送消息:

SendResult<String, WorkUnit> sendResult = workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();RecordMetadata recordMetadata = sendResult.getRecordMetadata();LOGGER.info("topic = {}, partition = {}, offset = {}, workUnit = {}",recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);

消費者

使用者部分使用偵聽器模式實現,對于已為RabbitMQ / ActiveMQ實現偵聽器的任何人,應該熟悉該模式。 首先是設置偵聽器容器的配置:

@Bean public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConcurrency(1);factory.setConsumerFactory(consumerFactory());return factory; }@Bean public ConsumerFactory<String, WorkUnit> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer()); }

以及響應容器讀取的消息的服務:

@Service public class WorkUnitsConsumer {private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);@KafkaListener(topics = "workunits")public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {log.info("Processing topic = {}, partition = {}, offset = {}, workUnit = {}",topic, partition, offset, workUnit);} }

這樣就避免了像設置原始使用者一樣設置偵聽器循環的所有復雜性,并且很好地被偵聽器容器隱藏了。

結論

我已經遍歷了設置批處理大小,確認的變化以及不同的API簽名的許多內部信息。 我的目的只是演示使用原始Kafka API的常見用例,并展示Spring-Kafka包裝器如何簡化它。

如果您有興趣進一步探索, 可以在這里找到原始生產者消費者樣本,在這里可以找到 Spring Kafka 。

翻譯自: https://www.javacodegeeks.com/2016/11/spring-kafka-producerconsumer-sample.html

總結

以上是生活随笔為你收集整理的Spring Kafka生产者/消费者样本的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。