消息中间件及小案例
一:什么是消息中間件
消息中間件利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來
進(jìn)行分布式系統(tǒng)的集成。通過提供消息傳遞和消息排隊模型,它可以在分布式環(huán)境下擴(kuò)展進(jìn)
程間的通信。對于消息中間件,常見的角色大致也就有 Producer(生產(chǎn)者)、Consumer(消
費(fèi)者)
常見的消息中間件產(chǎn)品:
( (1 )ActiveMQ
ActiveMQ 是 Apache 出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個完
全支持 JMS1.1 和 J2EE 1.4 規(guī)范的 JMS Provider 實現(xiàn)。我們在本次課程中介紹 ActiveMQ 的使
用。
(2)RabbitMQ
AMQP 協(xié)議的領(lǐng)導(dǎo)實現(xiàn),支持多種場景。淘寶的 MySQL 集群內(nèi)部有使用它進(jìn)行通訊,
OpenStack 開源云平臺的通信組件,最先在金融行業(yè)得到運(yùn)用。
(3)ZeroMQ
史上最快的消息隊列系統(tǒng)
(4)Kafka
Apache 下的一個子項目 。特點(diǎn):高吞吐,在一臺普通的服務(wù)器上既可以達(dá)到 10W/s
的吞吐速率;完全的分布式系統(tǒng)。適合處理海量數(shù)據(jù)。
1.2 JMS 簡介
1.2.1 什么是 JMS
JMS(Java Messaging Service)是 Java 平臺上有關(guān)面向消息中間件的技術(shù)規(guī)范,它便
于消息系統(tǒng)中的 Java 應(yīng)用程序進(jìn)行消息交換,并且通過提供標(biāo)準(zhǔn)的產(chǎn)生、發(fā)送、接收消息的
接口簡化企業(yè)應(yīng)用的開發(fā)。
JMS 本身只定義了一系列的接口規(guī)范,是一種與廠商無關(guān)的 API,用來訪問消息收發(fā)系
統(tǒng)。它類似于 JDBC(java Database Connectivity):這里,JDBC 是可以用來訪問許多不同關(guān)
系數(shù)據(jù)庫的 API,而 JMS 則提供同樣與廠商無關(guān)的訪問方法,以訪問消息收發(fā)服務(wù)。許多
廠商目前都支持 JMS,包括 IBM 的 MQSeries、BEA 的 Weblogic JMS service 和 Progress 的
北京市昌平區(qū)建材城西路金燕龍辦公樓一層 電話:400-618-9090
SonicMQ,這只是幾個例子。 JMS 使您能夠通過消息收發(fā)服務(wù)(有時稱為消息中介程序或
路由器)從一個 JMS 客戶機(jī)向另一個 JML 客戶機(jī)發(fā)送消息。消息是 JMS 中的一種類型對
象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關(guān)該消息的元數(shù)據(jù)組成。消息
主體則攜帶著應(yīng)用程序的數(shù)據(jù)或有效負(fù)載。
JMS 定義了五種不同的消息正文格式,以及調(diào)用的消息類型,允許你發(fā)送并接收以一
些不同形式的數(shù)據(jù),提供現(xiàn)有消息格式的一些級別的兼容性。
· TextMessage–一個字符串對象 · MapMessage–一套名稱-值對 · ObjectMessage–一個序列化的
Java 對象 · BytesMessage–一個字節(jié)的數(shù)據(jù)流 · StreamMessage – Java 原始值的數(shù)據(jù)流
1.2.2 JMS 消息傳遞類型
對于消息的傳遞有兩種類型:
一種是點(diǎn)對點(diǎn)的,即一個生產(chǎn)者和一個消費(fèi)者一一對應(yīng);
另一種是發(fā)布/ 訂閱模式,即一個生產(chǎn)者產(chǎn)生消息并進(jìn)行發(fā)送后,可以由多個消費(fèi)者進(jìn)
行接收。
1.3ActiveMQ
1.3.1 簡介
ActiveMQ 是 Apache 出品,最流行的,能力強(qiáng)勁的開源消息總線。
1.3.2 安裝與啟動
官方網(wǎng)站下載:http://activemq.apache.org/ 資源包中也提供軟件包
品優(yōu)購\資源\配套軟件\activeMQ\apache-activemq-5.14.4-bin.zip
解壓,運(yùn)行 bin\win64 下的 activemq.bat ,打開瀏覽器輸入地址
http://localhost:8161/ 即可進(jìn)入 ActiveMQ 管理頁面
點(diǎn)擊進(jìn)入管理頁面
輸入用戶名和密碼 均為 admin
進(jìn)入主界面
點(diǎn)對點(diǎn)消息列表:
列表各列信息含義如下:
Number Of Pending Messages : 等待消費(fèi)的消息 這個是當(dāng)前未出隊列的數(shù)量。
Number Of Consumers : 消費(fèi)者 這個是消費(fèi)者端的消費(fèi)者數(shù)量
Messages Enqueued : 進(jìn)入隊列的消息 進(jìn)入隊列的總數(shù)量,包括出隊列的。
Messages Dequeued : 出了隊列的消息 可以理解為是消費(fèi)這消費(fèi)掉的數(shù)量。
1.4 點(diǎn)對點(diǎn)模式
1.4.1 消息生產(chǎn)者
(1)創(chuàng)建工程 springjms_producer,在 POM 文件中引入 SpringJms 、activeMQ 以及單元測
試相關(guān)依賴
(2)在 src/main/resources 下創(chuàng)建 spring 配置文件 applicationContext-jms-producer.xml
(3)在 cn.itcast.demo 包下創(chuàng)建消息生產(chǎn)者類
@Component public class QueueProducer { @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination queueTextDestination; public void sendTextMessage(String text){ jmsTemplate.convertAndSend( queueTextDestination,text); } }(4)單元測試
在 src/test/java 創(chuàng)建測試類
1.4.2 消息消費(fèi)者
(1)創(chuàng)建工程 springjms_consumer,在 POM 文件中引入依賴 (同上一個工程)
(2)創(chuàng)建配置文件 applicationContext-jms-consumer-queue.xml
(3)編寫監(jiān)聽類
public class MyMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }(4)創(chuàng)建測試類
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations="classpath:applicationContext-jms-consumer-queue.xml") public class TestQueue { @Test public void testQueue(){ try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } } }1.5 發(fā)布/ 訂閱模式
1.5.1 消息生產(chǎn)者
(1)在工程 springjms_producer 的 applicationContext-jms-producer.xml 增加配置
(3)編寫測試類
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import cn.itcast.demo.TopicProducer; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations="classpath:applicationContext-activemq-producer.xml ") public class TestTopic { @Autowired private TopicProducer topicProducer; @Test public void sendTextQueue(){topicProducer.sendTextMessage(); } }1.5.2 消息消費(fèi)者
(1)在 springjms_consumer 工程中創(chuàng)建配置文件 applicationContext-jms-consumer-topic.xml
(2)編寫測試類
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations="classpath:applicationContext-jms-consumer-topic.xm l") public class TestTopic { @Test public void testTopic(){ try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } } }測試:同時運(yùn)行三個消費(fèi)者工程,在運(yùn)行生產(chǎn)者工程,查看三個消費(fèi)者工程的控制臺輸出。
總結(jié)
- 上一篇: SQL学习笔记——Unknown col
- 下一篇: JPG图片缩小优先的压缩怎么进行