日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

RabbitMQ初步到精通-第十章-RabbitMQ之Spring客户端源码

發布時間:2024/3/12 javascript 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ初步到精通-第十章-RabbitMQ之Spring客户端源码 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

第十章-RabbitMQ之Spring客戶端源碼

????????1. 前言

2. 客戶端消費代碼

2.1 消費的實現方式

2.2 消費中注解解釋

2.3?推測Spring實現過程

3.MQ消費源碼分析

3.1?集成SpringBoot 啟動過程

3.2 Broker投遞消息給客戶端過程

3.3 客戶端消費過程

4. 總結


第十章-RabbitMQ之Spring客戶端源碼

1. 前言

經過前面前面的學習,我們已經掌握了rabbitmq的基本用法,高級用法延遲隊列、死信隊列等,已經研究過了amqp-client的java客戶端源碼,由于我們在使用的時候,一般還是以SpringBoot為主,那經過Spring封裝后的客戶端源碼是是如何實現的呢?

同學們最好需要有研讀過 Spring源碼及SpringBoot 源碼的經驗,會更好銜接一下,不過關系也不大。

由于Spring 體系的龐大,封裝的rabbit客戶端實現的功能也很多,例 創建連接、生產者推送消息,事務,消費者消費等等內容,那我們這次只抽取rabbitmq消費的部分,進行研讀。

集成starter

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2. 客戶端消費代碼

2.1 消費的實現方式

如之前我們提到的集成SpringBoot后的使用方式:

@RabbitHandler @RabbitListener(queues = "SolarWaterHeater") @RabbitHandler@RabbitListener(queuesToDeclare = @Queue("SolarWaterHeater")) @RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue("SolarWaterHeater-RedWine"),key = "REDWINE",exchange = @Exchange(value = "routing-exchange", type = ExchangeTypes.DIRECT, durable = "false")))

2.2 消費中注解解釋

這里面出現了兩個注解

第一個:RabbitHandler 看下它的解釋:

* Annotation that marks a method to be the target of a Rabbit message * listener within a class that is annotated with {@link RabbitListener}.

如果一天類上面的注解是RabbitListener,那RabbitHandler標注的方法,即是Rabbit的消息監聽者。

@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE }) 這個注解只能標注到Method

第二個?RabbitListener

1. Annotation that marks a method to be the target of a Rabbit message listener

標注的方法是一個消息監聽者

2. When defined at the class level, a single message listener container is used to * service all methods annotated with {@code @RabbitHandler}

如果標注到類上,那標注RabbitHandler的方法即是消息監聽

鏈一個:@RabbitListener和@RabbitHandler的使用_sliver1836的博客-CSDN博客

2.3?推測Spring實現過程

所以,我們后續的源碼分析即基于此兩個注解開展。

在開始看代碼之前,我們先想一想,我們之前的使用java amqp客戶端開發消費邏輯的過程,

1、創建連接

2、創建Channel

3、聲明隊列、Exchange、綁定關系

4、監聽方法實現 繼承DefaultConumer

5、basic.Consume 注冊到Broker

6、Broker消息推送,監聽方法實現消費

那現在Spring就靠兩個注解就幫我們實現了消息的消費,有沒有很神奇。頓時感嘆程序猿越來越幸福,寫代碼如此簡單了呢?但有利就有弊,Spring幫我們封裝的太多,而我們知道的底層卻太少了。

閑話少說,到這,大家想一下,如果讓你寫個注解,就去實現上面6個步驟的內容,你該如何去做呢?

開發自定義注解大家都應該做過,大致的邏輯應該是不是可以,在系統啟動的時候,我們就會抓取到標注注解的方法,有此類的方法時,我們認為需要使用mq,我們在后端服務中依次的去執行上面中的6個步驟。這樣把注解的方法實現了監聽,后續監聽消息進行消費。

這里只是一個大概的推測,大家自己自行發揮想像。

3.MQ消費源碼分析

從哪入手呢?首先點開?RabbitListener 的源碼,然后Download源碼。

到這個界面:

我們不再研讀RabbitListener這個注解的功能了,大家自己看。

然后緊接著看到?RabbitListenerAnnotationBeanPostProcessor

這個類有什么特點呢?首先是處理RabbitListener 的處理類,然后呢是一個BeanPostProcessor繼承了BeanPostProcessor 接口-讀過Spring源碼的同學,肯定就能得到最有效的信息了,這個類會在系統初始化的時候,執行postProcessAfterInitialization()這個方法。如果沒讀過Spring源碼的話就先跟著節奏走吧。

從這開始了我們的切入。

3.1?集成SpringBoot 啟動過程

接著上面的步驟呢,我們往上簡單倒一下,

首先 這是一個SpringBoot 項目,通過SpringBoot?的啟動類的Main 方法進行啟動,然后開始掃描各個組件,初始化各種信息,這個不再細聊。【需要讀SpringBoot源碼】

其次呢,SpringBoot 只是對Spring 的封裝,還是需要回到Spring 的類初始化的過程中去。【需要讀Spring源碼】

如下呢,即Spring 的核心初始化方法:無論Spring 再怎么升級,這幾個核心方法基本不會怎么變化了,這里面我們找到 【registerBeanPostProcessors】,從這里面就會觸發到我們上面所說的-

RabbitListenerAnnotationBeanPostProcessor

@Overridepublic void refresh() throws BeansException, IllegalStateException {synchronized (this.startupShutdownMonitor) {// Prepare this context for refreshing.prepareRefresh();// Tell the subclass to refresh the internal bean factory.ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();// Prepare the bean factory for use in this context.prepareBeanFactory(beanFactory);try {// Allows post-processing of the bean factory in context subclasses.postProcessBeanFactory(beanFactory);// Invoke factory processors registered as beans in the context.invokeBeanFactoryPostProcessors(beanFactory);// Register bean processors that intercept bean creation.registerBeanPostProcessors(beanFactory);// Initialize message source for this context.initMessageSource();// Initialize event multicaster for this context.initApplicationEventMulticaster();// Initialize other special beans in specific context subclasses.onRefresh();// Check for listener beans and register them.registerListeners();// Instantiate all remaining (non-lazy-init) singletons.finishBeanFactoryInitialization(beanFactory);// Last step: publish corresponding event.finishRefresh();}catch (BeansException ex) {if (logger.isWarnEnabled()) {logger.warn("Exception encountered during context initialization - " +"cancelling refresh attempt: " + ex);}// Destroy already created singletons to avoid dangling resources.destroyBeans();// Reset 'active' flag.cancelRefresh(ex);// Propagate exception to caller.throw ex;}finally {// Reset common introspection caches in Spring's core, since we// might not ever need metadata for singleton beans anymore...resetCommonCaches();}}}

隨著Spring 的啟動,開始觸發到了RabbitListenerAnnotationBeanPostProcessor 中的?

postProcessAfterInitialization 方法。

代碼:

這就很好解釋了,bean 就是我們的消費類,

解析到了 標有注解的方法 @RabbitListener,然后進行處理。processAmqpListener

@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {Class<?> targetClass = AopUtils.getTargetClass(bean);final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);for (ListenerMethod lm : metadata.listenerMethods) {for (RabbitListener rabbitListener : lm.annotations) {processAmqpListener(rabbitListener, lm.method, bean, beanName);}}if (metadata.handlerMethods.length > 0) {processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);}return bean;} protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {// 對應的消費方法Method methodToUse = checkProxy(method, bean);//封裝對象MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();endpoint.setMethod(methodToUse);// 繼續處理processListener(endpoint, rabbitListener, bean, methodToUse, beanName);}

繼續:

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,Object adminTarget, String beanName) {endpoint.setBean(bean);endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);endpoint.setId(getEndpointId(rabbitListener));endpoint.setQueueNames(resolveQueues(rabbitListener));endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));endpoint.setBeanFactory(this.beanFactory);endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));Object errorHandler = resolveExpression(rabbitListener.errorHandler());if (errorHandler instanceof RabbitListenerErrorHandler) {endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler);}else if (errorHandler instanceof String) {String errorHandlerBeanName = (String) errorHandler;if (StringUtils.hasText(errorHandlerBeanName)) {endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));}}else {throw new IllegalStateException("error handler mut be a bean name or RabbitListenerErrorHandler, not a "+ errorHandler.getClass().toString());}String group = rabbitListener.group();if (StringUtils.hasText(group)) {Object resolvedGroup = resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}String autoStartup = rabbitListener.autoStartup();if (StringUtils.hasText(autoStartup)) {endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));}endpoint.setExclusive(rabbitListener.exclusive());String priority = resolve(rabbitListener.priority());if (StringUtils.hasText(priority)) {try {endpoint.setPriority(Integer.valueOf(priority));}catch (NumberFormatException ex) {throw new BeanInitializationException("Invalid priority value for " +rabbitListener + " (must be an integer)", ex);}}// 以上 前面都完成了對 MethodRabbitListenerEndpoint 對象的封裝,封裝的也都是注解中的屬性//此方法內部實際沒執行 跳過resolveAdmin(endpoint, rabbitListener, adminTarget);//跳過RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, adminTarget, beanName);// 屬性填充 放入List ,不重要this.registrar.registerEndpoint(endpoint, factory);}

程序回轉:

這里面來到一個

public void afterSingletonsInstantiated() 方法,這是由于實現了接口SmartInitializingSingleton, 后續得到了處理。

這里面會涉及到兩個類:

1.?RabbitListenerEndpointRegistrar

2.?RabbitListenerEndpointRegistry

有沒有長得很像,這里面是把?RabbitListenerEndpointRegistry 手工注冊到了RabbitListenerEndpointRegistrar 里面,然后進行了一系列初始化,

這里面不再詳細展開了,但這個RabbitListenerEndpointRegistry 很重要,后面還會涉及到它

?RabbitListenerEndpointRegistry 實現了一個Lifecycle接口,后續會調用到它的實現start()

將對應的消費Class 做好了封裝 ,返回,繼續Spring的初始化過程。

來到Spring核心流程?

finishRefresh(); /*** Finish the refresh of this context, invoking the LifecycleProcessor's* onRefresh() method and publishing the* {@link org.springframework.context.event.ContextRefreshedEvent}.*/protected void finishRefresh() {// Clear context-level resource caches (such as ASM metadata from scanning).clearResourceCaches();// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// Publish the final event.publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.LiveBeansView.registerApplicationContext(this);}

其中第三個方法

getLifecycleProcessor().onRefresh();

這個方法是獲取 lifecycle的處理器,進行lifecycle接口實現類的處理,這就呼應到了上面的?RabbitListenerEndpointRegistry ,他實現了lifecycle的接口。

最終一番流轉終于到了 這個Registry處理邏輯中:

@Overridepublic void start() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {startIfNecessary(listenerContainer);}} /*** Start the specified {@link MessageListenerContainer} if it should be started* on startup or when start is called explicitly after startup.* @param listenerContainer the container.* @see MessageListenerContainer#isAutoStartup()*/private void startIfNecessary(MessageListenerContainer listenerContainer) {if (this.contextRefreshed || listenerContainer.isAutoStartup()) {listenerContainer.start();}} MessageListenerContainer 也是在上面afterSingletonsInstantiated 處理好的,現在要啟動這個監聽者容器。

來到了?AbstractMessageListenerContainer 中的啟動方法:

/*** Start this container.* @see #doStart*/@Overridepublic void start() {if (isRunning()) {return;}if (!this.initialized) {synchronized (this.lifecycleMonitor) {if (!this.initialized) {afterPropertiesSet();}}}try {logger.debug("Starting Rabbit listener container.");configureAdminIfNeeded();checkMismatchedQueues();doStart();}catch (Exception ex) {throw convertRabbitAccessException(ex);}finally {this.lazyLoad = false;}} configureAdminIfNeeded() 獲取RabbitAdmin checkMismatchedQueues() 這個方法就很關鍵了,運行到此時打開我們的抓包工具,這里面開始創建Connection了。 protected void checkMismatchedQueues() {if (this.mismatchedQueuesFatal && this.amqpAdmin != null) {try {this.amqpAdmin.initialize();}catch (AmqpConnectException e) {logger.info("Broker not available; cannot check queue declarations");}catch (AmqpIOException e) {if (RabbitUtils.isMismatchedQueueArgs(e)) {throw new FatalListenerStartupException("Mismatched queues", e);}else {logger.info("Failed to get connection during start(): " + e);}}}else {try {// 創建連接方法Connection connection = getConnectionFactory().createConnection(); // NOSONARif (connection != null) {connection.close();}}catch (Exception e) {logger.info("Broker not available; cannot force queue declarations during start: " + e.getMessage());}}}

有沒有很熟悉

Connection connection = getConnectionFactory().createConnection(); @Overridepublic final Connection createConnection() throws AmqpException {if (this.stopped) {throw new AmqpApplicationContextClosedException("The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");}synchronized (this.connectionMonitor) {if (this.cacheMode == CacheMode.CHANNEL) {if (this.connection.target == null) {this.connection.target = super.createBareConnection();// invoke the listener *after* this.connection is assignedif (!this.checkoutPermits.containsKey(this.connection)) {this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));}this.connection.closeNotified.set(false);getConnectionListener().onCreate(this.connection);}return this.connection;}else if (this.cacheMode == CacheMode.CONNECTION) {return connectionFromCache();}}return null; // NOSONAR - never reach here - exceptions}

運行完此步,如上的代碼中,兩個重要的點:

1. 此步直接就創建了Connection、

this.connection.target = super.createBareConnection();

看下抓包:

2. 繼續這一步也很關鍵,創建完連接后,會把接下來的 Exchange、Queue、綁定關系根據注解配置中的內容,該創建的都創建一遍。

getConnectionListener().onCreate(this.connection);

直接運行到了

RabbitAdmin.initialize()

看方法頭上的注釋也很清晰

/*** Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe* (but unnecessary) to call this method more than once.*/@Override // NOSONAR complexitypublic void initialize() {if (this.applicationContext == null) {this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");return;}this.logger.debug("Initializing declarations");Collection<Exchange> contextExchanges = new LinkedList<Exchange>(this.applicationContext.getBeansOfType(Exchange.class).values());Collection<Queue> contextQueues = new LinkedList<Queue>(this.applicationContext.getBeansOfType(Queue.class).values());Collection<Binding> contextBindings = new LinkedList<Binding>(this.applicationContext.getBeansOfType(Binding.class).values());processLegacyCollections(contextExchanges, contextQueues, contextBindings);processDeclarables(contextExchanges, contextQueues, contextBindings);final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);final Collection<Queue> queues = filterDeclarables(contextQueues);final Collection<Binding> bindings = filterDeclarables(contextBindings);for (Exchange exchange : exchanges) {if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("+ exchange.getName()+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "+ "reopening the connection.");}}for (Queue queue : queues) {if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("+ queue.getName()+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"+ queue.isExclusive() + ". "+ "It will be redeclared if the broker stops and is restarted while the connection factory is "+ "alive, but all messages will be lost.");}}if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {this.logger.debug("Nothing to declare");return;}this.rabbitTemplate.execute(channel -> {declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));declareQueues(channel, queues.toArray(new Queue[queues.size()]));declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));return null;});this.logger.debug("Declarations finished");}

由于我們只創建了Queue,使用默認的Exchange,代碼不貼太多了,只貼聲明Queue的內容:

DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());

我們看下抓包情況:

?到此呢,Queue也聲明好了。下面呢,下面就該basic.Consume 了吧,把消費者注冊到Broker中去。

好,我們繼續:

繼續代碼又倒回去,倒到:

/*** Start this container.* @see #doStart*/@Overridepublic void start() {if (isRunning()) {return;}if (!this.initialized) {synchronized (this.lifecycleMonitor) {if (!this.initialized) {afterPropertiesSet();}}}try {logger.debug("Starting Rabbit listener container.");configureAdminIfNeeded();checkMismatchedQueues();doStart();}catch (Exception ex) {throw convertRabbitAccessException(ex);}finally {this.lazyLoad = false;}} doStart();

一看doxxx,那一定是要干實際的事情的,很重要對吧,

我們進入到?

SimpleMessageListenerContainer

中的實現方法中:

/*** Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer* to this container's task executor.*/@Overrideprotected void doStart() {checkListenerContainerAware();super.doStart();synchronized (this.consumersMonitor) {if (this.consumers != null) {throw new IllegalStateException("A stopped container should not have consumers");}int newConsumers = initializeConsumers();if (this.consumers == null) {logger.info("Consumers were initialized and then cleared " +"(presumably the container was stopped concurrently)");return;}if (newConsumers <= 0) {if (logger.isInfoEnabled()) {logger.info("Consumers are already running");}return;}Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();for (BlockingQueueConsumer consumer : this.consumers) {AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);processors.add(processor);getTaskExecutor().execute(processor);if (getApplicationEventPublisher() != null) {getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));}}waitForConsumersToStart(processors);}}

前面幾步意義不大,走到

int newConsumers = initializeConsumers(); protected int initializeConsumers() {int count = 0;synchronized (this.consumersMonitor) {if (this.consumers == null) {this.cancellationLock.reset();this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);for (int i = 0; i < this.concurrentConsumers; i++) {BlockingQueueConsumer consumer = createBlockingQueueConsumer();this.consumers.add(consumer);count++;}}}return count;}

重點來咯,

BlockingQueueConsumer consumer = createBlockingQueueConsumer();

這里把BlockingQueueConsumer做了一個初始化,相關的不再展開。

BlockingQueueConsumer -這將是后續我們非常重要的一個類

繼續重點內容,回到我們上面代碼塊中的內容:

for (BlockingQueueConsumer consumer : this.consumers) {AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);processors.add(processor);getTaskExecutor().execute(processor);if (getApplicationEventPublisher() != null) {getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));}}

這個for循環很重要了,由于我們是一個消費者,循環一次。

初始化一個

AsyncMessageProcessingConsumer

對象。這個對象點進去,大家看下這是個實現了Runnable接口的線程對象。哦哦,真正的核心哦。使用 SimpleAsyncTaskExecutor? ?來new的線程,這個執行器可不是線程池哦,來一個線程就會New一個,大家自行研究。

這里面我們可以得到一個結論,就是一個消費者,就會開啟一個線程進行監聽。

從此開啟了新線程,【打斷點記得Thread模式】

看線程的實現:

@Override // NOSONAR - complexity - many catch blockspublic void run() { // NOSONAR - line countif (!isActive()) {return;}boolean aborted = false;this.consumer.setLocallyTransacted(isChannelLocallyTransacted());String routingLookupKey = getRoutingLookupKey();if (routingLookupKey != null) {SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey); // NOSONAR both never null}if (this.consumer.getQueueCount() < 1) {if (logger.isDebugEnabled()) {logger.debug("Consumer stopping; no queues for " + this.consumer);}SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);if (getApplicationEventPublisher() != null) {getApplicationEventPublisher().publishEvent(new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));}this.start.countDown();return;}try {initialize();while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {mainLoop();}}

摘出核心點:

1、initialize();

private void initialize() throws Throwable { // NOSONARtry {redeclareElementsIfNecessary();this.consumer.start();this.start.countDown();}

初始化內容,

1.? redeclareElementsIfNecessary - 這個是再進行檢查進行Exchange 、Queue、Binding的聲明與前面聲明的方法實現的共用。

2.this.consumer.start();??

public void start() throws AmqpException {if (logger.isDebugEnabled()) {logger.debug("Starting consumer " + this);}this.thread = Thread.currentThread();try {this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,this.transactional);this.channel = this.resourceHolder.getChannel();ClosingRecoveryListener.addRecoveryListenerIfNecessary(this.channel); // NOSONAR never null here}catch (AmqpAuthenticationException e) {throw new FatalListenerStartupException("Authentication failure", e);}this.deliveryTags.clear();this.activeObjectCounter.add(this);passiveDeclarations();setQosAndreateConsumers();} 這里面我們看這個方法就行 setQosAndreateConsumers();

Qos是設定消費時每次抓取的數量

并CreadConsumers

private void setQosAndreateConsumers() {if (!this.acknowledgeMode.isAutoAck() && !cancelled()) {// Set basicQos before calling basicConsume (otherwise if we are not acking the broker// will send blocks of 100 messages)try {this.channel.basicQos(this.prefetchCount);}catch (IOException e) {this.activeObjectCounter.release(this);throw new AmqpIOException(e);}}try {if (!cancelled()) {for (String queueName : this.queues) {if (!this.missingQueues.contains(queueName)) {consumeFromQueue(queueName);}}}}catch (IOException e) {throw RabbitExceptionTranslator.convertRabbitAccessException(e);}}

有沒有很熟悉:

this.channel.basicQos(this.prefetchCount);

抓包:

繼續:

consumeFromQueue(queueName); private void consumeFromQueue(String queue) throws IOException {InternalConsumer consumer = new InternalConsumer(this.channel, queue);String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,this.exclusive, this.consumerArgs,consumer);if (consumerTag != null) {this.consumers.put(queue, consumer);if (logger.isDebugEnabled()) {logger.debug("Started on queue '" + queue + "' with tag " + consumerTag + ": " + this);}}else {logger.error("Null consumer tag received for queue " + queue);}}

?有沒有很熟悉:

String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,this.exclusive, this.consumerArgs,consumer);

那這里有有一個核心的類出現了。InternalConsumer

這里轉向 3.2 Broker投遞消息給客戶端? 解釋

到這里呢,我們把消費者注冊到了Broker中去了,看下抓包情況:

?到這呢,所以Broker也就能給我們投遞消息了。

2、mainLoop();

initialize();while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {mainLoop();}

這里也有個mainLoop ,于是想到了,java 的amqp客戶端也存在呢mainLoop ,這里的邏輯難道也和他的邏輯契合的?我們轉向 3.3 客戶端消費過程繼續。

3.2 Broker投遞消息給客戶端過程

上面說到了,已經將消費者注冊到了Broker中去了,但一定注意哦,注冊到Broker 中的,可不是我們使用注解 RabbitListener 標注的實際消費方法哦,而是新創建了一個內部的消費者:InternalConsumer

我們看下他的一個實現

private final class InternalConsumer extends DefaultConsumer {private final String queueName;boolean canceled;InternalConsumer(Channel channel, String queue) {super(channel);this.queueName = queue;}@Overridepublic void handleConsumeOk(String consumerTag) {super.handleConsumeOk(consumerTag);if (logger.isDebugEnabled()) {logger.debug("ConsumeOK: " + BlockingQueueConsumer.this);}if (BlockingQueueConsumer.this.applicationEventPublisher != null) {BlockingQueueConsumer.this.applicationEventPublisher.publishEvent(new ConsumeOkEvent(this, this.queueName, consumerTag));}}@Overridepublic void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {if (logger.isDebugEnabled()) {if (RabbitUtils.isNormalShutdown(sig)) {logger.debug("Received shutdown signal for consumer tag=" + consumerTag + ": " + sig.getMessage());}else {logger.debug("Received shutdown signal for consumer tag=" + consumerTag, sig);}}BlockingQueueConsumer.this.shutdown = sig;// The delivery tags will be invalid if the channel shuts downBlockingQueueConsumer.this.deliveryTags.clear();BlockingQueueConsumer.this.activeObjectCounter.release(BlockingQueueConsumer.this);}@Overridepublic void handleCancel(String consumerTag) throws IOException {if (logger.isWarnEnabled()) {logger.warn("Cancel received for " + consumerTag + " ("+ this.queueName+ "); " + BlockingQueueConsumer.this);}BlockingQueueConsumer.this.consumers.remove(this.queueName);if (!BlockingQueueConsumer.this.consumers.isEmpty()) {basicCancel(false);}else {BlockingQueueConsumer.this.cancelled.set(true);}}@Overridepublic void handleCancelOk(String consumerTag) {if (logger.isDebugEnabled()) {logger.debug("Received cancelOk for tag " + consumerTag + " ("+ this.queueName+ "); " + BlockingQueueConsumer.this);}this.canceled = true;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) {if (logger.isDebugEnabled()) {logger.debug("Storing delivery for consumerTag: '"+ consumerTag + "' with deliveryTag: '" + envelope.getDeliveryTag() + "' in "+ BlockingQueueConsumer.this);}try {if (BlockingQueueConsumer.this.abortStarted > 0) {if (!BlockingQueueConsumer.this.queue.offer(new Delivery(consumerTag, envelope, properties, body, this.queueName),BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {Channel channelToClose = super.getChannel();RabbitUtils.setPhysicalCloseRequired(channelToClose, true);// Defensive - should never happenBlockingQueueConsumer.this.queue.clear();if (!this.canceled) {getChannel().basicCancel(consumerTag);}try {channelToClose.close();}catch (@SuppressWarnings("unused") TimeoutException e) {// no-op}}}else {BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));}}catch (@SuppressWarnings("unused") InterruptedException e) {Thread.currentThread().interrupt();}catch (Exception e) {BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);}}@Overridepublic String toString() {return "InternalConsumer{" + "queue='" + this.queueName + '\'' +", consumerTag='" + getConsumerTag() + '\'' +'}';}}

哇,內部類,而且繼承了?DefaultConsumer ,這和我們前面學習Rabbitmq工作模式的過程中,自己手動開發的代碼一樣了吧,那我找到 投遞方法:

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,

好親切有木有,所以到這里真相大白咯。Broker將消息投遞到了這里,我們看看他接收到消息搞什么動作?

BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));

很明顯,和java amqp client 實現一樣,他這也用到了Queue,去存儲了,

this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);

也是個阻塞Queue哦,看來spring搞了一通,從客戶端那邊的queue里拿來,又放了一次queue。

那放進去了,就等著取唄,看誰來取咯。

3.3 客戶端消費過程

接續上面的?mainLoop(),既然消息又存到了本地的queue中,那mainLoop 的目的豈不是很明確了,那就是死循環的去取消息消息,然后再轉調到我們實際的 加入@RabbitListener 的方法中去呢。究竟是不是呢,驗證下:

private void mainLoop() throws Exception { // NOSONAR Exceptiontry {boolean receivedOk = receiveAndExecute(this.consumer); // At least one message receivedif (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {checkAdjust(receivedOk);}long idleEventInterval = getIdleEventInterval();if (idleEventInterval > 0) {if (receivedOk) {updateLastReceive();}else {long now = System.currentTimeMillis();long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();long lastReceive = getLastReceive();if (now > lastReceive + idleEventInterval&& now > lastAlertAt + idleEventInterval&& SimpleMessageListenerContainer.this.lastNoMessageAlert.compareAndSet(lastAlertAt, now)) {publishIdleContainerEvent(now - lastReceive);}}}}catch (ListenerExecutionFailedException ex) {// Continue to process, otherwise re-throwif (ex.getCause() instanceof NoSuchMethodException) {throw new FatalListenerExecutionException("Invalid listener", ex);}}catch (AmqpRejectAndDontRequeueException rejectEx) {/** These will normally be wrapped by an LEFE if thrown by the* listener, but we will also honor it if thrown by an* error handler.*/}}

看下重點方法:

boolean receivedOk = receiveAndExecute(this.consumer); private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONARPlatformTransactionManager transactionManager = getTransactionManager();if (transactionManager != null) {try {if (this.transactionTemplate == null) {this.transactionTemplate =new TransactionTemplate(transactionManager, getTransactionAttribute());}return this.transactionTemplate.execute(status -> { // NOSONAR null never returnedRabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(consumer.getChannel(), false),getConnectionFactory(), true);// unbound in ResourceHolderSynchronization.beforeCompletion()try {return doReceiveAndExecute(consumer);}catch (RuntimeException e1) {prepareHolderForRollback(resourceHolder, e1);throw e1;}catch (Exception e2) {throw new WrappedTransactionException(e2);}});}catch (WrappedTransactionException e) { // NOSONAR exception flow controlthrow (Exception) e.getCause();}}return doReceiveAndExecute(consumer);}

拋開事務,我們不關注。

return doReceiveAndExecute(consumer); private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONARChannel channel = consumer.getChannel();for (int i = 0; i < this.txSize; i++) {logger.trace("Waiting for message from consumer.");Message message = consumer.nextMessage(this.receiveTimeout);if (message == null) {break;}try {executeListener(channel, message);}

重點哦:

Message message = consumer.nextMessage(this.receiveTimeout);

從內部消費者取消息咯

public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {if (logger.isTraceEnabled()) {logger.trace("Retrieving delivery for " + this);}checkShutdown();if (this.missingQueues.size() > 0) {checkMissingQueues();}Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));if (message == null && this.cancelled.get()) {throw new ConsumerCancelledException();}return message;}

看到poll 我們就放心了,把消息取出來,包裝成Message對象。

快調頭回來,繼續看:

try {executeListener(channel, message); }

這就要真正處理這個消息了

protected void executeListener(Channel channel, Message messageIn) {if (!isRunning()) {if (logger.isWarnEnabled()) {logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);}throw new MessageRejectedWhileStoppingException();}try {doExecuteListener(channel, messageIn);}catch (RuntimeException ex) {if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {if (this.statefulRetryFatalWithNullMessageId) {throw new FatalListenerExecutionException("Illegal null id in message. Failed to manage retry for message: " + messageIn, ex);}else {throw new ListenerExecutionFailedException("Cannot retry message more than once without an ID",new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex),messageIn);}}handleListenerException(ex);throw ex;}}

代碼不往下貼了,繼續追就可以,最終還是找到了,打標@RabbitListener的那個方法上,得到了執行。真正讓業務邏輯執行到了MQ推送過來的消息,

太不容易了,消息從發送-> Exchange->Queue -> java amqp client? ->spring client - >consume 最終得到了消費。

4. 總結

小結一下,我們從注解RabbitHandler RabbitListener 入手,一步步追蹤到 與Broker鏈接的創建,Queue的聲明,接著,啟動新線程 注冊一個內部的消費者到Broker中,Broker有消息的時候會推送到本地的BlockingQueue中去。

使用MainLoop 消費本地Blockinqueue的內容

貼個小圖:

總結

以上是生活随笔為你收集整理的RabbitMQ初步到精通-第十章-RabbitMQ之Spring客户端源码的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。