pykafka
<!--一個博主專欄付費入口結(jié)束--><link rel="stylesheet" href="https://csdnimg.cn/release/phoenix/template/css/ck_htmledit_views-d284373521.css"><div id="content_views" class="markdown_views prism-atom-one-dark"><!-- flowchart 箭頭圖標(biāo) 勿刪 --><svg xmlns="http://www.w3.org/2000/svg" style="display: none;"><path stroke-linecap="round" d="M5,0 0,2.5 5,5z" id="raphael-marker-block" style="-webkit-tap-highlight-color: rgba(0, 0, 0, 0);"></path></svg><p>python連接kafka的標(biāo)準(zhǔn)庫比較流行的有<br>
1、kafka-python
2、pykafka
kafka-python使用的人多是比較成熟的庫,
pykafka是Samsa的升級版本,使用samsa連接zookeeper然后使用kafka Cluster。
區(qū)別:
pykafka的對zookeeper支持而kafka-python并沒有zk的支持
kafka-python使用
操作文檔
https://kafka-python.readthedocs.io/en/master/apidoc/modules.html
https://kafka-python.readthedocs.io/en/master/index.html
https://pypi.org/project/kafka-python/
生產(chǎn)者
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = [‘192.168.17.64:9092’, ‘192.168.17.65:9092’, ‘192.168.17.68:9092’])
Assign a topic
topic = ‘test’
def test():
print(‘begin’)
n = 1
try:
while (n<=100):
producer.send(topic, str(n).encode())
print(“send” + str(n))
n += 1
time.sleep(0.5)
except KafkaError as e:
print(e)
finally:
producer.close()
print(‘done’)
def test_json():
msg_dict = {
“sleep_time”: 10,
“db_config”: {
“database”: “test_1”,
“host”: “xxxx”,
“user”: “root”,
“password”: “root”
},
“table”: “msg”,
“msg”: “Hello World”
}
msg = json.dumps(msg_dict)
producer.send(topic, msg, partition=0)
producer.close()
if name == ‘main’:
test()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
可能遇到的問題–IOError: [Errno 24] Too many open files–多次創(chuàng)建KafkaProducer
在每個controller函數(shù)中創(chuàng)建一個SimpleProducer。切換到KafkaProducer后,依然在每個controller中創(chuàng)建新的KafkaProducer。如下所示:
try:producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format(kafka_host=app.config['KAFKA_HOST'],kafka_port=app.config['KAFKA_PORT'])])message_string = json.dumps(message)response = producer.send(kafka_topic, message_string.encode('utf-8'))producer.close()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
原因是每次創(chuàng)建KafkaProducer都會占用一個文件符號,controller結(jié)束時,沒有釋放,導(dǎo)致后面無法繼續(xù)創(chuàng)建新的KafkaProducer。
解決方法是創(chuàng)建全局KafkaProducer,供所有controller使用。
注意事項–慎用RecordMetadata.get()
官方例子中有如下的代碼
producer = KafkaProducer(bootstrap_servers=['broker1:1234'])
Asynchronous by default
future = producer.send(‘my-topic’, b’raw_bytes’)
Block for ‘synchronous’ sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed…
log.exception()
pass
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
KafkaProducer.send 返回 RecordMetadata 對象,RecordMetadata.get 可以獲取 record 的信息。但在發(fā)送大量消息時,get方法可能會造成明顯的延時。所以當(dāng)我們不關(guān)心消息是否發(fā)送成功時,就不要調(diào)用get方法了。
消費者
#!/bin/env python
from kafka import KafkaConsumer
#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer(‘test’, group_id = ‘test_group’, bootstrap_servers = [‘192.168.17.64:9092’, ‘192.168.17.65:9092’, ‘192.168.17.68:9092’])
try:
for msg in consumer:
print(msg)
print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition,msg.offset, msg.key, msg.value))
except KeyboardInterrupt, e:
print(e)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
設(shè)置不自動提交
自動提交位移設(shè)為flase, 默認為取最新的偏移量
consumer = kafka.KafkaConsumer(bootstrap_servers = ['192.168.17.64:9092','192.168.17.65:9092','192.168.17.68:9092'],group_id ='test_group_id',auto_offset_reset ='latest', enable_auto_commit = False)
- 1
- 2
- 3
- 4
批量發(fā)送數(shù)據(jù)
from kafka import KafkaClient
from kafka.producer import SimpleProducer
def send_data_2_kafka(datas):
‘’’
向kafka解析隊列發(fā)送數(shù)據(jù)
‘’’
client = KafkaClient(hosts=KAFKABROKER.split(","), timeout=30)
producer = SimpleProducer(client, async=False)
curcount = len(datas)/PARTNUM
for i in range(0, PARTNUM):
start = i*curcount
if i != PARTNUM - 1:
end = (i+1)*curcount
curdata = datas[start:end]
producer.send_messages(TOPICNAME, *curdata)
else:
curdata = datas[start:]
producer.send_messages(TOPICNAME, *curdata)
producer.stop()
client.close()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
其中PARTNUM為topic的partition的數(shù)目,這樣保證批量發(fā)送的數(shù)據(jù)均勻的落在kafka的partition中。
消費者訂閱多個主題
# =======訂閱多個消費者==========
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers=[‘127.0.0.1:9092’])
consumer.subscribe(topics=(‘test’,‘test0’)) #訂閱要消費的主題
print(consumer.topics())
print(consumer.position(TopicPartition(topic=‘test’, partition=0))) #獲取當(dāng)前主題的最新偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
消費者定時拉取
有時候,我們并不需要實時獲取數(shù)據(jù),因為這樣可能會造成性能瓶頸,我們只需要定時去獲取隊列里的數(shù)據(jù)然后批量處理就可以,這種情況,我們可以選擇主動拉取數(shù)據(jù)
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(group_id=‘123456’, bootstrap_servers=[‘10.43.35.25:4531’])
consumer.subscribe(topics=(‘test_rhj’,))
index = 0
while True:
msg = consumer.poll(timeout_ms=5) # 從kafka獲取消息
print msg
time.sleep(2)
index += 1
print ‘--------poll index is %s----------’ % index
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
每次拉取到的都是前面生產(chǎn)的數(shù)據(jù),可能是多條的列表,也可能沒有數(shù)據(jù),如果沒有數(shù)據(jù),則拉取到的為空。
消費者讀取最早偏移量
from kafka import KafkaConsumer
consumer = KafkaConsumer(‘test’,auto_offset_reset=‘earliest’,bootstrap_servers=[‘127.0.0.1:9092’])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
- 1
- 2
- 3
- 4
- 5
- 6
auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默認為latest
源碼定義:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}
消費者手動設(shè)置偏移量
# ==========讀取指定位置消息===============
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(‘test’,bootstrap_servers=[‘127.0.0.1:9092’])
print(consumer.partitions_for_topic(“test”)) #獲取test主題的分區(qū)信息
print(consumer.topics()) #獲取主題列表
print(consumer.subscription()) #獲取當(dāng)前消費者訂閱的主題
print(consumer.assignment()) #獲取當(dāng)前消費者topic、分區(qū)信息
print(consumer.beginning_offsets(consumer.assignment())) #獲取當(dāng)前消費者可消費的偏移量
consumer.seek(TopicPartition(topic=‘test’, partition=0), 5) #重置偏移量,從第5個偏移量消費
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
消費者掛起和恢復(fù)
# ==============消息恢復(fù)和掛起===========
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_servers=[‘127.0.0.1:9092’])
consumer.subscribe(topics=(‘test’))
consumer.topics()
consumer.pause(TopicPartition(topic=u’test’, partition=0)) # pause執(zhí)行后,consumer不能讀取,直到調(diào)用resume后恢復(fù)。
num = 0
while True:
print(num)
print(consumer.paused()) #獲取當(dāng)前掛起的消費者
msg = consumer.poll(timeout_ms=5)
print(msg)
time.sleep(2)
num = num + 1
if num == 10:
print(“resume…”)
consumer.resume(TopicPartition(topic=‘test’, partition=0))
print(“resume…”)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
pykafka使用
操作文檔
http://pykafka.readthedocs.io/en/latest/
https://github.com/Parsely/pykafka
需要注意的點
kafaka和zookeeper的群集,使用samsa的時候生產(chǎn)者和消費者都連接了zookeeper,但pykafka文檔中生產(chǎn)者直接連接kafaka服務(wù)器列表,消費者才用zookeeper。
生產(chǎn)者
#coding=utf-8
import time
from pykafka import KafkaClient
class KafkaTest(object):
“”"
測試kafka常用api
“”"
def init(self, host=“192.168.0.10:9092”):
self.host = host
self.client = KafkaClient(hosts=self.host)
def producer_partition(self, topic):"""生產(chǎn)者分區(qū)查看,主要查看生產(chǎn)消息時offset的變化:return:"""topic = self.client.topics[topic.encode()]partitions = topic.partitionsprint (u"查看所有分區(qū) {}".format(partitions))earliest_offset = topic.earliest_available_offsets()print(u"獲取最早可用的offset {}".format(earliest_offset))# 生產(chǎn)消息之前看看offsetlast_offset = topic.latest_available_offsets()print(u"最近可用offset {}".format(last_offset))# 同步生產(chǎn)消息p = topic.get_producer(sync=True)p.produce(str(time.time()).encode())# 查看offset的變化last_offset = topic.latest_available_offsets()print(u"最近可用offset {}".format(last_offset))def producer_designated_partition(self, topic):"""往指定分區(qū)寫消息,如果要控制打印到某個分區(qū),需要在獲取生產(chǎn)者的時候指定選區(qū)函數(shù),并且在生產(chǎn)消息的時候額外指定一個key:return:"""def assign_patition(pid, key):"""指定特定分區(qū), 這里測試寫入第一個分區(qū)(id=0):param pid: 為分區(qū)列表:param key::return:"""print("為消息分配partition {} {}".format(pid, key))return pid[0]topic = self.client.topics[topic.encode()]p = topic.get_producer(sync=True, partitioner=assign_patition)p.produce(str(time.time()).encode(), partition_key=b"partition_key_0")def async_produce_message(self, topic):"""異步生產(chǎn)消息,消息會被推到一個隊列里面,另外一個線程會在隊列中消息大小滿足一個閾值(min_queued_messages)或到達一段時間(linger_ms)后統(tǒng)一發(fā)送,默認5s:return:"""topic = self.client.topics[topic.encode()]last_offset = topic.latest_available_offsets()print("最近的偏移量 offset {}".format(last_offset))# 記錄最初的偏移量old_offset = last_offset[0].offset[0]p = topic.get_producer(sync=False, partitioner=lambda pid, key: pid[0])p.produce(str(time.time()).encode())s_time = time.time()while True:last_offset = topic.latest_available_offsets()print("最近可用offset {}".format(last_offset))if last_offset[0].offset[0] != old_offset:e_time = time.time()print('cost time {}'.format(e_time-s_time))breaktime.sleep(1)def get_produce_message_report(self, topic):"""查看異步發(fā)送消報告,默認會等待5s后才能獲得報告"""topic = self.client.topics[topic.encode()]last_offset = topic.latest_available_offsets()print("最近的偏移量 offset {}".format(last_offset))p = topic.get_producer(sync=False, delivery_reports=True, partitioner=lambda pid, key: pid[0])p.produce(str(time.time()).encode())s_time = time.time()delivery_report = p.get_delivery_report()e_time = time.time()print ('等待{}s, 遞交報告{}'.format(e_time-s_time, delivery_report))last_offset = topic.latest_available_offsets()print("最近的偏移量 offset {}".format(last_offset))
if name == ‘main’:
host = ‘192.168.0.10:9092,192.168.0.12:9092,192.168.0.13:9092’
kafka_ins = KafkaTest(host)
topic = ‘test’
# kafka_ins.producer_partition(topic)
# kafka_ins.producer_designated_partition(topic)
# kafka_ins.async_produce_message(topic)
kafka_ins.get_produce_message_report(topic)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
可能出現(xiàn)的問題–dliver_report(包括同步)子進程阻塞
多進程使用pykafka共享一個client,會造成只有進程能夠正常的寫入數(shù)據(jù),如果使用了dliver_report(包括同步),會導(dǎo)致子進程徹底阻塞掉不可用
可能出現(xiàn)的問題–Producer.produce accepts a bytes object as message
使用producer.produce發(fā)送數(shù)據(jù)出現(xiàn)故障,如下
#!/bin/env python
from pykafka import KafkaClient
host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092'
client = KafkaClient(hosts = host)
topic = client.topics["test"]
with topic.get_sync_producer() as producer:for i in range(100):producer.produce('test message ' + str(i ** 2))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
報錯:
Traceback (most recent call last):File "TaxiKafkaProduce.py", line 15, in <module>producer.produce(('test message ' + str(i ** 2)))File "/root/anaconda3/lib/python3.6/site-packages/pykafka/producer.py", line 325, in produce"got '%s'", type(message))
TypeError: ("Producer.produce accepts a bytes object as message, but it got '%s'", <class 'str'>)
- 1
- 2
- 3
- 4
- 5
- 6
因為kafka傳遞的是字節(jié),不是字符串,因此在傳遞字符串處encode()即可,分別是client.topics和producer.produce(),如下:
#!/bin/env python
from pykafka import KafkaClient
host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092'
client = KafkaClient(hosts = host)
topic = client.topics["test".encode()]
# 將產(chǎn)生kafka同步消息,這個調(diào)用僅僅在我們已經(jīng)確認消息已經(jīng)發(fā)送到集群之后
with topic.get_sync_producer() as producer:for i in range(100):producer.produce(('test message ' + str(i ** 2)).encode())
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
同步與異步
from pykafka import KafkaClient
#可接受多個client
client = KafkaClient(hosts ="192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092")
#查看所有的topic
client.topics
print client.topics
topic = client.topics[‘test_kafka_topic’]#選擇一個topic
message = “test message test message”
當(dāng)有了topic之后呢,可以創(chuàng)建一個producer,來發(fā)消息,生產(chǎn)kafka數(shù)據(jù),通過字符串形式,
with topic.get_sync_producer() as producer:
producer.produce(message)
以上的例子將產(chǎn)生kafka同步消息,這個調(diào)用僅僅在我們已經(jīng)確認消息已經(jīng)發(fā)送到集群之后
#但生產(chǎn)環(huán)境,為了達到高吞吐量,要采用異步的方式,通過delivery_reports =True來啟用隊列接口;
producer = topic.get_producer(sync=False, delivery_reports=True)
producer.produce(message)
try:
msg, exc = producer.get_delivery_report(block=False)
if exc is not None:
print ‘Failed to deliver msg {}: {}’.format(msg.partition_key, repr(exc))
else:
print ‘Successfully delivered msg {}’.format(msg.partition_key)
except Queue.Empty:
pass
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
消費者
pykafka消費者分為simple和balanced兩種
simple適用于需要消費指定分區(qū)且不需要自動的重分配(自定義)
balanced自動分配則選擇
#coding=utf-8
from pykafka import KafkaClient
class KafkaTest(object):
def init(self, host=“192.168.237.129:9092”):
self.host = host
self.client = KafkaClient(hosts=self.host)
def simple_consumer(self, topic, offset=0):"""消費者指定消費:param offset::return:"""topic = self.client.topics[topic.encode()]partitions = topic.partitionslast_offset = topic.latest_available_offsets()print("最近可用offset {}".format(last_offset)) # 查看所有分區(qū)consumer = topic.get_simple_consumer(b"simple_consumer_group", partitions=[partitions[0]]) # 選擇一個分區(qū)進行消費offset_list = consumer.held_offsetsprint("當(dāng)前消費者分區(qū)offset情況{}".format(offset_list)) # 消費者擁有的分區(qū)offset的情況consumer.reset_offsets([(partitions[0], offset)]) # 設(shè)置offsetmsg = consumer.consume()print("消費 :{}".format(msg.value.decode()))msg = consumer.consume()print("消費 :{}".format(msg.value.decode()))msg = consumer.consume()print("消費 :{}".format(msg.value.decode()))offset = consumer.held_offsetsprint("當(dāng)前消費者分區(qū)offset情況{}".format(offset)) # 3def balance_consumer(self, topic, offset=0):"""使用balance consumer去消費kafka:return:"""topic = self.client.topics["kafka_test".encode()]# managed=True 設(shè)置后,使用新式reblance分區(qū)方法,不需要使用zk,而False是通過zk來實現(xiàn)reblance的需要使用zkconsumer = topic.get_balanced_consumer(b"consumer_group_balanced2", managed=True)partitions = topic.partitionsprint("分區(qū) {}".format(partitions))earliest_offsets = topic.earliest_available_offsets()print("最早可用offset {}".format(earliest_offsets))last_offsets = topic.latest_available_offsets()print("最近可用offset {}".format(last_offsets))offset = consumer.held_offsetsprint("當(dāng)前消費者分區(qū)offset情況{}".format(offset))while True:msg = consumer.consume()offset = consumer.held_offsetsprint("{}, 當(dāng)前消費者分區(qū)offset情況{}".format(msg.value.decode(), offset))
if name == ‘main’:
host = ‘192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092’
kafka_ins = KafkaTest(host)
topic = ‘test’
# kafka_ins.simple_consumer(topic)
kafka_ins.balance_consumer(topic)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
連接zookeeper
>>>> balanced_consumer = topic.get_balanced_consumer(consumer_group='testgroup',auto_commit_enable=True, # 設(shè)置為Flase的時候不需要添加 consumer_groupzookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot' # 這里就是連接多個zk
)
- 1
- 2
- 3
- 4
- 5
使用consumber_group和consumer_id
# -* coding:utf8 *-
from pykafka import KafkaClient
host = ‘192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092’
client = KafkaClient(hosts = host)
print(client.topics)
消費者
topic = client.topics[‘test’.encode()]
consumer = topic.get_simple_consumer(consumer_group=‘test_group’,
# 設(shè)置為False的時候不需要添加consumer_group,直接連接topic即可取到消息
auto_commit_enable=True,
auto_commit_interval_ms=1,
#這里就是連接多個zk
zookeeper_connect=‘192.168.17.64:2181,192.168.17.65:2181,192.168.17.68:2181’
consumer_id=‘test_id’)
for message in consumer:
if message is not None:
#打印接收到的消息體的偏移個數(shù)和值
print(message.offset, message.value)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
可能遇到的問題–AttributeError: ‘SimpleConsumer’ object has no attribute ‘_consumer_group’
因為kafka在傳輸?shù)臅r候需要bytes,而不是str,所以在str上加上b標(biāo)識就可以,如下:
# -* coding:utf8 *-
from pykafka import KafkaClient
host = ‘192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092’
client = KafkaClient(hosts = host)
print(client.topics)
消費者
topic = client.topics[‘test’.encode()]
consumer = topic.get_simple_consumer(consumer_group=b’test_group’, auto_commit_enable=True, auto_commit_interval_ms=1, consumer_id=b’test_id’)
for message in consumer:
if message is not None:
print(message.offset, message.value.decode(‘utf-8’))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
不要重復(fù)消費,對已經(jīng)消費過的信息進行舍棄
不希望消費歷史數(shù)據(jù)的時候,需要使用auto_commit_enable這個參數(shù)
consumer = topic.get_simple_consumer(consumer_group=b'test_group', auto_commit_enable=True, auto_commit_interval_ms=1, consumer_id=b'test_id')
- 1
- 2
- 3
- 4
</div><link href="https://csdnimg.cn/release/phoenix/mdeditor/markdown_views-526ced5128.css" rel="stylesheet"></div>
總結(jié)
- 上一篇: dockerfile kafka
- 下一篇: pykafka连接重要使用pykafka