日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

ServerBootstrap的启动流程

發(fā)布時(shí)間:2025/3/19 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ServerBootstrap的启动流程 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、ServerBootstrap的啟動(dòng)示例代碼?

EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("bossThread",false));EventLoopGroup workEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("workThread",false));ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossEventLoopGroup,workEventLoopGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,128).handler(new LoggingHandler(LogLevel.INFO)).childHandler( new PojoServerIntitlizer());try {log.debug(" will start server");ChannelFuture closeFuture = bootstrap.bind(port).sync();log.debug(" server is closing");closeFuture.channel().closeFuture().sync();log.debug(" server is closed");} catch (InterruptedException e) {e.printStackTrace();}finally {bossEventLoopGroup.shutdownGracefully();workEventLoopGroup.shutdownGracefully();log.debug(" release event loop group");}

二、初始化流程

1.ServerBootstrap和bootStrap繼承于AbstractBootStrap,都使用模板方式。AbstractBootStrap類(lèi)有幾個(gè)比較重要的成員變量。

group:ServerSocketChannel的EventLoopGroup,即reactor-accept的BOSS事件循環(huán)線程池組。

channelFactory:創(chuàng)建channel的工廠類(lèi),如創(chuàng)建ServerSocketChannel,SocketChannel,

handler:reactor-accept的事件處理類(lèi)。

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {volatile EventLoopGroup group;@SuppressWarnings("deprecation")private volatile ChannelFactory<? extends C> channelFactory;private volatile SocketAddress localAddress;private volatile ChannelHandler handler;

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);private volatile EventLoopGroup childGroup;private volatile ChannelHandler childHandler;

? 2.設(shè)置參數(shù)流程。

bootstrap.group(bossEventLoopGroup,workEventLoopGroup) bossEventLoopGroup設(shè)置AbstractBootStrap的group,workEventLoopGroup設(shè)置為ServerBootstrap的childGroup,

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");return this;} channel(NioServerSocketChannel.class) 生成一個(gè)根據(jù)指定SocketChannel類(lèi)的反射CHANNEL工廠類(lèi),就是根據(jù)傳入的類(lèi)反射生成對(duì)象。這是設(shè)置AbstractBootStrap的類(lèi)的channelFactory public B channel(Class<? extends C> channelClass) {return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")));} .handler(new LoggingHandler(LogLevel.INFO)) 這是設(shè)置AbstractBootStrap的類(lèi)的handler public B handler(ChannelHandler handler) {this.handler = ObjectUtil.checkNotNull(handler, "handler");return self();} .childHandler( new PojoServerIntitlizer());這是設(shè)置ServerBootstrap子類(lèi)的childHandler public ServerBootstrap childHandler(ChannelHandler childHandler) {this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");return this;}

三、綁定流程

1.ChannelFuture closeFuture = bootstrap.bind(port).sync();這為起點(diǎn),會(huì)調(diào)用AbstractBootStrap的doBind方法

private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;}

2.這里面首先調(diào)用initAndRegister,會(huì)先創(chuàng)建ServerSocketChannel,并且將channel注冊(cè)到NioEventLoop的selector中,也就是前文的channel.register(selector,0,eventLoop)

final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {}ChannelFuture regFuture = config().group().register(channel);return regFuture;}

3.newChannel就是通過(guò)前面生成的ReflectChannelFactory調(diào)用反射生成一個(gè)NioServerSocketChannel對(duì)象,這里看一下類(lèi)的繼承圖,

?AbstractChannel 有兩個(gè)成員變量unsafe和pipline,unsafe為操作底層網(wǎng)絡(luò)IO的接口。pipline也就是我們的管道事件流。也就是說(shuō)每個(gè)channel會(huì)自帶一個(gè)pipline

protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}

?3.創(chuàng)建完channel后,我們來(lái)看下初始化流程,這個(gè)會(huì)調(diào)用到子類(lèi)(ServerBootStrap)的init方法,

這個(gè)會(huì)做兩個(gè)事件,

? ?(1).設(shè)置channel的連接選項(xiàng)和屬性。

? ?(2).添加新的handler

這里會(huì)為ServerSocketChannel的管道中新增一個(gè)handler,用來(lái)處理accept-reactor的IO流讀取事件,也就是新連接事件。ServerBootstrapAcceptor這個(gè)類(lèi)就是來(lái)處理新連接,并注冊(cè)accept的socketChannel并注冊(cè)到childGroup的work事件循環(huán)以及內(nèi)部eventLoop的selector中,并且設(shè)置子socketChannel的handler為childHandler.這個(gè)我們留在接收新連接的流程中講解。

void init(Channel channel) {setChannelOptions(channel, this.newOptionsArray(), logger);setAttributes(channel, (Entry[])this.attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = this.childGroup;final ChannelHandler currentChildHandler = this.childHandler;p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}}});}

4.現(xiàn)在回到第2步,初始化完成了,現(xiàn)在開(kāi)始將serverSocketChannel注冊(cè)到bossEventLoop的selector中。ChannelFuture regFuture = config().group().register(channel);這個(gè)會(huì)調(diào)用MultithreadEventLoopGroup.register,也就是從事件循環(huán)組的子eventLoop中取下一個(gè)NioEventLoop進(jìn)行注冊(cè)分配。也就是work-reactor的channel注冊(cè)機(jī)制。

@Overridepublic EventLoop next() {return (EventLoop) super.next();}@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}

?5.我們的NioEventLoop為SingleThreadEventLoop,這個(gè)會(huì)將channel,當(dāng)前的eventLoop對(duì)象封裝成一個(gè)promise,進(jìn)行注冊(cè)。

public ChannelFuture register(Channel channel) {return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this))); }

6.接著找到 ServerSocketChannel的內(nèi)部的unsafe對(duì)象進(jìn)行注冊(cè)。這個(gè)unsafe就是NioMessageUnsafe

public ChannelFuture register(ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}public abstract class AbstractNioMessageChannel extends AbstractNioChannel {@Overrideprotected AbstractNioUnsafe newUnsafe() {return new NioMessageUnsafe();} }

7.由于NioMessageUnsafe繼承于AbstractUnsafe,所以調(diào)用此類(lèi)的register,這個(gè)方法就是首先調(diào)用Channel.register,然后觸發(fā)管道的handlerAdd,channelRegister,channelActive方法。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {this.register0(promise);} }private void register0(ChannelPromise promise) {try {boolean firstRegistration = this.neverRegistered;AbstractChannel.this.doRegister();this.neverRegistered = false;AbstractChannel.this.registered = true;AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();this.safeSetSuccess(promise);AbstractChannel.this.pipeline.fireChannelRegistered();if (AbstractChannel.this.isActive()) {if (firstRegistration) {AbstractChannel.this.pipeline.fireChannelActive();} else if (AbstractChannel.this.config().isAutoRead()) {this.beginRead();}}}}

8.AbstractChannel.this.doRegister();這個(gè)就是把ServerSocketChannel注冊(cè)到NioEventLoop的selector中去。這時(shí)還沒(méi)有監(jiān)聽(tīng)事件。

AbstractNioChannelprotected void doRegister() throws Exception {boolean selected = false;while(true) {try {this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);return;} }}

生成的selectKey如下:?

9.invokeHandlerAddedIfNeeded會(huì)調(diào)用到父類(lèi)CHANNEL管道的channelHandler,也就是在我們初始化時(shí)第三步加入的handler

?在這里才會(huì)真正觸發(fā)handler的initChannel方法,生成第三步的ServerBootstrapAcceptor,初始化處理新的子連接的channelReader的處理器。

?10.現(xiàn)在初始化和注冊(cè)完成了,我們?cè)倩氐降谝徊?#xff0c;進(jìn)行doBind0(regFuture, channel, localAddress, promise)

AbstractBootstrap private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}

11.channel.bind,會(huì)調(diào)用內(nèi)部管道的bind,然后到tail的BIND,再到AbstractChannelHandlerContext的bind,因?yàn)閠ail就是繼承于AbstractChannelHandlerContext,這里面的BIND就是從尾部向前找,找一個(gè)帶有bind標(biāo)簽的HANDLER進(jìn)行處理。最終找到了header.ChannelHandlerContext(DefaultChannelPipeline$HeadContext#0, [id: 0x159b9902])

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);}public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {ObjectUtil.checkNotNull(localAddress, "localAddress");if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);EventExecutor executor = next.executor();next.invokeBind(localAddress, promise);return promise;}

12.最終到了head的BIND,這里面調(diào)用了unsafe本地IO類(lèi)的bind.

final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);}

13.最終調(diào)用到AbstractChannel的內(nèi)部類(lèi)AbstractUnsafe的bind,

@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {boolean wasActive = isActive();try {doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();}});}safeSetSuccess(promise);}

?14.接著調(diào)用到了AbstractUnsafe的外部類(lèi)的繼承類(lèi)的doBind方法,也就是NioServerSOcketChannel.doBind,這個(gè)就是調(diào)用原生的ServeSocketChannel進(jìn)行bind.

protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}}

15.綁定成功后會(huì)調(diào)用pipeline.fireChannelActive();這個(gè)是從頭節(jié)點(diǎn)一直向后傳遞事件。在頭節(jié)點(diǎn)的

channelActive方法中會(huì)觸發(fā)readIfIsAutoRead @Overridepublic final ChannelPipeline fireChannelActive() {AbstractChannelHandlerContext.invokeChannelActive(head);return this;}static void invokeChannelActive(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelActive();} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelActive();}});}}final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {@Overridepublic void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();readIfIsAutoRead();}

?16.readIfIsAutoRead就是channel如果開(kāi)啟了自動(dòng)讀,則開(kāi)始設(shè)置讀關(guān)注事件到eventLoop的selector中。這個(gè)讀會(huì)從管道的讀-》tail的讀->一直向前找支持讀的handler進(jìn)行處理。這個(gè)就是找到了head的handler

private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {channel.read();}}final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {public void read(ChannelHandlerContext ctx) {unsafe.beginRead();}

?17.headHandler的beginRead就調(diào)用了unsafe.beginRead方法,這個(gè)會(huì)調(diào)用外部channel.

doBeginRead--》AbstractNioChannel AbstractNioChannel protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}

這個(gè)就是將讀事件加到channel的selectKey的關(guān)注事件中。

?

?四、接收新連接流程

1.我們回到上一節(jié)的3,9步,為父類(lèi)NioServerSocketChannel的管道中新增了一個(gè)handler,

ServerBootstrapAcceptor,用來(lái)處理父類(lèi)的接收數(shù)據(jù),也就是新連接請(qǐng)求。當(dāng)有新連接進(jìn)來(lái)時(shí),會(huì)觸發(fā)到ServerBootstrapAcceptor.channelRead

2.ServerBootstrapAcceptor.channelRead 方法是怎么被調(diào)用的呢? 其實(shí)當(dāng)一個(gè) client 連接到 server 時(shí), Java 底層的 NIO ServerSocketChannel 會(huì)有一個(gè) SelectionKey.OP_ACCEPT 就緒事件, 接著就會(huì)調(diào)用到 NioServerSocketChannel.doReadMessages:

NioServerSocketChannel protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}

在 doReadMessages 中, 通過(guò) javaChannel().accept() 獲取到客戶(hù)端新連接的 SocketChannel, 接著就實(shí)例化一個(gè) NioSocketChannel, 并且傳入 NioServerSocketChannel 對(duì)象(即 this), 由此可知, 我們創(chuàng)建的這個(gè) NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 實(shí)例 .

?3.ServerBootstrapAcceptor.channelRead方法,這個(gè)就是把子channel的管道添加子handler,并且把子channel注冊(cè)到子group的eventLoop的selector中。流程跟父類(lèi)channel注冊(cè)一樣。

public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {childGroup.register(child)}

?4.還有個(gè)讀完成事件,里面跟上一節(jié)的第16步一樣,都是將當(dāng)前channel的selectKey中加入讀關(guān)注事件,以便接收讀數(shù)據(jù)。

@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.fireChannelReadComplete();readIfIsAutoRead();}

?當(dāng)然這個(gè)事件會(huì)反復(fù)調(diào)用,然后判斷加過(guò)讀事件,就不重復(fù)加了。

總結(jié)

以上是生活随笔為你收集整理的ServerBootstrap的启动流程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。