日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka分区分配策略(3)——自定义分区分配策略

發(fā)布時間:2024/4/11 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka分区分配策略(3)——自定义分区分配策略 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

接上文:
1.【Kafka分區(qū)分配策略(1)——RangeAssignor】
2.【Kafka分區(qū)分配策略(2)——RoundRobinAssignor和StickyAssignor】


歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。

歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-partitions-allocation-strategy-3-self-definition/


自定義分區(qū)分配策略

讀者不僅可以任意選用Kafka所提供的3種分配策略,還可以自定義分配策略來實現更多可選的功能。自定義的分配策略必須要實現org.apache.kafka.clients.consumer.internals.PartitionAssignor接口。PartitionAssignor接口的定義如下:

Subscription subscription(Set<String> topics); String name(); Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions); void onAssignment(Assignment assignment); class Subscription {private final List<String> topics;private final ByteBuffer userData; (省略若干方法……) } class Assignment {private final List<TopicPartition> partitions;private final ByteBuffer userData; (省略若干方法……) }

PartitionAssignor接口中定義了兩個內部類:Subscription和Assignment。

Subscription類用來表示消費者的訂閱信息,類中有兩個屬性:topics和userData,分別表示消費者所訂閱topic列表和用戶自定義信息。PartitionAssignor接口通過subscription()方法來設置消費者自身相關的Subscription信息,注意到此方法中只有一個參數topics,與Subscription類中的topics的相互呼應,但是并沒有有關userData的參數體現。為了增強用戶對分配結果的控制,可以在subscription()方法內部添加一些影響分配的用戶自定義信息賦予userData,比如:權重、ip地址、host或者機架(rack)等等。

舉例,在subscription()這個方法中提供機架信息,標識此消費者所部署的機架位置,在分區(qū)分配時可以根據分區(qū)的leader副本所在的機架位置來實施具體的分配,這樣可以讓消費者與所需拉取消息的broker節(jié)點處于同一機架。參考下圖,消費者consumer1和broker1都部署在機架rack1上,消費者consumer2和broker2都部署在機架rack2上。如果分區(qū)的分配不是機架感知的,那么有可能與圖(上部分)中的分配結果一樣,consumer1消費broker2中的分區(qū),而consumer2消費broker1中的分區(qū);如果分區(qū)的分配是機架感知的,那么就會出現圖(下部分)的分配結果,consumer1消費broker1中的分區(qū),而consumer2消費broker2中的分區(qū),這樣相比于前一種情形而言,既可以減少消費延遲又可以減少跨機架帶寬的占用。

再來說一下Assignment類,它是用來表示分配結果信息的,類中也有兩個屬性:partitions和userData,分別表示所分配到的分區(qū)集合和用戶自定義的數據??梢酝ㄟ^PartitionAssignor接口中的onAssignment()方法是在每個消費者收到消費組leader分配結果時的回調函數,例如在StickyAssignor策略中就是通過這個方法保存當前的分配方案,以備在下次消費組再平衡(rebalance)時可以提供分配參考依據。

接口中的name()方法用來提供分配策略的名稱,對于Kafka提供的3種分配策略而言,RangeAssignor對應的protocol_name為“range”,RoundRobinAssignor對應的protocol_name為“roundrobin”,StickyAssignor對應的protocol_name為“sticky”,所以自定義的分配策略中要注意命名的時候不要與已存在的分配策略發(fā)生沖突。這個命名用來標識分配策略的名稱,在后面所描述的加入消費組以及選舉消費組leader的時候會有涉及。

真正的分區(qū)分配方案的實現是在assign()方法中,方法中的參數metadata表示集群的元數據信息,而subscriptions表示消費組內各個消費者成員的訂閱信息,最終方法返回各個消費者的分配信息。

Kafka中還提供了一個抽象類org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以簡化PartitionAssignor接口的實現,對assign()方法進行了實現,其中會將Subscription中的userData信息去掉后,在進行分配。Kafka提供的3種分配策略都是繼承自這個抽象類。如果開發(fā)人員在自定義分區(qū)分配策略時需要使用userData信息來控制分區(qū)分配的結果,那么就不能直接繼承AbstractPartitionAssignor這個抽象類,而需要直接實現PartitionAssignor接口。

下面筆者參考Kafka中的RangeAssignor策略來自定義一個隨機的分配策略,這里筆者稱之為RandomAssignor,具體代碼實現如下:

package org.apache.kafka.clients.consumer;import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; import org.apache.kafka.common.TopicPartition; import java.util.*;/*** Created by 朱小廝 on 2018/7/12. * 歡迎關注筆者的微信公眾號:朱小廝的博客.*/ public class RandomAssignor extends AbstractPartitionAssignor {@Overridepublic String name() {return "random";}@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet()) {assignment.put(memberId, new ArrayList<>());}// 針對每一個topic進行分區(qū)分配for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {String topic = topicEntry.getKey();List<String> consumersForTopic = topicEntry.getValue();int consumerSize = consumersForTopic.size();Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null) {continue;}// 當前topic下的所有分區(qū)List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);// 將每個分區(qū)隨機分配給一個消費者for (TopicPartition partition : partitions) {int rand = new Random().nextInt(consumerSize);String randomConsumer = consumersForTopic.get(rand);assignment.get(randomConsumer).add(partition);}}return assignment;}// 獲取每個topic所對應的消費者列表,即:[topic, List[consumer]]private Map<String, List<String>> consumersPerTopic( Map<String, Subscription> consumerMetadata) {Map<String, List<String>> res = new HashMap<>();for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {String consumerId = subscriptionEntry.getKey();for (String topic : subscriptionEntry.getValue().topics())put(res, topic, consumerId);}return res;} }

在使用時,消費者客戶端需要添加相應的Properties參數,示例如下:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RandomAssignor.class.getName());

這里只是演示如何自定義實現一個分區(qū)分配策略,RandomAssignor的實現并不是特別的理想,并不見得會比Kafka自身所提供的RangeAssignor策略之類的要好。

歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-partitions-allocation-strategy-3-self-definition/


歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。


總結

以上是生活随笔為你收集整理的Kafka分区分配策略(3)——自定义分区分配策略的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。