Apache Kafka-初体验Kafka(02)-Centos7下搭建单节点kafka_配置参数详解_基本命令实操
文章目錄
- 安裝JDK
- 安裝zookeeper
- 安裝kafka
- 下載解壓
- 配置hosts
- 啟動kafka服務(wù)
- server.properties核心配置詳解
- 基本命令
- 創(chuàng)建主題
- 發(fā)送消息
- 消費消息
- 查看組名
- 查看消費者的消費偏移量
- 消費多主題
- 單播消費
- 多播消費
- 小結(jié)
安裝JDK
由于Kafka是用Scala語言開發(fā)的,運行在JVM上,因此在安裝Kafka之前需要先安裝JDK.
這里就不啰嗦了,戳這里—> : Java-CentoOS 7安裝JDK8 (rpm格式 和 tar.gz格式)& 多JDK設(shè)置默認(rèn)的Java 版本
[root@artisan ~]# java -version java version "1.8.0_221" Java(TM) SE Runtime Environment (build 1.8.0_221-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode) [root@artisan ~]#安裝zookeeper
kafka是基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng),所以zk也是必不可少的. kafka內(nèi)置了一個zk, 不建議使用。
zookeeper 也是基于java開發(fā)的,所以也是需要依賴JDK的。
下載地址 -->: https://archive.apache.org/dist/zookeeper/
這里我們下載 3.4.14版本的, 事實上3.x版本的都可以。
[root@artisan zookeeper-3.4.14]# pwd /usr/local/zookeeper-3.4.14# copy配置文件 [root@artisan zookeeper-3.4.14]# cp conf/zoo_sample.cfg conf/zoo.cfg # 啟動 zk [root@artisan zookeeper-3.4.14]# ./bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-3.4.14/bin/../conf/zoo.cfg Starting zookeeper ... STARTED # 查看進(jìn)程 QuorumPeerMain --> zk的進(jìn)程 [root@artisan zookeeper-3.4.14]# jps 3409 QuorumPeerMain 3425 Jps# 查看狀態(tài) [root@artisan zookeeper-3.4.14]# ./bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-3.4.14/bin/../conf/zoo.cfg Mode: standalone [root@artisan zookeeper-3.4.14]# # 客戶端連接 [root@artisan zookeeper-3.4.14]# ./bin/zkCli.sh Connecting to localhost:2181 2019-11-17 07:34:32,522 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT 2019-11-17 07:34:32,527 [myid:] - INFO [main:Environment@100] - Client environment:host.name=192.168.18.130 2019-11-17 07:34:32,527 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_221 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/java/jdk1.8.0_221-amd64/jre 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/usr/local/zookeeper-3.4.14/bin/../zookeeper-server/target/classes:/usr/local/zookeeper-3.4.14/bin/../build/classes:/usr/local/zookeeper-3.4.14/bin/../zookeeper-server/target/lib/*.jar:/usr/local/zookeeper-3.4.14/bin/../build/lib/*.jar:/usr/local/zookeeper-3.4.14/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/local/zookeeper-3.4.14/bin/../lib/slf4j-api-1.7.25.jar:/usr/local/zookeeper-3.4.14/bin/../lib/netty-3.10.6.Final.jar:/usr/local/zookeeper-3.4.14/bin/../lib/log4j-1.2.17.jar:/usr/local/zookeeper-3.4.14/bin/../lib/jline-0.9.94.jar:/usr/local/zookeeper-3.4.14/bin/../lib/audience-annotations-0.5.0.jar:/usr/local/zookeeper-3.4.14/bin/../zookeeper-3.4.14.jar:/usr/local/zookeeper-3.4.14/bin/../zookeeper-server/src/main/resources/lib/*.jar:/usr/local/zookeeper-3.4.14/bin/../conf:.:/usr/java/jdk1.8.0_221-amd64/jre/lib:/usr/java/jdk1.8.0_221-amd64/lib:/usr/java/jdk1.8.0_221-amd64/lib/tools.jar 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA> 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux 2019-11-17 07:34:32,531 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64 2019-11-17 07:34:32,531 [myid:] - INFO [main:Environment@100] - Client environment:os.version=3.10.0-123.el7.x86_64 2019-11-17 07:34:32,531 [myid:] - INFO [main:Environment@100] - Client environment:user.name=root 2019-11-17 07:34:32,531 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root 2019-11-17 07:34:32,531 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/usr/local/zookeeper-3.4.14 2019-11-17 07:34:32,534 [myid:] - INFO [main:ZooKeeper@442] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@25f38edc Welcome to ZooKeeper! 2019-11-17 07:34:32,624 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1025] - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) JLine support is enabled 2019-11-17 07:34:32,940 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@879] - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session 2019-11-17 07:34:33,066 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x1000014c05c0000, negotiated timeout = 30000WATCHER::WatchedEvent state:SyncConnected type:None path:null# 查看zk的根目錄信息,默認(rèn)只有 zookeeper 1個 [zk: localhost:2181(CONNECTED) 0] ls / [zookeeper] [zk: localhost:2181(CONNECTED) 1]安裝kafka
下載解壓
下載地址: https://kafka.apache.org/downloads
先說下 kafka版本的定義
kafka_2.11‐1.1.0 : 2.11 是 Scala的版本 ,1.1.0kafka的版本
【20210102更新】
配置hosts
啟動kafka時會使用linux主機(jī)名關(guān)聯(lián)的ip地址,所以需要把主機(jī)名和linux的ip映射配置到本地host里。
[root@artisan local]# cat /etc/hosts 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.18.130 artisan [root@artisan local]#主要影響的點是這兒 server.properties
如果不配置的話, 那就是用 IP
啟動kafka服務(wù)
啟動腳本語法: kafka-server-start.sh [-daemon] server.properties
server.properties 的配置路徑是一個強(qiáng)制的參數(shù), -daemon 表示以后臺進(jìn)程運行,否則ssh客戶端退出后,就會停止服務(wù)
[root@artisan soft_artisan]# pwd /usr/local/soft_artisan [root@artisan soft_artisan]# tar -xvzf kafka_2.11-1.1.0.tgz -C /usr/local/[root@artisan soft_artisan]# cd /usr/local/kafka_2.11-1.1.0/ [root@artisan kafka_2.11-1.1.0]# cd bin # 后臺啟動kafka [root@artisan bin]# ./kafka-server-start.sh -daemon ../config/server.properties # 查看進(jìn)程 [root@artisan bin]# jps 3409 QuorumPeerMain 11923 Kafka 11942 Jps [root@artisan bin]#查看zk下的節(jié)點信息
[root@artisan bin]# pwd /usr/local/zookeeper-3.4.14/bin [root@artisan bin]# [root@artisan bin]# ./zkCli.sh Connecting to localhost:2181 2019-11-17 10:05:52,083 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.14 ..... ..... ..... # 根節(jié)點下,除了zookeeper以外,都是kafka創(chuàng)建的 #查看zk的根目錄kafka相關(guān)節(jié)點 [zk: localhost:2181(CONNECTED) 0] ls / [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config] [zk: localhost:2181(CONNECTED) 1] ls /brokers [ids, topics, seqid] [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids [0 [zk: localhost:2181(CONNECTED) 3]server.properties核心配置詳解
官方說明: https://kafka.apache.org/documentation/#configuration
我們挑幾個常用的來說下吧
| broker.id | 0 | 每一個broker在集群中的唯一標(biāo)識,非負(fù)數(shù)。當(dāng)該服務(wù)器的IP地址發(fā)生改變時,broker.id沒有變化,則不會影響consumers的消息情況 |
| log.dirs | /tmp/kafka-logs | kafka數(shù)據(jù)的存放地址,多個地址的話用逗號分割,多個目錄分布在不同磁盤上可以提高讀寫性能 /data/kafka-logs-1,/data/kafka-logs-2 |
| listeners | 9092 | server接受客戶端連接的端口 |
| zookeeper.connect | localhost:2181 | zookeeper集群的地址,可以是多個,多個之間用逗號分割 hostname1:port1,hostname2:port2,hostname3:port3 |
| log.retention.hours | 168 | 每個日志文件刪除之前保存的時間。默認(rèn)數(shù)據(jù)保存時間對所有topic都一樣。 |
| min.insync.replicas | 1 | 當(dāng)producer設(shè)置acks為-1時,min.insync.replicas指定replicas的最小數(shù)目 |
| delete.topic.enable | false | 是否允許刪除主題 |
或者參考: apache kafka系列之server.properties配置文件參數(shù)說明
基本命令
官方指導(dǎo): https://kafka.apache.org/quickstart
創(chuàng)建主題
主要是用kafka內(nèi)置的 kafka-topics.sh 腳本 來操作消息 。
我們先來看下如何使用 該shell腳本吧
直接輸入 kafka-topics.sh ,回車可以看到參數(shù)說明。
[root@artisan bin]# pwd /usr/local/kafka_2.11-1.1.0/bin [root@artisan bin]# ./kafka-topics.sh Create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions, replica assignment, and/or configuration for the topic. --config <String: name=value> A topic configuration override for the topic being created or altered.The following is a list of valid configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs. --create Create a new topic. --delete Delete a topic --delete-config <String: name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). --describe List details for the given topics. --disable-rack-aware Disable rack aware replica assignment --force Suppress console prompts --help Print usage information. --if-exists if set when altering or deleting topics, the action will only execute if the topic exists --if-not-exists if set when creating topics, the action will only execute if the topic does not already exist --list List all available topics. --partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected --replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each replication factor> partition in the topic being created. --topic <String: topic> The topic to be create, alter or describe. Can also accept a regular expression except for --create option --topics-with-overrides if set when describing topics, only show topics that have overridden configs --unavailable-partitions if set when describing topics, only show partitions whose leader is not available --under-replicated-partitions if set when describing topics, only show under replicated partitions --zookeeper <String: hosts> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over. [root@artisan bin]#那根據(jù)指導(dǎo),創(chuàng)建個消息吧
--create Create a new topic.創(chuàng)建一個名字為“artisan”的Topic,這個topic只有一個partition,并且備份因子也設(shè)置為1: ./kafka-topics.sh --create --zookeeper 192.168.18.130:2181 --replication-factor 1 --partitions 1 --topic artisan
[root@artisan bin]# ./kafka-topics.sh --create --zookeeper 192.168.18.130:2181 --replication-factor 1 --partitions 1 --topic artisan Created topic "artisan". [root@artisan bin]#我們可以通過以下--list命令來查看kafka中目前存在的topic
[root@artisan bin]# ./kafka-topics.sh --list --zookeeper 192.168.18.130:2181 artisan [root@artisan bin]#除了我們通過手工的方式創(chuàng)建Topic,當(dāng)producer發(fā)布一個消息某個指定的Topic,但是這個Topic并不存在時,會自動創(chuàng)建
刪除主題
# 刪除 [root@artisan bin]# ./kafka-topics.sh --delete --topic artisan --zookeeper 192.168.18.130:2181 Topic artisan is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. # 查看 [root@artisan bin]# ./kafka-topics.sh --list --zookeeper 192.168.18.130:2181 # 新建 [root@artisan bin]# ./kafka-topics.sh --create --zookeeper 192.168.18.130:2181 --replication-factor 1 --partitions 1 --topic artisan Created topic "artisan". [root@artisan bin]#發(fā)送消息
kafka自帶了一個producer命令客戶端,可以從本地文件中讀取內(nèi)容,或者我們也可以以命令行中直接輸入內(nèi)容,并將這些內(nèi)容以消息的形式發(fā)送到kafka集群中。
在默認(rèn)情況下,每一個行會被當(dāng)做成一個獨立的消息。
首先我們要運行發(fā)布消息的腳本,然后在命令中輸入要發(fā)送的消息的內(nèi)容
[root@artisan bin]# ./kafka-console-producer.sh --broker-list 192.168.18.130:9092 --topic artisan >This is a message >This is another message >消費消息
對于consumer,kafka同樣也攜帶了一個命令行客戶端,會將獲取到內(nèi)容在命令中進(jìn)行輸出,默認(rèn)是消費最新的消息.
./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --topic artisan如果想要消費之前的消息可以通過--from-beginning參數(shù)指定,如下命令:
[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --from-beginning --topic artisan This is a message This is another message This is artisan this is anothhhh this is artisan如果你是通過不同的終端窗口來運行以上的命令,你將會看到在producer終端輸入的內(nèi)容,很快就會在consumer的終端窗口上顯示出來。
以上所有的命令都有一些附加的選項;當(dāng)我們不攜帶任何參數(shù)運行命令的時候,將會顯示出這個命令的詳細(xì)用法。
查看組名
[root@artisan bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.18.130:9092 --list Note: This will not show information about old Zookeeper-based consumers. console-consumer-81551 console-consumer-72540 console-consumer-23504 testGroup [root@artisan bin]#查看消費者的消費偏移量
[root@artisan bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.18.130:9092 --describe --group testGroup Note: This will not show information about old Zookeeper-based consumers. Consumer group 'testGroup' has no active members.TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID artisan 0 3 5 2 - - - [root@artisan bin]#current-offset 和 log-end-offset還有 lag ,分別為當(dāng)前消費偏移量,結(jié)束的偏移量(HW),落后消費的偏移量
消費多主題
先創(chuàng)建另外一個topic : xiaogongjiang
[root@artisan bin./kafka-topics.sh --create --zookeeper 192.168.18.130:2181 --replication-factor 1 --partitions 1 --topic xiaogongjiang Created topic "xiaogongjiang". [root@artisan bin]#開啟兩個生產(chǎn)者
[root@artisan bin]# ./kafka-console-producer.sh --broker-list 192.168.18.130:9092 --topic xiaogongjiang >send from xiaogongjiang >[root@artisan bin]# ./kafka-console-producer.sh --broker-list 192.168.18.130:9092 --topic artisan >send from artisan >消費多主題 如下
[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --whitelist "artisan|xiaogongjiang" send from xiaogongjiang send from artisan單播消費
一條消息只能被某一個消費者消費的模式,類似queue模式,只需讓所有消費者在同一個消費組里即可.
分別在兩個客戶端執(zhí)行如下消費命令,然后往主題里發(fā)送消息,結(jié)果只有一個客戶端能收到消息
這樣,生產(chǎn)者跟消費組沒關(guān)系,只要在消費的時候指定消費組即可
生產(chǎn)者
[root@artisan bin]# ./kafka-console-producer.sh --broker-list 192.168.18.130:9092 --topic artisan >queue model test >queue model the second message消費者1
[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --consumer-property group.id=artisanGroup --topic artisan消費者2
[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --consumer-property group.id=artisanGroup --topic artisan queue model test queue model the second message多播消費
一條消息能被多個消費者消費的模式,類似publish-subscribe模式 費,針對Kafka同一條消息只能被同一個消費組下的某一個消費者消費的特性,要實現(xiàn)多播只要保證這些消費者屬于不同的消費組即可。我們再增加一個消費者,該消費者屬于 testGroup-2 消費組, 結(jié)果兩個客戶端都能收到消息.
生產(chǎn)者
[root@artisan bin]# ./kafka-console-producer.sh --broker-list 192.168.18.130:9092 --topic artisan >messge artisan jajaja >消費者1 屬于 anotherArtisanGroup消費組
[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --consumer-property group.id=anotherArtisanGroup --topic artisan messge artisan jajaja消費者1 屬于 artisanGroup 消費組
[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --consumer-property group.id=artisanGroup --topic artisan messge artisan jajaja小結(jié)
到此為止,我們搭建了kafka的單節(jié)點環(huán)境,也演示了基本用法,接下來,我們來搭建一個3個節(jié)點的kafka集群吧。
總結(jié)
以上是生活随笔為你收集整理的Apache Kafka-初体验Kafka(02)-Centos7下搭建单节点kafka_配置参数详解_基本命令实操的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Kafka-初体验Kafk
- 下一篇: Apache Kafka-初体验Kafk