簡易 IM 雙向通信電腦端 GUI 應用——基于 Netty、WebSocket、JavaFX 、多線程技術等
說明
??這是一款使用 Netty 來實現 IM 雙向通信的 demo 項目。
??通信雙方互為發送方、接收方。通信雙方均使用 WebSocket 協議。
??通信雙方的客戶端 GUI 界面均是使用 JavaFX 來實現的。在該文本框中,可以點擊 發送 按鈕來發送消息,也可以使用 Enter,而在文本中另起一行需要使用組合鍵 Ctrl + Enter 來完成。
??通信過程是由其它線程在后臺完成,不會阻塞 UI 線程。
運行效果
核心代碼
package org.wangpai.demo.im.netty;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.net.URI
;
import java.net.URISyntaxException;
public class Client {private String otherServerIp
;private int otherServerPort
;public Client setIp(String otherServerIp
) {this.otherServerIp
= otherServerIp
;return this;}public Client setPort(int otherServerPort
) {this.otherServerPort
= otherServerPort
;return this;}private Channel channel
;private EventLoopGroup workerLoopGroup
= new NioEventLoopGroup();public Client start() {var handshaker
= this.getWebSocketClientHandshaker();var businessHandler
= new WebsocketClientHandler(handshaker
);Bootstrap bootstrap
= new Bootstrap();bootstrap
.group(workerLoopGroup
);bootstrap
.channel(NioSocketChannel.class);bootstrap
.remoteAddress(otherServerIp
, otherServerPort
);bootstrap
.option(ChannelOption.ALLOCATOR
, PooledByteBufAllocator.DEFAULT
);bootstrap
.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) {var pipeline
= ch
.pipeline();pipeline
.addLast(new HttpClientCodec());pipeline
.addLast(new HttpObjectAggregator(65535));pipeline
.addLast(new ChunkedWriteHandler());pipeline
.addLast("businessHandler", businessHandler
);}});ChannelFuture future
= bootstrap
.connect();future
.addListener((ChannelFuture futureListener
) -> {if (futureListener
.isSuccess()) {System.out
.println("客戶端連接成功"); } else {System.out
.println("客戶端連接失敗"); }});try {future
.sync();} catch (Exception exception
) {exception
.printStackTrace(); }this.channel
= future
.channel();handshaker
.handshake(channel
);try {businessHandler
.sync();} catch (InterruptedException exception
) {exception
.printStackTrace(); }return this;}private String generateWebsocketUrl(String ip
, int port
, String relativePath
) {return String.format("ws://%s:%d/%s", ip
, port
, relativePath
);}private WebSocketClientHandshaker getWebSocketClientHandshaker() {URI websocketUri
= null;try {websocketUri
= new URI(this.generateWebsocketUrl(this.otherServerIp
, this.otherServerPort
,Protocol.WEBSOCKET_PREFIX_PATH
));} catch (URISyntaxException exception
) {exception
.printStackTrace(); }var httpHeaders
= new DefaultHttpHeaders();var handshaker
= WebSocketClientHandshakerFactory.newHandshaker(websocketUri
, WebSocketVersion.V13
, null, false, httpHeaders
);return handshaker
;}public void send(String msg
) {channel
.writeAndFlush(new TextWebSocketFrame(msg
));}public void destroy() {this.workerLoopGroup
.shutdownGracefully();}private Client() {super();}public static Client getInstance() {return new Client();}
}
package org.wangpai.demo.im.netty;import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.util.CharsetUtil;
public class WebsocketClientHandler extends ChannelInboundHandlerAdapter {private WebSocketClientHandshaker handshaker
;private ChannelPromise channelPromise
;public WebsocketClientHandler(WebSocketClientHandshaker handshaker
) {this.handshaker
= handshaker
;}@Overridepublic void channelRead(ChannelHandlerContext ctx
, Object obj
) {if (!this.handshaker
.isHandshakeComplete()) { finishHandshake(ctx
, (FullHttpResponse) obj
);} else if (obj
instanceof FullHttpResponse) {FullHttpResponse response
= (FullHttpResponse) obj
;var msg
= String.format("Unexpected FullHttpResponse, status=%s, content=%s",response
.status(), response
.content().toString(CharsetUtil.UTF_8
));System.out
.println(msg
); } else if (obj
instanceof WebSocketFrame) {handleWebSocketResponse(ctx
, (WebSocketFrame) obj
);} else {}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx
, Throwable cause
) throws Exception {ctx
.close();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx
) {this.channelPromise
= ctx
.newPromise();}public ChannelFuture sync() throws InterruptedException {return this.channelPromise
.sync();}private void finishHandshake(ChannelHandlerContext ctx
, FullHttpResponse response
) {try {this.handshaker
.finishHandshake(ctx
.channel(), response
);this.channelPromise
.setSuccess();} catch (WebSocketHandshakeException exception
) {FullHttpResponse rsp
= response
;String errorMsg
= String.format("WebSocket Client failed to connect, status=%s, reason=%s",rsp
.status(), rsp
.content().toString(CharsetUtil.UTF_8
));this.channelPromise
.setFailure(new Exception(errorMsg
)); }}private void handleWebSocketResponse(ChannelHandlerContext ctx
, WebSocketFrame frame
) {if (frame
instanceof TextWebSocketFrame) {TextWebSocketFrame textFrame
= (TextWebSocketFrame) frame
; } else if (frame
instanceof BinaryWebSocketFrame) {BinaryWebSocketFrame binFrame
= (BinaryWebSocketFrame) frame
; } else if (frame
instanceof PongWebSocketFrame) {} else if (frame
instanceof CloseWebSocketFrame) {ctx
.channel().close();}}
}
package org.wangpai.demo.im.netty;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.wangpai.demo.im.view.MainFace;
@Accessors(chain
= true)
public class Server {@Setterprivate int port
;@Setterprivate MainFace mainFace
;private EventLoopGroup bossLoopGroup
= new NioEventLoopGroup(1);private EventLoopGroup workerLoopGroup
= new NioEventLoopGroup();public Server start() {ServerBootstrap bootstrap
= new ServerBootstrap();bootstrap
.group(this.bossLoopGroup
, this.workerLoopGroup
);bootstrap
.channel(NioServerSocketChannel.class);bootstrap
.localAddress(port
);bootstrap
.option(ChannelOption.SO_KEEPALIVE
, true);bootstrap
.option(ChannelOption.ALLOCATOR
, PooledByteBufAllocator.DEFAULT
);bootstrap
.childOption(ChannelOption.ALLOCATOR
, PooledByteBufAllocator.DEFAULT
);bootstrap
.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) {var pipeline
= ch
.pipeline();pipeline
.addLast(new HttpServerCodec());pipeline
.addLast(new HttpObjectAggregator(65535));pipeline
.addLast(new ChunkedWriteHandler());pipeline
.addLast(new WebSocketServerCompressionHandler());pipeline
.addLast(new WebSocketServerProtocolHandler("/" + Protocol.WEBSOCKET_PREFIX_PATH
));pipeline
.addLast(new TextServerHandler(mainFace
));}});try {bootstrap
.bind().sync().channel().closeFuture().sync();} catch (Exception exception
) {exception
.printStackTrace(); } finally {this.workerLoopGroup
.shutdownGracefully();this.bossLoopGroup
.shutdownGracefully();}return this;}public void destroy() {this.workerLoopGroup
.shutdownGracefully();this.bossLoopGroup
.shutdownGracefully();}private Server() {super();}public static Server getInstance() {return new Server();}
}
package org.wangpai.demo.im.netty;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.wangpai.demo.im.view.MainFace;
public class TextServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {private MainFace mainFace
;public TextServerHandler(MainFace mainFace
) {super();this.mainFace
= mainFace
;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx
, TextWebSocketFrame msg
) throws Exception {this.mainFace
.receive(msg
.text());}@Overridepublic void channelRead(ChannelHandlerContext ctx
, Object msg
) throws Exception {if (msg
instanceof TextWebSocketFrame) {super.channelRead(ctx
, msg
);} else { ctx
.fireChannelRead(msg
);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx
, Throwable cause
) throws Exception {ctx
.close();}
}
完整代碼
??已上傳至 GitCode 中,可免費下載:https://gitcode.net/wangpaiblog/20211221-im_demo-netty_websocket_javafx
參考知識
-
WebSocket 的通信機制:https://blog.csdn.net/wangpaiblog/article/details/121804329
-
JavaFX 中使用多線程與保證 UI 線程安全:https://blog.csdn.net/wangpaiblog/article/details/120755930
-
如何在 JavaFX 的 TextArea 實現回車發送信息而不換行,但組合鍵 Ctrl + Enter 換行:https://blog.csdn.net/wangpaiblog/article/details/121506912
總結
以上是生活随笔為你收集整理的简易 IM 双向通信电脑端 GUI 应用——基于 Netty、WebSocket、JavaFX 、多线程技术等的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。