[三]RabbitMQ-客户端源码之ChannelManager
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-channelmanager/
關(guān)于ChannelManager,官方注解:Manages a set of channels, indexed by channel number (1… _channelMax)。
ChannelManager類的代碼量不是很多,主要用來管理Channel的,channelNumber=0的除外,應(yīng)為channelNumber=0是留給Connection的特殊的channelNumber。
下面是ChannelManager的成員變量:
/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */ private final Object monitor = new Object();/** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();private final IntAllocator channelNumberAllocator;private final ConsumerWorkService workService;private final Set<CountDownLatch> shutdownSet = new HashSet<CountDownLatch>();/** Maximum channel number available on this connection. */ private final int _channelMax; private final ThreadFactory threadFactory;這上面的成員變量下面會(huì)有涉及。
對(duì)于ChannelManager的使用,是AMQConnection中的成員變量:
/** Object that manages a set of channels */ private volatile ChannelManager _channelManager;AMQConnection中start()的_channelManager中對(duì)其初始化:
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {return new ChannelManager(this._workService, channelMax, threadFactory); }再調(diào)用其構(gòu)造函數(shù):
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {if (channelMax == 0) {// The framing encoding only allows for unsigned 16-bit integers// for the channel numberchannelMax = (1 << 16) - 1;}_channelMax = channelMax;channelNumberAllocator = new IntAllocator(1, channelMax);this.workService = workService;this.threadFactory = threadFactory; }這里的ConsumerWorkService也在AMQConnection的start()方法中初始化——initializeConsumerWorkService():
private void initializeConsumerWorkService() {this._workService = new ConsumerWorkService(executor, threadFactory, shutdownTimeout); }再回到構(gòu)造函數(shù)。
channelMax參數(shù)是在client接收到broker的Connection.Tune幀中的“Channel-Max”參數(shù)之后設(shè)置的,如果為0則表示沒有限制,這里就會(huì)設(shè)置為默認(rèn)的最大值:2的16次方-1。
threadFactory參數(shù)是指:Executors.defaultThreadFactory();
關(guān)于ConsumerWorkService請(qǐng)參考文章末尾處。
使用過RabbitMQ的同學(xué)知道要生產(chǎn)或者消費(fèi)消息之前必須要初始化Channel,如下:
Channel channel = connection.createChannel();這個(gè)createChannel()是AMQConnection中的方法:
public Channel createChannel(int channelNumber) throws IOException {ensureIsOpen();ChannelManager cm = _channelManager;if (cm == null) return null;return cm.createChannel(this, channelNumber); } public Channel createChannel() throws IOException {ensureIsOpen();ChannelManager cm = _channelManager;if (cm == null) return null;return cm.createChannel(this); }這里就是調(diào)用了ChannelManager的createChannel方法。
下面是ChannelManager中關(guān)于創(chuàng)建Channel的代碼:
public ChannelN createChannel(AMQConnection connection) throws IOException {ChannelN ch;synchronized (this.monitor) {int channelNumber = channelNumberAllocator.allocate();if (channelNumber == -1) {return null;} else {ch = addNewChannel(connection, channelNumber);}}ch.open(); // now that it's been safely addedreturn ch; }public ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {ChannelN ch;synchronized (this.monitor) {if (channelNumberAllocator.reserve(channelNumber)) {ch = addNewChannel(connection, channelNumber);} else {return null;}}ch.open(); // now that it's been safely addedreturn ch; }private ChannelN addNewChannel(AMQConnection connection, int channelNumber) throws IOException {if (_channelMap.containsKey(channelNumber)) {// That number's already allocated! Can't do it// This should never happen unless something has gone// badly wrong with our implementation.throw new IllegalStateException("We have attempted to "+ "create a channel with a number that is already in "+ "use. This should never happen. "+ "Please report this as a bug.");}ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);_channelMap.put(ch.getChannelNumber(), ch);return ch; }protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {return new ChannelN(connection, channelNumber, workService); }上面有兩個(gè)createChannel方法,一個(gè)是帶了channelNumber的,一個(gè)是自動(dòng)分片channelNumber的,分別對(duì)應(yīng)AMQConnection中的兩個(gè)方法。最后都調(diào)用addNewChannel方法。
注意兩個(gè)createChannel方法中都有這樣一句代碼:
ch.open();這個(gè)是什么呢?其實(shí)是調(diào)用ChannelN的open方法:
/*** Package method: open the channel.* This is only called from {@link ChannelManager}.* @throws IOException if any problem is encountered*/ public void open() throws IOException {// wait for the Channel.OpenOk response, and ignore itexnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND)); }這樣就調(diào)用了AMQChannel的rpc方法,向broker發(fā)送了一個(gè)Channel.Open幀。
addNewChannel方法實(shí)際上是創(chuàng)建了一個(gè)ChannelN對(duì)象,然后置其于ChannelManager中的_channelMap中,方便管理。
channelNumberAllocator是channelNumber的分配器,其原理是采用BitSet來實(shí)現(xiàn)channelNumber的分配,有興趣的同學(xué)可以深究進(jìn)去看看。
關(guān)于ChannelN類會(huì)有專門一篇博文來講述,其實(shí)整個(gè)RabbitMQ-client的代碼最關(guān)鍵的就是ChannelN這個(gè)類,需要著重講述。
細(xì)心的朋友可能會(huì)發(fā)現(xiàn)關(guān)于ConsumerWorkService這個(gè),我并沒有做什么闡述。這個(gè)主要牽涉到Channel層面的處理,涉及到的類有AMQConnection, ChannelN, ConsumerDispatcher等。ConsumerWorkService是在AMQConnection中初始化,在ChannelManager中引用。至于這里怎么理解,在ChannelN中這么解釋:
service for managing this channel’s consumer callbacks。意思是管理消費(fèi)回調(diào)的服務(wù)。
綜述,ChannelManager主要用來管理Channel, 包括channelNumber與Channel之間的映射關(guān)系。
附:本系列全集
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-channelmanager/
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
總結(jié)
以上是生活随笔為你收集整理的[三]RabbitMQ-客户端源码之ChannelManager的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [一]RabbitMQ-客户端源码之Co
- 下一篇: [四]RabbitMQ-客户端源码之Fr