日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Netty writeAndFlush() 流程与异步

發(fā)布時間:2023/12/2 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty writeAndFlush() 流程与异步 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Netty writeAndFlush()方法分為兩步, 先 write 再 flush

@Overridepublic ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {DefaultChannelHandlerContext next;next = findContextOutbound(MASK_WRITE);ReferenceCountUtil.touch(msg, next);next.invoker.invokeWrite(next, msg, promise);next = findContextOutbound(MASK_FLUSH);next.invoker.invokeFlush(next);return promise;}

以上是DefaultChannelHandlerContext中的writeAndFlush方法, 可見實際上是先調(diào)用了write, 然后調(diào)用flush

1. write

write方法從TailHandler開始, 穿過中間自定義的各種handler以后到達(dá)HeadHandler, 然后調(diào)用了HeadHandler的成員變量Unsafe的write

如下

@Overridepublic void write(Object msg, ChannelPromise promise) {ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;if (outboundBuffer == null) {// If the outboundBuffer is null we know the channel was closed and so// need to fail the future right away. If it is not null the handling of the rest// will be done in flush0()// See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);// release message now to prevent resource-leak ReferenceCountUtil.release(msg);return;}outboundBuffer.addMessage(msg, promise);}

最終會把需要write的msg和promise(也就是一個future, 我們拿到手的future, 添加Listener的也是這個)放入到outboundBuffer中, msg和promise在outboundBuffer中的存在形式是一個自定義的結(jié)構(gòu)體Entry.

也就是說調(diào)用write方法實際上并不是真的將消息寫出去, 而是將消息和此次操作的promise放入到了一個隊列中

2. flush

flush也是從Tail開始, 最后到Head, 最終調(diào)用的也是Head里的unsafe的flush0()方法, 然后flush0()里再調(diào)用doWrite()方法, 如下:

@Overrideprotected void doWrite(ChannelOutboundBuffer in) throws Exception {int writeSpinCount = -1;for (;;) {Object msg = in.current();if (msg == null) {// Wrote all messages. clearOpWrite();break;}if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;int readableBytes = buf.readableBytes();if (readableBytes == 0) {in.remove();continue;}boolean setOpWrite = false;boolean done = false;long flushedAmount = 0;if (writeSpinCount == -1) {writeSpinCount = config().getWriteSpinCount();}for (int i = writeSpinCount - 1; i >= 0; i --) {int localFlushedAmount = doWriteBytes(buf); // 這里才是實際將數(shù)據(jù)寫出去的地方if (localFlushedAmount == 0) {setOpWrite = true;break;}flushedAmount += localFlushedAmount;if (!buf.isReadable()) {done = true;break;}}in.progress(flushedAmount);if (done) {in.remove();} else {incompleteWrite(setOpWrite);break;}} else if (msg instanceof FileRegion) {FileRegion region = (FileRegion) msg;boolean setOpWrite = false;boolean done = false;long flushedAmount = 0;if (writeSpinCount == -1) {writeSpinCount = config().getWriteSpinCount();}for (int i = writeSpinCount - 1; i >= 0; i --) {long localFlushedAmount = doWriteFileRegion(region);if (localFlushedAmount == 0) {setOpWrite = true;break;}flushedAmount += localFlushedAmount;if (region.transfered() >= region.count()) {done = true;break;}}in.progress(flushedAmount);if (done) {in.remove(); // 根據(jù)寫出的數(shù)據(jù)的數(shù)量情況, 來判斷操作是否完成, 如果完成則調(diào)用 in.remove()} else {incompleteWrite(setOpWrite);break;}} else {throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));}}}

紅字部分就是最后將數(shù)據(jù)寫出去的地方, 這里寫數(shù)據(jù)最終調(diào)用的是?GatheringByteChannel 的 write() 方法, 這是個原生Java接口, 具體實現(xiàn)依賴于實現(xiàn)這個接口的Java類, 例如會調(diào)用 NIO 的 SocketChannel 的write()方法, 至此, 實際寫數(shù)據(jù)的過程出現(xiàn)了, SocketChannel可以運行在non-blocking模式, 也就是非阻塞異步模式, write數(shù)據(jù)會馬上返回寫入的數(shù)據(jù)數(shù)量 (并不一定是所有數(shù)據(jù)都寫入成功, 對于是否寫入了所有數(shù)據(jù), Netty有自己的處理邏輯, 也就是上面代碼中的紅字的那段for循環(huán), 具體參看下SocketChannel的javadoc和netty源碼).

當(dāng)所有數(shù)據(jù)寫入SocketChannel成功, 開始調(diào)用in.remove(), 這個 in 就是第一步 1. write 里的那個 outboundBuffer, 他的類型是?ChannelOutboundBuffer, 代碼如下:

public final boolean remove() {if (isEmpty()) {return false;}Entry e = buffer[flushed];Object msg = e.msg;if (msg == null) {return false;}ChannelPromise promise = e.promise;int size = e.pendingSize;e.clear();flushed = flushed + 1 & buffer.length - 1;if (!e.cancelled) {// only release message, notify and decrement if it was not canceled before. safeRelease(msg);safeSuccess(promise); // 這里, 調(diào)用了promise的trySuccess()方法, 觸發(fā)ListenerdecrementPendingOutboundBytes(size);}return true;}

最后會調(diào)用Promise的notifyListeners()操作,?觸發(fā)Listener完成整個異步流程

---------

最后, 回到我們應(yīng)用netty的時候的代碼

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.writeAndFlush(new Object()).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {// do sth} else {// do sth }}});}

這就是整個流程

?

最后提一下, Netty的AbstractNioChannel里封裝了selectionKey, 在accept socket的時候, socket會被注冊到eventLoop()的Selector, 這個selectionKey就會被賦值, ?如下

selectionKey = javaChannel().register(eventLoop().selector, 0, this);

在以后Selector的select()的時候, ?則會通過這個key來獲取到channel, 然后調(diào)用?AbstractChannel 里的?DefaultChannelPipeline 來觸發(fā) Handler 的 connect, read, write 等等事件...

?

轉(zhuǎn)載于:https://www.cnblogs.com/zemliu/p/3667332.html

總結(jié)

以上是生活随笔為你收集整理的Netty writeAndFlush() 流程与异步的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。