日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka完整集群安装

發布時間:2023/12/15 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka完整集群安装 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

kafka集群模式安裝

  • 集群規劃
  • 1.安裝包下載
  • 2.安裝zookeeper集群
  • 3.安裝kafka
    • 1.檢查zookeeper集群是否運行
    • 2.上傳安裝包并解壓
    • 3.修改配置文件
    • 4.同步安裝包到其他服務器
    • 5.啟動和停止kafka
  • 4.集群測試
    • 1.創建一個Topic
    • 2.查看集群中的Topic
    • 3.生產和消費數據測試
      • 模擬生產者來生產數據
      • 模擬消費者消費數據
      • 運行describe topics命令查看分區情況
    • 4. 修改topic分區數
    • 5. 刪除topic

集群規劃

服務器名稱服務器IP安裝的組件
node1192.168.88.11zookeeper,kafka,jdk1.8
node2192.168.88.11zookeeper,kafka,jdk1.8
node3192.168.88.11zookeeper,kafka,jdk1.8

1.安裝包下載

下載鏈接:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.12-2.8.0.tgz

2.安裝zookeeper集群

請參考
zookeeper集群搭建:https://blog.csdn.net/smartsteps/article/details/119815461

3.安裝kafka

1.檢查zookeeper集群是否運行

每臺機器上執行查看運行狀態,一共一個主節點,兩個從節點./zkServer.sh status node1: Mode: follower node2: Mode: follower node3: Mode: leader

2.上傳安裝包并解壓

在node1上執行 解壓到/home目錄下

tar -zxvf kafka_2.12-2.8.0.tgz -C /home/

3.修改配置文件

創建日志文件件 mkdir /home/kafka_2.12-2.8.0/logs 進入配置文件 cd /home/kafka_2.12-2.8.0/configvi server.properties

修改配置文件

broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/home/kafka_2.12-2.8.0/logs num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=node1:2181,node2:2181,node3:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true host.name=node1

4.同步安裝包到其他服務器

scp -r kafka_2.12-2.8.0/ node2:$PWDscp -r kafka_2.12-2.8.0/ node3:$PWD

修改node2 kafka集群的配置

cd /home/kafka_2.12-2.8.0/config vi server.properties broker.id=1 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/home/kafka_2.12-2.8.0/logs num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=node1:2181,node2:2181,node3:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true host.name=node2

修改node3 kafka集群的配置

cd /home/kafka_2.12-2.8.0/config vi server.properties broker.id=2 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/home/kafka_2.12-2.8.0/logs num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=node1:2181,node2:2181,node3:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enble=true host.name=node3

5.啟動和停止kafka

每臺服務器都要執行

cd /home/kafka_2.12-2.8.0 bin/kafka-server-start.sh config/server.properties

后臺啟動

cd /home/kafka_2.12-2.8.0 nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

停止服務

cd /home/kafka_2.12-2.8.0 bin/kafka-server-stop.sh

4.集群測試

1.創建一個Topic

node1執行以下命令來創建topic
創建了一個名字為test的主題, 有三個分區,有兩個副本

cd /home/kafka_2.12-2.8.0 bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 3 --partitions 3 --topic test

2.查看集群中的Topic

查看kafka當中存在的主題
node1使用以下命令來查看kafka當中存在的topic主題

bin/kafka-topics.sh --list --zookeeper node1:2181,node2:2181,node3:2181

3.生產和消費數據測試

模擬生產者來生產數據

node1服務器執行以下命令來模擬生產者進行生產數據

bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test

模擬消費者消費數據

node2服務器執行以下命令來模擬消費者進行消費數據

bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning --topic test

運行describe topics命令查看分區情況

bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic test


這是輸出的解釋。第一行給出了所有分區的摘要,每個附加行提供有關一個分區的信息。由于我們只有一個分 區用于此主題,因此只有一行。

“leader”是負責給定分區的所有讀取和寫入的節點。每個節點將成為隨機選擇的分區部分的領導者。(因為在kafka中 如果有多個副本的話,就會存在leader和follower的關系,表示當前這個副本為leader所在的broker是哪一個)
“replicas”是復制此分區日志的節點列表,無論它們是否為領導者,或者即使它們當前處于活動狀態。(所有副本列表 0 ,1,2)
“isr”是“同步”復制品的集合。這是副本列表的子集,該列表當前處于活躍狀態并且已經被領導者捕獲。(可用的列表 數)

4. 修改topic分區數

bin/kafka-topics.sh --zookeeper node1:2181 --alter --topic test --partitions 2

5. 刪除topic

目前刪除topic在默認情況下知識打上一個刪除的標記,在重新啟動kafka后才刪除。如果需要立即刪除,則需要在 server.properties中配置: delete.topic.enable=true

然后執行以下命令進行刪除topic

bin/kafka-topics.sh --zookeeper node1:2181 --delete --topic test

總結

以上是生活随笔為你收集整理的kafka完整集群安装的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。