日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

云服务器安装kafka及python连接测试

發布時間:2023/12/20 python 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 云服务器安装kafka及python连接测试 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

云服務器環境:CentOS 7.6

環境準備(版本見解壓命令):

  • java環境jdk
  • zookeeper
  • Kafka

傳送門:kafka安裝包,包括jdk,zookeeper和kafka

jdk環境安裝

將下載的包上傳到服務器/opt/software,解壓

tar -zxvf jdk-8u311-linux-x64.tar.gz tar -xzvf kafka_2.12-2.3.1.tgz tar -zxvf apache-zookeeper-3.5.6.tar.gz

重命名,將三個文件夾移動到/usr/local目錄下并重新命名

mv kafka_2.12-2.3.1 ./kafka mv apache-zookeeper-3.5.6 ./zookeepercp -r jdk1.8.0_311/ /usr/local/ cp -r zookeeper/ /usr/local/ cp -r kafka /usr/local/

打開文件環境變量的文件并配置

vim /etc/profile

jdk和zookeeper配置如下

export JAVA_HOME=/usr/local/jdk1.8.0_311 export JRE_HOME=${JAVA_HOME}/jre export ZOOKEEPER_HOME=/usr/local/zookeeper export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin:${ZOOKEEPER_HOME}/bin: export PATH=$PATH:${JAVA_PATH}

使配置文件生效:

source /etc/profile

查看是否安裝成功,成功如下圖所示

java -version

zookeeper環境配置

cd /usr/local/zookeeper/conf/cp ./zoo_sample.cfg zoo.cfg

打開zoo.cfg配置文件

vim zoo.cfg

修改dataDir和dataLogDir配置

dataDir=/usr/local/zookeeper/dataDir dataLogDir=/usr/local/zookeeper/dataLogDir

在/usr/local/zookeeper/bin/下面啟動zookeeper

./zkServer.sh start./zkServer.sh status

成功如下圖所示:

kafka安裝:

在/usr/local/kafka/config下修改server.properties配置,單機只需要配置logs目錄

# 用于本地代碼測試線上的kafka的配置 listeners=PLAINTEXT://內網ip:9092 advertised.listeners=PLAINTEXT://外網ip:9092# 單機只需配置,省略新建logs文件夾目錄 log.dirs=/usr/local/kafka/logs

在/usr/local/kafka/bin目錄下啟動kafka

cd /usr/local/kafka/bin

啟動kafka:

./kafka-server-start.sh -daemon ../config/server.properties

查看進程狀態:

jps


創建topic:

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic csdn_test

查看topic列表:

./kafka-topics.sh --list --zookeeper localhost:2181

創建生產者測試:

./kafka-console-producer.sh --broker-list localhost:9092 --topic csdn_test


創建消費者測試:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic csdn_test --from-beginning

python本地代碼測試

生產者測試:

import json from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='外網ip:9092')msg = "Hello World".encode('utf-8') # 發送內容,必須是bytes類型 producer.send('test_2', msg) # 發送的topic為test producer.close()

前往服務器/usr/local/kafka/bin查看topic列表,看列表中是否有test_2

./kafka-topics.sh --list --zookeeper localhost:2181


如果無法找到這個topic,請參考前面kafka的conf的配置文件server.properties修改如下配置

listeners=PLAINTEXT://內網ip:9092 advertised.listeners=PLAINTEXT://外網ip:9092

消費者測試:

from kafka import KafkaConsumerconsumer = KafkaConsumer('test_2', bootstrap_servers=['外網ip:9092']) for msg in consumer:print(msg)recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)

總結

以上是生活随笔為你收集整理的云服务器安装kafka及python连接测试的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。