日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 默认站点!

默认站点

當前位置: 首頁 >

activeMQ 本地测试

發(fā)布時間:2023/11/27 31 豆豆
默认站点 收集整理的這篇文章主要介紹了 activeMQ 本地测试 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

參考博主 搭建~?https://www.cnblogs.com/jaycekon/p/6225058.html

ActiveMQ官網(wǎng)下載地址:http://activemq.apache.org/download.html

我下的是windows版本的

下載解壓之后進入D:\config\apache-activemq-5.15.7\bin\win64

雙擊運行activemq.bat,啟動本地MQ服務,

?

?

?started說明啟動成功。

接下來是代碼部分:

生產(chǎn)者:Producer

package com.mqtest;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;/*** @author Maggie.Hao* @date 2018/11/5 14:31*/
public class Producer{private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);//ActiveMq 的默認用戶名private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//ActiveMq 的默認登錄密碼private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//ActiveMQ 的鏈接地址private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;AtomicInteger count = new AtomicInteger(0);//鏈接工廠
    ConnectionFactory connectionFactory;//鏈接對象
    Connection connection;//事務管理
    Session session;ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();public void init(){LOGGER.info("Product init");try{//創(chuàng)建一個鏈接工廠//            connectionFactory = new ActiveMQConnectionFactory("admin","demo","tcp://127.0.0.1:61616");connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);//從工廠中創(chuàng)建一個鏈接connection = connectionFactory.createConnection();//開啟鏈接
            connection.start();//創(chuàng)建一個事務(這里通過參數(shù)可以設置事務的級別)session = connection.createSession(true, Session.SESSION_TRANSACTED);}catch (JMSException e){LOGGER.error("", e);}}public void sendMessage(String disname){try{//創(chuàng)建一個消息隊列Queue queue = session.createQueue(disname);//消息生產(chǎn)者MessageProducer messageProducer = null;if (threadLocal.get() != null){messageProducer = threadLocal.get();}else{messageProducer = session.createProducer(queue);threadLocal.set(messageProducer);}while (true){Thread.sleep(1000);int num = count.getAndIncrement();//創(chuàng)建一條消息
                TextMessage msg;msg = session.createTextMessage(Thread.currentThread().getName() + "==Productor:我現(xiàn)在正在生產(chǎn)東西!,count:" + num);LOGGER.info("msg:{} + {}", msg, num);//發(fā)送消息
                messageProducer.send(msg);//提交事務
                session.commit();}}catch (JMSException e){LOGGER.error("", e);}catch (InterruptedException e){LOGGER.error("", e);}}
}

消費者:Consumer

package com.mqtest;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;/*** @author Maggie.Hao* @date 2018/11/5 14:34*/
public class Consumer{private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;ConnectionFactory connectionFactory;Connection connection;Session session;ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();AtomicInteger count = new AtomicInteger();public void init(){try{connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);connection = connectionFactory.createConnection();connection.start();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);}catch (JMSException e){LOGGER.error("", e);}}public void getMessage(String disname){try{Queue queue = session.createQueue(disname);MessageConsumer consumer = null;if (threadLocal.get() != null){consumer = threadLocal.get();}else{consumer = session.createConsumer(queue);threadLocal.set(consumer);}while (true){Thread.sleep(1000);TextMessage msg = (TextMessage) consumer.receive();if (msg != null){msg.acknowledge();LOGGER.info("{}:Consumer:我是消費者,我正在消費Msg:{}----->{}", Thread.currentThread().getName(), msg.getText(), count.getAndIncrement());}else{break;}}}catch (JMSException e){LOGGER.error("", e);}catch (InterruptedException e){LOGGER.error("", e);}}
}

啟動生產(chǎn)者:

package com.mqtest;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @author Maggie.Hao* @date 2018/11/5 14:34*/
public class TestProducer{private static final Logger LOGGER = LoggerFactory.getLogger(TestProducer.class);public static void main(String[] args){Producer producer = new Producer();producer.init();TestProducer testMq = new TestProducer();try{Thread.sleep(1000);}catch (InterruptedException e){LOGGER.error("", e);}//Thread 1new Thread(testMq.new ProductorMq(producer)).start();//Thread 2new Thread(testMq.new ProductorMq(producer)).start();//Thread 3new Thread(testMq.new ProductorMq(producer)).start();//Thread 4new Thread(testMq.new ProductorMq(producer)).start();//Thread 5new Thread(testMq.new ProductorMq(producer)).start();}private class ProductorMq implements Runnable{Producer producter;public ProductorMq(Producer producter){this.producter = producter;}@Overridepublic void run(){while (true){try{producter.sendMessage("Jaycekon-MQ");Thread.sleep(10000);}catch (InterruptedException e){LOGGER.error("{}", e);}}}}
}

啟動消費者:

package com.mqtest;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @author Maggie.Hao* @date 2018/11/5 15:39*/
public class TestConsumer{private static final Logger LOGGER = LoggerFactory.getLogger(TestConsumer.class);public static void main(String[] args){Consumer comsumer = new Consumer();comsumer.init();TestConsumer testConsumer = new TestConsumer();new Thread(testConsumer.new ConsumerMq(comsumer)).start();new Thread(testConsumer.new ConsumerMq(comsumer)).start();new Thread(testConsumer.new ConsumerMq(comsumer)).start();new Thread(testConsumer.new ConsumerMq(comsumer)).start();}private class ConsumerMq implements Runnable{Consumer consumer;public ConsumerMq(Consumer consumer){this.consumer = consumer;}@Overridepublic void run(){while (true){try{consumer.getMessage("Jaycekon-MQ");Thread.sleep(10000);}catch (InterruptedException e){LOGGER.error("", e);}}}}
}

控制臺輸出結(jié)果:

可以在? ?http://127.0.0.1:8161/admin/queues.jsp 查看結(jié)果

用戶名和密碼默認都為:admin?

點擊Queues可以看到我們的消息隊列信息

?

轉(zhuǎn)載于:https://www.cnblogs.com/mengjie1001/p/9916115.html

總結(jié)

以上是默认站点為你收集整理的activeMQ 本地测试的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得默认站点網(wǎng)站內(nèi)容還不錯,歡迎將默认站点推薦給好友。