日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

关于WebSocket分布式实现的一种方案

發(fā)布時(shí)間:2024/4/13 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 关于WebSocket分布式实现的一种方案 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

WebSocket常用于做后臺消息推送,也可以做簡易的IM聊天,由于WebSocket中的Session沒有實(shí)現(xiàn)序列化接口的,我們無法將session序列化實(shí)現(xiàn)分布式部署,今天就來記錄一種分布式的實(shí)現(xiàn)方案。

實(shí)現(xiàn)原理

首先我們講的這種方式是利用redis訂閱和發(fā)布模式來實(shí)現(xiàn),大致過程:

  • 每個(gè)服務(wù)器記錄連接,保存在內(nèi)存當(dāng)中
  • 當(dāng)需要推送websocket消息的時(shí)候,同時(shí)在redis發(fā)布一個(gè)消息
  • 每個(gè)服務(wù)器訂閱redis的消息,當(dāng)監(jiān)聽到有消息時(shí),每臺服務(wù)器遍歷自己內(nèi)存當(dāng)中的連接進(jìn)行發(fā)送

這樣我們就可以實(shí)現(xiàn)websocket的分布式部署,當(dāng)然redis訂閱和發(fā)布也可以用其他消息隊(duì)列工具類實(shí)現(xiàn)。

實(shí)現(xiàn)過程

這里并不打算貼所有代碼,只記錄關(guān)鍵的一些代碼,其余的可以網(wǎng)上查看相關(guān)資料,基于SpringBoot2.1.8實(shí)現(xiàn)。

pom文件引入redis和websocket

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!--websocket--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId> </dependency>

寫一個(gè)redis發(fā)布器

@Component public class PublishService {@AutowiredStringRedisTemplate redisTemplate;/*** 發(fā)布方法** @param channel 消息發(fā)布訂閱 主題* @param message 消息信息*/public void publish(String channel, Object message) {redisTemplate.convertAndSend(channel, message);} }

寫一個(gè)redis監(jiān)聽器

public class SubscribeListener implements MessageListener {/*** 訂閱接收發(fā)布者的消息*/@Overridepublic void onMessage(Message message, byte[] pattern) {String msg = new String(message.getBody());System.out.println(new String(pattern) + "接收消息:" + msg);//遍歷本地內(nèi)存當(dāng)中的websocket連接...//拿到對應(yīng)的websocket session就可以進(jìn)行推送消息}}

配置下websocket

@Configuration public class WebSocketConfig extends ServerEndpointConfig.Configurator{@Overridepublic void modifyHandshake(ServerEndpointConfig sec,HandshakeRequest request, HandshakeResponse response) {// 主要為了能在websocket打開連接時(shí)獲取httpsession和當(dāng)前登陸用戶,此處跟本文內(nèi)容沒有關(guān)系HttpSession httpSession=(HttpSession) request.getHttpSession();//存入httpsessionsec.getUserProperties().put(HttpSession.class.getName(),httpSession);//存入當(dāng)前用戶sec.getUserProperties().put("user", ShiroUtils.getCurrentUser());super.modifyHandshake(sec, request, response);}/*** 自動(dòng)注冊使用了@ServerEndpoint注解聲明的Websocket endpoint* @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();} }

配置redis消息發(fā)布訂閱

/*** redis 消息監(jiān)聽 用于websocket 分布式處理* @param redisConnectionFactory* @return*/ @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory){RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);//設(shè)置訂閱topicredisMessageListenerContainer.addMessageListener(new SubscribeListener(), new ChannelTopic("socket_topic"));return redisMessageListenerContainer; }

再寫一個(gè)簡易的存儲工具類,這個(gè)就基于ConcurrentHashMap就能實(shí)現(xiàn),不記錄了。

最后來看websocket ServerEndpoint 方法

@OnOpen public void onOpen(Session session, EndpointConfig config) {log.debug("websocket:打開連接");//將連接存入我們的緩存工具,這個(gè)工具就是簡單的存儲,代碼自己寫一個(gè)CacheSessionMap.put("可以是sessionId,也可以實(shí)當(dāng)前用戶ID",session);}@OnClose public void onClose(Session session) {log.debug("websocket:關(guān)閉連接");//關(guān)閉的連接,我們將其移除CacheSessionMap.remove("可以是sessionId,也可以實(shí)當(dāng)前用戶ID");}@OnMessage public void onMessage(Session session, String message) throws IOException {log.debug("websocket:消息來了");//用我們之前寫的redis消息發(fā)布器,將這個(gè)消息發(fā)布到redispublishService.publish("socket_topic", message); }

總結(jié)

以上是生活随笔為你收集整理的关于WebSocket分布式实现的一种方案的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。