日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Netty的断线重连

發布時間:2023/12/14 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty的断线重连 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

因為工作中經常使用到TCP,所以會頻繁使用到諸如Mina或Netty之類的通信框架,為了方便項目的邏輯調用,經常會在框架的基礎上再一次進行封裝,這樣做其實有畫蛇添足的嫌疑,但也是無奈之舉。

這里主要記載使用Mina和Netty,構建適合項目的一個完整的重連邏輯。
當然,都是作為客戶端,畢竟一般只有客戶端才會做重連。

在這之前,需要考慮幾個問題:

  • 連接行為的結果可以較為方便地獲得,成功或失敗,最好直接有接口回調,可以在回調中進行后續邏輯處理
  • 當前通信連接的活躍狀態需要準確實時而方便地獲得,這樣有利于重連時對連接的判斷
  • 能夠較為靈活的配置Listener或Handler或Filter
  • 支持計數,無論是首次連接失敗多次后不再嘗試連接,還是中途斷開后斷線重連多次后不再嘗試連接,一般不作無休止地重連

從代碼層面看,框架中最好有一個類似Connector的類,能夠暴露合適的接口或方法,提供各種狀態與回調,使通信連接的動向能夠實時把握,然而事情并不是那么美好。

連接結果

由于框架設計的一些原則,一個connector根本不足以暴露這些接口。
對于Mina而言,作為客戶端一般用于連接的連接器是NioSocketConnector
對于Netty而言,則是Bootstrap

下表是一些常見的定義在兩個框架中的對比,不一定準確,但意義相近;

定義MinaNetty
連接器SocketConnectorBootstrap
會話IoSessionChannel
連接結果ConnectFutureChannelFuture
邏輯處理IoHandlerChannelHandler
過濾器IoFilterChannelHandler

對于Mina而言,連接操作是這樣的:

ConnectFuture future = mConnector.connect();future.awaitUninterruptibly();if (future.isConnected()) {//得到會話mSession = future.getSession();}

對于Netty來說,連接可以寫成與Mina幾乎相同的形式:

ChannelFuture future = bootstrap.connect();future.awaitUninterruptibly();if(future.isSuccess()){mChannel = future.channel();}

也可以不阻塞等待,兩種future都可以自行添加Listener監聽異步任務是否完成:

//Minafuture.addListener(new IoFutureListener<IoFuture>() {@Overridepublic void operationComplete(IoFuture ioFuture) {}}); //Nettyfuture.addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {}});

畢竟是出自一人之手,部分API真是驚人的相似。
到這里,第一個連接返回結果問題算是有所結論,兩種框架都可以正常返回連接的結果。

會話狀態

而上述代碼中,返回的mSession與mChannel就是得到的會話,這兩種類各自提供了一些接口,可以用于獲得通信連接的實時狀態。
Mina的IoSession這里只取部分方法:

public interface IoSession {IoHandler getHandler();IoSessionConfig getConfig();IoFilterChain getFilterChain();ReadFuture read();WriteFuture write(Object var1);WriteFuture write(Object var1, SocketAddress var2);CloseFuture closeNow();boolean isConnected();boolean isActive();boolean isClosing();boolean isSecured();CloseFuture getCloseFuture();SocketAddress getRemoteAddress();SocketAddress getLocalAddress();SocketAddress getServiceAddress();boolean isIdle(IdleStatus var1);boolean isReaderIdle();boolean isWriterIdle();boolean isBothIdle(); }

再對比看下Netty提供的Channel,這里也只取部分方法展示:

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {EventLoop eventLoop();Channel parent();ChannelConfig config();boolean isOpen();boolean isRegistered();boolean isActive();SocketAddress localAddress();SocketAddress remoteAddress();boolean isWritable();Channel.Unsafe unsafe();ChannelPipeline pipeline();public interface Unsafe {SocketAddress localAddress();SocketAddress remoteAddress();void register(EventLoop var1, ChannelPromise var2);void bind(SocketAddress var1, ChannelPromise var2);void connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);void disconnect(ChannelPromise var1);void close(ChannelPromise var1);void write(Object var1, ChannelPromise var2);void flush();} }

可以看出,無論是IoSession還是Channel,都有相關的API可以知曉通信是否活躍,所以第二個問題在可以獲得IoSession或Channel的情況下,是沒有問題的。

配置Handler

那么再看配置Listener或Handler的相差操作是否靈活。
二者在這方面的差別較為明顯。

對于Mina而言,添加Handler可以直接利用Connector,真正的邏輯Handler只能由setHandler方法添加,且只能為一個,而相關的Filter則要通過getFilterChain()拿到的過濾器集合去添加;對于Mina來說,Handler和Filter是沒有交集的,他們分屬不同的接口IoHandler和IoFilter:

mConnector.setHandler(handler); mConnector.getFilterChain().addLast(CODEC, protocolCodecFilter);

Netty有所不同,netty中所有的handler、filter都是ChannelHandller,這些handler都要在連接行為發生后才能生效,也就是掛載到Channel上的,而不是Bootstrap,一般添加是這樣的:

bootstrap.handler(handler); channel.pipeline().addLast(someHandler); channel.pipeline().addLast(someFilter);

但handler依舊只能添加一個,如果要添加多個handler或filter,就必須獲取到channel,然后進行添加,netty本身提供了一個ChannelInitializer可以用于添加多個channelHandler,一般會這么寫:

bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(handler);channel.pipeline().addLast(someHandler);channel.pipeline().addLast(someFilter);}});

對于Netty來說,Handler和Filter是同一個東西,都是ChannelHandler。

兩者在這方面的區別比較明顯:
一是netty將handler和filter都統一為handler了,
二是netty不能像mina一樣,在未連接之前就可以配置所有的Handler或Filter,netty必須獲得channel也就是連接成功后才能配置多個Filter。

這就造成了一個問題,Mina可以提前就配置監聽器監聽連接的狀態,可以正常監聽中途斷開,也就是在創建Connector后就可以掛載上監聽:

mConnector.getFilterChain().addFirst("reconnect", new IoFilterAdapter() {@Overridepublic void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { //監聽到斷開,可接入回調接口,做進一步的重連邏輯mConnector.connect();}});

而Netty不能,創建Connector也就是Bootstrap并不能實現類似的掛載,Bootstrap只能掛載一個Handler,而相關的過濾器或監聽只能在Channel出現后再進行掛載,那么就會寫成這樣:

bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {//添加其他Filter或Handler}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);//監聽到斷開,重連bootstrap.connect();}});

這里initChannel方法永遠是最先被調用的,因為在源碼中是這樣的:

//ChannelInitializer.javaprotected abstract void initChannel(C var1) throws Exception;public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {if (this.initChannel(ctx)) {ctx.pipeline().fireChannelRegistered();this.removeState(ctx);} else {ctx.fireChannelRegistered();}}

在這種邏輯下,Mina可以在sessionClosed回調中使用SocketConnetor進行重連,Netty可以在channelInactive回調中使用Bootstrap進行重連。
看起來沒什么毛病。

但需要注意一點,就是Handler的復用問題,也就是對Handler或Filter的檢查,Mina和Netty都有對Handler的重復添加進行過檢查,不過檢查邏輯有細微的差別。
Mina中是這樣檢查的:

//DefaultIoFilterChain.javapublic synchronized void addLast(String name, IoFilter filter) {this.checkAddable(name);this.register(this.tail.prevEntry, name, filter);}private final Map<String, Entry> name2entry = new ConcurrentHashMap();private void checkAddable(String name) {if (this.name2entry.containsKey(name)) {throw new IllegalArgumentException("Other filter is using the same name '" + name + "'");}}

可以看到,Mina只會檢查Filter在Map中對應的key是否被使用過,當然理論上Filter掛載在SocketConnector的FilterChain中,只要配置過一次,就無需再進行配置。

那么Netty呢?
Netty的Handler不是能隨意復用的,要復用必須標明注解@Sharable,否則就會出現異常:

警告: Failed to initialize a channel. Closing: [id: 0x1caafa97] io.netty.channel.ChannelPipelineException: io.netty.handler.timeout.IdleStateHandler is not a @Sharable handler, so can't be added or removed multiple times.

這是因為在源碼進行檢查時,是對Handler本身進行檢查的,handler會有一個added的屬性,一旦被添加使用過,就會置為true,而判斷邏輯會阻止為added=true的handler添加進來 。這樣一來,如果強行添加已經添加過的handler就會拋出異常:

//DefaultChannelPipeline.javapublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {AbstractChannelHandlerContext newCtx;synchronized(this) {checkMultiplicity(handler);newCtx = this.newContext(group, this.filterName(name, handler), handler);//省略部分代碼}this.callHandlerAdded0(newCtx);return this;}private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter)handler;if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}

這也就說明使用channel.pipeline().addLast(handler)這種方法添加handler時,如果想不同的Channel添加同一個Handler實例,每種handler都必須注解了@Sharable,如果正好要使用IdleStateHandler這種源碼內部的Handler,而IdleStateHandler是沒有注解過@Sharable,那么就會出現上面的異常。
而實際應用中,為了實現心跳,IdleStateHandler是一般都會使用到的。

那么問題來了,Mina每次重新連接,創建新的session,但只要SocketConnector沒有變,所有Handler和Filter自然就沒有變,仍然可用,因為所有Handler和Filter是掛載到SocketConnector的FilterChain中,算是只和Connector相關的;
而Netty,如果重新連接的話,會創建新的Channel,然后會重新調用initChannel,然后利用channel.pipeline().addLast添加Handler,算是掛載到Channel上的,而不是Bootstrap上。

這樣顯示出兩者最大的區別就是,Mina中配置一次即可,而Netty則需要每次產生新的Channel時對其進行重新配置。

所以Netty中的handler想復用的話,就必須加注解,否則就會報異常。如果一定要用到無法注解@Sharable的Handler,比如上面的IdleStateHandler,那就要想辦法每次initChannel時,也新建一個新的IdleStateHandler…
或者,繼承IdleStateHandler,然后加上注解也行,雖然也很丑就是了。

So Bad…

這樣的情況下,可以想辦法,每次都新建,類似這種:

FunctionsChannelHandler functionsChannelHandler = new FunctionsChannelHandler(bootstrap){@Overridepublic ChannelHandler[] handlers() {return new ChannelHandler[]{new NormalClientEncoder(),new IdleStateHandler(20, 10, 20),this,new NormalClientHandler()};}};bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {//添加各種handlerchannel.pipeline().addLast(functionsChannelHandler.handlers());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);//監聽到斷開}});

因為netty把所有監聽器過濾器邏輯處理都歸為ChannelHandler的原因,把一個handler擴展成一個功能較為豐富的handler是一種不錯的方法 。或者沿用這種思路,使其每次新加Handler時,都是new過的Handler。
應對框架自帶的一些未注解@Sharable的類,也可以繼承之,自行加注解:

@ChannelHandler.Sharable public class HeartHandler extends IdleStateHandler {public HeartHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds);}public HeartHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {super(readerIdleTime, writerIdleTime, allIdleTime, unit);}public HeartHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {super(observeOutput, readerIdleTime, writerIdleTime, allIdleTime, unit);} }

這樣一來,配置Handler也勉強可算靈活了。

連接計數

對連接計數一般都是開發者編寫的邏輯,主要是應對無休止地連接。
主要應用在兩種場景:
一是首次連接,如果多次連接不成功,那么停止連接,或者另有邏輯;
二是斷線重連,如果多次重連不成功,那么停止連接并銷毀,或者另有邏輯。

因為Mina和Netty都是多線程模型的緣故,計數為了求穩可以直接使用Atom類,當然覺得大材小用也可以直接使用普通int值,畢竟理論上兩次連接中間應該會有一定延時才對。

應用示例

所以最后,都可以對各自的連接器進行二次封裝,然后編寫對自己有利的邏輯。
對于Mina,大概可以寫成這樣:

public class TCPConnector {private static final int BUFFER_SIZE = 10 * 1024;private static final long CONNECT_TIMEOUT_MILLIS = 10 * 1000;private static final int KEEPALIVE_REQUEST_INTERVAL = 10;private static final int KEEPALIVE_REQUEST_TIMEOUT = 40;private static final String RECONNECT = "reconnect";private static final String CODEC = "codec";private static final String HEARTBEAT = "heartbeat";private static final String EXECUTOR = "executor";/*** 連接器*/private SocketConnector mConnector;/*** 會話*/private IoSession mSession;/*** 外用接口*/private IConnectorListener connectorListener;/*** 連接所在線程*/private ExecutorService mExecutor;/*** 重連次數*/private AtomicInteger recconnectCounter;/*** 首次連接次數*/private AtomicInteger connectCounter;private String host;private int port;public interface IConnectorListener {/*** 連接建立成功*/void connectSuccess(IoSession session);/*** 連接建立失敗*/void connectFailed();/*** 連接中途斷掉時*/void sessionClosed(IoSession session);}public TCPConnector() {mConnector = new NioSocketConnector();recconnectCounter = new AtomicInteger(0);connectCounter = new AtomicInteger(0);}/*** 設置目標地址與端口** @param host 目標地址* @param port 目標端口*/public void setHostPort(String host, int port) {L.d("設置地址與端口-" + host + ":" + port);this.host = host;this.port = port;}public String getHost() {return this.host;}/*** 在子線程中啟用連接*/public void connectInThread() {mExecutor.execute(new Runnable() {@Overridepublic void run() {connect();}});}/*** 根據設置的參數連接*/private void connect() {//如果已經連接,則直接【連接成功】if (mSession == null || !mSession.isConnected()) {//連接mConnector.setDefaultRemoteAddress(new InetSocketAddress(host, port));L.i("連接-->" + host + ":" + port);ConnectFuture future = mConnector.connect();//阻塞,等待連接建立響應future.awaitUninterruptibly(CONNECT_TIMEOUT_MILLIS);//響應連接成功或失敗if (future.isConnected()) {//得到會話mSession = future.getSession();if (connectorListener != null) {connectCounter.set(0);connectorListener.connectSuccess(mSession);}} else {if (connectorListener != null) {connectCounter.incrementAndGet();connectorListener.connectFailed();}}} else {if (connectorListener != null) {connectCounter.incrementAndGet();connectorListener.connectSuccess(mSession);}}}/*** 重連*/private void reconnect() {if (mConnector == null)throw new IllegalArgumentException("IoConnector cannot be null");//如果已經連接,則直接【連接成功】if (mSession == null || !mSession.isConnected()) {//連接ConnectFuture future = mConnector.connect();//阻塞,等待連接建立響應future.awaitUninterruptibly();//響應連接成功或失敗if (future.isConnected()) {//得到會話mSession = future.getSession();}}}/*** 重連** @param reconnectTimeoutMills 連接的超時時間* @param reconnectTimes 重連次數*/public void reconnect(final long reconnectTimeoutMills, final int reconnectTimes) {try {recconnectCounter.set(0);while (mConnector != null && !(mSession != null && mSession.isConnected()) && recconnectCounter.incrementAndGet() < reconnectTimes) {reconnect();if (mSession != null && mSession.isConnected()) {break;}else{TimeUnit.MILLISECONDS.sleep(reconnectTimeoutMills);}L.w(Thread.currentThread().getName() + "," + "重連" + host + ":" + port + "(" + recconnectCounter.get() + ")次...");}} catch (InterruptedException e) {e.printStackTrace();} finally {if (mSession != null && mSession.isConnected()) {if (connectorListener != null) {connectorListener.connectSuccess(mSession);}} else {if (connectorListener != null) {connectorListener.connectFailed();}}}}public IoSession getSession() {return mSession;}public IoConnector getConnector() {return mConnector;}public boolean isActive() {return mConnector != null && mConnector.isActive() && mSession != null;}public boolean isConnected() {return mSession != null && mSession.isConnected();}public IConnectorListener getConnectorListener() {return connectorListener;}public int getRecconnectCounter() {return recconnectCounter.get();}public int getConnectCounter() {return connectCounter.get();}public void resetConnectCounter() {connectCounter.set(0);}/*** 斷開連接,釋放資源*/public void disconnect() {if (mConnector != null) {connectorListener = null;mConnector.getFilterChain().clear();mConnector.dispose();mConnector = null;}if (mSession != null) {mSession.closeNow();mSession = null;}if (mExecutor != null) {mExecutor.shutdown();mExecutor = null;}L.w("斷開");}public static class Builder {private TCPConnector newInstance = new TCPConnector();private ProtocolCodecFilter protocolCodecFilter;private KeepAliveFilter keepAliveFilter;public Builder setExecutor(ExecutorService executor) {newInstance.mExecutor = executor;return this;}public Builder setConnectListener(IConnectorListener listener) {newInstance.connectorListener = listener;return this;}public Builder setHost(String host) {newInstance.host = host;return this;}public Builder setPort(int port) {newInstance.port = port;return this;}public Builder setProtocolCodecFilter(ProtocolCodecFactory protocolCodecFactory) {protocolCodecFilter = new ProtocolCodecFilter(protocolCodecFactory);return this;}public Builder setConnectTimeoutMillis(long connectTimeoutMillis) {newInstance.mConnector.setConnectTimeoutMillis(connectTimeoutMillis);return this;}public Builder setKeepAliveFilter(KeepAliveMessageFactory keepAliveMessageFactory, int keepAliveRequestInterval) {keepAliveFilter = new KeepAliveFilter(keepAliveMessageFactory, IdleStatus.BOTH_IDLE, KeepAliveRequestTimeoutHandler.LOG, keepAliveRequestInterval, KEEPALIVE_REQUEST_TIMEOUT);return this;}public Builder setKeepAliveFilter(KeepAliveMessageFactory keepAliveMessageFactory, KeepAliveRequestTimeoutHandler keepAliveRequestTimeoutHandler, int keepAliveRequestInterval, int keepAliveRequestTimeOut) {keepAliveFilter = new KeepAliveFilter(keepAliveMessageFactory, IdleStatus.BOTH_IDLE, keepAliveRequestTimeoutHandler, keepAliveRequestInterval, keepAliveRequestTimeOut);return this;}public Builder setHandlerAdapter(IoHandlerAdapter handler) {newInstance.mConnector.setHandler(handler);return this;}public Builder setReadBuffer(int size) {newInstance.mConnector.getSessionConfig().setReadBufferSize(size);return this;}public Builder setReceiveBuffer(int size) {newInstance.mConnector.getSessionConfig().setReceiveBufferSize(size);return this;}public Builder setSendBuffer(int size) {newInstance.mConnector.getSessionConfig().setSendBufferSize(size);return this;}public TCPConnector build() {//添加重連監聽if (newInstance.connectorListener != null) {newInstance.mConnector.getFilterChain().addFirst(RECONNECT, new IoFilterAdapter() {@Overridepublic void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {if (newInstance != null && newInstance.connectorListener != null)newInstance.connectorListener.sessionClosed(session);}});}//設置編碼解碼if (protocolCodecFilter != null)newInstance.mConnector.getFilterChain().addLast(CODEC, protocolCodecFilter);//設置心跳if (keepAliveFilter != null)newInstance.mConnector.getFilterChain().addLast(HEARTBEAT, keepAliveFilter);//connector不允許使用OrderedThreadPoolExecutornewInstance.mConnector.getFilterChain().addLast(EXECUTOR, new ExecutorFilter(Executors.newSingleThreadExecutor()));return newInstance;}}@Overridepublic String toString() {return "MinaHelper{" +"mSession=" + mSession +", mConnector=" + mConnector +", connectorListener=" + connectorListener +", mExecutor=" + mExecutor +'}';}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;TCPConnector connector = (TCPConnector) o;return port == connector.port && (host != null ? host.equals(connector.host) : connector.host == null);}@Overridepublic int hashCode() {int result = host != null ? host.hashCode() : 0;result = 31 * result + port;return result;}

使用起來是這樣的:

HigherGateWayHandler higherGateWayHandler = new HigherGateWayHandler();TCPConnector higherGateWayClient = new TCPConnector.Builder().setExecutor(ThreadPool.singleThread("higher_gateway_client")).setHost(NC.GATEWAT_HIGHER_HOST).setPort(NC.LOWER_PORT).setConnectTimeoutMillis(10 * 1000).setReadBuffer(10 * 1024).setHandlerAdapter(higherGateWayHandler).setProtocolCodecFilter(new HigherGateWayCodecFactory()).setKeepAliveFilter(new KeepAliveHigherGateWay(), higherGateWayHandler, 10, 20).setConnectListener(new TCPConnector.IConnectorListener() {@Overridepublic void connectSuccess(IoSession session) {//連接成功后}@Overridepublic void connectFailed() {//重連失敗if (higherGateWayClient.getRecconnectCounter() == 3) {//重連失敗后}//非重連失敗,優先級連接情況下if (higherGateWayClient.getRecconnectCounter() == 0 && higherGateWayClient.getConnectCounter() > 2) {higherGateWayClient.resetConnectCounter();} else {higherGateWayClient.connectInThread();}}@Overridepublic void sessionClosed(IoSession session) {executors.execute(new Runnable() {@Overridepublic void run() {//重連邏輯higherGateWayClient.reconnect(10 * 1000, 3);}});}}).build();

而Netty,封裝起來會有一點花里胡哨,目前遇到的問題是當重連以后復用IdleStateHandler這種Handler時,就會使得其中的計時機制失效,也就是說,心跳沒用了,暫時不明原因,大概率是其中的線程被銷毀無法再起的原因。那么當前就只能想辦法每次調用initChannel時,創建新的Handler才行:

public class NettyConnector {/*** 連接器*/private Bootstrap bootstrap;/*** 地址*/private String host;private int port;/*** 會話*/private Channel channel;private static final long TIME_OUT = 10;private long connectTimeoutMills;/*** 重連次數*/private AtomicInteger recconnectCounter;/*** 首次連接次數*/private AtomicInteger connectCounter;/*** 以接口引出通信狀態*/public interface IChannelStateListener {void onConnectSuccess(Channel channel);void onConnectFailed();void onDisconnect();}private IChannelStateListener channelStateListener;private NettyConnector(final Builder builder) {recconnectCounter = new AtomicInteger(0);connectCounter = new AtomicInteger(0);connectTimeoutMills = builder.timeoutMills;bootstrap = builder.bootstrap;bootstrap.handler(new ChannelInitializer() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ChannelDisconnectHandler());channel.pipeline().addLast(builder.handlerSet.handlers());}});}public void setRemoteAddress(String host, int port) {L.d("設置地址與端口-" + host + ":" + port);this.host = host;this.port = port;}public void setChannelStateListener(IChannelStateListener listener) {channelStateListener = listener;}public void connect() {if (channel == null || !channel.isActive()) {bootstrap.remoteAddress(this.host, this.port);L.d("第" + (connectCounter.get() + 1) + "次連接" + host + ":" + port + "中......");final long startMills = System.currentTimeMillis();ChannelFuture channelFuture = bootstrap.connect();channelFuture.addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {L.d("連接(" + bootstrap.config().remoteAddress() + ")成功");channel = f.channel();if (channelStateListener != null) {connectCounter.set(0);channelStateListener.onConnectSuccess(channel);}} else {long delay = System.currentTimeMillis() - startMills;if (delay > 0) {TimeUnit.MILLISECONDS.sleep(connectTimeoutMills - delay);}L.d("連接(" + bootstrap.config().remoteAddress() + ")失敗");if (channelStateListener != null) {connectCounter.incrementAndGet();channelStateListener.onConnectFailed();}}}});}}private void reconnect() {if (bootstrap == null)throw new IllegalArgumentException("bootstrap cannot be null");//如果已經連接,則直接【連接成功】if (channel == null || !channel.isActive()) {//連接channel = bootstrap.connect().awaitUninterruptibly().channel();}}/*** 重連* @param reconnectTimeoutMills 重連超時時間* @param reconnectTimes 重連次數*/public void reconnect(final long reconnectTimeoutMills, final int reconnectTimes) {try {recconnectCounter.set(0);while (channel != null && !channel.isActive() && recconnectCounter.getAndIncrement() < reconnectTimes) {L.d(Thread.currentThread().getName() + "," + "重連" + bootstrap.config().remoteAddress() + "(" + recconnectCounter.get() + ")次...");reconnect();if (channel.isActive()) {break;} else {TimeUnit.MILLISECONDS.sleep(reconnectTimeoutMills);}L.d(channel.isActive() + "");}} catch (InterruptedException e) {e.printStackTrace();} finally {if (channel != null && channel.isActive()) {if (channelStateListener != null) {channelStateListener.onConnectSuccess(channel);}} else {if (channelStateListener != null) {channelStateListener.onConnectFailed();}}}}public Channel getChannel() {return channel;}public boolean isConnected() {return channel != null && channel.isActive();}public String getAddress() {return host + ":" + port;}public int getConnectFailedTimes() {return connectCounter.get();}public int getReconnectFailedTimes() {return recconnectCounter.get();}public static class Builder {private Bootstrap bootstrap = new Bootstrap();private HandlerSet handlerSet;private long timeoutMills = 10 * 1000;public Builder group(EventLoopGroup loopGroup) {bootstrap.group(loopGroup);return this;}@Deprecatedpublic Builder remoteAddress(String inetHost, int inetPort) {bootstrap.remoteAddress(inetHost, inetPort);return this;}public Builder setConnectTimeoutMills(long timeout) {timeoutMills = timeout;return this;}public Builder handler(HandlerSet handlers) {handlerSet = handlers;return this;}public NettyConnector build() {bootstrap.channel(NioSocketChannel.class);return new NettyConnector(this);}}/*** 主要用于監聽斷開*/class ChannelDisconnectHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();if (channelStateListener != null) {channelStateListener.onDisconnect();}}}/*** 主要用于創建新的handler,避免復用帶來的一些問題*/@ChannelHandler.Sharablepublic static abstract class HandlerSet extends ChannelInboundHandlerAdapter {public abstract ChannelHandler[] handlers();}}

因為Netty不能像Mina直接在Connector上掛載監聽sessionClosed,只能用一個ChannelDisconnectHandler這樣的東西去監聽是否已經斷開,并通過接口引出結果;
并且因為只能在Channel.Pipeline中才能添加多個Handler的原因,這里用一個HandlerSet強行將所有需要的Handler集合,然后在創建Bootstrap的時候一次性添加進去,想要保證每次都新建,這里就使用抽象方法,讓使用的時候可以自行創建。
注意,由于這里的抽象類HandlerSet每次其實并不是新建的,所有是需要復用的,所以需要加注解@Sharable,但也只需要加它一個就行了,其他都是新建出來的,無需理會。
寫出來就是這樣:

NettyConnector connector = new NettyConnector.Builder().group(new NioEventLoopGroup()).handler(new NettyConnector.HandlerSet() {@Overridepublic ChannelHandler[] handlers() {return new ChannelHandler[]{new HeartHandler(10, 10, 10),new NormalClientEncoder(),new NormalClientHeartBeatHandler(),new NormalClientHandler()};}}).setConnectTimeoutMills(5 * 1000).build();connector.setRemoteAddress("192.168.0.102", 8000);connector.setChannelStateListener(new NettyConnector.IChannelStateListener() {@Overridepublic void onConnectSuccess(Channel channel) {L.d("連接" + channel.remoteAddress().toString() + "成功");}@Overridepublic void onConnectFailed() {L.d("連接" + connector.getAddress() + "失敗");if (connector.getReconnectFailedTimes() == 0 && connector.getConnectFailedTimes() < 3) {connector.connect();}}@Overridepublic void onDisconnect() {L.d(connector.getChannel().remoteAddress().toString() + "已斷開");connector.reconnect(5000, 5);}});

其中的HeartHandler是繼承自IdleStateHandler的。

整個封裝顯得花里胡哨…卻又很丑,不過勉強能用,水平有限。

就這樣吧。

總結

以上是生活随笔為你收集整理的Netty的断线重连的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。