日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

java实现Kafka生产者示例

發(fā)布時(shí)間:2025/7/14 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java实现Kafka生产者示例 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

使用java實(shí)現(xiàn)Kafka的生產(chǎn)者

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869package com.lisg.kafkatest;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.Partitioner;import kafka.producer.ProducerConfig;import kafka.serializer.StringEncoder;/**?* Kafka生產(chǎn)者?* @author lisg?*?*/public class KafkaProducer {????public static void main(String[] args) {?????????????????Properties props = new Properties();????????//根據(jù)這個(gè)配置獲取metadata,不必是kafka集群上的所有broker,但最好至少有兩個(gè)????????props.put("metadata.broker.list", "vm1:9092,vm2:9092");????????//消息傳遞到broker時(shí)的序列化方式????????props.put("serializer.class", StringEncoder.class.getName());????????//zk集群????????props.put("zookeeper.connect", "vm1:2181");????????//是否獲取反饋????????//0是不獲取反饋(消息有可能傳輸失敗)????????//1是獲取消息傳遞給leader后反饋(其他副本有可能接受消息失敗)????????//-1是所有in-sync replicas接受到消息時(shí)的反饋????????props.put("request.required.acks", "1");//????? props.put("partitioner.class", MyPartition.class.getName());?????????????????//創(chuàng)建Kafka的生產(chǎn)者, key是消息的key的類型, value是消息的類型????????Producer<Integer, String> producer = new Producer<Integer, String>(????????????????new ProducerConfig(props));?????????????????int count = 0;????????while(true) {????????????String message = "message-" + ++count;????????????//消息主題是test????????????KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", message);????????????//message可以帶key, 根據(jù)key來將消息分配到指定區(qū), 如果沒有key則隨機(jī)分配到某個(gè)區(qū)//????????? KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", 1, message);????????????producer.send(keyedMessage);????????????System.out.println("send: " + message);????????????try {????????????????Thread.sleep(1000);????????????} catch (InterruptedException e) {????????????????e.printStackTrace();????????????}????????}?????????//????? producer.close();????} }/**?* 自定義分區(qū)類?*?*/class MyPartition implements Partitioner {????public int partition(Object key, int numPartitions) {????????return key.hashCode()%numPartitions;????}?????}




來自為知筆記(Wiz)

附件列表

?

轉(zhuǎn)載于:https://www.cnblogs.com/lishouguang/p/4560559.html

總結(jié)

以上是生活随笔為你收集整理的java实现Kafka生产者示例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。