使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)
生活随笔
收集整理的這篇文章主要介紹了
使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
前言:
本文基于jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar
其中jedis連接池需要依賴commons-pool2包,json包用于對象實例和json字符串的相互轉換
1、jedis的消息隊列方法簡述
1.1、發布消息方法
(其中,channel是對應消息通道,message是對應消息體)
jedis.publish(channel, message);
1.2、監聽消息方法
(其中,jedisPubSub用于處理監聽到的消息,channels是對應的通道)
jedis.subscribe(jedisPubSub, channels);
2、發布消息
/*** 從jedis連接池獲取jedis操作實例* @return*/public static Jedis getJedis() {return RedisPoolManager.getJedis();}/*** 推入消息到redis消息通道* * @param String* channel* @param String* message*/public static void publish(String channel, String message) {Jedis jedis = null;try {jedis = getJedis();jedis.publish(channel, message);} finally {jedis.close();}}/*** 推入消息到redis消息通道* * @param byte[]* channel* @param byte[]* message*/public void publish(byte[] channel, byte[] message) {Jedis jedis = null;try {jedis = getJedis();jedis.publish(channel, message);} finally {jedis.close();}}
3、監聽消息
3.1、監聽消息主體方法
/*** 監聽消息通道* @param jedisPubSub - 監聽任務* @param channels - 要監聽的消息通道*/public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {Jedis jedis = null;try {jedis = getJedis();jedis.subscribe(jedisPubSub, channels);} finally {jedis.close();}}/*** 監聽消息通道* @param jedisPubSub - 監聽任務* @param channels - 要監聽的消息通道*/public static void subscribe(JedisPubSub jedisPubSub, String... channels) {Jedis jedis = null;try {jedis = getJedis();jedis.subscribe(jedisPubSub, channels);} finally {jedis.close();}}
3.2、處理監聽到的消息任務
class Tasker implements Runnable {private String[] channel = null;//監聽的消息通道private JedisPubSub jedisPubSub = null;//消息處理任務public Tasker(JedisPubSub jedisPubSub, String ...channel) {this.jedisPubSub = jedisPubSub;this.channel = channel;}@Overridepublic void run() {// 監聽channel通道的消息RedisMQ.subscribe(jedisPubSub, channel);}}
3.3、處理監聽到的消息主體類實現
package cn.eguid.livePushServer.redisManager;import java.util.Map;import org.json.JSONObject;import cc.eguid.livepush.PushManager; import redis.clients.jedis.JedisPubSub;public class RedisMQHandler extends JedisPubSub{PushManager pushManager = null;public RedisMQHandler(PushManager pushManager) {super();this.pushManager = pushManager;}@Override// 接收到消息后進行分發執行public void onMessage(String channel, String message) {JSONObject jsonObj = new JSONObject(message);System.out.println(channel+","+message);if ("push".equals(channel)) {Map<String,Object> map=jsonObj.toMap();System.out.println("接收到一條推流消息,準備推流:"+map); // String appName=pushManager.push(map);//推流完成后還需要發布一個成功消息到返回隊列} else if ("close".equals(channel)) {String appName=jsonObj.getString("appName");System.out.println("接收到一條關閉消息,準備關閉應用:"+appName); // pushManager.closePush(appName);}} }4、測試消息隊列發布和監聽
public static void main(String[] args) throws InterruptedException {PushManager pushManager= new PushManagerImpl();Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));t1.start();t2.start();LivePushEntity livePushInfo=new LivePushEntity();livePushInfo.setAppName("test1");JSONObject json=new JSONObject(livePushInfo);publish("push",json.toString());publish("close", json.toString());Thread.sleep(2000);publish("push", json.toString());publish("close",json.toString());Thread.sleep(2000);publish("push", json.toString());publish("close",json.toString());}
轉載于:https://www.cnblogs.com/eguid/p/6821593.html
總結
以上是生活随笔為你收集整理的使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [迷宫中的算法实践]迷宫生成算法——Pr
- 下一篇: MyBatis关联查询,表字段相同,re