日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flume均匀发送数据到kafka的partition配置UUID Interceptor生成key的坑

發(fā)布時(shí)間:2023/12/3 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume均匀发送数据到kafka的partition配置UUID Interceptor生成key的坑 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、需求

Flume向kafka發(fā)送數(shù)據(jù)時(shí),同一個(gè)flume發(fā)送到kafka的數(shù)據(jù)總是固定在某一個(gè)partition中。而業(yè)務(wù)需求是發(fā)送的數(shù)據(jù)在所有的partition平均分布

?

二、實(shí)現(xiàn)

Flume的官方文檔:

Kafka?Sink?uses?the?topic?and?key?properties?from?the?FlumeEvent?headers?to?send?events?to?Kafka.?If?topic?exists?in?the?headers,?the?event?will?be?sent?to?that?specific?topic,?overriding?the?topic?configured?for?the?Sink.?If?key?exists?in?the?headers,?the?key?will?used?by?Kafka?to?partition?the?data?between?the?topic?partitions.?Events?with?same?key?will?be?sent?to?the?same?partition.?If?the?key?is?null,?events?will?be?sent?to?random?partitions.

kafka-sink是從header里的key參數(shù)的value值來hash到kafka的某個(gè)分區(qū)中。如果key為null,那么就會(huì)隨機(jī)發(fā)布至分區(qū)中。事實(shí)上key為null被指定到kafka的某個(gè)固定分區(qū)。

要partition平均分布數(shù)據(jù),就向header中寫上隨機(jī)的key,然后數(shù)據(jù)才會(huì)真正的向kafka分區(qū)進(jìn)行隨機(jī)發(fā)布。

官方文檔有一個(gè)UUID?Interceptor,會(huì)為每個(gè)event的head添加一個(gè)隨機(jī)唯一的key,向flume添加攔截器達(dá)到隨機(jī)分區(qū)發(fā)送。

在flume添加的配置文件如下:

agent1.sources.nginxlogSource.interceptors?=?UUIDi1 agent1.sources.nginxlogSource.interceptors.UUIDi1.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder agent1.sources.nginxlogSource.interceptors.UUIDi1.headerName=key agent1.sources.nginxlogSource.interceptors.UUIDi1.preserveExisting=false

?

三、出現(xiàn)的問題

由于網(wǎng)絡(luò)抖動(dòng),國外nginx機(jī)器連接不上國內(nèi)的kafka集群的部分機(jī)器,nginx機(jī)器的flume通道堵塞,內(nèi)存占有率高,導(dǎo)致Nginx機(jī)器的cpu飆升100%,持續(xù)幾十臺機(jī)器崩潰發(fā)送告警。

故障原因:

flume添加了UUID攔截器,UUID攔截器給Event的header添加了一個(gè)key值,flume在發(fā)送到kafka中根據(jù)key指定了固定分區(qū)。由于網(wǎng)絡(luò)抖動(dòng),該kafka分區(qū)連接不上,分區(qū)的所有數(shù)據(jù)發(fā)送失敗回滾到channel通道,失敗數(shù)據(jù)還是以key指定的分區(qū)進(jìn)行重新發(fā)送,發(fā)送數(shù)據(jù)一直失敗回滾channel通道,直到機(jī)器崩潰故障發(fā)生。

?

四、總結(jié)

不要使用UUID攔截器進(jìn)行固定的分區(qū)發(fā)送,數(shù)據(jù)量大或者網(wǎng)絡(luò)抖動(dòng)容易導(dǎo)致機(jī)器崩潰。應(yīng)該重新編寫kafkaSink,flume在發(fā)送數(shù)據(jù)的時(shí)候隨機(jī)生成一個(gè)key,發(fā)送到不同的分區(qū)。就算失敗回滾到channel通道也會(huì)發(fā)送到新的分區(qū)。

示例:

KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(eventTopic, UUID.randomUUID().toString(), eventBody); messageList.add(data);

?

?

總結(jié)

以上是生活随笔為你收集整理的Flume均匀发送数据到kafka的partition配置UUID Interceptor生成key的坑的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。