java调用kafka
生活随笔
收集整理的這篇文章主要介紹了
java调用kafka
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
pom.xml
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.kafka</groupId>
? ? ? ? ? ? <artifactId>kafka-clients</artifactId>
? ? ? ? ? ? <version>2.3.1</version>
? ? ? ? </dependency>
注意調試通過后要添加log-back.xml并設置日志級別到info及以上,否則debug日志實在是多啊……
///ProducerTest.java
package com.whq.demo; import java.util.Properties; import java.util.Random; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class ProducerTest {public static String topic = "whq_test";//定義主題public static void main(String[] args) throws InterruptedException {Properties p = new Properties();p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.81.145:9092,192.168.81.145:9093,192.168.81.145:9094");//kafka地址,多個地址用逗號分割p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);try {while (true) {String msg = "Hello,你好," + new Random().nextInt(100);ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);kafkaProducer.send(record);System.out.println("消息發送成功:" + msg);Thread.sleep(500);}} finally {kafkaProducer.close();}} }
///ConsumerTest.java
//出現問題:生產者第二次發消息時連接到了127.0.0.1:9093,后來發現是kafka服務器配置問題:
vi config/server.properties
修改
listeners=PLAINTEXT://192.168.81.145:9092 ? ?#注意,這里一定要有客戶端可訪問的ip,否則非本機連接調用第二次會異常
總結
以上是生活随笔為你收集整理的java调用kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: js中的cookie使用和vue-coo
- 下一篇: 实验1.1