Java+Kafka消息队列
生活随笔
收集整理的這篇文章主要介紹了
Java+Kafka消息队列
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
本文主要針對(duì),Java端對(duì)Kafka消息隊(duì)列的生產(chǎn)和消費(fèi)。Kafka的安裝部署,請(qǐng)看查看相關(guān)文章。
筆者最近所用的是Spring mvc,監(jiān)聽(tīng)文件路徑,然后將讀取到的文件內(nèi)容發(fā)送到消息隊(duì)列中。由另外系統(tǒng)去消費(fèi)消息。
當(dāng)然消息隊(duì)列作為消息交換機(jī),本系統(tǒng)既有生產(chǎn)消息也有消費(fèi)消息。不做詳述。
生成者代碼相對(duì)簡(jiǎn)單很多。
package com.dhc.test.kafka;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.log4j.Logger;import java.util.Properties;public class ProducerHandler {private final KafkaProducer<String, String> producer;private static Logger logger = Logger.getLogger(DataInManager.class.getName());public ProducerHandler(String topic,String message) {Properties props = new Properties();props.put("bootstrap.servers”,"127.0.0.1:9092");props.put("acks", "all");props.put("retries", "0");props.put("batch.size", "16384");props.put("linger.ms", "1");props.put("buffer.memory", "33554432");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");producer = new KafkaProducer<String, String>(props);//生成消息ProducerRecord record = new ProducerRecord(topic,message);//發(fā)送消息producer.send(record);logger.info("【kafka】向Kafka的TOPIC【" + topic + "】中發(fā)送消息");logger.info("【kafka】消息內(nèi)容:" + message);logger.info("【kafka】推送成功");} }消費(fèi)者代碼 package com.dhc.test.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.log4j.Logger;import java.util.List; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class ConsumerHandler {static Logger logger = Logger.getLogger(DataInManager.class.getName());private final KafkaConsumer<String, String> consumer;private ExecutorService executors;public ConsumerHandler(List<String> topics) {Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(topics);execute(1);}public void execute(int workerNum) {executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy());Thread t = new Thread(new Runnable(){//啟動(dòng)一個(gè)子線程來(lái)監(jiān)聽(tīng)kafka消息public void run(){while (true) {ConsumerRecords<String, String> records = consumer.poll(200);for (final ConsumerRecord record : records) {logger.info("【Kafka】監(jiān)聽(tīng)到kafka的TOPIC【" + record.topic() + "】的消息");logger.info("【Kafka】消息內(nèi)容:" + record.value());executors.submit(new ConsumerWorker(record));}}}});t.start();}public void shutdown() {if (consumer != null) {consumer.close();}if (executors != null) {executors.shutdown();}try {if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {logger.info("【Kafka】Timeout.... Ignore for this case ");}} catch (InterruptedException ignored) {logger.info("【Kafka】Other thread interrupted this shutdown, ignore for this case.");Thread.currentThread().interrupt();}} }package com.dhc.test.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.log4j.Logger;public class ConsumerWorker implements Runnable {private ConsumerRecord<String, String> consumerRecord;public ConsumerWorker(ConsumerRecord record) {this.consumerRecord = record;}private static Logger logger = Logger.getLogger(DataInManager.class.getName());public void run() {// consumer接收消息后,這里可以寫(xiě)針對(duì)收到的消息的業(yè)務(wù)處理System.out.println(consumerRecord.value());} }main方法啟動(dòng) package com.dhc.test;import com.dhc.test.kafka.ConsumerHandler;import java.util.ArrayList; import java.util.List; import java.util.Properties;public class Start {public static void main(String[] args) throws Exception {// 啟動(dòng)Kafka consumer監(jiān)視List<String> topics = new ArrayList<String>();// 監(jiān)聽(tīng)的消息通道topics.add("test");new ConsumerHandler(topics);} }
謝謝關(guān)注!
總結(jié)
以上是生活随笔為你收集整理的Java+Kafka消息队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Java+面板颜色分块_地图区块颜色分块
- 下一篇: java美元兑换,(Java实现) 美元