日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

netty reactor线程模型分析

發布時間:2025/4/5 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 netty reactor线程模型分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

netty4線程模型

ServerBootstrap http示例

// Configure the server.EventLoopGroup bossGroup = new EpollEventLoopGroup(1);EventLoopGroup workerGroup = new EpollEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.channel(EpollServerSocketChannel.class);b.option(ChannelOption.SO_BACKLOG, 1024);b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);b.group(bossGroup, workerGroup)// .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HttpHelloWorldServerInitializer(sslCtx));Channel ch = b.bind(PORT).sync().channel(); /* System.err.println("Open your web browser and navigate to " +(SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');*/ch.closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}

綁定過程:

private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.executor = channel.eventLoop();}doBind0(regFuture, channel, localAddress, promise);}});return promise;}}

初始化過程:

final ChannelFuture initAndRegister() {final Channel channel = channelFactory().newChannel();try {init(channel);} catch (Throwable t) {channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}ChannelFuture regFuture = group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture;}

ServerBootStrap的初始化過程:

@Overridevoid init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options();synchronized (options) {channel.config().setOptions(options);}final Map<AttributeKey<?>, Object> attrs = attrs();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = handler();if (handler != null) {pipeline.addLast(handler);} pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}

接收器ServerBootstrapAcceptor

@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}

ThreadPerChannelEventLoopGroup實現注冊

@Overridepublic ChannelFuture register(Channel channel) {if (channel == null) {throw new NullPointerException("channel");}try { EventLoop l = nextChild();return l.register(channel, new DefaultChannelPromise(channel, l));} catch (Throwable t) {return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);}}

獲取子eventLoop

private EventLoop nextChild() throws Exception {if (shuttingDown) {throw new RejectedExecutionException("shutting down");}EventLoop loop = idleChildren.poll();if (loop == null) {if (maxChannels > 0 && activeChildren.size() >= maxChannels) {throw tooManyChannels;}loop = newChild(childArgs);loop.terminationFuture().addListener(childTerminationListener);}activeChildren.add(loop);return loop;}

產生新子eventLoop(SingleThreadEventExecutor.java)

/*** Create a new instance** @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it* @param executor the {@link Executor} which will be used for executing* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the* executor thread*/protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {super(parent);if (executor == null) {throw new NullPointerException("executor");}this.addTaskWakesUp = addTaskWakesUp;this.executor = executor;taskQueue = newTaskQueue();}

其執行方法(SingleThreadEventExecutor.java):

@Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else { startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}

啟動處理線程(SingleThreadEventExecutor.java):

private void startThread() {if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {doStartThread();}}}private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}

其中的run方法由其子類(DefaultEventLoop,EpollEventLoop,NioEventLoop,ThreadPerChannelEventLoop)各種實現,以NioEventLoop為例:

@Overrideprotected void run() {for (;;) {boolean oldWakenUp = wakenUp.getAndSet(false);try {if (hasTasks()) {selectNow();} else {select(oldWakenUp);// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and// 'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and// 'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {processSelectedKeys(); runAllTasks();} else {final long ioStartTime = System.nanoTime();processSelectedKeys();final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}if (isShuttingDown()) {closeAll();if (confirmShutdown()) {break;}}} catch (Throwable t) {logger.warn("Unexpected exception in the selector loop.", t);// Prevent possible consecutive immediate failures that lead to// excessive CPU consumption.try {Thread.sleep(1000);} catch (InterruptedException e) {// Ignore. }}}}

運行所有任務(SingleThreadEventExecutor.java)

/*** Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.*/protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue();Runnable task = pollTask();if (task == null) {return false;}final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;long runTasks = 0;long lastExecutionTime;for (;;) {try {task.run();} catch (Throwable t) {logger.warn("A task raised an exception.", t);}runTasks ++;// Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}this.lastExecutionTime = lastExecutionTime;return true;}

小結

  本文從一個簡單的示例程序,一步步分析netty4的線程模型,從ServerBootstrapAcceptor到SingleThreadEventExecutor的源碼,環環相扣,可以根據上面的分析鏈理解

一個請求過來后,netty的處理流程。

?

轉載于:https://www.cnblogs.com/davidwang456/p/5118802.html

總結

以上是生活随笔為你收集整理的netty reactor线程模型分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 久久国产成人 | 伊人免费在线观看高清版 | 亚洲欧洲一区二区在线观看 | 欧美日日骚 | 最新中文字幕第一页 | 在线a网站 | 亚洲天堂爱爱 | 日韩爱爱片| 日韩免费| 成人久久毛片 | 揄拍成人国产精品视频 | 国产一区欧美 | 午夜免费小视频 | 色欧美88888久久久久久影院 | 丰满少妇理论片 | 涩涩涩涩涩涩涩涩涩 | 亚洲青色在线 | 日韩欧美国产电影 | 中文字幕一区二区三区门四区五区 | 国产精品天天操 | 国产黄色一级网站 | 精产国品一二三产区m553麻豆 | 美日韩一区二区三区 | 18日本xxxxxxxxx95 国产又好看的毛片 | 在线看不卡av | 久久天天操 | 日本三级韩国三级美三级91 | www日本com| 亚洲国产tv | 中国zzji女人高潮免费 | 中文字幕精品一区二 | 日韩极品视频 | 男女拍拍拍 | 姑娘第5集在线观看免费 | 久久精品高清视频 | 在线免费观看日本 | 国产日韩欧美激情 | 国产人妻人伦精品1国产丝袜 | 中文字幕资源在线 | 久久久久久无码精品人妻一区二区 | 91av看片| 91精品国产乱码久久久张津瑜 | 久久久久香蕉视频 | 日韩孕交 | 久艹视频在线观看 | 男人天堂国产 | 黄色一级播放 | 日本香蕉视频 | 欲涩漫入口免费网站 | 日本在线高清 | 91色精品| 女人扒开屁股让男人桶 | 免费在线黄网站 | 国产成人精品免高潮在线观看 | 欧美成人精品激情在线视频 | www插插插无码免费视频网站 | 一本一道精品欧美中文字幕 | 欧美 日韩 国产 激情 | 日韩高清不卡在线 | a级片在线看 | 日韩久久不卡 | 在线天堂一区 | 日本精品在线观看视频 | 琪琪秋霞午夜被窝电影网 | 日韩午夜影院 | 香蕉成人在线视频 | 亚洲福利视频在线 | 不卡视频国产 | 日韩视频专区 | 黄色小视频免费网站 | 久久精品视频日本 | 国产亚洲精品久久久久丝瓜 | 欧美久久激情 | 九九看片| 校园春色 亚洲色图 | 国产69页 | 激情在线观看视频 | 朝桐光一区二区三区 | 人人爱人人看 | 日日骚视频 | 亚洲天堂视频一区 | 一起艹在线观看 | 99热在线这里只有精品 | 51国产偷自视频区视频 | 狠狠躁夜夜躁人爽 | 丝袜视频在线观看 | 久久精品视频免费看 | 久久综合一区二区三区 | 国产免费叼嘿网站免费 | 超碰中文在线 | 欧美性生活网站 | 香蕉尹人网 | 一级片免费观看 | 国产精品久久久久无码av | 丁香花婷婷| 国产三级午夜理伦三级 | 激情综合网激情 | 99热影院| 久久99国产精品成人 |