日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java生产者消费者代码_Java实现Kafka生产者消费者代码实例

發(fā)布時間:2023/12/10 java 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java生产者消费者代码_Java实现Kafka生产者消费者代码实例 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Kafka的結構與RabbitMQ類似,消息生產者向Kafka服務器發(fā)送消息,Kafka接收消息后,再投遞給消費者。

生產者的消費會被發(fā)送到Topic中,Topic中保存著各類數(shù)據,每一條數(shù)據都使用鍵、值進行保存。

每一個Topic中都包含一個或多個物理分區(qū)(Partition),分區(qū)維護著消息的內容和索引,它們有可能被保存在不同服務器。

新建一個Maven項目,pom.xml 加入依賴:

org.apache.kafka

kafka-clients

2.3.0

1、編寫生產者

將消息投遞到Kafka服務器的名稱為“topic1”的Topic中

package com.example.kafkatest;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer {

public static void main(String[] args) {

//配置信息

Properties props = new Properties();

//kafka服務器地址

props.put("bootstrap.servers", "localhost:9092");

//設置數(shù)據key和value的序列化處理類

props.put("key.serializer", StringSerializer.class);

props.put("value.serializer", StringSerializer.class);

//創(chuàng)建生產者實例

KafkaProducer producer = new KafkaProducer<>(props);

ProducerRecord record = new ProducerRecord("topic1", "userName", "lc");

//發(fā)送記錄

producer.send(record);

producer.close();

}

}

運行后,可打開命令行工具,進入Kafka目錄,執(zhí)行命令查詢服務器的Topic:

bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

結果如下:

2、編寫消費者

本例中,消費者和生產者在同一個項目中,只是使用不同的啟動類。

消費者會為自已指定一個消費者組的標識,每一條發(fā)布到Topic的記錄,都會被交付給消費者組的一個消費者實例。

如果多個消費者實例有相同的消費者組,則這些記錄會分配到各個消費者實例上,以達到負載均衡的目錄。

如果所有的消費者有不同的消費者組,則每一條記錄都會廣播到全部的消費者進行處理。

package com.example.rabbittest;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;

import java.util.Arrays;

import java.util.Properties;

public class Consumer {

public static void main(String[] args) {

//配置信息

Properties props = new Properties();

//kafka服務器地址

props.put("bootstrap.servers", "localhost:9092");

//必須指定消費者組

props.put("group.id", "test");

//設置數(shù)據key和value的序列化處理類

props.put("key.deserializer", StringDeserializer.class);

props.put("value.deserializer", StringDeserializer.class);

//創(chuàng)建消息者實例

KafkaConsumer consumer = new KafkaConsumer<>(props);

//訂閱topic1的消息

consumer.subscribe(Arrays.asList("topic1"));

//到服務器中讀取記錄

while (true){

ConsumerRecords records = consumer.poll(Duration.ofMillis(100));

for(ConsumerRecord record : records){

System.out.println("key:" + record.key() + "" + ",value:" + record.value());

}

}

}

}

運行后,IDEA控制臺其中輸出如下:

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

總結

以上是生活随笔為你收集整理的java生产者消费者代码_Java实现Kafka生产者消费者代码实例的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。