Activemq实战
文章目錄
- 消息發(fā)布模式和接口
- JMS發(fā)布消息模式
- JMS API
- 點(diǎn)對點(diǎn)模式
- 同步消費(fèi)
- 異步消費(fèi)
- 總結(jié)
- 發(fā)布/訂閱模式
- 訂閱者類型
- 點(diǎn)對點(diǎn)模式與發(fā)布訂閱模式比較
- JMS
- 消息的可靠性和持久性
- 設(shè)置方式
- 生產(chǎn)者和消費(fèi)者事務(wù)及ACK
- 事務(wù)
- 設(shè)置方式
- 總結(jié)
- ACK
- 非事務(wù)模式下ACK
- 事務(wù)模式下ACK
- 總結(jié)
- spring 整合 Activemq
- spring framework
- spring boot
- mqtt
- 高級特性
- 異步投遞及確認(rèn)
- 延遲投遞和定時投遞
- 消費(fèi)重試機(jī)制
- 重新發(fā)送消息情況
- Java配置
- Spring配置
- sprint boot配置
- 死信隊(duì)列
- 防止重復(fù)調(diào)用
- Activemq應(yīng)用場景
- 參考
- JMS和AMQP
消息發(fā)布模式和接口
JMS發(fā)布消息模式
點(diǎn)對點(diǎn)模式:一個生產(chǎn)者向特定隊(duì)列發(fā)送消息,一個消費(fèi)者讀取消息。一條消息僅能被一個消費(fèi)者消費(fèi)。
發(fā)布/訂閱模式:0個或多個消費(fèi)者 讀取topic消息,一條消息可以被每個消費(fèi)者消費(fèi)。訂閱者必須保持活動狀態(tài)才能訂閱消息。如果訂閱者創(chuàng)建了持久訂閱,則會在訂閱者重新連接時重新發(fā)布。
JMS API
JMS開發(fā)的基本步驟:
點(diǎn)對點(diǎn)模式
同步消費(fèi)
MessageConsumer.receive()方法,會阻塞。
異步消費(fèi)
使用MessageListener 創(chuàng)建異步監(jiān)聽。
總結(jié)
每一條消息只會被一個消費(fèi)者消費(fèi)
消息被消費(fèi)會,不會再被存儲。
多個消費(fèi)者消費(fèi)一個隊(duì)列,未被消費(fèi)的消息,會被輪詢被已經(jīng)活動的消費(fèi)者 消費(fèi)。
發(fā)布/訂閱模式
訂閱者類型
- 活動持久訂閱者。設(shè)置了 Connection 的 ClientId 的訂閱者。
- 離線持久訂閱者。下線了的持久訂閱者
- 活動非持久訂閱者。未設(shè)置ClientId,默認(rèn)情況。
要用持久化訂閱,發(fā)送者要用DelIveryMode.PERSISTENT模式發(fā)送,在連接之前設(shè)定。一定要設(shè)置完成后,再start這個connection
Message Durability 與 Message Persistence 是不同的,Message Durability 僅指發(fā)布訂閱模式下的訂閱者
點(diǎn)對點(diǎn)模式與發(fā)布訂閱模式比較
| 工作模式 | 如果當(dāng)前沒有訂閱者,則所有消息丟棄。 每條消息都會發(fā)送到所有訂閱者 | 沒有消費(fèi)者,消息不會丟棄。 如果有多個消費(fèi)者,消息采用**負(fù)載均衡模式(輪詢)**保證一條消息只會發(fā)送給一個消費(fèi)者 |
| 有無狀態(tài) | 無狀態(tài) | queue數(shù)據(jù)會以文件形式保存,或者配置成DB。 |
| 投遞完整性 | 當(dāng)前沒有訂閱者,則所有消息丟棄 | 消息不會丟棄 |
| 處理效率 | 消息會根據(jù)訂閱者數(shù)量復(fù)制,訂閱者數(shù)量多性能會降低。 | 一條消息發(fā)送到一個消費(fèi)者,消費(fèi)者數(shù)量不影響效率。 |
JMS
發(fā)送ObjectMessage時,如果是自定義的對象,需要設(shè)置trustAllPackages
消息的可靠性和持久性
設(shè)置方式
MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //默認(rèn)是持久性的。注意:只有queue 持久化才有意義。topic的持久化沒有任何意義,因?yàn)槿绻麤]有訂閱者,發(fā)送的消息就是沒有價值的。
生產(chǎn)者和消費(fèi)者事務(wù)及ACK
事務(wù)
設(shè)置方式
session = connection.createSession(useTransaction,Session.AUTO_ACKNOWLEDGE); session.commit();總結(jié)
生產(chǎn)者開啟了事務(wù),必須顯性的提交事務(wù),不然消息不會發(fā)送到broker。
事務(wù)偏生產(chǎn)者,ACK偏消費(fèi)者,消費(fèi)者端不需要設(shè)置事務(wù),如果設(shè)置了事務(wù),則ACK不會提交,會導(dǎo)致重復(fù)消費(fèi)。
ACK
提交,指調(diào)用 session.commit()方法。ACK指調(diào)用了message.acknowledge()方法。
非事務(wù)模式下ACK
生產(chǎn)者設(shè)置為自動ACK,消費(fèi)者設(shè)置為自動ACK,正常
生產(chǎn)者設(shè)置為自動ACK,消費(fèi)者設(shè)置為手動ACK,如果沒有調(diào)用ACK方法,則會重復(fù)消費(fèi)。
生產(chǎn)者設(shè)置為手動ACK,消費(fèi)者設(shè)置為自動ACK,正常
事務(wù)模式下ACK
生產(chǎn)者ACK設(shè)置為自動,消費(fèi)者ACK設(shè)置為自動。已提交,正常
生產(chǎn)者ACK設(shè)置為自動,消費(fèi)者ACK設(shè)置為自動。未提交,會出現(xiàn)重復(fù)消費(fèi)
生產(chǎn)者ACK設(shè)置為自動,消費(fèi)者ACK設(shè)置為手動。已提交,未 ACK ,正常
生產(chǎn)者ACK設(shè)置為自動,消費(fèi)者ACK設(shè)置為手動。未提交,已ACK,會出現(xiàn)重復(fù)消費(fèi)
生產(chǎn)者ACK設(shè)置為手動,消費(fèi)者ACK設(shè)置為自動。已提交,正常
生產(chǎn)者ACK設(shè)置為手動,消費(fèi)者ACK設(shè)置為自動。未提交,會出現(xiàn)重復(fù)消費(fèi)
生產(chǎn)者ACK設(shè)置為手動,消費(fèi)者ACK設(shè)置為自動。未提交,已ACK,會出現(xiàn)重復(fù)消費(fèi)
生產(chǎn)者ACK設(shè)置為手動,消費(fèi)者ACK設(shè)置為自動。未提交,未ACK,會出現(xiàn)重復(fù)消費(fèi)
生產(chǎn)者ACK設(shè)置為手動,消費(fèi)者ACK設(shè)置為手動。已提交,已ACK,正常
生產(chǎn)者ACK設(shè)置為手動,消費(fèi)者ACK設(shè)置為手動。已提交,未ACK,正常
總結(jié)
在非事務(wù)中,消費(fèi)者根據(jù)是否ACK,決定消息是否標(biāo)記未已消費(fèi)。
在事務(wù)中,消費(fèi)者根據(jù)事務(wù)是否提交,決定消息是否標(biāo)記為已消費(fèi)。如果事務(wù)回滾,則消息會被再次投遞。
spring 整合 Activemq
spring framework
<!-- activemq依賴--><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.5</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.5</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${spring.version}</version></dependency>spring boot
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!-- 連接池可以單獨(dú)依賴 --> <dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.5</version> </dependency> spring:activemq:user: adminpassword: adminbroker-url: tcp://192.168.153.129:61616pool:enabled: truemax-connections: 10jms:pub-sub-domain: false #只能監(jiān)聽隊(duì)列,不能是topic。重點(diǎn):
@EnableJms注解:
@JmsListener 注解:
屬性參考類:ActiveMQProperties ,JmsProperties
mqtt
Activemq默認(rèn)就開啟的
WEB端使用參考:
mqtt.rar
高級特性
異步投遞及確認(rèn)
同步發(fā)送,send()方法會阻塞,直到broker給出一個確認(rèn)消息。非事務(wù)、持久化消息 場景默認(rèn)的方式。
異步發(fā)送,不會阻塞,會提高吞吐量。大部分場景的默認(rèn)方式。
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setUseAsyncSend(true);//異步發(fā)送異步發(fā)送,可能會丟失消息,可以在send()方法中帶上AsyncCallback類型參數(shù)
producer.send(message, new AsyncCallback() {@Overridepublic void onSuccess() {try {System.out.println("消息發(fā)送成功:" + message.getStringProperty("msgId"));} catch (JMSException e) {e.printStackTrace();}}@Overridepublic void onException(JMSException e) {try {System.out.println("出現(xiàn)異常:" + message.getStringProperty("msgId"));} catch (JMSException je) {je.printStackTrace();}}});延遲投遞和定時投遞
Activemq 需要開啟 Scheduler,broker的schedulerSupport屬性。
設(shè)置屬性:
| AMQ_SCHEDULED_DELAY | long | 延遲投遞的時間 |
| AMQ_SCHEDULED_PERIOD | long | 重復(fù)投遞的時間間隔 |
| AMQ_SCHEDULED_REPEAT | int | 重復(fù)投遞次數(shù),(不包括第一次) |
| AMQ_SCHEDULED_CRON | String | Cron表達(dá)式 |
示例:
//延遲30秒,投遞10次,間隔10秒: MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); long delay = 30 * 1000; long period = 10 * 1000; int repeat = 9; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); producer.send(message);//CRON 表達(dá)式 ,定時投遞。linux crontab 表達(dá)式。 MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); producer.send(message);//CRON表達(dá)式的優(yōu)先級高于另外三個參數(shù),如果在設(shè)置了CRON的同時,也有repeat和period參數(shù),則會在每次CRON執(zhí)行的時候,重復(fù)投遞repeat次,每次間隔為period。就是說設(shè)置是疊加的效果。例如每小時都會發(fā)生消息被投遞10次,延遲1秒開始,每次間隔1秒: MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9); producer.send(message);設(shè)置延時投遞后,會在scheduler中有一條記錄,會記錄任務(wù)信息。投遞repeat次,則會把消息往 指定 destination 中 發(fā)送 repeat + 1 次。
消費(fèi)重試機(jī)制
重新發(fā)送消息情況
消息重發(fā)指的是消息可以被broker重新投遞,不一定是之前的消費(fèi)者。重新投遞后,消費(fèi)者可以重新消費(fèi)此消息。
總結(jié):即broker必須確認(rèn)消息被消費(fèi)了,才不會重發(fā)。1,2 會重新投遞給原來的消費(fèi)者,3,4 則不一定。
重發(fā)配置:http://activemq.apache.org/redelivery-policy
Java配置
// 手動修改RedeliveryPolicy(重發(fā)策略) RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setMaximumRedeliveries(3);// 修改重發(fā)次數(shù)為3次 activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);// 將重發(fā)策略設(shè)置到ConnectionFactory中Spring配置
<!--定義RedeliveryPolicy(重發(fā)機(jī)制)--> <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"><property name="useCollisionAvoidance" value="false"/><property name="useExponentialBackOff" value="true"/><property name="maximumRedeliveries" value="3"/><property name="initialRedeliveryDelay" value="1000"/><property name="backOffMultiplier" value="2"/><property name="maximumRedeliveryDelay" value="1000"/> </bean> <!--創(chuàng)建連接工廠--> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"/><!--引用自定義重發(fā)機(jī)制--><property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/> </bean>sprint boot配置
@Configuration public class ActiveMQConfig {@Beanpublic JmsListenerContainerFactory<?> queueListenerFactory(@Qualifier("activeMQConnectionFactory") ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();container.setConcurrentConsumers(3);container.setConnectionFactory(connectionFactory);factory.setPubSubDomain(false);factory.setConnectionFactory(connectionFactory);factory.setConcurrency("3-15"); //連接數(shù)factory.setRecoveryInterval(1000L); //重連間隔時間factory.setSessionAcknowledgeMode(4);return factory;}@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory(){ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();connectionFactory.setTrustAllPackages(true);connectionFactory.setRedeliveryPolicy(redeliveryPolicy());return connectionFactory;}@Beanpublic RedeliveryPolicy redeliveryPolicy(){RedeliveryPolicy redeliveryPolicy=new RedeliveryPolicy();//是否在每次嘗試重新發(fā)送失敗后,增長這個等待時間redeliveryPolicy.setUseExponentialBackOff(true);//重發(fā)次數(shù),默認(rèn)為6次redeliveryPolicy.setMaximumRedeliveries(5);//重發(fā)時間間隔,默認(rèn)為1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失敗后重新發(fā)送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//設(shè)置重發(fā)最大拖延時間-1 表示沒有拖延只有UseExponentialBackOff(true)為true時生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory){JmsTemplate jmsTemplate=new JmsTemplate();jmsTemplate.setDeliveryMode(1);//進(jìn)行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(activeMQConnectionFactory);jmsTemplate.setSessionAcknowledgeMode(4);//客戶端簽收模式return jmsTemplate;}}死信隊(duì)列
設(shè)置每個隊(duì)列的死信隊(duì)列,activemq.xml
<destinationPolicy><policyMap><policyEntries><policyEntry queue=">"><deadLetterStrategy><individualDeadLetterStrategy queuePrefix="DLQ."useQueueForQueueMessages="true"processExpired="false"processNonPersistent="false"/></deadLetterStrategy></policyEntry></policyEntries></policyMap> </destinationPolicy>防止重復(fù)調(diào)用
1、如果是做數(shù)據(jù)庫插入,設(shè)置一個唯一主鍵
2、通過第三方服務(wù)做消費(fèi)記錄,例如Redis。
Activemq應(yīng)用場景
- 異步處理
- 應(yīng)用解耦
- 流量消峰
- 日志處理(異步處理的一種應(yīng)用)
- 消息通信
參考
spring JMS:https://spring.io/guides/gs/messaging-jms/
https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms-using
https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.amqp
jms配置:https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.jms.activemq
重發(fā)配置:http://activemq.apache.org/redelivery-policy
JMS和AMQP
1.大多應(yīng)用中,可通過消息服務(wù)中間件來提升系統(tǒng)異步通信、擴(kuò)展解耦能力 2.消息服務(wù)中兩個重要概念:- 消息代理(message broker)和目的地(destination)- 當(dāng)消息發(fā)送者發(fā)送消息以后,將由消息代理接管,消息代理保證消息傳遞到指定目的地。 3.消息隊(duì)列主要有兩種形式的目的地- 隊(duì)列(queue):點(diǎn)對點(diǎn)消息通信(point-to-point)- 主題(topic):發(fā)布(publish)/訂閱(subscribe)消息通信 4.點(diǎn)對點(diǎn)式:–消息發(fā)送者發(fā)送消息,消息代理將其放入一個隊(duì)列中,消息接收者從隊(duì)列中獲取消息內(nèi)容,消息讀取后被移出隊(duì)列–消息只有唯一的發(fā)送者和接受者,但并不是說只能有一個接收者 5.發(fā)布訂閱式:–發(fā)送者(發(fā)布者)發(fā)送消息到主題,多個接收者(訂閱者)監(jiān)聽(訂閱)這個主題,那么就會在消息到達(dá)時同時收到消息 6.JMS(Java Message Service)JAVA消息服務(wù):–基于JVM消息代理的規(guī)范。ActiveMQ、HornetMQ是JMS實(shí)現(xiàn) 7.AMQP(Advanced Message Queuing Protocol)–高級消息隊(duì)列協(xié)議,也是一個消息代理的規(guī)范,兼容JMS–RabbitMQ是AMQP的實(shí)現(xiàn) 8.Spring支持–spring-jms提供了對JMS的支持–spring-rabbit提供了對AMQP的支持–需要ConnectionFactory的實(shí)現(xiàn)來連接消息代理–提供JmsTemplate、RabbitTemplate來發(fā)送消息–@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上監(jiān)聽消息代理發(fā)布的消息–@EnableJms、@EnableRabbit開啟支持 9.Spring Boot自動配置–JmsAutoConfiguration–RabbitAutoConfiguration總結(jié)
以上是生活随笔為你收集整理的Activemq实战的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: K8S-statefulset-naco
- 下一篇: Activemq-In-action(二