日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

Java+Kafka消息队列

發(fā)布時(shí)間:2023/12/31 java 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java+Kafka消息队列 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本文主要針對(duì),Java端對(duì)Kafka消息隊(duì)列的生產(chǎn)和消費(fèi)。Kafka的安裝部署,請(qǐng)看查看相關(guān)文章。


筆者最近所用的是Spring mvc,監(jiān)聽(tīng)文件路徑,然后將讀取到的文件內(nèi)容發(fā)送到消息隊(duì)列中。由另外系統(tǒng)去消費(fèi)消息。

當(dāng)然消息隊(duì)列作為消息交換機(jī),本系統(tǒng)既有生產(chǎn)消息也有消費(fèi)消息。不做詳述。


生成者代碼相對(duì)簡(jiǎn)單很多。

package com.dhc.test.kafka;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.log4j.Logger;import java.util.Properties;public class ProducerHandler {private final KafkaProducer<String, String> producer;private static Logger logger = Logger.getLogger(DataInManager.class.getName());public ProducerHandler(String topic,String message) {Properties props = new Properties();props.put("bootstrap.servers”,"127.0.0.1:9092");props.put("acks", "all");props.put("retries", "0");props.put("batch.size", "16384");props.put("linger.ms", "1");props.put("buffer.memory", "33554432");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");producer = new KafkaProducer<String, String>(props);//生成消息ProducerRecord record = new ProducerRecord(topic,message);//發(fā)送消息producer.send(record);logger.info("【kafka】向Kafka的TOPIC【" + topic + "】中發(fā)送消息");logger.info("【kafka】消息內(nèi)容:" + message);logger.info("【kafka】推送成功");} }消費(fèi)者代碼

package com.dhc.test.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.log4j.Logger;import java.util.List; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class ConsumerHandler {static Logger logger = Logger.getLogger(DataInManager.class.getName());private final KafkaConsumer<String, String> consumer;private ExecutorService executors;public ConsumerHandler(List<String> topics) {Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(topics);execute(1);}public void execute(int workerNum) {executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy());Thread t = new Thread(new Runnable(){//啟動(dòng)一個(gè)子線程來(lái)監(jiān)聽(tīng)kafka消息public void run(){while (true) {ConsumerRecords<String, String> records = consumer.poll(200);for (final ConsumerRecord record : records) {logger.info("【Kafka】監(jiān)聽(tīng)到kafka的TOPIC【" + record.topic() + "】的消息");logger.info("【Kafka】消息內(nèi)容:" + record.value());executors.submit(new ConsumerWorker(record));}}}});t.start();}public void shutdown() {if (consumer != null) {consumer.close();}if (executors != null) {executors.shutdown();}try {if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {logger.info("【Kafka】Timeout.... Ignore for this case ");}} catch (InterruptedException ignored) {logger.info("【Kafka】Other thread interrupted this shutdown, ignore for this case.");Thread.currentThread().interrupt();}} }
package com.dhc.test.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.log4j.Logger;public class ConsumerWorker implements Runnable {private ConsumerRecord<String, String> consumerRecord;public ConsumerWorker(ConsumerRecord record) {this.consumerRecord = record;}private static Logger logger = Logger.getLogger(DataInManager.class.getName());public void run() {// consumer接收消息后,這里可以寫(xiě)針對(duì)收到的消息的業(yè)務(wù)處理System.out.println(consumerRecord.value());} }main方法啟動(dòng)

package com.dhc.test;import com.dhc.test.kafka.ConsumerHandler;import java.util.ArrayList; import java.util.List; import java.util.Properties;public class Start {public static void main(String[] args) throws Exception {// 啟動(dòng)Kafka consumer監(jiān)視List<String> topics = new ArrayList<String>();// 監(jiān)聽(tīng)的消息通道topics.add("test");new ConsumerHandler(topics);} }
謝謝關(guān)注!


總結(jié)

以上是生活随笔為你收集整理的Java+Kafka消息队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。