Netty的断线重连
因為工作中經常使用到TCP,所以會頻繁使用到諸如Mina或Netty之類的通信框架,為了方便項目的邏輯調用,經常會在框架的基礎上再一次進行封裝,這樣做其實有畫蛇添足的嫌疑,但也是無奈之舉。
這里主要記載使用Mina和Netty,構建適合項目的一個完整的重連邏輯。
當然,都是作為客戶端,畢竟一般只有客戶端才會做重連。
在這之前,需要考慮幾個問題:
- 連接行為的結果可以較為方便地獲得,成功或失敗,最好直接有接口回調,可以在回調中進行后續邏輯處理
- 當前通信連接的活躍狀態需要準確實時而方便地獲得,這樣有利于重連時對連接的判斷
- 能夠較為靈活的配置Listener或Handler或Filter
- 支持計數,無論是首次連接失敗多次后不再嘗試連接,還是中途斷開后斷線重連多次后不再嘗試連接,一般不作無休止地重連
從代碼層面看,框架中最好有一個類似Connector的類,能夠暴露合適的接口或方法,提供各種狀態與回調,使通信連接的動向能夠實時把握,然而事情并不是那么美好。
連接結果
由于框架設計的一些原則,一個connector根本不足以暴露這些接口。
對于Mina而言,作為客戶端一般用于連接的連接器是NioSocketConnector;
對于Netty而言,則是Bootstrap;
下表是一些常見的定義在兩個框架中的對比,不一定準確,但意義相近;
| 連接器 | SocketConnector | Bootstrap |
| 會話 | IoSession | Channel |
| 連接結果 | ConnectFuture | ChannelFuture |
| 邏輯處理 | IoHandler | ChannelHandler |
| 過濾器 | IoFilter | ChannelHandler |
對于Mina而言,連接操作是這樣的:
ConnectFuture future = mConnector.connect();future.awaitUninterruptibly();if (future.isConnected()) {//得到會話mSession = future.getSession();}對于Netty來說,連接可以寫成與Mina幾乎相同的形式:
ChannelFuture future = bootstrap.connect();future.awaitUninterruptibly();if(future.isSuccess()){mChannel = future.channel();}也可以不阻塞等待,兩種future都可以自行添加Listener監聽異步任務是否完成:
//Minafuture.addListener(new IoFutureListener<IoFuture>() {@Overridepublic void operationComplete(IoFuture ioFuture) {}}); //Nettyfuture.addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {}});畢竟是出自一人之手,部分API真是驚人的相似。
到這里,第一個連接返回結果問題算是有所結論,兩種框架都可以正常返回連接的結果。
會話狀態
而上述代碼中,返回的mSession與mChannel就是得到的會話,這兩種類各自提供了一些接口,可以用于獲得通信連接的實時狀態。
Mina的IoSession這里只取部分方法:
再對比看下Netty提供的Channel,這里也只取部分方法展示:
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {EventLoop eventLoop();Channel parent();ChannelConfig config();boolean isOpen();boolean isRegistered();boolean isActive();SocketAddress localAddress();SocketAddress remoteAddress();boolean isWritable();Channel.Unsafe unsafe();ChannelPipeline pipeline();public interface Unsafe {SocketAddress localAddress();SocketAddress remoteAddress();void register(EventLoop var1, ChannelPromise var2);void bind(SocketAddress var1, ChannelPromise var2);void connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);void disconnect(ChannelPromise var1);void close(ChannelPromise var1);void write(Object var1, ChannelPromise var2);void flush();} }可以看出,無論是IoSession還是Channel,都有相關的API可以知曉通信是否活躍,所以第二個問題在可以獲得IoSession或Channel的情況下,是沒有問題的。
配置Handler
那么再看配置Listener或Handler的相差操作是否靈活。
二者在這方面的差別較為明顯。
對于Mina而言,添加Handler可以直接利用Connector,真正的邏輯Handler只能由setHandler方法添加,且只能為一個,而相關的Filter則要通過getFilterChain()拿到的過濾器集合去添加;對于Mina來說,Handler和Filter是沒有交集的,他們分屬不同的接口IoHandler和IoFilter:
mConnector.setHandler(handler); mConnector.getFilterChain().addLast(CODEC, protocolCodecFilter);Netty有所不同,netty中所有的handler、filter都是ChannelHandller,這些handler都要在連接行為發生后才能生效,也就是掛載到Channel上的,而不是Bootstrap,一般添加是這樣的:
bootstrap.handler(handler); channel.pipeline().addLast(someHandler); channel.pipeline().addLast(someFilter);但handler依舊只能添加一個,如果要添加多個handler或filter,就必須獲取到channel,然后進行添加,netty本身提供了一個ChannelInitializer可以用于添加多個channelHandler,一般會這么寫:
bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(handler);channel.pipeline().addLast(someHandler);channel.pipeline().addLast(someFilter);}});對于Netty來說,Handler和Filter是同一個東西,都是ChannelHandler。
兩者在這方面的區別比較明顯:
一是netty將handler和filter都統一為handler了,
二是netty不能像mina一樣,在未連接之前就可以配置所有的Handler或Filter,netty必須獲得channel也就是連接成功后才能配置多個Filter。
這就造成了一個問題,Mina可以提前就配置監聽器監聽連接的狀態,可以正常監聽中途斷開,也就是在創建Connector后就可以掛載上監聽:
mConnector.getFilterChain().addFirst("reconnect", new IoFilterAdapter() {@Overridepublic void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { //監聽到斷開,可接入回調接口,做進一步的重連邏輯mConnector.connect();}});而Netty不能,創建Connector也就是Bootstrap并不能實現類似的掛載,Bootstrap只能掛載一個Handler,而相關的過濾器或監聽只能在Channel出現后再進行掛載,那么就會寫成這樣:
bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {//添加其他Filter或Handler}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);//監聽到斷開,重連bootstrap.connect();}});這里initChannel方法永遠是最先被調用的,因為在源碼中是這樣的:
//ChannelInitializer.javaprotected abstract void initChannel(C var1) throws Exception;public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {if (this.initChannel(ctx)) {ctx.pipeline().fireChannelRegistered();this.removeState(ctx);} else {ctx.fireChannelRegistered();}}在這種邏輯下,Mina可以在sessionClosed回調中使用SocketConnetor進行重連,Netty可以在channelInactive回調中使用Bootstrap進行重連。
看起來沒什么毛病。
但需要注意一點,就是Handler的復用問題,也就是對Handler或Filter的檢查,Mina和Netty都有對Handler的重復添加進行過檢查,不過檢查邏輯有細微的差別。
Mina中是這樣檢查的:
可以看到,Mina只會檢查Filter在Map中對應的key是否被使用過,當然理論上Filter掛載在SocketConnector的FilterChain中,只要配置過一次,就無需再進行配置。
那么Netty呢?
Netty的Handler不是能隨意復用的,要復用必須標明注解@Sharable,否則就會出現異常:
這是因為在源碼進行檢查時,是對Handler本身進行檢查的,handler會有一個added的屬性,一旦被添加使用過,就會置為true,而判斷邏輯會阻止為added=true的handler添加進來 。這樣一來,如果強行添加已經添加過的handler就會拋出異常:
//DefaultChannelPipeline.javapublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {AbstractChannelHandlerContext newCtx;synchronized(this) {checkMultiplicity(handler);newCtx = this.newContext(group, this.filterName(name, handler), handler);//省略部分代碼}this.callHandlerAdded0(newCtx);return this;}private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter)handler;if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}這也就說明使用channel.pipeline().addLast(handler)這種方法添加handler時,如果想不同的Channel添加同一個Handler實例,每種handler都必須注解了@Sharable,如果正好要使用IdleStateHandler這種源碼內部的Handler,而IdleStateHandler是沒有注解過@Sharable,那么就會出現上面的異常。
而實際應用中,為了實現心跳,IdleStateHandler是一般都會使用到的。
那么問題來了,Mina每次重新連接,創建新的session,但只要SocketConnector沒有變,所有Handler和Filter自然就沒有變,仍然可用,因為所有Handler和Filter是掛載到SocketConnector的FilterChain中,算是只和Connector相關的;
而Netty,如果重新連接的話,會創建新的Channel,然后會重新調用initChannel,然后利用channel.pipeline().addLast添加Handler,算是掛載到Channel上的,而不是Bootstrap上。
這樣顯示出兩者最大的區別就是,Mina中配置一次即可,而Netty則需要每次產生新的Channel時對其進行重新配置。
所以Netty中的handler想復用的話,就必須加注解,否則就會報異常。如果一定要用到無法注解@Sharable的Handler,比如上面的IdleStateHandler,那就要想辦法每次initChannel時,也新建一個新的IdleStateHandler…
或者,繼承IdleStateHandler,然后加上注解也行,雖然也很丑就是了。
So Bad…
這樣的情況下,可以想辦法,每次都新建,類似這種:
FunctionsChannelHandler functionsChannelHandler = new FunctionsChannelHandler(bootstrap){@Overridepublic ChannelHandler[] handlers() {return new ChannelHandler[]{new NormalClientEncoder(),new IdleStateHandler(20, 10, 20),this,new NormalClientHandler()};}};bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {//添加各種handlerchannel.pipeline().addLast(functionsChannelHandler.handlers());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);//監聽到斷開}});因為netty把所有監聽器過濾器邏輯處理都歸為ChannelHandler的原因,把一個handler擴展成一個功能較為豐富的handler是一種不錯的方法 。或者沿用這種思路,使其每次新加Handler時,都是new過的Handler。
應對框架自帶的一些未注解@Sharable的類,也可以繼承之,自行加注解:
這樣一來,配置Handler也勉強可算靈活了。
連接計數
對連接計數一般都是開發者編寫的邏輯,主要是應對無休止地連接。
主要應用在兩種場景:
一是首次連接,如果多次連接不成功,那么停止連接,或者另有邏輯;
二是斷線重連,如果多次重連不成功,那么停止連接并銷毀,或者另有邏輯。
因為Mina和Netty都是多線程模型的緣故,計數為了求穩可以直接使用Atom類,當然覺得大材小用也可以直接使用普通int值,畢竟理論上兩次連接中間應該會有一定延時才對。
應用示例
所以最后,都可以對各自的連接器進行二次封裝,然后編寫對自己有利的邏輯。
對于Mina,大概可以寫成這樣:
使用起來是這樣的:
HigherGateWayHandler higherGateWayHandler = new HigherGateWayHandler();TCPConnector higherGateWayClient = new TCPConnector.Builder().setExecutor(ThreadPool.singleThread("higher_gateway_client")).setHost(NC.GATEWAT_HIGHER_HOST).setPort(NC.LOWER_PORT).setConnectTimeoutMillis(10 * 1000).setReadBuffer(10 * 1024).setHandlerAdapter(higherGateWayHandler).setProtocolCodecFilter(new HigherGateWayCodecFactory()).setKeepAliveFilter(new KeepAliveHigherGateWay(), higherGateWayHandler, 10, 20).setConnectListener(new TCPConnector.IConnectorListener() {@Overridepublic void connectSuccess(IoSession session) {//連接成功后}@Overridepublic void connectFailed() {//重連失敗if (higherGateWayClient.getRecconnectCounter() == 3) {//重連失敗后}//非重連失敗,優先級連接情況下if (higherGateWayClient.getRecconnectCounter() == 0 && higherGateWayClient.getConnectCounter() > 2) {higherGateWayClient.resetConnectCounter();} else {higherGateWayClient.connectInThread();}}@Overridepublic void sessionClosed(IoSession session) {executors.execute(new Runnable() {@Overridepublic void run() {//重連邏輯higherGateWayClient.reconnect(10 * 1000, 3);}});}}).build();而Netty,封裝起來會有一點花里胡哨,目前遇到的問題是當重連以后復用IdleStateHandler這種Handler時,就會使得其中的計時機制失效,也就是說,心跳沒用了,暫時不明原因,大概率是其中的線程被銷毀無法再起的原因。那么當前就只能想辦法每次調用initChannel時,創建新的Handler才行:
public class NettyConnector {/*** 連接器*/private Bootstrap bootstrap;/*** 地址*/private String host;private int port;/*** 會話*/private Channel channel;private static final long TIME_OUT = 10;private long connectTimeoutMills;/*** 重連次數*/private AtomicInteger recconnectCounter;/*** 首次連接次數*/private AtomicInteger connectCounter;/*** 以接口引出通信狀態*/public interface IChannelStateListener {void onConnectSuccess(Channel channel);void onConnectFailed();void onDisconnect();}private IChannelStateListener channelStateListener;private NettyConnector(final Builder builder) {recconnectCounter = new AtomicInteger(0);connectCounter = new AtomicInteger(0);connectTimeoutMills = builder.timeoutMills;bootstrap = builder.bootstrap;bootstrap.handler(new ChannelInitializer() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ChannelDisconnectHandler());channel.pipeline().addLast(builder.handlerSet.handlers());}});}public void setRemoteAddress(String host, int port) {L.d("設置地址與端口-" + host + ":" + port);this.host = host;this.port = port;}public void setChannelStateListener(IChannelStateListener listener) {channelStateListener = listener;}public void connect() {if (channel == null || !channel.isActive()) {bootstrap.remoteAddress(this.host, this.port);L.d("第" + (connectCounter.get() + 1) + "次連接" + host + ":" + port + "中......");final long startMills = System.currentTimeMillis();ChannelFuture channelFuture = bootstrap.connect();channelFuture.addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {L.d("連接(" + bootstrap.config().remoteAddress() + ")成功");channel = f.channel();if (channelStateListener != null) {connectCounter.set(0);channelStateListener.onConnectSuccess(channel);}} else {long delay = System.currentTimeMillis() - startMills;if (delay > 0) {TimeUnit.MILLISECONDS.sleep(connectTimeoutMills - delay);}L.d("連接(" + bootstrap.config().remoteAddress() + ")失敗");if (channelStateListener != null) {connectCounter.incrementAndGet();channelStateListener.onConnectFailed();}}}});}}private void reconnect() {if (bootstrap == null)throw new IllegalArgumentException("bootstrap cannot be null");//如果已經連接,則直接【連接成功】if (channel == null || !channel.isActive()) {//連接channel = bootstrap.connect().awaitUninterruptibly().channel();}}/*** 重連* @param reconnectTimeoutMills 重連超時時間* @param reconnectTimes 重連次數*/public void reconnect(final long reconnectTimeoutMills, final int reconnectTimes) {try {recconnectCounter.set(0);while (channel != null && !channel.isActive() && recconnectCounter.getAndIncrement() < reconnectTimes) {L.d(Thread.currentThread().getName() + "," + "重連" + bootstrap.config().remoteAddress() + "(" + recconnectCounter.get() + ")次...");reconnect();if (channel.isActive()) {break;} else {TimeUnit.MILLISECONDS.sleep(reconnectTimeoutMills);}L.d(channel.isActive() + "");}} catch (InterruptedException e) {e.printStackTrace();} finally {if (channel != null && channel.isActive()) {if (channelStateListener != null) {channelStateListener.onConnectSuccess(channel);}} else {if (channelStateListener != null) {channelStateListener.onConnectFailed();}}}}public Channel getChannel() {return channel;}public boolean isConnected() {return channel != null && channel.isActive();}public String getAddress() {return host + ":" + port;}public int getConnectFailedTimes() {return connectCounter.get();}public int getReconnectFailedTimes() {return recconnectCounter.get();}public static class Builder {private Bootstrap bootstrap = new Bootstrap();private HandlerSet handlerSet;private long timeoutMills = 10 * 1000;public Builder group(EventLoopGroup loopGroup) {bootstrap.group(loopGroup);return this;}@Deprecatedpublic Builder remoteAddress(String inetHost, int inetPort) {bootstrap.remoteAddress(inetHost, inetPort);return this;}public Builder setConnectTimeoutMills(long timeout) {timeoutMills = timeout;return this;}public Builder handler(HandlerSet handlers) {handlerSet = handlers;return this;}public NettyConnector build() {bootstrap.channel(NioSocketChannel.class);return new NettyConnector(this);}}/*** 主要用于監聽斷開*/class ChannelDisconnectHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();if (channelStateListener != null) {channelStateListener.onDisconnect();}}}/*** 主要用于創建新的handler,避免復用帶來的一些問題*/@ChannelHandler.Sharablepublic static abstract class HandlerSet extends ChannelInboundHandlerAdapter {public abstract ChannelHandler[] handlers();}}因為Netty不能像Mina直接在Connector上掛載監聽sessionClosed,只能用一個ChannelDisconnectHandler這樣的東西去監聽是否已經斷開,并通過接口引出結果;
并且因為只能在Channel.Pipeline中才能添加多個Handler的原因,這里用一個HandlerSet強行將所有需要的Handler集合,然后在創建Bootstrap的時候一次性添加進去,想要保證每次都新建,這里就使用抽象方法,讓使用的時候可以自行創建。
注意,由于這里的抽象類HandlerSet每次其實并不是新建的,所有是需要復用的,所以需要加注解@Sharable,但也只需要加它一個就行了,其他都是新建出來的,無需理會。
寫出來就是這樣:
其中的HeartHandler是繼承自IdleStateHandler的。
整個封裝顯得花里胡哨…卻又很丑,不過勉強能用,水平有限。
就這樣吧。
總結
以上是生活随笔為你收集整理的Netty的断线重连的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MM41/MM42/MM43零售物料主数
- 下一篇: mGBA-0.9.2 免费开源的gba模