日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Golang中使用kafka

發布時間:2024/4/15 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Golang中使用kafka 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

golang中比較好用的kafka client有

  • sarama
  • confluent-kafka-go
  • go_kafka_client
  • optiopay-kafka
  • siesta

其中 sarama的使用者應該是最多的, 然后還有一個sarama的cluster版本?sarama-cluster

本文簡單描述下sarama的一些簡單使用

生產者接口

func producer_test() {fmt.Printf("producer_test\n")config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Partitioner = sarama.NewRandomPartitionerconfig.Producer.Return.Successes = trueconfig.Producer.Return.Errors = trueconfig.Version = sarama.V0_11_0_2producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)if err != nil {fmt.Printf("producer_test create producer error :%s\n", err.Error())return}defer producer.AsyncClose()// send messagemsg := &sarama.ProducerMessage{Topic: "kafka_go_test",Key: sarama.StringEncoder("go_test"),}value := "this is message"for {fmt.Scanln(&value)msg.Value = sarama.ByteEncoder(value)fmt.Printf("input [%s]\n", value)// send to chainproducer.Input() <- msgselect {case suc := <-producer.Successes():fmt.Printf("offset: %d, timestamp: %s", suc.Offset, suc.Timestamp.String())case fail := <-producer.Errors():fmt.Printf("err: %s\n", fail.Err.Error())}} }

消費者接口

func consumer_test() {fmt.Printf("consumer_test")config := sarama.NewConfig()config.Consumer.Return.Errors = trueconfig.Version = sarama.V0_11_0_2// consumerconsumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)if err != nil {fmt.Printf("consumer_test create consumer error %s\n", err.Error())return}defer consumer.Close()partition_consumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)if err != nil {fmt.Printf("try create partition_consumer error %s\n", err.Error())return}defer partition_consumer.Close()for {select {case msg := <-partition_consumer.Messages():fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))case err := <-partition_consumer.Errors():fmt.Printf("err :%s\n", err.Error())}}}

元數據接口

func metadata_test() {fmt.Printf("metadata test\n")config := sarama.NewConfig()config.Version = sarama.V0_11_0_2client, err := sarama.NewClient([]string{"localhost:9092"}, config)if err != nil {fmt.Printf("metadata_test try create client err :%s\n", err.Error())return}defer client.Close()// get topic settopics, err := client.Topics()if err != nil {fmt.Printf("try get topics err %s\n", err.Error())return}fmt.Printf("topics(%d):\n", len(topics))for _, topic := range topics {fmt.Println(topic)}// get broker setbrokers := client.Brokers()fmt.Printf("broker set(%d):\n", len(brokers))for _, broker := range brokers {fmt.Printf("%s\n", broker.Addr())} }

  

轉載于:https://www.cnblogs.com/596014054-yangdongsheng/p/10446828.html

總結

以上是生活随笔為你收集整理的Golang中使用kafka的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。