Python操作Kafka爬坑
生活随笔
收集整理的這篇文章主要介紹了
Python操作Kafka爬坑
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
組內(nèi)做大數(shù)據(jù),需要kafka寫入數(shù)據(jù),最近在看python正好,練練手,網(wǎng)上找了一圈,都是用的pykafka,經(jīng)過一整圈的安裝,最終搞定,代碼如下
#coding:u8
import sys
import time
import random
import datetime
import MySQLdb
import codecs
from pykafka import KafkaClient
import logging
import json
import threading
'''
******************
'''
ad=[]
try:
? ini=file("set.txt")
? ad=ini.readline().splitlines()
? ini.close
except Exception as e:
? print "open settings file Error:",type(e)
? ad=["192.168.1.121:9092"]
print "open ini file"
try:
? client = KafkaClient(hosts = ad[0])
? print "Topics:",client.topics
? topic ?= client.topics["mytopic"]
except Exception as e:
? print "Opening kafka Error:%s" %(type(e))
? sys.exit(1)
print "before threading"
try:
? with tp.get_sync_producer() as producer:
? ? producer.produce(str(dct2))
?except Exception as e:
? ? print "Error:" ,type(e)
? print "ini consumer"
? while 1==1:
? ? print "nn",type(consumer)
? ? for message in consumer:
? ? ? print "mm"
? ? ? if message is not None:
? ? ? ? print message.offset, message.value
except Exception as e:
? print e,type(e)
運行結(jié)果,可以列出topic,寫入的數(shù)據(jù)也沒有報錯信息。但是,消費者取不到數(shù)據(jù),無論是kafka直接取,還是python寫消費者代碼。
后來采用了 kafkapython 正常,代碼如下:
#coding:utf-8
import sys
import time
import random
import datetime
import codecs
import kafka.kafkaProducer
import logging
import json
import threading
ad=[]
try:
? ini=file("set.txt")
? ad=ini.readline().splitlines()
? ini.close
except Exception as e:
? ad=["192.168.1.121:9092,192.168.1.122.9092"]
? #print "open settings file Error:%d,%s" %(e.args[0],e.args[1])
? print "Opening settings file Error:",e,type(e)
print "Opened ini file"
'''
try:
? client = KafkaClient(hosts = ad[0])
? print "Topics:",client.topics
? topic ?= client.topics["mytopic"]
except Exception as e:
? print "Opening kafka Error:%s" %(e.args[0])
? sys.exit(1)
print "before threading"
'''
try:
? producer = KafkaProducer(bootstrap_servers=ad[0], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
except Exception as e:
? print "Opening kafka Error:",e,type(e)
? sys.exit(1)
print "before threading"
threads=[]
for i in range(0,12):
? try:
? ? threads.append(threading.Thread(target=tf,args=(producer,i)))
? ? threads[i].start()
? except Exception as e:
? ? print "Treand error at Thread:%d:%s,%s" %(i,e,type(e))
print "main thread is ended"
#coding:u8
import sys
import time
import random
import datetime
import MySQLdb
import codecs
from pykafka import KafkaClient
import logging
import json
import threading
'''
******************
'''
ad=[]
try:
? ini=file("set.txt")
? ad=ini.readline().splitlines()
? ini.close
except Exception as e:
? print "open settings file Error:",type(e)
? ad=["192.168.1.121:9092"]
print "open ini file"
try:
? client = KafkaClient(hosts = ad[0])
? print "Topics:",client.topics
? topic ?= client.topics["mytopic"]
except Exception as e:
? print "Opening kafka Error:%s" %(type(e))
? sys.exit(1)
print "before threading"
try:
? with tp.get_sync_producer() as producer:
? ? producer.produce(str(dct2))
?except Exception as e:
? ? print "Error:" ,type(e)
? print "ini consumer"
? while 1==1:
? ? print "nn",type(consumer)
? ? for message in consumer:
? ? ? print "mm"
? ? ? if message is not None:
? ? ? ? print message.offset, message.value
except Exception as e:
? print e,type(e)
運行結(jié)果,可以列出topic,寫入的數(shù)據(jù)也沒有報錯信息。但是,消費者取不到數(shù)據(jù),無論是kafka直接取,還是python寫消費者代碼。
后來采用了 kafkapython 正常,代碼如下:
#coding:utf-8
import sys
import time
import random
import datetime
import codecs
import kafka.kafkaProducer
import logging
import json
import threading
ad=[]
try:
? ini=file("set.txt")
? ad=ini.readline().splitlines()
? ini.close
except Exception as e:
? ad=["192.168.1.121:9092,192.168.1.122.9092"]
? #print "open settings file Error:%d,%s" %(e.args[0],e.args[1])
? print "Opening settings file Error:",e,type(e)
print "Opened ini file"
'''
try:
? client = KafkaClient(hosts = ad[0])
? print "Topics:",client.topics
? topic ?= client.topics["mytopic"]
except Exception as e:
? print "Opening kafka Error:%s" %(e.args[0])
? sys.exit(1)
print "before threading"
'''
try:
? producer = KafkaProducer(bootstrap_servers=ad[0], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
except Exception as e:
? print "Opening kafka Error:",e,type(e)
? sys.exit(1)
print "before threading"
threads=[]
for i in range(0,12):
? try:
? ? threads.append(threading.Thread(target=tf,args=(producer,i)))
? ? threads[i].start()
? except Exception as e:
? ? print "Treand error at Thread:%d:%s,%s" %(i,e,type(e))
print "main thread is ended"
代碼均有所節(jié)略。
總結(jié)
以上是生活随笔為你收集整理的Python操作Kafka爬坑的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 兴业信用卡随借金占用额度吗
- 下一篇: est.java 2 错误 找不到符号_