Flume均匀发送数据到kafka的partition配置UUID Interceptor生成key的坑
一、需求
Flume向kafka發(fā)送數(shù)據(jù)時(shí),同一個(gè)flume發(fā)送到kafka的數(shù)據(jù)總是固定在某一個(gè)partition中。而業(yè)務(wù)需求是發(fā)送的數(shù)據(jù)在所有的partition平均分布
?
二、實(shí)現(xiàn)
Flume的官方文檔:
Kafka?Sink?uses?the?topic?and?key?properties?from?the?FlumeEvent?headers?to?send?events?to?Kafka.?If?topic?exists?in?the?headers,?the?event?will?be?sent?to?that?specific?topic,?overriding?the?topic?configured?for?the?Sink.?If?key?exists?in?the?headers,?the?key?will?used?by?Kafka?to?partition?the?data?between?the?topic?partitions.?Events?with?same?key?will?be?sent?to?the?same?partition.?If?the?key?is?null,?events?will?be?sent?to?random?partitions.
kafka-sink是從header里的key參數(shù)的value值來hash到kafka的某個(gè)分區(qū)中。如果key為null,那么就會(huì)隨機(jī)發(fā)布至分區(qū)中。事實(shí)上key為null被指定到kafka的某個(gè)固定分區(qū)。
要partition平均分布數(shù)據(jù),就向header中寫上隨機(jī)的key,然后數(shù)據(jù)才會(huì)真正的向kafka分區(qū)進(jìn)行隨機(jī)發(fā)布。
官方文檔有一個(gè)UUID?Interceptor,會(huì)為每個(gè)event的head添加一個(gè)隨機(jī)唯一的key,向flume添加攔截器達(dá)到隨機(jī)分區(qū)發(fā)送。
在flume添加的配置文件如下:
agent1.sources.nginxlogSource.interceptors?=?UUIDi1 agent1.sources.nginxlogSource.interceptors.UUIDi1.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder agent1.sources.nginxlogSource.interceptors.UUIDi1.headerName=key agent1.sources.nginxlogSource.interceptors.UUIDi1.preserveExisting=false?
三、出現(xiàn)的問題
由于網(wǎng)絡(luò)抖動(dòng),國外nginx機(jī)器連接不上國內(nèi)的kafka集群的部分機(jī)器,nginx機(jī)器的flume通道堵塞,內(nèi)存占有率高,導(dǎo)致Nginx機(jī)器的cpu飆升100%,持續(xù)幾十臺機(jī)器崩潰發(fā)送告警。
故障原因:
flume添加了UUID攔截器,UUID攔截器給Event的header添加了一個(gè)key值,flume在發(fā)送到kafka中根據(jù)key指定了固定分區(qū)。由于網(wǎng)絡(luò)抖動(dòng),該kafka分區(qū)連接不上,分區(qū)的所有數(shù)據(jù)發(fā)送失敗回滾到channel通道,失敗數(shù)據(jù)還是以key指定的分區(qū)進(jìn)行重新發(fā)送,發(fā)送數(shù)據(jù)一直失敗回滾channel通道,直到機(jī)器崩潰故障發(fā)生。
?
四、總結(jié)
不要使用UUID攔截器進(jìn)行固定的分區(qū)發(fā)送,數(shù)據(jù)量大或者網(wǎng)絡(luò)抖動(dòng)容易導(dǎo)致機(jī)器崩潰。應(yīng)該重新編寫kafkaSink,flume在發(fā)送數(shù)據(jù)的時(shí)候隨機(jī)生成一個(gè)key,發(fā)送到不同的分區(qū)。就算失敗回滾到channel通道也會(huì)發(fā)送到新的分區(qū)。
示例:
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(eventTopic, UUID.randomUUID().toString(), eventBody); messageList.add(data);?
?
總結(jié)
以上是生活随笔為你收集整理的Flume均匀发送数据到kafka的partition配置UUID Interceptor生成key的坑的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 无线猫路由一体机怎么连接另一个无线路由器
- 下一篇: Hadoop生态Flume(三)拦截器(