日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

Java通过Netty,实现Websocket消息推送简单几步搞定

發(fā)布時(shí)間:2023/12/10 java 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java通过Netty,实现Websocket消息推送简单几步搞定 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

前言

曾幾何時(shí),不知道大家有沒(méi)有在項(xiàng)目里遇到過(guò)需要服務(wù)端給客戶(hù)端推送消息的需求,是否曾經(jīng)苦惱過(guò)、糾結(jié)過(guò),我們知道要想實(shí)現(xiàn)這樣的需求肯定離不開(kāi)websocket長(zhǎng)連接方式,那么到底是該選原生的websocket還是更加高級(jí)的netty框架呢?

在此我極力推薦netty,因?yàn)橐豢詈玫目蚣芤话愣际窃谠幕A(chǔ)上進(jìn)行包裝成更好、更方便、更實(shí)用的東西,很多我們需要自己考慮的問(wèn)題都基本可以不用去考慮,不過(guò)此文不會(huì)去講netty有多么的高深莫測(cè),因?yàn)檫@些概念性的東西隨處可見(jiàn),而是通過(guò)實(shí)戰(zhàn)來(lái)達(dá)到推送消息的目的。

實(shí)戰(zhàn)

一、邏輯架構(gòu)圖

從圖中可以看出本次實(shí)戰(zhàn)的基本流程是客戶(hù)端A請(qǐng)求服務(wù)端核心模塊,核心模塊生產(chǎn)一條消息到消息隊(duì)列,然后服務(wù)端消息模塊消費(fèi)消息,消費(fèi)完之后就將消息推送給客戶(hù)端B,流程很簡(jiǎn)單,沒(méi)有太多技巧,唯一的巧妙之處就在消息模塊這邊的處理上,本文的重點(diǎn)也主要講解消息模塊這一塊,主要包括netty server、netty client、channel的存儲(chǔ)等等。

二、代碼

1、添加依賴(lài)

<dependency><groupId>io.nettygroupId><artifactId>netty-allartifactId><version>4.1.6.Finalversion> dependency>

2、NettyServer類(lèi)

@Service public?class?NettyServer?{public?void?run(int?port){new?Thread(){public?void?run(){runServer(port);}}.start();}private?void?runServer(int?port){Print.info("===============Message服務(wù)端啟動(dòng)===============");EventLoopGroup bossGroup =?new?NioEventLoopGroup();EventLoopGroup workerGroup =?new?NioEventLoopGroup();try?{ServerBootstrap b =?new?ServerBootstrap();b.group(bossGroup, workerGroup);b.channel(NioServerSocketChannel.class);b.childHandler(new?ChannelInitializer() {protected?void?initChannel(SocketChannel ch)?throws?Exception?{ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("codec-http",?new?HttpServerCodec());pipeline.addLast("aggregator",?new?HttpObjectAggregator(65536));pipeline.addLast("handler",?new?MyWebSocketServerHandler());}});Channel ch = b.bind(port).sync().channel();Print.info("Message服務(wù)器啟動(dòng)成功:"?+ ch.toString());ch.closeFuture().sync();}?catch?(Exception e){Print.error("Message服務(wù)運(yùn)行異常:"?+ e.getMessage());e.printStackTrace();}?finally?{bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();Print.info("Message服務(wù)已關(guān)閉");}} }

3、MyWebSocketServerHandler類(lèi)

public?class?MyWebSocketServerHandler?extends?SimpleChannelInboundHandler<Object>{private?static?final?String WEBSOCKET_PATH =?"";private?WebSocketServerHandshaker handshaker;@Override????protected?void?channelRead0(ChannelHandlerContext ctx, Object msg)?throws?Exception?{if?(msg?instanceof?FullHttpRequest){//以http請(qǐng)求形式接入,但是走的是websockethandleHttpRequest(ctx, (FullHttpRequest) msg);}else?if?(msg?instanceof??WebSocketFrame){//處理websocket客戶(hù)端的消息handleWebSocketFrame(ctx, (WebSocketFrame) msg);}}@Override????public?void?channelReadComplete(ChannelHandlerContext ctx)?throws?Exception?{ctx.flush();}private?void?handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req)?throws?Exception?{//要求Upgrade為websocket,過(guò)濾掉get/Postif?(!req.decoderResult().isSuccess()|| (!"websocket".equals(req.headers().get("Upgrade")))) {//若不是websocket方式,則創(chuàng)建BAD_REQUEST的req,返回給客戶(hù)端sendHttpResponse(ctx, req,?new?DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}WebSocketServerHandshakerFactory wsFactory =?new?WebSocketServerHandshakerFactory("ws://localhost:9502/websocket",?null,?false);handshaker = wsFactory.newHandshaker(req);if?(handshaker ==?null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); }?else?{handshaker.handshake(ctx.channel(), req); }}private?void?handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame)?{// Check for closing frame if (frame instanceof CloseWebSocketFrame) {handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());?return; }if?(frame?instanceof?PingWebSocketFrame) {ctx.channel().write(new?PongWebSocketFrame(frame.content().retain()));?return; }if?(!(frame?instanceof?TextWebSocketFrame)) {Print.error("數(shù)據(jù)幀類(lèi)型不支持!");?throw?new?UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); }// Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); Print.info("Netty服務(wù)器接收到的信息: " + request); if (request.equals(Const.HEARTBEAT)){ctx.channel().write(new?TextWebSocketFrame(request));?return; }JSONObject jsonData = JSONObject.parseObject(request); String eventType = jsonData.getString("event_type"); String apiToken = jsonData.getString("api_token");?if?(Const.FRONT.equals(eventType)){Print.info("front event"); ChannelSupervise.updateChannel(apiToken, ctx.channel()); }else?if?(Const.BEHIND.equals(eventType)){Print.info("behind event"); Channel chan = ChannelSupervise.findChannel(apiToken);?if?(null?== chan){Print.error("目標(biāo)用戶(hù)不存在"); }else?{JSONObject jsonMsg =?new?JSONObject(); jsonMsg.put("type", jsonData.get("type")); jsonMsg.put("child_type", jsonData.get("child_type")); jsonMsg.put("title", jsonData.get("title")); jsonMsg.put("body", jsonData.get("body")); ChannelSupervise.sendToSimple(apiToken,?new?TextWebSocketFrame(jsonMsg.toString())); Print.info("向目標(biāo)用戶(hù)發(fā)送成功"); }}else{Print.error("event type error"); }}private?static?void?sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res)?{// 返回應(yīng)答給客戶(hù)端 if (res.status().code() != 200) {ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); }ChannelFuture f = ctx.channel().writeAndFlush(res);?// 如果是非Keep-Alive,關(guān)閉連接 if (!isKeepAlive(req) || res.status().code() != 200) {f.addListener(ChannelFutureListener.CLOSE); }}@Override????public?void?exceptionCaught(ChannelHandlerContext ctx, Throwable cause)?throws?Exception?{cause.printStackTrace(); ctx.close(); }private?static?String?getWebSocketLocation(FullHttpRequest req)?{return?"ws://"?+ req.headers().get(HOST) + WEBSOCKET_PATH; }/** * 接收客戶(hù)端連接事件 */????@Override????public?void?channelActive(ChannelHandlerContext ctx)?throws?Exception?{Print.info("客戶(hù)端與服務(wù)端連接開(kāi)啟:"?+ ctx.channel()); ChannelSupervise.addChannel(null, ctx.channel()); }/** * 接收客戶(hù)端關(guān)閉事件 */????@Override????public?void?channelInactive(ChannelHandlerContext ctx)?throws?Exception?{Print.info("客戶(hù)端與服務(wù)端連接關(guān)閉:"?+ ctx.channel()); ChannelSupervise.removeChannel(ctx.channel()); }}

4、ChannelSupervise類(lèi)

public?class?ChannelSupervise?{private???static?ChannelGroup GlobalGroup =?new?DefaultChannelGroup(GlobalEventExecutor.INSTANCE);private??static?ConcurrentMapChannelMap =?new?ConcurrentHashMap();public??static?void?addChannel(String apiToken, Channel channel){GlobalGroup.add(channel);if?(null?!= apiToken) {ChannelMap.put(apiToken, channel.id());}}public?static?void?updateChannel(String apiToken, Channel channel){Channel chan = GlobalGroup.find(channel.id());if?(null?== chan){addChannel(apiToken, channel);}else?{ChannelMap.put(apiToken, channel.id());}}public?static?void?removeChannel(Channel channel){GlobalGroup.remove(channel);Collectionvalues = ChannelMap.values();values.remove(channel.id());}public?static?Channel?findChannel(String apiToken){ChannelId chanId = ChannelMap.get(apiToken);if?(null?== chanId){return?null;}return?GlobalGroup.find(ChannelMap.get(apiToken));}public?static?void?sendToAll(TextWebSocketFrame tws){GlobalGroup.writeAndFlush(tws);}public?static?void?sendToSimple(String apiToken, TextWebSocketFrame tws){GlobalGroup.find(ChannelMap.get(apiToken)).writeAndFlush(tws);} }

5、NettyClient類(lèi)

@Servicepublic?class?NettyClient?{private?Channel channel;public?void?run(String strUri){new?Thread(){public?void?run(){runClient(strUri);}}.start();private?void?runClient(String strUri)?{EventLoopGroup?group?=?new?NioEventLoopGroup();try?{Bootstrap b =?new?Bootstrap();URI uri =?new?URI(strUri);String protocol = uri.getScheme();if?(!"ws".equals(protocol)) {throw?new?IllegalArgumentException("Unsupported protocol: "?+ protocol);}HttpHeaders customHeaders =?new?DefaultHttpHeaders();customHeaders.add("MyHeader",?"MyValue");// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.// If you change it to V00, ping is not supported and remember to change// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.final MyWebSocketClientHandler handler =new?MyWebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13,?null,?false, customHeaders)); b.group(group); b.channel(NioSocketChannel.class); b.handler(new?ChannelInitializer() {@Overpublic?void?initChannel(SocketChannel ch) throws Exception?{ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("http-codec",?new?HttpClientCodec()); pipeline.addLast("aggregator",?new?HttpObjectAggregator(8192)); pipeline.addLast("ws-handler", handler); }}); Print.info("===============Message客戶(hù)端啟動(dòng)==============="); channel = b.connect(uri.getHost(), uri.getPort()).sync().channel(); handler.handshakeFuture().sync(); channel.closeFuture().sync(); }?catch?(Exception e){Print.error(e.getMessage()); }?finally?{group.shutdownGracefully(); }}

6、MyWebSocketClientHandler類(lèi)

public?class?MyWebSocketClientHandler?extends?SimpleChannelInboundHandler<Object>?{private?final?WebSocketClientHandshaker handshaker;private?ChannelPromise handshakeFuture;public?MyWebSocketClientHandler(WebSocketClientHandshaker handshaker)?{this.handshaker = handshaker;}public?ChannelFuture?handshakeFuture()?{return?handshakeFuture;}@Overridepublic?void?handlerAdded(ChannelHandlerContext ctx)?throws?Exception?{handshakeFuture = ctx.newPromise();}@Overridepublic?void?channelActive(ChannelHandlerContext ctx)?throws?Exception?{handshaker.handshake(ctx.channel());}@Overridepublic?void?channelInactive(ChannelHandlerContext ctx)?throws?Exception?{Print.info("webSocket client disconnected!");}@Overridepublic?void?channelRead(ChannelHandlerContext ctx, Object msg)?throws?Exception?{Channel ch = ctx.channel();if?(!handshaker.isHandshakeComplete()) {handshaker.finishHandshake(ch, (FullHttpResponse) msg);Print.info("websocket client connected!");handshakeFuture.setSuccess();return;}if?(msg?instanceof?FullHttpResponse) {FullHttpResponse response = (FullHttpResponse) msg;throw?new?Exception("Unexpected FullHttpResponse (getStatus="?+ response.getStatus() +?", content="?+ response.content().toString(CharsetUtil.UTF_8) +?')');}WebSocketFrame frame = (WebSocketFrame) msg;if?(frame?instanceof?TextWebSocketFrame) {TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;Print.info("客戶(hù)端收到消息: "?+ textFrame.text());}?else?if?(frame?instanceof?PongWebSocketFrame) {Print.info("websocket client received pong");}?else?if?(frame?instanceof?CloseWebSocketFrame) {Print.info("websocket client received closing");ch.close();}}@Overrideprotected?void?channelRead0(ChannelHandlerContext channelHandlerContext, Object o)?throws?Exception?{}@Overridepublic?void?exceptionCaught(ChannelHandlerContext ctx, Throwable cause)?throws?Exception?{cause.printStackTrace();if?(!handshakeFuture.isDone()) {handshakeFuture.setFailure(cause);}ctx.close();}}

7、啟動(dòng)類(lèi)

@SpringBootApplication @Servicepublic class MessageApplication {@Autowiredprivate NettyServer server;@Autowiredprivate NettyClient client;public?static?void?main(String[] args) {SpringApplication.run(MessageApplication.class, args);}@PostConstructpublic?void?initMessage(){server.run(9502);try?{Thread.sleep(1000);}?catch?(InterruptedException e) {e.printStackTrace();}client.run("ws://localhost:"?+?9502);}

8、客戶(hù)端B測(cè)試頁(yè)面

<html><head><meta?charset="UTF-8"><title>WebSocket Chattitle>head><body><script?type="text/javascript">var?socket;if?(!window.WebSocket) {window.WebSocket =?window.MozWebSocket;}if?(window.WebSocket) {socket =?new?WebSocket("ws://localhost:9502");socket.onmessage =?function(event)?{var?ta =?document.getElementById('responseText');ta.value = ta.value +?'\n'?+ event.data};socket.onopen =?function(event)?{var?ta =?document.getElementById('responseText');ta.value =?"連接開(kāi)啟!";};socket.onclose =?function(event)?{var?ta =?document.getElementById('responseText');ta.value = ta.value +?"連接被關(guān)閉";};}?else?{alert("你的瀏覽器不支持 WebSocket!");}function?send(message)?{if?(!window.WebSocket) {return;}if?(socket.readyState == WebSocket.OPEN) {socket.send(message);}?else?{alert("連接沒(méi)有開(kāi)啟.");}}script><form?onsubmit="return false;"><h3>WebSocket:h3><textarea?id="responseText"?style="width: 500px; height: 300px;">textarea><br><input?type="text"?name="message"??style="width: 300px"?value="1"><input?type="button"?value="發(fā)送消息"?onclick="send(this.form.message.value)"><input?type="button"?onclick="javascript:document.getElementById('responseText').value=''"?value="清空聊天記錄">form><br>body> html>

三、測(cè)試

1、先運(yùn)行啟動(dòng)類(lèi),此時(shí)會(huì)先啟動(dòng)netty服務(wù)器,然后啟動(dòng)一個(gè)netty客戶(hù)端,然后過(guò)30s模擬客戶(hù)端A進(jìn)行消息發(fā)送

2、打開(kāi)測(cè)試頁(yè)面,在底下的輸入框輸入:{"event_type":"front", "api_token":"11111"},表示客戶(hù)端B連接上netty服務(wù)器

測(cè)試結(jié)果如下:

消息模塊:

客戶(hù)端B:

四、結(jié)束語(yǔ)

本文只是拋磚引玉,主要啟發(fā)有類(lèi)似需求的朋友知道怎么去存儲(chǔ)channel,進(jìn)而怎么給指定客戶(hù)推送消息,如果想要進(jìn)行大型項(xiàng)目的高并發(fā)、可靠穩(wěn)定地使用,還需進(jìn)一步地改進(jìn)。

作者:都市心聲

來(lái)源:toutiao.com/i6794445371457143307

文章推薦程序員效率:畫(huà)流程圖常用的工具程序員效率:整理常用的在線筆記軟件遠(yuǎn)程辦公:常用的遠(yuǎn)程協(xié)助軟件,你都知道嗎?51單片機(jī)程序下載、ISP及串口基礎(chǔ)知識(shí)硬件:斷路器、接觸器、繼電器基礎(chǔ)知識(shí)

總結(jié)

以上是生活随笔為你收集整理的Java通过Netty,实现Websocket消息推送简单几步搞定的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。