日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka配置

發(fā)布時間:2024/4/13 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka配置 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

服務(wù)器端

安裝相關(guān)

參數(shù)意義
broker.idbroker 的編號,如果集群中有多個broker ,則每個broker 的編號需要設(shè)置的不同
portkafka監(jiān)聽端口,一般默認(rèn)是9092,使用1024以前端口的話就需要root權(quán)限。E.g:開啟線程消費時用到的就是這個端口
zookeeper.connect

用于保存broker元數(shù)據(jù)的zookeeper地址是通過該參數(shù)指定的,當(dāng)producer向kafka topic發(fā)布消息事,指定的就是zookeeper.connect這個路徑。多個用逗號分開,例如:

debugo01:2181,debugo02,debugo03

最佳的實踐方式是再加一個chroot 路徑,這樣既可以明確指明該chroot 路徑下的節(jié)點是為Kafka所用的,也可以實現(xiàn)多個Kafka 集群復(fù)用一套ZooKeeper 集群,這樣可以節(jié)省更多的硬件資源。包含chroot 路徑的配置類似于localhost1:2181 , localhost2:2181,
localhost3:2181/kafka 這種,如果不指定chroot,那么默認(rèn)使用ZooKeeper 的根路徑

zookeeper.connection.timeout.ms連接zk的超時時間
zookeeper.sync.time.msZooKeeper集群中l(wèi)eader和follower之間的同步實際

log.dirs/log.dir

日志存放目錄,多個目錄使用逗號分割。

Kafka是把所有消息都保存在在磁盤上,存放目錄即通過該參數(shù)指定的.

一般情況下,log.dir用來配置單個根目錄,而log.dirs用來配置多個根目錄(以逗號分隔〉,但是Kafka井沒有對此做強制性限制,也就是說,log.dir和log.dirs都可以用來配置單個或多個根目錄。log.dirs的優(yōu)先級比log.dir高,但是如果沒有配置log.dirs,則會以log.dir配置為準(zhǔn)

listeners

客戶端要連接broker的入口地址列表。


配置格式為protocoll://hostnamel:portl,protocol2://hostname2:port2,其中protocol代表協(xié)議類型,Kafka當(dāng)前支持的協(xié)議類型有PLAINTEXT、SSL、SASL_SSL等,
如果未開啟安全認(rèn)證,則使用簡單的PLAINTEXT即可。hostname代表主機名,p。此代表服務(wù)端口,此參數(shù)的默認(rèn)值為null。比如此參數(shù)配置為PLAINTEXT://198.162.0.2:9092,如
果有多個地址,則中間以逗號隔開。如果不指定主機名,則表示綁定默認(rèn)網(wǎng)卡,注意有可能會綁定到127.0.0.1,這樣無法對外提供服務(wù),所以主機名最好不要為空;如果主機名是0.0.0.0,
則表示綁定所有的網(wǎng)卡。

?

advertised.listeners

與此參數(shù)關(guān)聯(lián)的還有advertised.listeners,作用和listeners類似,默認(rèn)值也為null。

不過advertised.listeners主要用于IaaS(InfrastructureasaService)環(huán)境,比如公有云上的機器通常配備有多塊網(wǎng)卡,即包含私網(wǎng)網(wǎng)卡和公網(wǎng)網(wǎng)卡,對于這種情況而言,可以設(shè)置advertised.listeners參數(shù)綁定公網(wǎng)IP供外部客戶端使用,而配置listeners參數(shù)來綁定私網(wǎng)IP地址供broker間通信使用。

??

運行相關(guān)

Producer端

參數(shù)意義
buffer.memoryRecordAccumulator緩存的大小。默認(rèn)值為33554432B,即32MB
max.block.msRecordAccumulator緩存空間不足時,KafkaProducer的send()方法調(diào)用要么被阻塞的時長。超時則拋出異常。默認(rèn)值為60000,即60秒。
batch.size

客戶端發(fā)送消息時,BufferPool中可以放進去的ByteBuffer,大小。默認(rèn)值為16384B,即16KB。

ProducerBatch創(chuàng)建時,如果不超過此值,則根據(jù)此值創(chuàng)建,并能夠通過BufferPool復(fù)用。如果大于此值,則以實際大小創(chuàng)建,不會被復(fù)用。

acks

這個參數(shù)用來指定分區(qū)中必須要有多少個副本收到這條消息,之后生產(chǎn)者才會認(rèn)為這條消息是成功寫入的。acks 是生產(chǎn)者客戶端中一個非常重要的參數(shù),它涉及消息的可靠性和吞吐量之間的權(quán)衡。acks 參數(shù)有3 種類型的值(都是字符串類型):

acks = 1 。默認(rèn)值即為l 。生產(chǎn)者發(fā)送消息之后,只要分區(qū)的leader 副本成功寫入消息,那么它就會收到來自服務(wù)端的成功響應(yīng)
acks = 0 。生產(chǎn)者發(fā)送消息之后不需要等待任何服務(wù)端的響應(yīng)
acks =-1 或acks =all 。生產(chǎn)者在消息發(fā)送之后,需要等待ISR 中的所有副本都成功寫入消息之后才能夠收到來自服務(wù)端的成功響應(yīng)。

max.request.size這個參數(shù)用來限制生產(chǎn)者客戶端能發(fā)送的消息的最大值,默認(rèn)值為1048576B ,即1MB
retries配置生產(chǎn)者重試的次數(shù),默認(rèn)值為0,即在發(fā)生異常的時候不進行任何重試動作
retry.backoff.ms默認(rèn)值為100 ,它用來設(shè)定兩次重試之間的時間間隔
max.in.flight.requests.per.connection在需要保證消息順序的場合建議把參數(shù)配置為1 ,而不是把acks 配置為0 , 不過這樣也會影響整體的吞吐。
compression.type這個參數(shù)用來指定消息的壓縮方式,默認(rèn)值為“ none ”,即默認(rèn)情況下,消息不會被壓縮。該參數(shù)還可以配置為“ gzip ” “ snappy ” 和“ lz4 ” 。
connections.max.idle.ms指定在多久之后關(guān)閉限制的連接,默認(rèn)值是540000 ( ms ) ,即9 分鐘。
linger.ms指定生產(chǎn)者發(fā)送ProducerBatch 之前等待更多消息( ProducerRecord )加入
ProducerBatch 的時間,默認(rèn)值為0。生產(chǎn)者客戶端會在ProducerBatch 被填滿或等待時間超過linger.ms 值時發(fā)迭出去
receive.buffer.bytes設(shè)置Socket 接收消息緩沖區(qū)( SO_REVBUF )的大小,默認(rèn)值為32768B,即32M。如果設(shè)置為-1 ,則使用操作系統(tǒng)的默認(rèn)值。如果Producer與Kafka 處于不同的機房,則可以適地調(diào)大這個參數(shù)值。
send.buffer.bytes設(shè)置Socket 發(fā)送消息緩沖區(qū)(SO_SNDBUF )的大小,默認(rèn)值為131072B ,即128KB 。與receive.buffer.bytes參數(shù)一樣, 如果設(shè)置為l ,則使用操作系統(tǒng)的默認(rèn)值。
request.timeout.ms配置Producer 等待請求響應(yīng)的最長時間,默認(rèn)值為30000ms。請求超時之后可以選擇進行重試。注意這個參數(shù)需要比broker 端參數(shù)replica.lagtime.max.ms 的值要大,這樣可以減少因客戶端重試而引起的消息重復(fù)的概率。

Consumer端

參數(shù)意義
partition.assignment.strategy

分區(qū)分配策略,kafka又兩個默認(rèn)策略,

  • ??? Range:該策略會把主題的若干個連續(xù)的分區(qū)分配給消費者
  • ??? Robin:該策略把主題的所有分區(qū)逐個分配給消費者

分區(qū)策略默認(rèn)是:org.apache.kafka.clients.consumer.RangeAssignor=>Range策略

org.apache.kafka.clients.consumer.RoundRobinAssignor=>Robin策略
?

?

fetch.min.bytes消費者從服務(wù)器獲取記錄的最小字節(jié)數(shù),broker收到消費者拉取數(shù)據(jù)的請求的時候,如果可用數(shù)據(jù)量小于設(shè)置的值,那么broker將會等待有足夠可用的數(shù)據(jù)的時候才返回給消費者,這樣可以降低消費者和broker的工作負(fù)載,因為當(dāng)主題不是很活躍的情況下,就不需要來來回回的處理消息,如果沒有很多可用數(shù)據(jù),但消費者的CPU 使用率卻很高,那么就需要把該屬性的值設(shè)得比默認(rèn)值大。如果消費者的數(shù)量比較多,把該屬性的值設(shè)置得大一點可以降低broker 的工作負(fù)載。
fetch.max.wait.msfetch.min.bytes設(shè)置了broker返回給消費者最小的數(shù)據(jù)量,而fetch.max.wait.ms設(shè)置的則是broker的等待時間,兩個屬性只要滿足了任何一條,broker都會將數(shù)據(jù)返回給消費者,也就是說舉個例子,fetch.min.bytes設(shè)置成1MB,fetch.max.wait.ms設(shè)置成1000ms,那么如果在1000ms時間內(nèi),如果數(shù)據(jù)量達(dá)到了1MB,broker將會把數(shù)據(jù)返回給消費者;如果已經(jīng)過了1000ms,但是數(shù)據(jù)量還沒有達(dá)到1MB,那么broker仍然會把當(dāng)前積累的所有數(shù)據(jù)返回給消費者。
max.partition.fetch.bytes該屬性指定了服務(wù)器從每個分區(qū)里返回給消費者的最大字節(jié)數(shù)。它的默認(rèn)值是lMB , 也
就是說,kafkaConsumer.poll() 方法從每個分區(qū)里返回的記錄最多不超max.partitions.fetch.bytes 指定的字節(jié)。如果一個主題有20 個分區(qū)和5 個消費者,那么每個消費者需要至少4MB 的可用內(nèi)存來接收記錄。在為消費者分配內(nèi)存時,可以給它們多分配一些,因為如果群組里有消費者發(fā)生崩憤,剩下的消費者需要處理更多的分區(qū)。max.partition.fetch.bytes 的值必須比broker 能夠接收的最大消息的字節(jié)數(shù)(通過max.message.size 屬性配置)大, 否則消費者可能無法讀取這些消息,導(dǎo)致消費者一直掛起重試,例如,max.message.size設(shè)置為2MB,而該屬性設(shè)置為1MB,那么當(dāng)一個生產(chǎn)者可能就會生產(chǎn)一條大小為2MB的消息,那么就會出現(xiàn)問題,消費者能從分區(qū)取回的最大消息大小就只有1MB,但是數(shù)據(jù)量是2MB,所以就會導(dǎo)致消費者一直掛起重試。在設(shè)置該屬性時,另一個需要考慮的因素是消費者處理數(shù)據(jù)的時間。消費者需要頻繁調(diào)用poll()方法
來避免會話過期和發(fā)生分區(qū)再均衡,如果單次調(diào)用poll () 返回的數(shù)據(jù)太多,消費者需要更多的時間來處理,可能無怯及時進行下一個輪詢來避免會話過期。如果出現(xiàn)這種情況, 可以把max.partitioin.fetch.bytes 值改小,或者延長會話過期時間。
session.timeout.ms該屬性指定了當(dāng)消費者被認(rèn)為已經(jīng)掛掉之前可以與服務(wù)器斷開連接的時間。默認(rèn)是3s,消費者在3s之內(nèi)沒有再次向服務(wù)器發(fā)送心跳,那么將會被認(rèn)為已經(jīng)死亡。此時,協(xié)調(diào)器將會出發(fā)再均衡,把它的分區(qū)分配給其他的消費者,該屬性與heartbeat.interval.ms緊密相關(guān),該參數(shù)定義了消費者發(fā)送心跳的時間間隔,也就是心跳頻率,一般要同時修改這兩個參數(shù),heartbeat.interval.ms參數(shù)值必須要小于session.timeout.ms,一般是session.timeout.ms的三分之一,比如,session.timeout.ms設(shè)置成3min,那么heartbeat.interval.ms一般設(shè)置成1min,這樣,可以更快的檢測以及恢復(fù)崩潰的節(jié)點,不過長時間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的再均衡(有一種情況就是網(wǎng)絡(luò)延遲,本身消費者是沒有掛掉的,但是網(wǎng)絡(luò)延遲造成了心跳超時,這樣本不該發(fā)生再均衡,但是因為網(wǎng)絡(luò)原因造成了非預(yù)期的再均衡),把該屬性的值設(shè)置得大一些,可以減少意外的再均衡,不過檢測節(jié)點崩憤-需要更長的時間。
auto.offset.reset該屬性指定了消費者在讀取一個沒有偏移量后者偏移量無效(消費者長時間失效當(dāng)前的偏移量已經(jīng)過時并且被刪除了)的分區(qū)的情況下,應(yīng)該作何處理,默認(rèn)值是latest,也就是從最新記錄讀取數(shù)據(jù)(消費者啟動之后生成的記錄),另一個值是earliest,意思是在偏移量無效的情況下,消費者從起始位置開始讀取數(shù)據(jù)。
enable.auto.commit指定了消費者是否自動提交偏移量,默認(rèn)值是true,為了盡量避免重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,有自己控制合適提交偏移量,如果設(shè)置為true, 可以通過設(shè)置 auto.commit.interval.ms屬性來控制提交的頻率
client.id消費者客戶端ID
max.poll.records控制單次調(diào)用call方法能夠返回的記錄數(shù)量,幫助控制在輪詢里需要處理的數(shù)據(jù)量。

receive.buffer.bytes

+

send.buffer.bytes

socket 在讀寫數(shù)據(jù)時用到的TCP 緩沖區(qū)也可以設(shè)置大小。如果它們被設(shè)為-1 ,就使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或消費者與broker 處于不同的數(shù)據(jù)中心內(nèi),可以適當(dāng)增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬

?Topic

?

?

?

?

?

總結(jié)

以上是生活随笔為你收集整理的Kafka配置的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。