pykafka连接重要使用pykafka,kafka-python的api开发kafka生产者和消费者
https://pykafka.readthedocs.io/en/latest/api/producer.html
說明文檔
</div><h2 class="header">python通過Pykafka庫來連接kafka并收發消息<div class="ui teal label horizontal" data-tooltip="原創">原</div></h2><div class="extra ui horizontal list meta-wrap"><div class="item"><a href="https://my.oschina.net/u/2263272" class="__user"> <div class="osc-avatar small-portrait _35x35 avatar" title="啊哈關關" data-user-id="2263272"><img src="https://static.oschina.net/uploads/user/1131/2263272_50.jpeg?t=1496295926000" alt="啊哈關關" title="啊哈關關"></div> <span>啊哈關關</span></a>發布于 2016/11/15 11:31</div><div class="item">字數 444</div><div class="item">閱讀 9146</div><div class="item collect-btn " data-id="788006" data-user-id="2263272" data-obj-type="3" data-max="99" data-tag-required="" data-current-user-id="" data-recommend-tags="Kafka,Python,pykafka">收藏 <span data-collect-count="" data-id="788006" data-obj-type="3">1</span></div><div class="item"><a class="normal like article-like " data-id="788006">點贊 <span data-article-like-count="">1</span></a></div><div class="item comment-count"><a href="#comments" class="normal"><i class="comment outline icon"></i> 評論 <span data-article-reply-count="">1</span></a></div></div><div class="tags"><a class="ui horizontal label" href="https://my.oschina.net/u/2263272?q=Kafka">Kafka</a><a class="ui horizontal label" href="https://my.oschina.net/u/2263272?q=Python">Python</a><a class="ui horizontal label" href="https://my.oschina.net/u/2263272?q=pykafka">pykafka</a> </div><div class="content" id="articleContent"><div class="ad-wrap"><p style="margin:0 0 10px 0;"><a data-traceid="blog_detail_above_text_link_1" data-tracepid="blog_detail_above_text_link" style="color:#A00;font-weight:bold;" href="https://my.oschina.net/u/2663968/blog/3120060" target="_blank">同樣是后端開發,年薪50萬和年薪20萬的差距在哪里>>> </a> <img src="https://www.oschina.net/img/hot3.png" align="absmiddle" style="max-height: 32px; max-width: 32px;"></p></div><p><strong>1.安裝pykafka</strong></p>
pip install pykafka?
2.下載安裝
git clone?https://github.com/Parsely/pykafka.git
然后將下載下來的pykafka文件夾下的pykafka文件(pykafka的庫文件)放到/Library/Python/2.7/site-packages/路徑下即可
3.假設你有至少一個卡夫卡實例在本地運行,你可以使用pykafka連接它。
consumer.py 消費者
#!/usr/bin/python # -*- coding:utf-8 -*- from pykafka import KafkaClient
#kafka默認端口為9092
client = KafkaClient(hosts=‘192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092’)#這里連接多個客戶端
topic = client.topics[‘test_kafka_topic’]
#從zookeeper消費,zookeeper的默認端口為2181
balanced_consumer = topic.get_balanced_consumer(
consumer_group=‘test_kafka_group’,
auto_commit_enable=True, # 設置為False的時候不需要添加consumer_group,直接連接topic即可取到消息
zookeeper_connect=‘192.168.1.140:2181,192.168.1.141:2181,192.168.1.142:2181’#這里就是連接多個zk
)
for message in balanced_consumer:
# print message
if message is not None:
print message.offset, message.value#打印接收到的消息體的偏移個數和值
producer.py 生產者
#!/usr/bin/python # -*- coding:utf-8 -*-
from pykafka import KafkaClient
client = KafkaClient(hosts =“192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092”) #可接受多個client
#查看所有的topic
client.topics
print client.topics
topic = client.topics[‘test_kafka_topic’]#選擇一個topic
message =“test message test message”
#當有了topic之后呢,可以創建一個producer,來發消息,生產kafka數據,通過字符串形式,
with topic.get_sync_producer() as producer:
producer.produce(message)
#The example above would produce to kafka synchronously -
#the call only returns after we have confirmation that the message made it to the cluster.
#以上的例子將產生kafka同步消息,這個調用僅僅在我們已經確認消息已經發送到集群之后
#但生產環境,為了達到高吞吐量,要采用異步的方式,通過delivery_reports =True來啟用隊列接口;
with topic.get_sync_producer() as producer:
producer.produce(‘test message’,partition_key=’{}’.)
producer=topic.get_producer()
producer.produce(message)
print message
?
</div></div><div class="ui hidden divider"></div><p style="text-align:center;">? 著作權歸作者所有</p></div>
版權聲明:本文為博主原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。 本文鏈接:https://blog.csdn.net/learn_tech/article/details/81115996 <!--一個博主專欄付費入口結束--><link rel="stylesheet" href="https://csdnimg.cn/release/phoenix/template/css/ck_htmledit_views-d284373521.css"><link rel="stylesheet" href="https://csdnimg.cn/release/phoenix/template/css/ck_htmledit_views-d284373521.css"><div class="htmledit_views" id="content_views"><p><strong>轉載地址:</strong><a href="https://blog.csdn.net/ricky110/article/details/79157043" rel="nofollow" data-token="e71796bcb09ebee8196ab936461e656a">https://blog.csdn.net/ricky110/article/details/79157043</a></p>
總結
以上是生活随笔為你收集整理的pykafka连接重要使用pykafka,kafka-python的api开发kafka生产者和消费者的全部內容,希望文章能夠幫你解決所遇到的問題。