日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

[三]RabbitMQ-客户端源码之ChannelManager

發(fā)布時(shí)間:2024/4/11 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [三]RabbitMQ-客户端源码之ChannelManager 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。

歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-channelmanager/


關(guān)于ChannelManager,官方注解:Manages a set of channels, indexed by channel number (1… _channelMax)。

ChannelManager類的代碼量不是很多,主要用來管理Channel的,channelNumber=0的除外,應(yīng)為channelNumber=0是留給Connection的特殊的channelNumber。

下面是ChannelManager的成員變量:

/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */ private final Object monitor = new Object();/** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();private final IntAllocator channelNumberAllocator;private final ConsumerWorkService workService;private final Set<CountDownLatch> shutdownSet = new HashSet<CountDownLatch>();/** Maximum channel number available on this connection. */ private final int _channelMax; private final ThreadFactory threadFactory;

這上面的成員變量下面會(huì)有涉及。


對(duì)于ChannelManager的使用,是AMQConnection中的成員變量:

/** Object that manages a set of channels */ private volatile ChannelManager _channelManager;

AMQConnection中start()的_channelManager中對(duì)其初始化:

protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {return new ChannelManager(this._workService, channelMax, threadFactory); }

再調(diào)用其構(gòu)造函數(shù):

public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {if (channelMax == 0) {// The framing encoding only allows for unsigned 16-bit integers// for the channel numberchannelMax = (1 << 16) - 1;}_channelMax = channelMax;channelNumberAllocator = new IntAllocator(1, channelMax);this.workService = workService;this.threadFactory = threadFactory; }

這里的ConsumerWorkService也在AMQConnection的start()方法中初始化——initializeConsumerWorkService():

private void initializeConsumerWorkService() {this._workService = new ConsumerWorkService(executor, threadFactory, shutdownTimeout); }

再回到構(gòu)造函數(shù)。

channelMax參數(shù)是在client接收到broker的Connection.Tune幀中的“Channel-Max”參數(shù)之后設(shè)置的,如果為0則表示沒有限制,這里就會(huì)設(shè)置為默認(rèn)的最大值:2的16次方-1。
threadFactory參數(shù)是指:Executors.defaultThreadFactory();

關(guān)于ConsumerWorkService請(qǐng)參考文章末尾處。


使用過RabbitMQ的同學(xué)知道要生產(chǎn)或者消費(fèi)消息之前必須要初始化Channel,如下:

Channel channel = connection.createChannel();

這個(gè)createChannel()是AMQConnection中的方法:

public Channel createChannel(int channelNumber) throws IOException {ensureIsOpen();ChannelManager cm = _channelManager;if (cm == null) return null;return cm.createChannel(this, channelNumber); } public Channel createChannel() throws IOException {ensureIsOpen();ChannelManager cm = _channelManager;if (cm == null) return null;return cm.createChannel(this); }

這里就是調(diào)用了ChannelManager的createChannel方法。

下面是ChannelManager中關(guān)于創(chuàng)建Channel的代碼:

public ChannelN createChannel(AMQConnection connection) throws IOException {ChannelN ch;synchronized (this.monitor) {int channelNumber = channelNumberAllocator.allocate();if (channelNumber == -1) {return null;} else {ch = addNewChannel(connection, channelNumber);}}ch.open(); // now that it's been safely addedreturn ch; }public ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {ChannelN ch;synchronized (this.monitor) {if (channelNumberAllocator.reserve(channelNumber)) {ch = addNewChannel(connection, channelNumber);} else {return null;}}ch.open(); // now that it's been safely addedreturn ch; }private ChannelN addNewChannel(AMQConnection connection, int channelNumber) throws IOException {if (_channelMap.containsKey(channelNumber)) {// That number's already allocated! Can't do it// This should never happen unless something has gone// badly wrong with our implementation.throw new IllegalStateException("We have attempted to "+ "create a channel with a number that is already in "+ "use. This should never happen. "+ "Please report this as a bug.");}ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);_channelMap.put(ch.getChannelNumber(), ch);return ch; }protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {return new ChannelN(connection, channelNumber, workService); }

上面有兩個(gè)createChannel方法,一個(gè)是帶了channelNumber的,一個(gè)是自動(dòng)分片channelNumber的,分別對(duì)應(yīng)AMQConnection中的兩個(gè)方法。最后都調(diào)用addNewChannel方法。

注意兩個(gè)createChannel方法中都有這樣一句代碼:

ch.open();

這個(gè)是什么呢?其實(shí)是調(diào)用ChannelN的open方法:

/*** Package method: open the channel.* This is only called from {@link ChannelManager}.* @throws IOException if any problem is encountered*/ public void open() throws IOException {// wait for the Channel.OpenOk response, and ignore itexnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND)); }

這樣就調(diào)用了AMQChannel的rpc方法,向broker發(fā)送了一個(gè)Channel.Open幀。

addNewChannel方法實(shí)際上是創(chuàng)建了一個(gè)ChannelN對(duì)象,然后置其于ChannelManager中的_channelMap中,方便管理。

channelNumberAllocator是channelNumber的分配器,其原理是采用BitSet來實(shí)現(xiàn)channelNumber的分配,有興趣的同學(xué)可以深究進(jìn)去看看。

關(guān)于ChannelN類會(huì)有專門一篇博文來講述,其實(shí)整個(gè)RabbitMQ-client的代碼最關(guān)鍵的就是ChannelN這個(gè)類,需要著重講述。

細(xì)心的朋友可能會(huì)發(fā)現(xiàn)關(guān)于ConsumerWorkService這個(gè),我并沒有做什么闡述。這個(gè)主要牽涉到Channel層面的處理,涉及到的類有AMQConnection, ChannelN, ConsumerDispatcher等。ConsumerWorkService是在AMQConnection中初始化,在ChannelManager中引用。至于這里怎么理解,在ChannelN中這么解釋:
service for managing this channel’s consumer callbacks。意思是管理消費(fèi)回調(diào)的服務(wù)。
綜述,ChannelManager主要用來管理Channel, 包括channelNumber與Channel之間的映射關(guān)系。


附:本系列全集

  • [Conclusion]RabbitMQ-客戶端源碼之總結(jié)
  • [一]RabbitMQ-客戶端源碼之ConnectionFactory
  • [二]RabbitMQ-客戶端源碼之AMQConnection
  • [三]RabbitMQ-客戶端源碼之ChannelManager
  • [四]RabbitMQ-客戶端源碼之Frame
  • [五]RabbitMQ-客戶端源碼之AMQChannel
  • [六]RabbitMQ-客戶端源碼之AMQCommand
  • [七]RabbitMQ-客戶端源碼之AMQPImpl+Method
  • [八]RabbitMQ-客戶端源碼之ChannelN
  • [九]RabbitMQ-客戶端源碼之Consumer
  • 歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-channelmanager/


    歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。


    總結(jié)

    以上是生活随笔為你收集整理的[三]RabbitMQ-客户端源码之ChannelManager的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。