日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Mina2.0框架源码剖析(八)

發布時間:2025/4/16 编程问答 14 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Mina2.0框架源码剖析(八) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這篇來看看AbstractPollingIoConnector抽象類,它用于用于實現客戶端連接的輪詢策略。處理邏輯基本上和上一篇文章說的AbstractPollingIoAcceptor類似,它繼承自AbstractIoConnector,兩個泛型參數分別是所處理的會話和客戶端socket連接。底層的sockets會被不斷檢測,并當有任何一個socket需要被處理時就會被喚醒去處理。這個類封裝了客戶端socketbind,connectdispose等動作,其成員變量Executor用于發起連接請求,另一個AbstractPollingIoProcessor用于處理已經連接客戶端的I/O操作請求,如讀寫和關閉連接。

其最重要的幾個成員變量是:

private?final?Queue<ConnectionRequest>?connectQueue?=?new?ConcurrentLinkedQueue<ConnectionRequest>();//連接隊列
private?final?Queue<ConnectionRequest>?cancelQueue?=?new?ConcurrentLinkedQueue<ConnectionRequest>();//?取消連接隊列

先來看看當服務端調用connect后的處理過程:

????protected?final?ConnectFuture?connect0(
????????????SocketAddress?remoteAddress,?SocketAddress?localAddress,
????????????IoSessionInitializer
<??extends?ConnectFuture>?sessionInitializer)?{
????????H?handle?
=?null;
????????
boolean?success?=?false;
????????
try?{
????????????handle?
=?newHandle(localAddress);
????????????
if?(connect(handle,?remoteAddress))?{//若已經連接服務器成功
????????????????ConnectFuture?future?=?new?DefaultConnectFuture();
????????????????T?session?
=?newSession(processor,?handle);//創建新會話
????????????????finishSessionInitialization(session,?future,?sessionInitializer);//結束會話初始化
????????????????session.getProcessor().add(session);//將剩下的處理交給IoProcessor
????????????????success?=?true;
????????????????
return?future;
????????????}
????????????success?
=?true;
????????}?
catch?(Exception?e)?{
????????????
return?DefaultConnectFuture.newFailedFuture(e);
????????}?
finally?{
????????????
if?(!success?&&?handle?!=?null)?{
????????????????
try?{
????????????????????close(handle);
????????????????}?
catch?(Exception?e)?{
????????????????????ExceptionMonitor.getInstance().exceptionCaught(e);
????????????????}
????????????}
????????}
????????ConnectionRequest?request?
=?new?ConnectionRequest(handle,?sessionInitializer);
????????connectQueue.add(request);
//連接請求加入連接隊列中
????????startupWorker();//開啟工作線程處理連接請求
????????wakeup();//中斷select操作
????????return?request;
????}

?????真正的負責處理客戶端請求的工作都是Worker線程完成的,

private?class?Worker?implements?Runnable?{
????????
public?void?run()?{
????????????
int?nHandles?=?0;
????????????
while?(selectable)?{
????????????????
try?{
??????????????????????
int?timeout?=?(int)Math.min(getConnectTimeoutMillis(),?1000L);//等待超時時間
????????????????????boolean?selected?=?select(timeout);//在超時時限內查看是否有可以被處理的選擇鍵(狀態
????????????????????nHandles?+=?registerNew();//取出連接隊列隊頭的連接請求,將其注冊一個用于連接的新的客戶端socket,?并把它加入連接輪詢池中
????????????????????if?(selected)?{
????????????????????????nHandles
-=?processSessions(selectedHandles());//處理連接請求
????????????????????}
????????????????????processTimedOutSessions(allHandles());
//處理超時連接請求
????????????????????nHandles?-=?cancelKeys();
????????????????????
if?(nHandles?==?0)?{
????????????????????????
synchronized?(lock)?{
????????????????????????????
if?(connectQueue.isEmpty())?{
????????????????????????????????worker?
=?null;
????????????????????????????????
break;
????????????????????????????}
????????????????????????}
????????????????????}
????????????????}?
catch?(Throwable?e)?{
????????????????????ExceptionMonitor.getInstance().exceptionCaught(e);
????????????????????
try?{
????????????????????????Thread.sleep(
1000);
????????????????????}?
catch?(InterruptedException?e1)?{
????????????????????????ExceptionMonitor.getInstance().exceptionCaught(e1);
????????????????????}
????????????????}
????????????}
????????????
if?(selectable?&&?isDisposing())?{
????????????????selectable?
=?false;
????????????????
try?{
????????????????????
if?(createdProcessor)?{
????????????????????????processor.dispose();
????????????????????}
????????????????}?
finally?{
????????????????????
try?{
????????????????????????
synchronized?(disposalLock)?{
????????????????????????????
if?(isDisposing())?{
????????????????????????????????destroy();
????????????????????????????}
????????????????????????}
????????????????????}?
catch?(Exception?e)?{
????????????????????????ExceptionMonitor.getInstance().exceptionCaught(e);
????????????????????}?
finally?{
????????????????????????disposalFuture.setDone();
????????????????????}
????????????????}
????????????}
????????}
????}
private?int?registerNew()?{
????????
int?nHandles?=?0;
????????
for?(;?;)?{
????????????ConnectionRequest?req?
=?connectQueue.poll();//取連接隊列隊頭請求
????????????if?(req?==?null)?{
????????????????
break;
????????????}
????????????H?handle?
=?req.handle;
????????????
try?{
????????????????register(handle,?req);
//注冊一個用于連接的新的客戶端socket,?并把它加入連接輪詢池中
????????????????nHandles?++;
????????????}?
catch?(Exception?e)?{
????????????????req.setException(e);
????????????????
try?{
????????????????????close(handle);
????????????????}?
catch?(Exception?e2)?{
????????????????????ExceptionMonitor.getInstance().exceptionCaught(e2);
????????????????}
????????????}
????????}
????????
return?nHandles;
????}
private?int?processSessions(Iterator<H>?handlers)?{//處理連接請求
????????int?nHandles?=?0;
????????
while?(handlers.hasNext())?{
????????????H?handle?
=?handlers.next();
????????????handlers.remove();
????????????ConnectionRequest?entry?
=?connectionRequest(handle);
????????????
boolean?success?=?false;
????????????
try?{
????????????????
if?(finishConnect(handle))?{//連接請求成功完成,創建一個新會話
????????????????????T?session?=?newSession(processor,?handle);
????????????????????finishSessionInitialization(session,?entry,?entry.getSessionInitializer());
//結束會話初始化
????????????????????session.getProcessor().add(session);//將剩下的工作交給IoProcessor去處理
????????????????????nHandles?++;
????????????????}
????????????????success?
=?true;
????????????}?
catch?(Throwable?e)?{
????????????????entry.setException(e);
????????????}?
finally?{
????????????????
if?(!success)?{//若連接失敗,則將此連接請求放到取消連接隊列中
????????????????????cancelQueue.offer(entry);
????????????????}
????????????}
????????}
????????
return?nHandles;
????}
private?void?processTimedOutSessions(Iterator<H>?handles)?{//處理超時的連接請求
????????long?currentTime?=?System.currentTimeMillis();//當前時間

????????
while?(handles.hasNext())?{
????????????H?handle?
=?handles.next();
????????????ConnectionRequest?entry?
=?connectionRequest(handle);
????????????
if?(currentTime?>=?entry.deadline)?{//當前時間已經超出了連接請求的底限
????????????????entry.setException(
????????????????????????
new?ConnectException("Connection?timed?out."));
????????????????cancelQueue.offer(entry);
//將此連接請求放入取消連接隊列中
????????????}
????????}
????}
private?int?cancelKeys()?{//把取消隊列中的連接請求給cancel掉
????????int?nHandles?=?0;
????????
for?(;?;)?{
????????????ConnectionRequest?req?
=?cancelQueue.poll();
????????????
if?(req?==?null)?{
????????????????
break;
????????????}
????????????H?handle?
=?req.handle;
????????????
try?{
????????????????close(handle);
//關閉對應的客戶端socket
????????????}?catch?(Exception?e)?{
????????????????ExceptionMonitor.getInstance().exceptionCaught(e);
????????????}?
finally?{
????????????????nHandles?
++;
????????????}
????????}
????????
return?nHandles;
????}

?

作者:phinecos(洞庭散人)
出處:http://phinecos.cnblogs.com/
本文版權歸作者和博客園共有,歡迎轉載,但請保留此段聲明,并在文章頁面明顯位置給出原文連接。

總結

以上是生活随笔為你收集整理的Mina2.0框架源码剖析(八)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。