网络与IO知识扫盲(七):仿照Netty工作架构图,手写多路复用模型
生活随笔
收集整理的這篇文章主要介紹了
网络与IO知识扫盲(七):仿照Netty工作架构图,手写多路复用模型
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
Netty工作架構(gòu)圖
從圖上看來(lái):
- 一個(gè)線程在 Boss Group 中負(fù)責(zé)接收
- 另外兩個(gè)線程在 Worker Group 中由接收之后的連接分配過去,負(fù)責(zé)讀寫
根據(jù)上圖模型,仿照Netty手寫一個(gè)多路復(fù)用模型
MainThread.java
這里不做關(guān)于 IO 和 業(yè)務(wù)的事情
package com.bjmashibing.system.io.testreactor;public class MainThread {public static void main(String[] args) {//1,創(chuàng)建 IO Thread (一個(gè)或者多個(gè))SelectorThreadGroup boss = new SelectorThreadGroup(3); //混雜模式//boss有自己的線程組SelectorThreadGroup worker = new SelectorThreadGroup(3); //混雜模式//worker有自己的線程組//混雜模式,只有一個(gè)線程負(fù)責(zé)accept,每個(gè)都會(huì)被分配client,進(jìn)行R/W // SelectorThreadGroup stg = new SelectorThreadGroup(3);//2,我應(yīng)該把 監(jiān)聽(9999)的 server 注冊(cè)到某一個(gè) selector上boss.setWorker(worker);//但是,boss得多持有worker的引用:/*** boss里選一個(gè)線程注冊(cè)listen , 觸發(fā)bind,從而,這個(gè)不選中的線程得持有 workerGroup的引用* 因?yàn)槲磥?lái) listen 一旦accept得到client后得去worker中 next出一個(gè)線程分配*/boss.bind(9999);boss.bind(8888);boss.bind(6666);boss.bind(7777);} }SelectorThreadGroup.java
package com.bjmashibing.system.io.testreactor;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.Channel; import java.nio.channels.ServerSocketChannel; import java.util.concurrent.atomic.AtomicInteger;/*** @author: 馬士兵教育* @create: 2020-06-21 20:37*/ public class SelectorThreadGroup { //天生都是bossSelectorThread[] selectorThreadArray;ServerSocketChannel serverSocketChannel = null;AtomicInteger xid = new AtomicInteger(0);SelectorThreadGroup selectorThreadGroup = this;public void setWorker(SelectorThreadGroup selectorThreadGroup) {this.selectorThreadGroup = selectorThreadGroup;}SelectorThreadGroup(int num) {//num 線程數(shù)selectorThreadArray = new SelectorThread[num];for (int i = 0; i < num; i++) {selectorThreadArray[i] = new SelectorThread(this);new Thread(selectorThreadArray[i]).start();}}public void bind(int port) {try {serverSocketChannel = ServerSocketChannel.open();// 打開服務(wù)器-套接字通道serverSocketChannel.configureBlocking(false);//設(shè)置非阻塞serverSocketChannel.bind(new InetSocketAddress(port));//綁定端口//注冊(cè)到那個(gè)selector上呢? // nextSelectorV2(server);nextSelectorV3(serverSocketChannel);} catch (IOException e) {e.printStackTrace();}}/*** 負(fù)載均衡的方式選擇一個(gè)selector* @param channel*/public void nextSelectorV3(Channel channel) {try {if (channel instanceof ServerSocketChannel) {SelectorThread selectorThread = next(); //listen 選擇了 boss組selectorThreadArray中的一個(gè)線程后,要更新這個(gè)線程的work組selectorThread.linkedBlockingQueue.put(channel);selectorThread.setWorker(selectorThreadGroup);selectorThread.selector.wakeup();} else {SelectorThread selectorThread = nextV3(); //在main線程(當(dāng)前SelectorThreadGroup)中,取到堆里的selectorThread對(duì)象//1,通過隊(duì)列傳遞數(shù)據(jù) 消息selectorThread.linkedBlockingQueue.add(channel);//2,通過打斷阻塞,讓對(duì)應(yīng)的線程去自己在打斷后完成注冊(cè)selectorselectorThread.selector.wakeup();}} catch (InterruptedException e) {e.printStackTrace();}}public void nextSelectorV2(Channel c) {try {if (c instanceof ServerSocketChannel) {selectorThreadArray[0].linkedBlockingQueue.put(c);selectorThreadArray[0].selector.wakeup();} else {SelectorThread st = nextV2(); //在 main線程種,取到堆里的selectorThread對(duì)象//1,通過隊(duì)列傳遞數(shù)據(jù) 消息st.linkedBlockingQueue.add(c);//2,通過打斷阻塞,讓對(duì)應(yīng)的線程去自己在打斷后完成注冊(cè)selectorst.selector.wakeup();}} catch (InterruptedException e) {e.printStackTrace();}}public void nextSelector(Channel c) {SelectorThread st = next(); //在 main線程種,取到堆里的selectorThread對(duì)象//1,通過隊(duì)列傳遞數(shù)據(jù) 消息st.linkedBlockingQueue.add(c);//2,通過打斷阻塞,讓對(duì)應(yīng)的線程去自己在打斷后完成注冊(cè)selectorst.selector.wakeup();// public void nextSelector(Channel c) { // SelectorThread st = next(); //在 main線程種,取到堆里的selectorThread對(duì)象 // // //1,通過隊(duì)列傳遞數(shù)據(jù) 消息 // st.lbq.add(c); // //2,通過打斷阻塞,讓對(duì)應(yīng)的線程去自己在打斷后完成注冊(cè)selector // st.selector.wakeup();//重點(diǎn): c有可能是 server 有可能是client // ServerSocketChannel s = (ServerSocketChannel) c;//呼應(yīng)上, int nums = selector.select(); //阻塞 wakeup() // try { // s.register(st.selector, SelectionKey.OP_ACCEPT); //會(huì)被阻塞的!!!!! // st.selector.wakeup(); //功能是讓 selector的select()方法,立刻返回,不阻塞! // System.out.println("aaaaa"); // } catch (ClosedChannelException e) { // e.printStackTrace(); // }}//無(wú)論 serversocket socket 都復(fù)用這個(gè)方法private SelectorThread next() {int index = xid.incrementAndGet() % selectorThreadArray.length; //輪詢就會(huì)很尷尬,傾斜return selectorThreadArray[index];}private SelectorThread nextV2() {int index = xid.incrementAndGet() % (selectorThreadArray.length - 1); //輪詢就會(huì)很尷尬,傾斜return selectorThreadArray[index + 1];}private SelectorThread nextV3() {int index = xid.incrementAndGet() % selectorThreadGroup.selectorThreadArray.length; //動(dòng)用worker的線程分配return selectorThreadGroup.selectorThreadArray[index];} }SelectorThread.java
package com.bjmashibing.system.io.testreactor;import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue;/*** @author: 馬士兵教育* @create: 2020-06-21 20:14*/ public class SelectorThread extends ThreadLocal<LinkedBlockingQueue<Channel>> implements Runnable {// 每線程對(duì)應(yīng)一個(gè)selector,// 多線程情況下,該主機(jī),該程序的并發(fā)客戶端被分配到多個(gè)selector上//注意,每個(gè)客戶端,只綁定到其中一個(gè)selector//其實(shí)不會(huì)有交互問題Selector selector = null;// LinkedBlockingQueue<Channel> lbq = new LinkedBlockingQueue<>();LinkedBlockingQueue<Channel> linkedBlockingQueue = get(); //lbq 在接口或者類中是固定使用方式邏輯寫死了。你需要是lbq每個(gè)線程持有自己的獨(dú)立對(duì)象SelectorThreadGroup selectorThreadGroup;@Overrideprotected LinkedBlockingQueue<Channel> initialValue() {return new LinkedBlockingQueue<>();//你要豐富的是這里! pool。。。}SelectorThread(SelectorThreadGroup selectorThreadGroup) {try {this.selectorThreadGroup = selectorThreadGroup;selector = Selector.open();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void run() {//Loopwhile (true) {try {//1,select()int nums = selector.select(); //此方法執(zhí)行阻塞選擇操作。只有在選擇了至少一個(gè)通道、調(diào)用了此選擇器的wakeup方法或中斷了當(dāng)前線程(以最先出現(xiàn)的方式)之后,它才會(huì)返回。//2,處理selectedKeysSystem.out.println("In run(), selector.select() 獲取到的keys數(shù)量為: " + nums);if (nums > 0) {Set<SelectionKey> keys = selector.selectedKeys();System.out.println("In run(), keys is: " + keys);Iterator<SelectionKey> iter = keys.iterator();int loop = 0;while (iter.hasNext()) { //線程處理的過程System.out.println("In run(), loop is: " + loop++);SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) { //復(fù)雜,接受客戶端的過程(接收之后,要注冊(cè),多線程下,新的客戶端,注冊(cè)到那里呢?)acceptHandler(key);} else if (key.isReadable()) {readHander(key);} else if (key.isWritable()) {}}}//3,處理一些task : listen clientSystem.out.println("處理一些task");if (!linkedBlockingQueue.isEmpty()) { //隊(duì)列是個(gè)啥東西啊? 堆里的對(duì)象,線程的棧是獨(dú)立,堆是共享的System.out.println("linkedBlockingQueue 不是空的,包含 " + linkedBlockingQueue);//只有方法的邏輯,本地變量是線程隔離的Channel c = linkedBlockingQueue.take();if (c instanceof ServerSocketChannel) {ServerSocketChannel server = (ServerSocketChannel) c;server.register(selector, SelectionKey.OP_ACCEPT);System.out.println(Thread.currentThread().getName() + " register server listen");} else if (c instanceof SocketChannel) {SocketChannel client = (SocketChannel) c;ByteBuffer buffer = ByteBuffer.allocateDirect(4096);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println(Thread.currentThread().getName() + " register client: " + client.getRemoteAddress());} else {System.out.println("c 既不是server,也不是client,c=" + c);}} else {System.out.println("linkedBlockingQueue 是空的");}} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} // catch (InterruptedException e) { // e.printStackTrace(); // }}}private void readHander(SelectionKey key) {System.out.println(Thread.currentThread().getName() + " readHander......");ByteBuffer buffer = (ByteBuffer) key.attachment();SocketChannel client = (SocketChannel) key.channel();buffer.clear();while (true) {try {int num = client.read(buffer);if (num > 0) {buffer.flip(); //將讀到的內(nèi)容翻轉(zhuǎn),然后直接寫出while (buffer.hasRemaining()) {client.write(buffer);}buffer.clear();} else if (num == 0) {break;} else if (num < 0) {//客戶端斷開了System.out.println("client: " + client.getRemoteAddress() + " closed......");key.cancel();break;}} catch (IOException e) {e.printStackTrace();}}}private void acceptHandler(SelectionKey key) {System.out.println(Thread.currentThread().getName() + "::: acceptHandler Begin");ServerSocketChannel server = (ServerSocketChannel) key.channel();try {SocketChannel clientChannel = server.accept();clientChannel.configureBlocking(false);//選擇一個(gè)selector,并調(diào)用wakeup()完成注冊(cè)selectorThreadGroup.nextSelectorV3(clientChannel);System.out.println(Thread.currentThread().getName() + "::: acceptHandler Finish");} catch (IOException e) {e.printStackTrace();}}public void setWorker(SelectorThreadGroup stgWorker) {this.selectorThreadGroup = stgWorker;} }用nc命令連接
截取了一些輸出
"C:\Program Files\Java\jdk-11.0.3\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.2\lib\idea_rt.jar=7876:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.2\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\Bug\mashibing\bjmashbing-sysio\target\classes;C:\Users\Bug\.m2\repository\junit\junit\4.12\junit-4.12.jar;C:\Users\Bug\.m2\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;C:\Users\Bug\.m2\repository\io\netty\netty-all\4.1.49.Final\netty-all-4.1.49.Final.jar;C:\Users\Bug\.m2\repository\org\scala-sbt\test-interface\1.0\test-interface-1.0.jar com.bjmashibing.system.io.testreactor.MainThread In run(), selector.select() 獲取到的keys數(shù)量為: 0 處理一些task linkedBlockingQueue 不是空的,包含 [sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:9999]] In run(), selector.select() 獲取到的keys數(shù)量為: 0 處理一些task linkedBlockingQueue 不是空的,包含 [sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8888]] Thread-1 register server listen In run(), selector.select() 獲取到的keys數(shù)量為: 0 處理一些task Thread-2 register server listen linkedBlockingQueue 不是空的,包含 [sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:6666]] Thread-0 register server listen In run(), selector.select() 獲取到的keys數(shù)量為: 0 處理一些task linkedBlockingQueue 不是空的,包含 [sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:7777]] Thread-1 register server listen In run(), selector.select() 獲取到的keys數(shù)量為: 1 In run(), keys is: [channel=sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8888], selector=sun.nio.ch.WindowsSelectorImpl@473e262f, interestOps=16, readyOps=16] In run(), loop is: 0 Thread-2::: acceptHandler Begin Thread-2::: acceptHandler Finish 處理一些task In run(), selector.select() 獲取到的keys數(shù)量為: 0 處理一些task linkedBlockingQueue 是空的 linkedBlockingQueue 不是空的,包含 [java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7940]] Thread-4 register client: /192.168.111.1:7940 In run(), selector.select() 獲取到的keys數(shù)量為: 1 In run(), keys is: [channel=sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8888], selector=sun.nio.ch.WindowsSelectorImpl@473e262f, interestOps=16, readyOps=16] In run(), loop is: 0 Thread-2::: acceptHandler Begin Thread-2::: acceptHandler Finish 處理一些task linkedBlockingQueue 是空的 In run(), selector.select() 獲取到的keys數(shù)量為: 0 處理一些task linkedBlockingQueue 不是空的,包含 [java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7955]] Thread-5 register client: /192.168.111.1:7955 In run(), selector.select() 獲取到的keys數(shù)量為: 1 In run(), keys is: [channel=java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7955], selector=sun.nio.ch.WindowsSelectorImpl@7912fa80, interestOps=1, readyOps=1] In run(), loop is: 0 Thread-5 readHander...... 處理一些task linkedBlockingQueue 是空的 In run(), selector.select() 獲取到的keys數(shù)量為: 1 In run(), keys is: [channel=java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7955], selector=sun.nio.ch.WindowsSelectorImpl@7912fa80, interestOps=1, readyOps=1] In run(), loop is: 0 Thread-5 readHander...... 處理一些task linkedBlockingQueue 是空的 In run(), selector.select() 獲取到的keys數(shù)量為: 1 In run(), keys is: [channel=java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7940], selector=sun.nio.ch.WindowsSelectorImpl@24d0d04, interestOps=1, readyOps=1] In run(), loop is: 0 Thread-4 readHander...... 處理一些task linkedBlockingQueue 是空的 In run(), selector.select() 獲取到的keys數(shù)量為: 1 In run(), keys is: [channel=sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8888], selector=sun.nio.ch.WindowsSelectorImpl@473e262f, interestOps=16, readyOps=16] In run(), loop is: 0 Thread-2::: acceptHandler Begin Thread-2::: acceptHandler Finish 處理一些task linkedBlockingQueue 是空的 In run(), selector.select() 獲取到的keys數(shù)量為: 0 處理一些task linkedBlockingQueue 不是空的,包含 [java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7961]] Thread-3 register client: /192.168.111.1:7961 In run(), selector.select() 獲取到的keys數(shù)量為: 1 In run(), keys is: [channel=java.nio.channels.SocketChannel[connected local=/192.168.111.1:8888 remote=/192.168.111.1:7961], selector=sun.nio.ch.WindowsSelectorImpl@67ea4302, interestOps=1, readyOps=1] In run(), loop is: 0 Thread-3 readHander...... 處理一些task linkedBlockingQueue 是空的總結(jié)
以上是生活随笔為你收集整理的网络与IO知识扫盲(七):仿照Netty工作架构图,手写多路复用模型的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 多线程与高并发(一):单机高并发应该掌握
- 下一篇: 多线程与高并发(二):解析自旋锁CAS操