日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

大数据 -- zookeeper和kafka集群环境搭建

發(fā)布時(shí)間:2023/12/18 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据 -- zookeeper和kafka集群环境搭建 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一 運(yùn)行環(huán)境

從阿里云申請(qǐng)三臺(tái)云服務(wù)器,這里我使用了兩個(gè)不同的阿里云賬號(hào)去申請(qǐng)?jiān)品?wù)器。我們配置三臺(tái)主機(jī)名分別為zy1,zy2,zy3。

我們通過(guò)阿里云可以獲取主機(jī)的公網(wǎng)ip地址,如下:

?通過(guò)secureRCT連接主機(jī)106.15.74.155,運(yùn)行ifconfig,可以查看其內(nèi)網(wǎng)ip地址:

1、賬號(hào)1申請(qǐng)了兩臺(tái)云服務(wù)器:

主機(jī)zy1的公網(wǎng)ip為:106.15.74.155,內(nèi)網(wǎng)ip為172.19.182.67。

主機(jī)zy2的公網(wǎng)ip為:47.103.134.70,內(nèi)網(wǎng)ip為172.19.14.178。

2、賬號(hào)2申請(qǐng)了一臺(tái)云服務(wù)器:

主機(jī)zy3的公網(wǎng)ip為:47.97.10.51,內(nèi)網(wǎng)ip為172.16.229.255。

3、阿里云入規(guī)則配置

由于主機(jī)位于不同的局域網(wǎng)下,因此需要進(jìn)行一個(gè)公網(wǎng)端口到內(nèi)網(wǎng)端口的映射。在搭建zookeeper和kafka需要使用到2181,2888 ,3888,9092端口。需要在阿里云中配置入規(guī)則,具體可以參考阿里云官方收藏:同一個(gè)地域、不同賬號(hào)下的實(shí)例實(shí)現(xiàn)內(nèi)網(wǎng)互通?。

注意:如果7.103.134.70配置一個(gè)入端口3888,那么對(duì)該47.103.134.70:3888的訪問(wèn)會(huì)實(shí)際映射到172.19.14.178:3888下。如果是同一局域網(wǎng)下的兩個(gè)主機(jī),是不需要配置這個(gè)的,可以直接互4通。

如果想了解更多,可以參考以下博客:

通過(guò)SSH訪問(wèn)阿里云服務(wù)器的原理可以參考-用SSH訪問(wèn)內(nèi)網(wǎng)主機(jī)的方法

云服務(wù)器主機(jī)內(nèi)網(wǎng)ip和外網(wǎng)ip的區(qū)別

一臺(tái)阿里云2臺(tái)騰訊云服務(wù)器搭建Hadoop集群

4、配置/etc/hosts

以主機(jī)zy1為例:配置如下:

注意zy1對(duì)應(yīng)的ip需要配置為內(nèi)網(wǎng)ip,也就是本機(jī)ip:172.19.182.67。而zy2、zy3配置的都是公網(wǎng)ip。

二 JDK安裝

在每個(gè)主機(jī)下執(zhí)行以下操作:

1、安裝之前先查看一下有無(wú)系統(tǒng)自帶jdk

rpm -qa |grep javarpm -qa |grep jdkrpm -qa |grep gcj

如果有就使用批量卸載命令

rpm -qa | grep java | xargs rpm -e --nodeps

2、直接yum安裝1.8.0版本openjdk

yum install java-1.8.0-openjdk* -y

默認(rèn)jre jdk 安裝路徑是/usr/lib/jvm 下面:

3、配置環(huán)境變量

vim /etc/profile #set java environment , appendexport JAVA_HOME=/usr/lib/jvm/javaexport CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jarexport PATH=$PATH:$JAVA_HOME/bin

使得配置生效

. /etc/profile

4、查看版本

echo $JAVA_HOME echo $CLASSPATH java -version

三 安裝zookeeper

在主機(jī)zy1下面執(zhí)行以下操作:

1、下載并解壓

wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz

創(chuàng)建目錄/opt/bigdata:

mkdir /opt/bigdata

解壓文件到/opt/bigdata:

tar -zxvf ?zookeeper-3.4.13.tar.gz -C /opt/bigdata

跳轉(zhuǎn)目錄:

cd /opt/bigdata/zookeeper-3.4.13/

2、復(fù)制配置文件

cp conf/zoo_sample.cfg conf/zoo.cfg

修改配置文件如下:

vim conf/zoo.cfg

其中部分參數(shù)意義如下:

  • server.1=zy1:2888:3888:server.1 這個(gè)1是服務(wù)器的標(biāo)識(shí)也可以是其他的數(shù)字, 表示這個(gè)是第幾號(hào)服務(wù)器,用來(lái)標(biāo)識(shí)服務(wù)器,這個(gè)標(biāo)識(shí)要寫(xiě)到快照目錄下面myid文件里。第一個(gè)端口是master和slave之間的通信端口,默認(rèn)是2888,第二個(gè)端口是leader選舉的端口,集群剛啟動(dòng)的時(shí)候選舉或者leader掛掉之后進(jìn)行新的選舉的端口默認(rèn)是3888
  • dataDir:快照日志的存儲(chǔ)路徑。
  • dataLogDir:事物日志的存儲(chǔ)路徑,如果不配置這個(gè)那么事物日志會(huì)默認(rèn)存儲(chǔ)到dataDir制定的目錄,這樣會(huì)嚴(yán)重影響zk的性能,當(dāng)zk吞吐量較大的時(shí)候,產(chǎn)生的事物日志、快照日志太多。
  • clientPort:這個(gè)端口就是客戶端連接 zookeeper 服務(wù)器的端口,zookeeper 會(huì)監(jiān)聽(tīng)這個(gè)端口,接受客戶端的訪問(wèn)請(qǐng)求。

3、創(chuàng)建myid文件

創(chuàng)建/opt/bigdata/data/zookeeper/zkdata:

mkdir -vp /opt/bigdata/data/zookeeper/zkdata

創(chuàng)建myid文件:

echo 1 > /opt/bigdata/data/zookeeper/zkdata/myid

4、拷貝zookeeper到主機(jī)zy2、zy3

scp -r /opt/bigdata/zookeeper-3.4.13/ zy2:/opt/bigdata/ scp -r /opt/bigdata/zookeeper-3.4.13/ zy3:/opt/bigdata/

5、創(chuàng)建主機(jī)zy2、zy3的myid文件

zyx主機(jī):

創(chuàng)建/opt/bigdata/data/zookeeper/zkdata:

mkdir -vp /opt/bigdata/data/zookeeper/zkdata

創(chuàng)建myid文件:

echo x > /opt/bigdata/data/zookeeper/zkdata/myid

注意:x表示主機(jī)的編號(hào)。

6、配置環(huán)境變量(每個(gè)主機(jī)都需要配置)

vim /etc/profile #set java environment , appendexport ZOOKEEPER_HOME=/opt/bigdata/zookeeper-3.4.13export PATH=$ZOOKEEPER_HOME/bin:$PATH

使得配置生效

. /etc/profile

7、啟動(dòng)服務(wù)并查看

進(jìn)入到zookeeper目錄下,在每個(gè)主機(jī)下分別執(zhí)行

cd /opt/bigdata/zookeeper-3.4.13 bin/zkServer.sh start

檢查服務(wù)狀態(tài)

bin/zkServer.sh status

可以用“jps”查看zk的進(jìn)程,這個(gè)是zk的整個(gè)工程的main

jps

注意:zk集群一般只有一個(gè)leader,多個(gè)follower,主一般是相應(yīng)客戶端的讀寫(xiě)請(qǐng)求,而從主同步數(shù)據(jù),當(dāng)主掛掉之后就會(huì)從follower里投票選舉一個(gè)leader出來(lái)。

8、客戶端連接

zookeeper服務(wù)開(kāi)啟后,進(jìn)入客戶端的命令:

zkCli.sh

更多常用命令參考博客:Kafka在zookeeper中存儲(chǔ)結(jié)構(gòu)和查看方式。

9、出現(xiàn)錯(cuò)誤常用排錯(cuò)手段

1、防火墻

防火墻沒(méi)有關(guān)閉問(wèn)題。解決方式參考:https://blog.csdn.net/weiyongle1996/article/details/73733228

2、端口沒(méi)有開(kāi)啟

如果/etc/hosts全部配置為公網(wǎng):在zy1運(yùn)行zkServer.sh start,查看端口開(kāi)啟狀態(tài):

netstat -an | grep 3888

則會(huì)發(fā)現(xiàn)無(wú)法開(kāi)啟公網(wǎng)3888端口,我們應(yīng)該打開(kāi)的是內(nèi)網(wǎng)機(jī)器對(duì)應(yīng)的端口。
如果端口已經(jīng)開(kāi)啟,可以通過(guò)telnet ip por判斷該端口是否可以從外部訪問(wèn)。

四 安裝kafka

在主機(jī)zy1下執(zhí)行:

1、下載并解壓

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.1/kafka_2.12-2.1.1.tgz tar -zxvf kafka_2.12-2.1.1.tgz -C /opt/bigdata

重命名:

cd /opt/bigdata/ mv kafka_2.12-2.1.1 kafka

2、修改kafka配置文件

?在/opt/bigdata/kafka下:

vim config/server.properties

各個(gè)參數(shù)意義:

  • broker.id=1? #當(dāng)前機(jī)器在集群中的唯一標(biāo)識(shí),和zookeeper的myid性質(zhì)一樣;
  • listeners=PLAINTEXT://主機(jī):9092? #當(dāng)前kafka對(duì)外提供服務(wù)的主機(jī):端口(默認(rèn)是9092);
  • num.network.threads=3? #這個(gè)是borker進(jìn)行網(wǎng)絡(luò)處理的線程數(shù);
  • num.io.threads=8 #這個(gè)是borker進(jìn)行I/O處理的線程數(shù);
  • log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個(gè)目錄可以配置為“,”逗號(hào)分割的表達(dá)式,上面的num.io.threads要大于這個(gè)目錄的個(gè)數(shù)這個(gè)目錄,如果配置多個(gè)目錄,新創(chuàng)建的topic他把消息持久化的地方是,當(dāng)前以逗號(hào)分割的目錄中,那個(gè)分區(qū)數(shù)最少就放那一個(gè);
  • socket.send.buffer.bytes=102400 #發(fā)送緩沖區(qū)buffer大小,數(shù)據(jù)不是一下子就發(fā)送的,先回存儲(chǔ)到緩沖區(qū)了到達(dá)一定的大小后在發(fā)送,能提高性能;
  • socket.receive.buffer.bytes=102400 #kafka接收緩沖區(qū)大小,當(dāng)數(shù)據(jù)到達(dá)一定大小后在序列化到磁盤(pán);
  • socket.request.max.bytes=104857600 #這個(gè)參數(shù)是向kafka請(qǐng)求消息或者向kafka發(fā)送消息的請(qǐng)請(qǐng)求的最大數(shù),這個(gè)值不能超過(guò)java的堆棧大小;
  • num.partitions=1 #默認(rèn)的分區(qū)數(shù),一個(gè)topic默認(rèn)1個(gè)分區(qū)數(shù);
  • log.retention.hours=168 #默認(rèn)消息的最大持久化時(shí)間,168小時(shí),7天;
  • message.max.byte=5242880? #消息保存的最大值5M;
  • default.replication.factor=2? #kafka保存消息的副本數(shù),如果一個(gè)副本失效了,另一個(gè)還可以繼續(xù)提供服務(wù);
  • replica.fetch.max.bytes=5242880? #取消息的最大直接數(shù);
  • log.segment.bytes=1073741824 #這個(gè)參數(shù)是:因?yàn)閗afka的消息是以追加的形式落地到文件,當(dāng)超過(guò)這個(gè)值的時(shí)候,kafka會(huì)新起一個(gè)文件;
  • log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時(shí)間(log.retention.hours=168 ),到目錄查看是否有過(guò)期的消息如果有,刪除;
  • log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能;
  • zookeeper.connect=xx:12181,xx:12181,xx:12181#設(shè)置zookeeper的連接端口;

?注意,這里如果希望在java中創(chuàng)建topic也是多個(gè)備份,需要添加一下屬性

#default replication factors for automatically created topics,默認(rèn)值1;

default.replication.factor=3

#When a producer sets acks to "all" (or "-1"), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful.

#min.insync.replicas and acks allow you to enforce greater durability guarantees,默認(rèn)值1;

min.insync.replicas=3

上面是參數(shù)的解釋,實(shí)際的修改項(xiàng)為:

broker.id=1

listeners=PLAINTEXT://zy1:9092? ? ? ? ? ?#內(nèi)網(wǎng)地址

advertised.listeners=PLAINTEXT://106.15.74.155:9092? ?#公網(wǎng)地址(不然遠(yuǎn)程客戶端無(wú)法訪問(wèn))

log.dirs=/opt/bigdata/kafka/kafka-logs

#此外,可以在log.retention.hours=168 下面新增下面三項(xiàng):

message.max.byte=5242880

default.replication.factor=2

replica.fetch.max.bytes=5242880

#設(shè)置zookeeper的連接端口

zookeeper.connect=zy1:2181,zy2:2181,zy3:2181?

如果我們需要?jiǎng)h除topic,還需要配置一下內(nèi)容:

delete.topic.enable=true

具體參考博客:kafka安裝及刪除Topic,Kafka0.8.2.1刪除topic邏輯。

3、復(fù)制kafka到zy2、zy3

scp -r /opt/bigdata/kafka zy2:/opt/bigdata/
scp -r /opt/bigdata/kafka zy3:/opt/bigdata/

4、修改zy2、zy3的配置文件server.properties

拷貝文件過(guò)去的其他兩個(gè)節(jié)點(diǎn)需要更改broker.id和listeners,以zy2為例:

5、啟動(dòng)kafka

我們可以根據(jù)Kafka內(nèi)帶的zk集群來(lái)啟動(dòng),但是建議使用獨(dú)立的zk集群:

zkServer.sh start 在/opt/bigdata/kafka下 ,三個(gè)節(jié)點(diǎn)分別執(zhí)行如下命令,啟動(dòng)kafka集群:
bin/kafka-server-start.sh config/server.properties &

運(yùn)行命令后服務(wù)確實(shí)后臺(tái)啟動(dòng)了,但日志會(huì)打印在控制臺(tái),而且關(guān)掉命令行窗口,服務(wù)就會(huì)隨之停止,這個(gè)讓我挺困惑的。后來(lái),參考了其他的啟動(dòng)腳本,通過(guò)測(cè)試和調(diào)試最終找到了完全滿足要求的命令。

bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

其中1>/dev/null 2>&1 是將命令產(chǎn)生的輸入和錯(cuò)誤都輸入到空設(shè)備,也就是不輸出的意思。/dev/null代表空設(shè)備。

注意:如果內(nèi)存不足:打開(kāi)kafka安裝位置,在bin目錄下找到kafka-server-start.sh文件,將export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"修改為export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"。

6、驗(yàn)證

思路:以下給出幾條kafka指令。創(chuàng)建一個(gè)topic,一個(gè)節(jié)點(diǎn)作為生產(chǎn)者,兩個(gè)節(jié)點(diǎn)作為消費(fèi)者分別看看能否接收數(shù)據(jù),進(jìn)行驗(yàn)證:

創(chuàng)建及查看topic:

cd /opt/big/data/kafka bin/kafka-topics.sh -list -zookeeper zy1:2181 bin/kafka-topics.sh --create --zookeeper zy1:2181 --replication-factor 3 --partitions 3 --topic zy-test

開(kāi)啟生產(chǎn)者:

bin/kafka-console-producer.sh --broker-list zy1:9092 --topic zy-test

開(kāi)啟消費(fèi)者:

bin/kafka-console-consumer.sh --bootstrap-server zy2:9092 --topic zy-test --from-beginning

?

節(jié)點(diǎn)zy1產(chǎn)生消息,如果消息沒(méi)有清理,在節(jié)點(diǎn)zy2、zy3都可以接收到消息。

7、更多kafka命令

以下是kafka常用命令行總結(jié):??

查看topic的詳細(xì)信息??

bin/kafka-topics.sh -zookeeper zy1:2181 --describe --topic zy-test

可以看到topic包含3個(gè)復(fù)本,每個(gè)副本又分為三個(gè)partition。以zy-test:partition0為例,其leader保存在broker.id=1的主機(jī)上,副本保存在2、3節(jié)點(diǎn)上。其消息保存在配置參數(shù)log.dirs所指定的路徑下:

為topic增加副本??

bin/kafka-reassign-partitions.sh --zookeeper zy1:2181 --reassignment-json-file json/partitions-to-move.json -execute

創(chuàng)建topic?

bin/kafka-topics.sh --create --zookeeper zy1:2181 --replication-factor 3 --partitions 3 --topic zy-test

為topic增加partition??

bin/kafka-topics.sh –-zookeeper zy1:2181 –-alter –-partitions 3 –-topic zy-test

kafka生產(chǎn)者客戶端命令??

bin/kafka-console-producer.sh --broker-list zy1:9092 --topic zy-test

kafka消費(fèi)者客戶端命令??

bin/kafka-console-consumer.sh --bootstrap-server zy2:9092 --topic zy-test --from-beginning

kafka服務(wù)啟動(dòng)??

bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

刪除topic??

bin/kafka-topics.sh --zookeeper zy1:2181 --delete --topic zy-test

8、關(guān)閉kafka

bin/kafka-server-stop.sh

五 consumer offsets

由于Zookeeper并不適合大批量的頻繁寫(xiě)入操作,新版Kafka已推薦將consumer的位移信息保存在kafka內(nèi)部的topic中,即__consumer_offsets topic,并且默認(rèn)提供了kafka_consumer_groups.sh腳本供用戶查看consumer信息。

1、獲取消息在topic中的記錄信息

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list zy1:9092 --topic zy-test

輸出結(jié)果每個(gè)字段分別表示topic、partition、untilOffset(當(dāng)前partition的最大偏移);

上面的輸出結(jié)果表明kafka隊(duì)列總有產(chǎn)生過(guò)4條消息(這并不代表kafka隊(duì)列現(xiàn)在一定有4條消息,因?yàn)閗afka有兩種策略可以刪除舊數(shù)據(jù):基于時(shí)間、基于大小)。

由于我使用kafka-console-producer.sh生成了四條消息:zy、19941108、?liuyan、1。因此kafka消息隊(duì)列中存在4條消息。

2、創(chuàng)建一個(gè)console consumer group

bin/kafka-console-consumer.sh --bootstrap-server zy1:9092 --consumer.config config/consumer.properties --topic zy-test --from-beginning

?

再次建立消費(fèi)者:

會(huì)發(fā)現(xiàn)獲取不到數(shù)據(jù)。這是因?yàn)槲覀冎付讼M(fèi)組,第一次消費(fèi)時(shí)從offset為0開(kāi)始消費(fèi),把4條消息全部讀出,此時(shí)offset移動(dòng)到最后,當(dāng)再次使用同一消費(fèi)組讀取數(shù)據(jù),則會(huì)從上次的offset開(kāi)始獲取數(shù)據(jù)。

而使用:

bin/kafka-console-consumer.sh --bootstrap-server zy1:9092 --topic zy-test --from-beginning

每次都會(huì)獲取四條數(shù)據(jù),這是因?yàn)槊看味紩?huì)創(chuàng)建一個(gè)新的消費(fèi)者,這些消費(fèi)者會(huì)被隨機(jī)分配到一個(gè)不同的組,因此每次都是從offset為0開(kāi)始消費(fèi)。

參數(shù)解釋:

3、?獲取該consumer group的group id

bin/kafka-consumer-groups.sh --bootstrap-server zy1:9092 --list

可以看到有三個(gè)消費(fèi)組,前兩個(gè)消費(fèi)者沒(méi)有指定消費(fèi)組,隨機(jī)產(chǎn)生一個(gè)console-consumer-***的group.ig。

第三個(gè)是我們剛剛在config/consumer.properties 中指定的消費(fèi)組。

4、查看消費(fèi)者組的offset

bin/kafka-consumer-groups.sh --bootstrap-server zy1:9092 --describe --group test-consumer-group

如果此時(shí)再使用生產(chǎn)者客戶端生成兩條消息:

再次查看消費(fèi)組test-consumer-group的消費(fèi)情況:

5、KAFKA API指定位移消費(fèi)

由于我們還沒(méi)有介紹KAFKA的API,這塊內(nèi)容就不先介紹,具體參考博客:Kafka消費(fèi)者 之 指定位移消費(fèi)。

六 各個(gè)端口作用

  • 2888:zookeeper集群三臺(tái)主機(jī)心跳端口;
  • 3888:zookeeper集群三臺(tái)主機(jī)選取leader端口,防止其中一個(gè)宕機(jī)了;
  • 2181:zookeeper服務(wù)器監(jiān)聽(tīng)端口,等待消費(fèi)者連接,消費(fèi)者可以從中獲取topic分區(qū)以及消費(fèi)offset等信息(高版本消費(fèi)者offset已經(jīng)保存在kafka內(nèi)部的topic中了);
  • 9092:kafka服務(wù)器綁定端口,等待生產(chǎn)者和消費(fèi)者連接;

因此上面介紹的kafka命令,與topic相關(guān)的使用--zookeeper zy1:2181,與生產(chǎn)者、消費(fèi)者相關(guān)的使用?--bootstrap-server zy1:9092。

參考博客:

[1]kafka和zookeeper集群搭建詳細(xì)步驟

[2]yum安裝jdk環(huán)境變量配置

[3]使用命令讀取kafka的內(nèi)部topic:__consumer_offsets

[4]kafka學(xué)習(xí)筆記:知識(shí)點(diǎn)整理 - cyfonly - 博客園(推薦)

轉(zhuǎn)載于:https://www.cnblogs.com/zyly/p/11327605.html

總結(jié)

以上是生活随笔為你收集整理的大数据 -- zookeeper和kafka集群环境搭建的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。