日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

pykafka连接重要使用pykafka,kafka-python的api开发kafka生产者和消费者

發布時間:2023/11/28 生活经验 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 pykafka连接重要使用pykafka,kafka-python的api开发kafka生产者和消费者 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

https://pykafka.readthedocs.io/en/latest/api/producer.html
說明文檔

                                </div><h2 class="header">python通過Pykafka庫來連接kafka并收發消息<div class="ui teal label horizontal" data-tooltip="原創">原</div></h2><div class="extra ui horizontal list meta-wrap"><div class="item"><a href="https://my.oschina.net/u/2263272" class="__user">                        <div class="osc-avatar small-portrait _35x35 avatar" title="啊哈關關" data-user-id="2263272"><img src="https://static.oschina.net/uploads/user/1131/2263272_50.jpeg?t=1496295926000" alt="啊哈關關" title="啊哈關關"></div>&nbsp;<span>啊哈關關</span></a>發布于 2016/11/15 11:31</div><div class="item">字數 444</div><div class="item">閱讀 9146</div><div class="item collect-btn " data-id="788006" data-user-id="2263272" data-obj-type="3" data-max="99" data-tag-required="" data-current-user-id="" data-recommend-tags="Kafka,Python,pykafka">收藏 <span data-collect-count="" data-id="788006" data-obj-type="3">1</span></div><div class="item"><a class="normal like article-like " data-id="788006">點贊 <span data-article-like-count="">1</span></a></div><div class="item comment-count"><a href="#comments" class="normal"><i class="comment outline icon"></i> 評論 <span data-article-reply-count="">1</span></a></div></div><div class="tags"><a class="ui horizontal label" href="https://my.oschina.net/u/2263272?q=Kafka">Kafka</a><a class="ui horizontal label" href="https://my.oschina.net/u/2263272?q=Python">Python</a><a class="ui horizontal label" href="https://my.oschina.net/u/2263272?q=pykafka">pykafka</a>                            </div><div class="content" id="articleContent"><div class="ad-wrap"><p style="margin:0 0 10px 0;"><a data-traceid="blog_detail_above_text_link_1" data-tracepid="blog_detail_above_text_link" style="color:#A00;font-weight:bold;" href="https://my.oschina.net/u/2663968/blog/3120060" target="_blank">同樣是后端開發,年薪50萬和年薪20萬的差距在哪里&gt;&gt;&gt; </a>  <img src="https://www.oschina.net/img/hot3.png" align="absmiddle" style="max-height: 32px; max-width: 32px;"></p></div><p><strong>1.安裝pykafka</strong></p> 

pip install pykafka?

2.下載安裝

git clone?https://github.com/Parsely/pykafka.git

然后將下載下來的pykafka文件夾下的pykafka文件(pykafka的庫文件)放到/Library/Python/2.7/site-packages/路徑下即可

3.假設你有至少一個卡夫卡實例在本地運行,你可以使用pykafka連接它。

consumer.py 消費者

#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient

#kafka默認端口為9092
client = KafkaClient(hosts=‘192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092’)#這里連接多個客戶端
topic = client.topics[‘test_kafka_topic’]

#從zookeeper消費,zookeeper的默認端口為2181
balanced_consumer = topic.get_balanced_consumer(
consumer_group=‘test_kafka_group’,
auto_commit_enable=True, # 設置為False的時候不需要添加consumer_group,直接連接topic即可取到消息
zookeeper_connect=‘192.168.1.140:2181,192.168.1.141:2181,192.168.1.142:2181’#這里就是連接多個zk
)

for message in balanced_consumer:
# print message
if message is not None:
print message.offset, message.value#打印接收到的消息體的偏移個數和值

producer.py 生產者

#!/usr/bin/python
# -*- coding:utf-8 -*-

from pykafka import KafkaClient

client = KafkaClient(hosts =“192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092”) #可接受多個client
#查看所有的topic
client.topics
print client.topics

topic = client.topics[‘test_kafka_topic’]#選擇一個topic

message =“test message test message”
#當有了topic之后呢,可以創建一個producer,來發消息,生產kafka數據,通過字符串形式,
with topic.get_sync_producer() as producer:
producer.produce(message)
#The example above would produce to kafka synchronously -
#the call only returns after we have confirmation that the message made it to the cluster.
#以上的例子將產生kafka同步消息,這個調用僅僅在我們已經確認消息已經發送到集群之后

#但生產環境,為了達到高吞吐量,要采用異步的方式,通過delivery_reports =True來啟用隊列接口;
with topic.get_sync_producer() as producer:
producer.produce(‘test message’,partition_key=’{}’.)
producer=topic.get_producer()
producer.produce(message)
print message

?

                                    </div></div><div class="ui hidden divider"></div><p style="text-align:center;">? 著作權歸作者所有</p></div>
版權聲明:本文為博主原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。 本文鏈接:https://blog.csdn.net/learn_tech/article/details/81115996
         <!--一個博主專欄付費入口結束--><link rel="stylesheet" href="https://csdnimg.cn/release/phoenix/template/css/ck_htmledit_views-d284373521.css"><link rel="stylesheet" href="https://csdnimg.cn/release/phoenix/template/css/ck_htmledit_views-d284373521.css"><div class="htmledit_views" id="content_views"><p><strong>轉載地址:</strong><a href="https://blog.csdn.net/ricky110/article/details/79157043" rel="nofollow" data-token="e71796bcb09ebee8196ab936461e656a">https://blog.csdn.net/ricky110/article/details/79157043</a></p>

總結

以上是生活随笔為你收集整理的pykafka连接重要使用pykafka,kafka-python的api开发kafka生产者和消费者的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。