日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

activemq的使用场景

發(fā)布時間:2023/12/10 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 activemq的使用场景 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、消息隊列概述
消息隊列中間件是分布式系統(tǒng)中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題。實現(xiàn)高性能,高可用,可伸縮和最終一致性架構。是大型分布式系統(tǒng)不可缺少的中間件。

目前在生產環(huán)境,使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

二、消息隊列應用場景
以下介紹消息隊列在實際應用中常用的使用場景。異步處理,應用解耦,流量削鋒和消息通訊四個場景。本篇使用ActiveMQ+SpringBoot來模擬這四個場景。

2.1異步處理
場景說明:汽車觸發(fā)圍欄報警后,需要發(fā)送報警郵件和報警短信。傳統(tǒng)的做法有兩種1.串行的方式;2.并行方式。

(1)串行方式:將報警信息寫入數(shù)據(jù)庫成功后,發(fā)送報警郵件,再發(fā)送報警短信。以上三個任務全部完成后,該報警信息加入統(tǒng)計列表。

(2)并行方式:報警信息寫入數(shù)據(jù)庫成功后,同時發(fā)送報警郵件和短信。

假設三個業(yè)務節(jié)點每個使用50毫秒鐘,不考慮網絡等其他開銷,則串行方式的時間是150毫秒,并行的時間可能是100毫秒。

因為CPU在單位時間內處理的請求數(shù)是一定的,假設CPU1秒內吞吐量是100次。則串行方式1秒內CPU可處理的請求量是7次(1000/150)。并行方式處理的請求量是10次(1000/100)。

小結:如以上案例描述,傳統(tǒng)的方式系統(tǒng)的性能(并發(fā)量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?

引入消息隊列,將不是必須的業(yè)務邏輯,異步處理。改造后的架構如下:

代碼示例

①在pom文件中引入activemq依賴

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId><version>1.5.6.RELEASE</version></dependency>

②在配置文件中加上activemq的配置

spring.activemq.broker-url=tcp://127.0.0.1:61616 # 在考慮結束之前等待的時間 #spring.activemq.close-timeout=15s # 默認代理URL是否應該在內存中。如果指定了顯式代理,則忽略此值。 spring.activemq.in-memory=true # 是否在回滾回滾消息之前停止消息傳遞。這意味著當啟用此命令時,消息順序不會被保留。 spring.activemq.non-blocking-redelivery=false # 密碼 spring.activemq.password=123456 # 等待消息發(fā)送響應的時間。設置為0等待永遠。 spring.activemq.send-timeout=0 spring.activemq.user=haha # 是否信任所有包 #spring.activemq.packages.trust-all= # 要信任的特定包的逗號分隔列表(當不信任所有包時) #spring.activemq.packages.trusted= # 當連接請求和池滿時是否阻塞。設置false會拋“JMSException異常”。 #spring.activemq.pool.block-if-full=true # 如果池仍然滿,則在拋出異常前阻塞時間。 #spring.activemq.pool.block-if-full-timeout=-1ms # 是否在啟動時創(chuàng)建連接。可以在啟動時用于加熱池。 #spring.activemq.pool.create-connection-on-startup=true # 是否用Pooledconnectionfactory代替普通的ConnectionFactory。 #spring.activemq.pool.enabled=false # 連接過期超時。 #spring.activemq.pool.expiry-timeout=0ms # 連接空閑超時 #spring.activemq.pool.idle-timeout=30s # 連接池最大連接數(shù) #spring.activemq.pool.max-connections=1 # 每個連接的有效會話的最大數(shù)目。 #spring.activemq.pool.maximum-active-session-per-connection=500 # 當有"JMSException"時嘗試重新連接 #spring.activemq.pool.reconnect-on-exception=true # 在空閑連接清除線程之間運行的時間。當為負數(shù)時,沒有空閑連接驅逐線程運行。 #spring.activemq.pool.time-between-expiration-check=-1ms # 是否只使用一個MessageProducer #spring.activemq.pool.use-anonymous-producers=true

③消息生產者

import java.util.Map;import javax.jms.Destination; import javax.jms.Queue;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessagePostProcessor; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;/*** 報警消息Producer* @author ko**/ @Component //@EnableScheduling public class AlarmProducer {// 也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝@Autowiredprivate JmsTemplate jmsTemplate; // private JmsMessagingTemplate jmsTemplate;// @Autowired // private Queue queue;// @Scheduled(fixedDelay=5000) // 5s執(zhí)行一次 只有無參的方法才能用該注解public void sendMessage(Destination destination, String message){ // jmsTemplate.convertAndSend(destinationName, payload, messagePostProcessor);this.jmsTemplate.convertAndSend(destination, message);}// 雙向隊列// @JmsListener(destination="out.queue") //   public void consumerMessage(String text){ //   System.out.println("從out.queue隊列收到的回復報文為:"+text); // } }

⑤controller里寫上測試接口

@Autowiredprivate AlarmProducer alarmProducer;@RequestMapping(value="/chufabaojing",method=RequestMethod.GET)@ApiOperation(value="觸發(fā)報警", notes="觸發(fā)報警")@ApiImplicitParams({@ApiImplicitParam(name = "devicename", value = "name",example = "xxxx", required = true, dataType = "string",paramType="query"),})public String chufabaojing(String devicename){List<String> alarmStrList = new ArrayList<>();alarmStrList.add(devicename+"out fence01");alarmStrList.add(devicename+"out fence02");alarmStrList.add(devicename+"in fence01");alarmStrList.add(devicename+"in fence02");System.out.println("設備"+devicename+"出圍欄報警");// 報警信息寫入數(shù)據(jù)庫System.out.println("報警數(shù)據(jù)寫入數(shù)據(jù)庫。。。");// 寫入消息隊列Destination destination = new ActiveMQQueue("mytest.queue");for (String alarmStr : alarmStrList) {alarmProducer.sendMessage(destination, alarmStr);}// 消息寫進消息隊列里就不管了// 下面兩步驟移到activemq消費者里// 發(fā)送郵件// 發(fā)送短信return "success";}

2.2 應用解耦
場景介紹,在spring cloud分布式微服務項目中,工單管理和設備管理分別是兩個微服務,如果A工單被張三接單了,那么工單狀態(tài)要設為已派單,檢驗員設為張三,設備狀態(tài)要置為在檢。

傳統(tǒng)的做法是,先調用工單管理的工單更新接口,成功之后再調用設備管理的設備更新接口,成功之后再返回操作提示給用戶。這樣做的缺點是應用耦合,如果在派單操作的時候正好設備管理微服務掛了或者阻塞了,那么派單操作就會失敗或者要等待很長時間無反饋。另外如果設備管理的接口有變動,那么工單管理里面的代碼也要改動。

引入消息中間件,派單的時候,工單管理的工單更新接口處理好后把信息寫入消息隊列,然后直接返回操作反饋給用戶。不管工單管理服務正不正常,正常就從消息隊列里訂閱消息處理,不正常就等待回復正常后再訂閱消息處理。

2.3 流量削峰
場景介紹,XX公司的系統(tǒng)原來是針對A地區(qū)的客戶開發(fā)的,現(xiàn)在為了搶占市場,拿下了B和C兩個地區(qū)的客戶,那么新系統(tǒng)上線,就存在如何把B和C的基礎數(shù)據(jù)導入XX公司的系統(tǒng)中來的問題,短時間內要把龐大的舊數(shù)據(jù)改造適合新系統(tǒng)再導入進來,這很容易使系統(tǒng)掛掉,另外每天還有增量數(shù)據(jù)產生。

這時可以引入消息中間件,B和C的客戶只要負責把數(shù)據(jù)規(guī)則放到消息隊列里就好了,XX公司可以有條不紊的從消息隊列里訂閱數(shù)據(jù),可以有效緩解短時間內的高流量壓力,但是這也對消息中間件的可靠性提出了要求。

如果遇到activemq的瓶頸,可以看看activemq集群方案,這篇文章 http://blog.csdn.net/shuangzh115/article/details/50989182

2.4 點對點通訊
類似聊天室的功能。

總結

以上是生活随笔為你收集整理的activemq的使用场景的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。