日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

消息中间件 --- Kafka快速入门

發布時間:2024/7/23 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 消息中间件 --- Kafka快速入门 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

消息中間件 --- Kafka 快速入門

消息中間件:https://blog.51cto.com/u_9291927/category33

GitHub: GitHub - scorpiostudio/HelloKafka: HelloKafka

  • Kafka快速入門(一)--- Kafka簡介:https://blog.51cto.com/9291927/2493953
  • Kafka快速入門(二)--- Kafka架構:https://blog.51cto.com/9291927/2497814
  • Kafka快速入門(三)--- Kafka核心技術:https://blog.51cto.com/9291927/2497820
  • Kafka快速入門(四)--- Kafka高級功能:https://blog.51cto.com/9291927/2497828
  • Kafka快速入門(五)--- Kafka管理:https://blog.51cto.com/9291927/2497842
  • Kafka快速入門(六)--- Kafka集群部署:https://blog.51cto.com/9291927/2498428
  • Kafka快速入門(七)--- Kafka監控:https://blog.51cto.com/9291927/2498434
  • Kafka快速入門(八)--- Confluent Kafka簡介:https://blog.51cto.com/9291927/2499090
  • Kafka快速入門(九)--- C客戶端:https://blog.51cto.com/9291927/2502001
  • Kafka快速入門(十)--- C++客戶端:https://blog.51cto.com/9291927/2502063
  • Kafka快速入門(十一)--- RdKafka源碼分析:https://blog.51cto.com/9291927/2504489
  • Kafka快速入門(十二)--- Python客戶端:https://blog.51cto.com/9291927/2504495

Python3 學習(五十四):confluent-kafka 模塊的使用

From:https://blog.csdn.net/liao392781/article/details/90487438

coufluent-kafka 是 Python 模塊,是對 librdkafka 的輕量級封裝,librdkafka 又是基于 c/c++ 的kafka 庫,性能上不必多說。使用上要優于 kafka-python。confluent-kafka-python 是 Confluent 用于 Apache Kafka(?Apache Kafka ) 和 Confluent Platform(?Data in Motion Platform for the Enterprise | Confluent )?的 Python 客戶端。

特征:

  • 高性能 :?confluent-kafka-python 是 librdkafka(?https://github.com/edenhill/librdkafka ) 的一個輕量級包裝器,librdkafka是一個 經過精心調優的C客戶端。
  • 可靠性 :?在編寫Apache Kafka客戶端時,有很多細節要做。我們將它們放在一個地方(librdkafka)并在我們所有客戶中利用這項工作(也是匯合 - kafka-go (?https://github.com/confluentinc/confluent-kafka-go ) 和 confluent-kafka-dotnet (?GitHub - confluentinc/confluent-kafka-dotnet: Confluent's Apache Kafka .NET client ))

示例代碼:

# -*- coding: utf-8 -*- # @Author : # @Date : # @File : kafka_operate.py # @description : XXXimport time import datetime from confluent_kafka import avro from confluent_kafka.avro import AvroProducer, AvroConsumer from confluent_kafka.avro.serializer import SerializerError from confluent_kafka import Producer, Consumer, KafkaErrordef delivery_report(err, msg):""" Called once for each message produced to indicate delivery result.Triggered by poll() or flush(). """if err is not None:print('Message delivery failed: {}'.format(err))else:print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))def kafka_producer():p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})while True:try:current_date = str(datetime.datetime.now().replace(microsecond=0))data = current_date# Trigger any available delivery report callbacks from previous produce() callsp.poll(0)# Asynchronously produce a message, the delivery report callback# will be triggered from poll() above, or flush() below, when the message has# been successfully delivered or failed permanently.p.produce('my_topic', data.encode('utf-8'), callback=delivery_report)time.sleep(1)except BaseException as be:print(be)break# Wait for any outstanding messages to be delivered and delivery report# callbacks to be triggered.p.flush()def kafka_consumer():c = Consumer({'bootstrap.servers': 'mybroker','group.id': 'mygroup','auto.offset.reset': 'earliest'})c.subscribe(['my_topic'])while True:msg = c.poll(1.0)if msg is None:continueif msg.error():print("Consumer error: {}".format(msg.error()))continueprint('Received message: {}'.format(msg.value().decode('utf-8')))c.close()def kafka_avro_producer():value_schema_str = """{"namespace": "my.test","name": "value","type": "record","fields" : [{"name" : "name","type" : "string"}]}"""key_schema_str = """{"namespace": "my.test","name": "key","type": "record","fields" : [{"name" : "name","type" : "string"}]}"""value_schema = avro.loads(value_schema_str)key_schema = avro.loads(key_schema_str)value = {"name": "Value"}key = {"name": "Key"}avro_producer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2','schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)avro_producer.produce(topic='my_topic', value=value, key=key)avro_producer.flush()def kafka_avro_consumer():c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2','group.id': 'groupid','schema.registry.url': 'http://127.0.0.1:8081'})c.subscribe(['my_topic'])while True:try:msg = c.poll(10)except SerializerError as e:print("Message deserialization failed for {}: {}".format(msg, e))breakif msg is None:continueif msg.error():print("AvroConsumer error: {}".format(msg.error()))continueprint(msg.value())c.close()if __name__ == '__main__':pass

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的消息中间件 --- Kafka快速入门的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。