日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Activemq实战

發(fā)布時間:2024/4/13 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Activemq实战 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

文章目錄

  • 消息發(fā)布模式和接口
    • JMS發(fā)布消息模式
    • JMS API
  • 點(diǎn)對點(diǎn)模式
    • 同步消費(fèi)
    • 異步消費(fèi)
    • 總結(jié)
  • 發(fā)布/訂閱模式
    • 訂閱者類型
    • 點(diǎn)對點(diǎn)模式與發(fā)布訂閱模式比較
  • JMS
  • 消息的可靠性和持久性
    • 設(shè)置方式
  • 生產(chǎn)者和消費(fèi)者事務(wù)及ACK
    • 事務(wù)
      • 設(shè)置方式
      • 總結(jié)
    • ACK
      • 非事務(wù)模式下ACK
      • 事務(wù)模式下ACK
      • 總結(jié)
  • spring 整合 Activemq
    • spring framework
    • spring boot
  • mqtt
  • 高級特性
    • 異步投遞及確認(rèn)
    • 延遲投遞和定時投遞
    • 消費(fèi)重試機(jī)制
      • 重新發(fā)送消息情況
      • Java配置
      • Spring配置
      • sprint boot配置
    • 死信隊(duì)列
    • 防止重復(fù)調(diào)用
  • Activemq應(yīng)用場景
  • 參考
    • JMS和AMQP

消息發(fā)布模式和接口

JMS發(fā)布消息模式

點(diǎn)對點(diǎn)模式:一個生產(chǎn)者向特定隊(duì)列發(fā)送消息,一個消費(fèi)者讀取消息。一條消息僅能被一個消費(fèi)者消費(fèi)。

發(fā)布/訂閱模式:0個或多個消費(fèi)者 讀取topic消息,一條消息可以被每個消費(fèi)者消費(fèi)。訂閱者必須保持活動狀態(tài)才能訂閱消息。如果訂閱者創(chuàng)建了持久訂閱,則會在訂閱者重新連接時重新發(fā)布。

JMS API

JMS開發(fā)的基本步驟:

  • 創(chuàng)建一個JMS Connection Factory
  • 通過Connection Factory來創(chuàng)建JMS Connection
  • 啟動JMS Connection
  • 通過Connection創(chuàng)建JMS Session
  • 創(chuàng)建JMS Destination
  • 創(chuàng)建JMS Producer,或者創(chuàng)建JMS Message,并設(shè)置Destination
  • 創(chuàng)建JMS Consumer,或者是注冊一個JMS Message Listener
  • 發(fā)送或者接收Message(s)
  • 關(guān)閉所有的JMS資源(Connection 、Session、Producer、Consumer等)
  • 點(diǎn)對點(diǎn)模式

    同步消費(fèi)

    MessageConsumer.receive()方法,會阻塞。

    異步消費(fèi)

    使用MessageListener 創(chuàng)建異步監(jiān)聽。

    總結(jié)

    每一條消息只會被一個消費(fèi)者消費(fèi)

    消息被消費(fèi)會,不會再被存儲。

    多個消費(fèi)者消費(fèi)一個隊(duì)列,未被消費(fèi)的消息,會被輪詢被已經(jīng)活動的消費(fèi)者 消費(fèi)。

    發(fā)布/訂閱模式

    訂閱者類型

    • 活動持久訂閱者。設(shè)置了 Connection 的 ClientId 的訂閱者。
    • 離線持久訂閱者。下線了的持久訂閱者
    • 活動非持久訂閱者。未設(shè)置ClientId,默認(rèn)情況。

    要用持久化訂閱,發(fā)送者要用DelIveryMode.PERSISTENT模式發(fā)送,在連接之前設(shè)定。一定要設(shè)置完成后,再start這個connection

    Message Durability 與 Message Persistence 是不同的,Message Durability 僅指發(fā)布訂閱模式下的訂閱者

    點(diǎn)對點(diǎn)模式與發(fā)布訂閱模式比較

    比 較點(diǎn)topicqueue
    工作模式如果當(dāng)前沒有訂閱者,則所有消息丟棄。
    每條消息都會發(fā)送到所有訂閱者
    沒有消費(fèi)者,消息不會丟棄。
    如果有多個消費(fèi)者,消息采用**負(fù)載均衡模式(輪詢)**保證一條消息只會發(fā)送給一個消費(fèi)者
    有無狀態(tài)無狀態(tài)queue數(shù)據(jù)會以文件形式保存,或者配置成DB。
    投遞完整性當(dāng)前沒有訂閱者,則所有消息丟棄消息不會丟棄
    處理效率消息會根據(jù)訂閱者數(shù)量復(fù)制,訂閱者數(shù)量多性能會降低。一條消息發(fā)送到一個消費(fèi)者,消費(fèi)者數(shù)量不影響效率。

    JMS

    發(fā)送ObjectMessage時,如果是自定義的對象,需要設(shè)置trustAllPackages

    消息的可靠性和持久性

    設(shè)置方式

    MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //默認(rèn)是持久性的。

    注意:只有queue 持久化才有意義。topic的持久化沒有任何意義,因?yàn)槿绻麤]有訂閱者,發(fā)送的消息就是沒有價值的。

    生產(chǎn)者和消費(fèi)者事務(wù)及ACK

    事務(wù)

    設(shè)置方式

    session = connection.createSession(useTransaction,Session.AUTO_ACKNOWLEDGE); session.commit();

    總結(jié)

    生產(chǎn)者開啟了事務(wù),必須顯性的提交事務(wù),不然消息不會發(fā)送到broker。

    事務(wù)偏生產(chǎn)者,ACK偏消費(fèi)者,消費(fèi)者端不需要設(shè)置事務(wù),如果設(shè)置了事務(wù),則ACK不會提交,會導(dǎo)致重復(fù)消費(fèi)。

    ACK

    提交,指調(diào)用 session.commit()方法。ACK指調(diào)用了message.acknowledge()方法。

    非事務(wù)模式下ACK

    生產(chǎn)者設(shè)置為自動ACK,消費(fèi)者設(shè)置為自動ACK,正常

    生產(chǎn)者設(shè)置為自動ACK,消費(fèi)者設(shè)置為手動ACK,如果沒有調(diào)用ACK方法,則會重復(fù)消費(fèi)。

    生產(chǎn)者設(shè)置為手動ACK,消費(fèi)者設(shè)置為自動ACK,正常

    事務(wù)模式下ACK

    生產(chǎn)者ACK設(shè)置為自動,消費(fèi)者ACK設(shè)置為自動。已提交,正常

    生產(chǎn)者ACK設(shè)置為自動,消費(fèi)者ACK設(shè)置為自動。未提交,會出現(xiàn)重復(fù)消費(fèi)

    生產(chǎn)者ACK設(shè)置為自動,消費(fèi)者ACK設(shè)置為手動。已提交,未 ACK ,正常

    生產(chǎn)者ACK設(shè)置為自動,消費(fèi)者ACK設(shè)置為手動。未提交,已ACK,會出現(xiàn)重復(fù)消費(fèi)

    生產(chǎn)者ACK設(shè)置為手動,消費(fèi)者ACK設(shè)置為自動。已提交,正常

    生產(chǎn)者ACK設(shè)置為手動,消費(fèi)者ACK設(shè)置為自動。未提交,會出現(xiàn)重復(fù)消費(fèi)

    生產(chǎn)者ACK設(shè)置為手動,消費(fèi)者ACK設(shè)置為自動。未提交,已ACK,會出現(xiàn)重復(fù)消費(fèi)

    生產(chǎn)者ACK設(shè)置為手動,消費(fèi)者ACK設(shè)置為自動。未提交,未ACK,會出現(xiàn)重復(fù)消費(fèi)

    生產(chǎn)者ACK設(shè)置為手動,消費(fèi)者ACK設(shè)置為手動。已提交,已ACK,正常

    生產(chǎn)者ACK設(shè)置為手動,消費(fèi)者ACK設(shè)置為手動。已提交,未ACK,正常

    總結(jié)

    非事務(wù)中,消費(fèi)者根據(jù)是否ACK,決定消息是否標(biāo)記未已消費(fèi)。

    事務(wù)中,消費(fèi)者根據(jù)事務(wù)是否提交,決定消息是否標(biāo)記為已消費(fèi)。如果事務(wù)回滾,則消息會被再次投遞。

    spring 整合 Activemq

    spring framework

    <!-- activemq依賴--><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.5</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.5</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${spring.version}</version></dependency>

    spring boot

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!-- 連接池可以單獨(dú)依賴 --> <dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.5</version> </dependency> spring:activemq:user: adminpassword: adminbroker-url: tcp://192.168.153.129:61616pool:enabled: truemax-connections: 10jms:pub-sub-domain: false #只能監(jiān)聽隊(duì)列,不能是topic。

    重點(diǎn):

    @EnableJms注解:

    @JmsListener 注解:

    屬性參考類:ActiveMQProperties ,JmsProperties

    mqtt

    Activemq默認(rèn)就開啟的

    WEB端使用參考:

    mqtt.rar

    高級特性

    異步投遞及確認(rèn)

    同步發(fā)送,send()方法會阻塞,直到broker給出一個確認(rèn)消息。非事務(wù)、持久化消息 場景默認(rèn)的方式。

    異步發(fā)送,不會阻塞,會提高吞吐量。大部分場景的默認(rèn)方式。

    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setUseAsyncSend(true);//異步發(fā)送

    異步發(fā)送,可能會丟失消息,可以在send()方法中帶上AsyncCallback類型參數(shù)

    producer.send(message, new AsyncCallback() {@Overridepublic void onSuccess() {try {System.out.println("消息發(fā)送成功:" + message.getStringProperty("msgId"));} catch (JMSException e) {e.printStackTrace();}}@Overridepublic void onException(JMSException e) {try {System.out.println("出現(xiàn)異常:" + message.getStringProperty("msgId"));} catch (JMSException je) {je.printStackTrace();}}});

    延遲投遞和定時投遞

    Activemq 需要開啟 Scheduler,broker的schedulerSupport屬性。

    設(shè)置屬性:

    Property nametypedescription
    AMQ_SCHEDULED_DELAYlong延遲投遞的時間
    AMQ_SCHEDULED_PERIODlong重復(fù)投遞的時間間隔
    AMQ_SCHEDULED_REPEATint重復(fù)投遞次數(shù),(不包括第一次)
    AMQ_SCHEDULED_CRONStringCron表達(dá)式

    示例:

    //延遲30秒,投遞10次,間隔10秒: MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); long delay = 30 * 1000; long period = 10 * 1000; int repeat = 9; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); producer.send(message);//CRON 表達(dá)式 ,定時投遞。linux crontab 表達(dá)式。 MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); producer.send(message);//CRON表達(dá)式的優(yōu)先級高于另外三個參數(shù),如果在設(shè)置了CRON的同時,也有repeat和period參數(shù),則會在每次CRON執(zhí)行的時候,重復(fù)投遞repeat次,每次間隔為period。就是說設(shè)置是疊加的效果。例如每小時都會發(fā)生消息被投遞10次,延遲1秒開始,每次間隔1秒: MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9); producer.send(message);

    設(shè)置延時投遞后,會在scheduler中有一條記錄,會記錄任務(wù)信息。投遞repeat次,則會把消息往 指定 destination 中 發(fā)送 repeat + 1 次。

    消費(fèi)重試機(jī)制

    重新發(fā)送消息情況

    消息重發(fā)指的是消息可以被broker重新投遞,不一定是之前的消費(fèi)者。重新投遞后,消費(fèi)者可以重新消費(fèi)此消息。

  • 事務(wù)會話中,rollback的消息,會被重新投遞。
  • 消費(fèi)端,使用客戶端手動確認(rèn),還未確認(rèn),執(zhí)行session.recover(),還未ACK的消息都會被重新投遞。
  • 所有未ACK消息,通過session.close()關(guān)閉事務(wù),則這些消息會被broker立即重發(fā)。
  • 消息被拉取后,ACK超時,也會重發(fā)
  • 總結(jié):即broker必須確認(rèn)消息被消費(fèi)了,才不會重發(fā)。1,2 會重新投遞給原來的消費(fèi)者,3,4 則不一定。

    重發(fā)配置:http://activemq.apache.org/redelivery-policy

    Java配置

    // 手動修改RedeliveryPolicy(重發(fā)策略) RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setMaximumRedeliveries(3);// 修改重發(fā)次數(shù)為3次 activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);// 將重發(fā)策略設(shè)置到ConnectionFactory中

    Spring配置

    <!--定義RedeliveryPolicy(重發(fā)機(jī)制)--> <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"><property name="useCollisionAvoidance" value="false"/><property name="useExponentialBackOff" value="true"/><property name="maximumRedeliveries" value="3"/><property name="initialRedeliveryDelay" value="1000"/><property name="backOffMultiplier" value="2"/><property name="maximumRedeliveryDelay" value="1000"/> </bean> <!--創(chuàng)建連接工廠--> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"/><!--引用自定義重發(fā)機(jī)制--><property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/> </bean>

    sprint boot配置

    @Configuration public class ActiveMQConfig {@Beanpublic JmsListenerContainerFactory<?> queueListenerFactory(@Qualifier("activeMQConnectionFactory") ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();container.setConcurrentConsumers(3);container.setConnectionFactory(connectionFactory);factory.setPubSubDomain(false);factory.setConnectionFactory(connectionFactory);factory.setConcurrency("3-15"); //連接數(shù)factory.setRecoveryInterval(1000L); //重連間隔時間factory.setSessionAcknowledgeMode(4);return factory;}@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory(){ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();connectionFactory.setTrustAllPackages(true);connectionFactory.setRedeliveryPolicy(redeliveryPolicy());return connectionFactory;}@Beanpublic RedeliveryPolicy redeliveryPolicy(){RedeliveryPolicy redeliveryPolicy=new RedeliveryPolicy();//是否在每次嘗試重新發(fā)送失敗后,增長這個等待時間redeliveryPolicy.setUseExponentialBackOff(true);//重發(fā)次數(shù),默認(rèn)為6次redeliveryPolicy.setMaximumRedeliveries(5);//重發(fā)時間間隔,默認(rèn)為1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失敗后重新發(fā)送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//設(shè)置重發(fā)最大拖延時間-1 表示沒有拖延只有UseExponentialBackOff(true)為true時生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory){JmsTemplate jmsTemplate=new JmsTemplate();jmsTemplate.setDeliveryMode(1);//進(jìn)行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(activeMQConnectionFactory);jmsTemplate.setSessionAcknowledgeMode(4);//客戶端簽收模式return jmsTemplate;}}

    死信隊(duì)列

    設(shè)置每個隊(duì)列的死信隊(duì)列,activemq.xml

    <destinationPolicy><policyMap><policyEntries><policyEntry queue=">"><deadLetterStrategy><individualDeadLetterStrategy queuePrefix="DLQ."useQueueForQueueMessages="true"processExpired="false"processNonPersistent="false"/></deadLetterStrategy></policyEntry></policyEntries></policyMap> </destinationPolicy>

    防止重復(fù)調(diào)用

    1、如果是做數(shù)據(jù)庫插入,設(shè)置一個唯一主鍵

    2、通過第三方服務(wù)做消費(fèi)記錄,例如Redis。

    Activemq應(yīng)用場景

    • 異步處理
    • 應(yīng)用解耦
    • 流量消峰
    • 日志處理(異步處理的一種應(yīng)用)
    • 消息通信

    參考

    spring JMS:https://spring.io/guides/gs/messaging-jms/

    https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms-using

    https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.amqp

    jms配置:https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.jms.activemq

    重發(fā)配置:http://activemq.apache.org/redelivery-policy

    JMS和AMQP

    1.大多應(yīng)用中,可通過消息服務(wù)中間件來提升系統(tǒng)異步通信、擴(kuò)展解耦能力 2.消息服務(wù)中兩個重要概念:- 消息代理(message broker)和目的地(destination)- 當(dāng)消息發(fā)送者發(fā)送消息以后,將由消息代理接管,消息代理保證消息傳遞到指定目的地。 3.消息隊(duì)列主要有兩種形式的目的地- 隊(duì)列(queue):點(diǎn)對點(diǎn)消息通信(point-to-point)- 主題(topic):發(fā)布(publish)/訂閱(subscribe)消息通信 4.點(diǎn)對點(diǎn)式:–消息發(fā)送者發(fā)送消息,消息代理將其放入一個隊(duì)列中,消息接收者從隊(duì)列中獲取消息內(nèi)容,消息讀取后被移出隊(duì)列–消息只有唯一的發(fā)送者和接受者,但并不是說只能有一個接收者 5.發(fā)布訂閱式:–發(fā)送者(發(fā)布者)發(fā)送消息到主題,多個接收者(訂閱者)監(jiān)聽(訂閱)這個主題,那么就會在消息到達(dá)時同時收到消息 6.JMS(Java Message Service)JAVA消息服務(wù):–基于JVM消息代理的規(guī)范。ActiveMQ、HornetMQ是JMS實(shí)現(xiàn) 7.AMQP(Advanced Message Queuing Protocol)–高級消息隊(duì)列協(xié)議,也是一個消息代理的規(guī)范,兼容JMS–RabbitMQ是AMQP的實(shí)現(xiàn) 8.Spring支持–spring-jms提供了對JMS的支持–spring-rabbit提供了對AMQP的支持–需要ConnectionFactory的實(shí)現(xiàn)來連接消息代理–提供JmsTemplate、RabbitTemplate來發(fā)送消息–@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上監(jiān)聽消息代理發(fā)布的消息–@EnableJms、@EnableRabbit開啟支持 9.Spring Boot自動配置–JmsAutoConfiguration–RabbitAutoConfiguration

    總結(jié)

    以上是生活随笔為你收集整理的Activemq实战的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。