日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

netty客户端源码

發布時間:2024/7/19 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 netty客户端源码 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

隨筆記錄。

?

//創建一個ChannelFactory(客戶端代碼)

ChannelFactory factory = new NioClientSocketChannelFactory(

????????????????? Executors.newCachedThreadPool(),

????????????????? Executors.newCachedThreadPool());

// NioClientSocketChannelFactory構造方法

public NioClientSocketChannelFactory(

??????????? Executor bossExecutor, Executor workerExecutor,

??????????? int bossCount, int workerCount) {

??????? ...

? ? ? ? // 線程池

??????? this.bossExecutor = bossExecutor;

// 線程池

??????? this.workerExecutor = workerExecutor;

? ? ? ?// 構建ChannelSink,NioClientSocketPipelineSink實例

? ? ? ?// bossCount默認1,workerCount默認Runtime.getRuntime().availableProcessors() * 2

??????? sink = new NioClientSocketPipelineSink(

??????????????? bossExecutor, workerExecutor, bossCount, workerCount);

}

?

// NioClientSocketPipelineSink構造方法

NioClientSocketPipelineSink(Executor bossExecutor, Executor workerExecutor,

??????????? int bossCount, int workerCount) {

??????? this.bossExecutor = bossExecutor;

? ? ??

??????? bosses = new Boss[bossCount];

??????? for (int i = 0; i < bosses.length; i ++) {

??????????? bosses[i] = new Boss(i + 1);

??????? }

???????

??????? workers = new NioWorker[workerCount];

??????? for (int i = 0; i < workers.length; i ++) {

??????????? workers[i] = new NioWorker(id, i + 1, workerExecutor);

??????? }

}

?

// 創建Bootstrap并設置factory(客戶端代碼)

ClientBootstrap bootstrap = new ClientBootstrap(factory);

// Bootstrap類set方法

public void setFactory(ChannelFactory factory) {

??????? …

??????? this.factory = factory;

}

?

// 設置ChannelPipelineFactory,實現getPipeline方法,返回一個ChannelPipeline實現類

// (客戶端代碼)

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

????????????? public ChannelPipeline getPipeline() {

????????????????? ChannelPipeline pipeline = Channels.pipeline();

????????????????? pipeline.addLast("encode",new StringEncoder());

????????????????? pipeline.addLast("decode",new StringDecoder());

????????????????? pipeline.addLast("handler1",new TimeClientHandler());

????????????????? return pipeline;

????????????? }

????????? });

DefaultChannelPipeline類addLast方法

public synchronized void addLast(String name, ChannelHandler handler) {

??? if (name2ctx.isEmpty()) {

??????? // 初始化name2ctx,head,tail

??????? init(name, handler);

??? } else {

??????? …

??????? DefaultChannelHandlerContext oldTail = tail;

??????? DefaultChannelHandlerContext??? newTail =?? new DefaultChannelHandlerContext(oldTail, null, name, handler);

??????? …

??????? // 最新的DefaultChannelHandlerContext放入tail以及更新到oldTail.next中

??????? oldTail.next = newTail;

??????? tail = newTail;

??????? name2ctx.put(name, newTail);

??????? …

??? }

}

// 客戶端發起連接請求(客戶端代碼)

bootstrap.connect (new InetSocketAddress("127.0.0.1", 8080));

// connect源代碼解讀

ClientBootstrap類connect方法

public ChannelFuture connect(final SocketAddress remoteAddress,

final SocketAddress localAddress) {

???????? …

??????? ChannelPipeline pipeline;

??????? try {

? ? ? ? ? ?// 返回 DefaultChannelPipeline對象實例

??????????? pipeline = getPipelineFactory().getPipeline();

??????? } catch (Exception e) {

??????????? throw new ChannelPipelineException("Failed to initialize a pipeline.", e);

??????? }

??????? // Set the options.

? ? ? ? // 返回NioClientSocketChannelFactory實例,并創建NioClientSocketChannel實例

??????? Channel ch = getFactory().newChannel(pipeline);

??????? ch.getConfig().setOptions(getOptions());

??????? // Bind.

??????? if (localAddress != null) {

??????????? ch.bind(localAddress);

??????? }

??????? // Connect.

??????? return ch.connect(remoteAddress);

}

NioClientSocketChannelFactory類newChannel方法

public SocketChannel newChannel(ChannelPipeline pipeline) {

????????//this為NioClientSocketChannelFactory實例

? ? ? ?//pipeline為DefaultChannelPipeline實例

? ? ? ?//sink為NioClientSocketPipelineSink實例

? ? ? ?// sink.nextWorker返回一個NioWorker實例

??????? return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker());

}

NioClientSocketChannel類構造方法

NioClientSocketChannel(

??????????? ChannelFactory factory, ChannelPipeline pipeline,

??????????? ChannelSink sink, NioWorker worker) {

? ? ? ? // ?新創建一個SocketChannel(newSocket() = > SocketChannel.open())

??????? super(null, factory, pipeline, sink, newSocket(), worker);

??????? fireChannelOpen(this);

??? }

繼續看父類NioSocketChannel構造方法

public NioSocketChannel(

??????????? Channel parent, ChannelFactory factory,

??????????? ChannelPipeline pipeline, ChannelSink sink,

??????????? SocketChannel socket, NioWorker worker) {

??????? super(parent, factory, pipeline, sink);

??????? this.socket = socket;

??????? this.worker = worker;

??????? config = new DefaultNioSocketChannelConfig(socket.socket());

}

繼續看父類AbstractChannel構造方法

protected AbstractChannel(

??????????? Channel parent, ChannelFactory factory,

??????????? ChannelPipeline pipeline, ChannelSink sink) {

?????????????????? // 傳入了一個null值

??????? this.parent = parent;

?????????????????? // NioClientSocketChannelFactory實例

??????? this.factory = factory;

?????????????????? // DefaultChannelPipeline實例

??????? this.pipeline = pipeline;

??????? id = allocateId(this);

??????? pipeline.attach(this, sink);

}

DefaultChannelPipeline類attach方法

public void attach(Channel channel, ChannelSink sink) {

??????? …

?????????????????? // NioClientSocketChannel實例

??????? this.channel = channel;

?????????????????? // NioClientSocketPipelineSink實例

??????? this.sink = sink;

}

?

// ClientBootstrap類connect方法中ch.connect(remoteAddress)

//類AbstractChannel

public ChannelFuture connect(SocketAddress remoteAddress) {

??????? return Channels.connect(this, remoteAddress);

}

//類Channels

public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {

??????? if (remoteAddress == null) {

??????????? throw new NullPointerException("remoteAddress");

??????? }

??????? ChannelFuture future = future(channel, true);

?????????????????? // DefaultChannelPipeline

?????????????????? // 新建一個ChannelState實例DownstreamChannelStateEvent

??????? channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(

??????????????? channel, future, ChannelState.CONNECTED, remoteAddress));

??????? return future;

}

//類NioClientSocketPipelineSink

public void eventSunk(

??????????? ChannelPipeline pipeline, ChannelEvent e) throws Exception {

??????? if (e instanceof ChannelStateEvent) {

??????????? ChannelStateEvent event = (ChannelStateEvent) e;

??????????? NioClientSocketChannel channel =

??????????????? (NioClientSocketChannel) event.getChannel();

??????????? ChannelFuture future = event.getFuture();

??????????? ChannelState state = event.getState();

??????????? Object value = event.getValue();

?

??????????? switch (state) {

??????????? case OPEN:

??????????????? if (Boolean.FALSE.equals(value)) {

??????????????????? channel.worker.close(channel, future);

??????????????? }

??????????????? break;

??????????? case BOUND:

??????????????? if (value != null) {

??????????????????? bind(channel, future, (SocketAddress) value);

??????????????? } else {

??????????????????? channel.worker.close(channel, future);

??????????????? }

??????????????? break;

??????????? case CONNECTED:

??????????????? if (value != null) {

?????????????????????????????????????????????? //第一次客戶端發起連接

??????????????????? connect(channel, future, (SocketAddress) value);

??????????????? } else {

??????????????????? channel.worker.close(channel, future);

??????????????? }

??????????????? break;

??????????? case INTEREST_OPS:

??????????????? channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());

??????????????? break;

??????????? }

??????? } else if (e instanceof MessageEvent) {

??????????? MessageEvent event = (MessageEvent) e;

??????????? NioSocketChannel channel = (NioSocketChannel) event.getChannel();

??????????? boolean offered = channel.writeBuffer.offer(event);

??????????? assert offered;

??????????? channel.worker.writeFromUserCode(channel);

??????? }

}

private void connect(

??????????? final NioClientSocketChannel channel, final ChannelFuture cf,

??????????? SocketAddress remoteAddress) {

??????? try {

// channel.socket在初始化NioClientSocketChannel時創建

//nio發起連接,因為設置了socket.configureBlocking(false)

//connect方法立即返回,返回值為false

//此時服務端已經收到了客戶端發送的connect事件并進行處理

??????????? if (channel.socket.connect(remoteAddress)) {

??????????????? channel.worker.register(channel, cf);

??????????? } else {

??????????????? channel.getCloseFuture().addListener(new ChannelFutureListener() {

??????????????????? public void operationComplete(ChannelFuture f)

??????????????????????????? throws Exception {

??????????????????????? if (!cf.isDone()) {

??????????????????????????? cf.setFailure(new ClosedChannelException());

??????????????????????? }

??????????????????? }

???? ???????????});

??????????????? cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

??????????????? channel.connectFuture = cf;

???????????????????????????????????? //注冊事件,nextBoss()返回一個Runnable實例

??????????????? nextBoss().register(channel);

??????????? }

?

??????? } catch (Throwable t) {

??????????? cf.setFailure(t);

??????????? fireExceptionCaught(channel, t);

??????????? channel.worker.close(channel, succeededFuture(channel));

??????? }

}

// Boss內部類 NioClientSocketPipelineSink

void register(NioClientSocketChannel channel) {

??? Runnable registerTask = new RegisterTask(this, channel);

??? Selector selector;

?

??? synchronized (startStopLock) {

??????? if (!started) {

??????????? // Open a selector if this worker didn't start yet.

??????????? try {

??????????????? // 打開一個選擇器

????????? ??????this.selector = selector =? Selector.open();

??????????? } catch (Throwable t) {

??????????????? throw new ChannelException(

??????????????????????? "Failed to create a selector.", t);

??????????? }

?

??????????? // Start the worker thread with the new Selector.

??????????? boolean success = false;

??????????? try {

??????????????? //啟動線程,消費任務隊列

//bossExecutor是客戶端代碼Executors.newCachedThreadPool()所創建

// nio的selector.select(500)操作

??????????????? DeadLockProofWorker.start(

??????????????????????? bossExecutor,

??????????????????????? new ThreadRenamingRunnable(

??????????????????????????????? this, "New I/O client boss #" + id + '-' + subId));

??????????????? success = true;

??????????? } finally {

??????????????? if (!success) {

????????????????? ??// Release the Selector if the execution fails.

??????????????????? try {

??????????????????????? selector.close();

??????????????????? } catch (Throwable t) {

??????????????????????? logger.warn("Failed to close a selector.", t);

??????????????????? }

??????????????????? this.selector = selector = null;

??????????????????? // The method will return to the caller at this point.

??????????????? }

??????????? }

??????? } else {

??????????? // Use the existing selector if this worker has been started.

????? ??????selector = this.selector;

??????? }

?

??????? assert selector != null && selector.isOpen();

?

??????? started = true;

??????? //寫入隊列一個注冊任務

??????? boolean offered = registerTaskQueue.offer(registerTask);

??????? assert offered;

??? }

?

??? if (wakenUp.compareAndSet(false, true)) {

??????? selector.wakeup();

??? }

}

//類DeadLockProofWorker

public static void start(final Executor parent, final Runnable runnable) {

? //parent為bossExecutor,即一個線程池

? ? ? ? ......

//開啟一個子線程

??????? parent.execute(new Runnable() {

??????????? public void run() {

??????????????? PARENT.set(parent);

??????????????? try {

? ? ? ? ? ? ? ? ? ?// ThreadRenamingRunnable實例

??????????????????? runnable.run();

??????????????? } finally {

??????????????????? PARENT.remove();

??????????????? }

??????????? }

??????? });

}

//類ThreadRenamingRunnable

public void run() {

? ? ? ?......

??????? // Run the actual runnable and revert the name back when it ends.

??????? try {

? ? ? ? ? //runnable為Boss實例

??????????? runnable.run();

??????? } finally {

??????????? if (renamed) {

??????????????? // Revert the name back if the current thread was renamed.

??????????????? // We do not check the exception here because we know it works.

??????????????? currentThread.setName(oldThreadName);

??????????? }

??????? }

}

// Boss內部類 NioClientSocketPipelineSink中

public void run() {

??? boolean shutdown = false;

??? Selector selector = this.selector;

??? long lastConnectTimeoutCheckTimeNanos = System.nanoTime();

??? for (;;) {

??????? wakenUp.set(false);

?

??????? try {

? ? ? ? ? ? // 設置超時阻塞

??????????? int selectedKeyCount = selector.select(500);

?

??????????? if (wakenUp.get()) {

??????????????? selector.wakeup();

??????????? }

? ? ? ? ? ? // 消費隊列中的事件

? ? ? ? ? ? //nio中register操作

???????? ???processRegisterTaskQueue();

?

??????????? if (selectedKeyCount > 0) {

? ? ? ? ? ? ? ? //處理選擇器獲取到的事件

??????????????? processSelectedKeys(selector.selectedKeys());

??????????? }

??????????? ……

??????? } catch (Throwable t) {

?????????? ……

??????? }

??? }

}

?

private void processRegisterTaskQueue() {

for (;;) {

???????? //獲取事件,task為registerTaskQueue.offer(registerTask);RegisterTask實例

??????? final Runnable task = registerTaskQueue.poll();

??????? if (task == null) {

??????????? break;

??????? }

? ? ? ?//執行NioClientSocketPipelineSink中的內部類RegisterTask的Run方法

??????? task.run();

??? }

}

?

//內部類RegisterTask NioClientSocketPipelineSink中

public void run() {

try {

???????? // nio socket注冊,只有完成注冊以后,才能和服務端進行通信

??????? channel.socket.register(

??????????????? boss.selector, SelectionKey.OP_CONNECT, channel);

??? } catch (ClosedChannelException e) {

??????? channel.worker.close(channel, succeededFuture(channel));

??? }

?? ……

}

private void processSelectedKeys(Set<SelectionKey> selectedKeys) {

??? for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {

??????? SelectionKey k = i.next();

??????? i.remove();

?

??????? if (!k.isValid()) {

??????????? close(k);

??????????? continue;

??????? }

?

??????? if (k.isConnectable()) {

? ? ? ? ? ? //完成客戶端連接

??????????? connect(k);

??????? }

??? }

}

?

private void connect(SelectionKey k) {

??? NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();

try {

???????? //nio完成客戶端連接

??????? if (ch.socket.finishConnect()) {

??????????? k.cancel();

? ? ? ? ? ? //NioWorker類注冊

??????????? ch.worker.register(ch, ch.connectFuture);

??????? }

??? } catch (Throwable t) {

? ? ? ?.......

??? }

}

?

類NioWorker負責讀寫事件注冊處理

未完待續...

轉載于:https://www.cnblogs.com/liuxinan/p/6073424.html

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的netty客户端源码的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。