日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python操作Kafka爬坑

發(fā)布時間:2023/12/10 python 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python操作Kafka爬坑 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

組內(nèi)做大數(shù)據(jù),需要kafka寫入數(shù)據(jù),最近在看python正好,練練手,網(wǎng)上找了一圈,都是用的pykafka,經(jīng)過一整圈的安裝,最終搞定,代碼如下
#coding:u8
import sys
import time
import random
import datetime
import MySQLdb
import codecs
from pykafka import KafkaClient
import logging
import json
import threading
'''
******************
'''
ad=[]
try:
? ini=file("set.txt")
? ad=ini.readline().splitlines()
? ini.close
except Exception as e:
? print "open settings file Error:",type(e)
? ad=["192.168.1.121:9092"]
print "open ini file"
try:
? client = KafkaClient(hosts = ad[0])
? print "Topics:",client.topics
? topic ?= client.topics["mytopic"]
except Exception as e:
? print "Opening kafka Error:%s" %(type(e))
? sys.exit(1)
print "before threading"


try:
? with tp.get_sync_producer() as producer:
? ? producer.produce(str(dct2))
?except Exception as e:
? ? print "Error:" ,type(e)


? print "ini consumer"
? while 1==1:
? ? print "nn",type(consumer)
? ? for message in consumer:
? ? ? print "mm"
? ? ? if message is not None:
? ? ? ? print message.offset, message.value
except Exception as e:
? print e,type(e)


運行結(jié)果,可以列出topic,寫入的數(shù)據(jù)也沒有報錯信息。但是,消費者取不到數(shù)據(jù),無論是kafka直接取,還是python寫消費者代碼。
后來采用了 kafkapython 正常,代碼如下:


#coding:utf-8
import sys
import time
import random
import datetime
import codecs
import kafka.kafkaProducer
import logging
import json
import threading


ad=[]
try:
? ini=file("set.txt")
? ad=ini.readline().splitlines()
? ini.close
except Exception as e:
? ad=["192.168.1.121:9092,192.168.1.122.9092"]
? #print "open settings file Error:%d,%s" %(e.args[0],e.args[1])
? print "Opening settings file Error:",e,type(e)
print "Opened ini file"
'''
try:
? client = KafkaClient(hosts = ad[0])
? print "Topics:",client.topics
? topic ?= client.topics["mytopic"]
except Exception as e:
? print "Opening kafka Error:%s" %(e.args[0])
? sys.exit(1)
print "before threading"
'''
try:
? producer = KafkaProducer(bootstrap_servers=ad[0], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
except Exception as e:
? print "Opening kafka Error:",e,type(e)
? sys.exit(1)




print "before threading"




threads=[]
for i in range(0,12):
? try:
? ? threads.append(threading.Thread(target=tf,args=(producer,i)))
? ? threads[i].start()
? except Exception as e:
? ? print "Treand error at Thread:%d:%s,%s" %(i,e,type(e))




print "main thread is ended"

代碼均有所節(jié)略。



總結(jié)

以上是生活随笔為你收集整理的Python操作Kafka爬坑的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。