并发设计模式
1、Immutability模式:如何利用不變性解決并發問題?
“多個線程同時讀寫同一共享變量存在并發問題”,這里的必要條件之一是讀寫,如果只有讀,而沒有寫,是沒有并發問題的。
解決并發問題,其實最簡單的辦法就是讓共享變量只有讀操作,而沒有寫操作。這個辦法如此重要,以至于被上升到了一種解決并發問題的設計模式:不變性(Immutability)模式。所謂不變性,簡單來講,就是對象一旦被創建之后,狀態就不再發生變化。換句話說,就是變量一旦被賦值,就不允許修改了(沒有寫操作);沒有修改操作,也就是保持了不變性。
(1)快速實現具備不可變性的類
實現一個具備不可變性的類,還是挺簡單的。將一個類所有的屬性都設置成 final 的,并且只允許存在只讀方法,那么這個類基本上就具備不可變性了。更嚴格的做法是這個類本身也是 final 的,也就是不允許繼承。因為子類可以覆蓋父類的方法,有可能改變不可變性,所以推薦你在實際工作中,使用這種更嚴格的做法。
Java SDK 里很多類都具備不可變性,只是由于它們的使用太簡單,最后反而被忽略了。例如經常用到的 String 和 Long、Integer、Double 等基礎類型的包裝類都具備不可變性,這些對象的線程安全性都是靠不可變性來保證的。如果你仔細翻看這些類的聲明、屬性和方法,你會發現它們都嚴格遵守不可變類的三點要求:類和屬性都是 final 的,所有方法均是只讀的。
看到這里你可能會疑惑,Java 的 String 方法也有類似字符替換操作,怎么能說所有方法都是只讀的呢?我們結合 String 的源代碼來解釋一下這個問題,下面的示例代碼源自 Java 1.8 SDK,我略做了修改,僅保留了關鍵屬性 value[]和 replace() 方法,你會發現:String 這個類以及它的屬性 value[]都是 final 的;而 replace() 方法的實現,就的確沒有修改 value[],而是將替換后的字符串作為返回值返回了。
public final class String {private final char value[];// 字符替換String replace(char oldChar, char newChar) {//無需替換,直接返回this if (oldChar == newChar){return this;}int len = value.length;int i = -1;/* avoid getfield opcode */char[] val = value; //定位到需要替換的字符位置while (++i < len) {if (val[i] == oldChar) {break;}}//未找到oldChar,無需替換if (i >= len) {return this;} //創建一個buf[],這是關鍵//用來保存替換后的字符串char buf[] = new char[len];for (int j = 0; j < i; j++) {buf[j] = val[j];}while (i < len) {char c = val[i];buf[i] = (c == oldChar) ? newChar : c;i++;}//創建一個新的字符串返回//原字符串不會發生任何變化return new String(buf, true);} }通過分析 String 的實現,你可能已經發現了,如果具備不可變性的類,需要提供類似修改的功能,具體該怎么操作呢?做法很簡單,那就是創建一個新的不可變對象,這是與可變對象的一個重要區別,可變對象往往是修改自己的屬性。
所有的修改操作都創建一個新的不可變對象,你可能會有這種擔心:是不是創建的對象太多了,有點太浪費內存呢?是的,這樣做的確有些浪費,那如何解決呢?
(2)利用享元模式避免創建重復對象
如果你熟悉面向對象相關的設計模式,相信你一定能想到享元模式(Flyweight Pattern)。利用享元模式可以減少創建對象的數量,從而減少內存占用。Java 語言里面 Long、Integer、Short、Byte 等這些基本數據類型的包裝類都用到了享元模式。
如果你熟悉面向對象相關的設計模式,相信你一定能想到享元模式(Flyweight Pattern)。利用享元模式可以減少創建對象的數量,從而減少內存占用。Java 語言里面 Long、Integer、Short、Byte 等這些基本數據類型的包裝類都用到了享元模式。
下面我們就以 Long 這個類作為例子,看看它是如何利用享元模式來優化對象的創建的。
享元模式本質上其實就是一個對象池,利用享元模式創建對象的邏輯也很簡單:創建之前,首先去對象池里看看是不是存在;如果已經存在,就利用對象池里的對象;如果不存在,就會新創建一個對象,并且把這個新創建出來的對象放進對象池里。
Long 這個類并沒有照搬享元模式,Long 內部維護了一個靜態的對象池,僅緩存了[-128,127]之間的數字,這個對象池在 JVM 啟動的時候就創建好了,而且這個對象池一直都不會變化,也就是說它是靜態的。之所以采用這樣的設計,是因為 Long 這個對象的狀態共有 264 種,實在太多,不宜全部緩存,而[-128,127]之間的數字利用率最高。下面的示例代碼出自 Java 1.8,valueOf() 方法就用到了 LongCache 這個緩存,你可以結合著來加深理解。
Long valueOf(long l) {final int offset = 128;// [-128,127]直接的數字做了緩存if (l >= -128 && l <= 127) { return LongCache.cache[(int)l + offset];}return new Long(l); } //緩存,等價于對象池 //僅緩存[-128,127]直接的數字 static class LongCache {static final Long cache[] = new Long[-(-128) + 127 + 1];static {for(int i=0; i<cache.length; i++)cache[i] = new Long(i-128);} }“Integer 和 String 類型的對象不適合做鎖”,其實基本上所有的基礎類型的包裝類都不適合做鎖,因為它們內部用到了享元模式,這會導致看上去私有的鎖,其實是共有的。例如在下面代碼中,本意是 A 用鎖 al,B 用鎖 bl,各自管理各自的,互不影響。但實際上 al 和 bl 是一個對象,結果 A 和 B 共用的是一把鎖。
class A {Long al=Long.valueOf(1);public void setAX(){synchronized (al) {//省略代碼無數}} } class B {Long bl=Long.valueOf(1);public void setBY(){synchronized (bl) {//省略代碼無數}} }(2)使用 Immutability 模式的注意事項
在使用 Immutability 模式的時候,需要注意以下兩點:
① 對象的所有屬性都是 final 的,并不能保證不可變性;
② 不可變對象也需要正確發布。
在 Java 語言中,final 修飾的屬性一旦被賦值,就不可以再修改,但是如果屬性的類型是普通對象,那么這個普通對象的屬性是可以被修改的。例如下面的代碼中,Bar 的屬性 foo 雖然是 final 的,依然可以通過 setAge() 方法來設置 foo 的屬性 age。所以,在使用 Immutability 模式的時候一定要確認保持不變性的邊界在哪里,是否要求屬性對象也具備不可變性。
class Foo{int age=0;int name="abc"; } final class Bar {final Foo foo;void setAge(int a){foo.age=a;} }下面我們再看看如何正確地發布不可變對象。不可變對象雖然是線程安全的,但是并不意味著引用這些不可變對象的對象就是線程安全的。例如在下面的代碼中,Foo 具備不可變性,線程安全,但是類 Bar 并不是線程安全的,類 Bar 中持有對 Foo 的引用 foo,對 foo 這個引用的修改在多線程中并不能保證可見性和原子性。
//Foo線程安全 final class Foo{final int age=0;final int name="abc"; } //Bar線程不安全 class Bar {Foo foo;void setFoo(Foo f){this.foo=f;} }如果你的程序僅僅需要 foo 保持可見性,無需保證原子性,那么可以將 foo 聲明為 volatile 變量,這樣就能保證可見性。如果你的程序需要保證原子性,那么可以通過原子類來實現。下面的示例代碼是合理庫存的原子化實現,你應該很熟悉了,其中就是用原子類解決了不可變對象引用的原子性問題。
public class SafeWM {class WMRange{final int upper;final int lower;WMRange(int upper,int lower){//省略構造函數實現}}final AtomicReference<WMRange>rf = new AtomicReference<>(new WMRange(0,0));// 設置庫存上限void setUpper(int v){while(true){WMRange or = rf.get();// 檢查參數合法性if(v < or.lower){throw new IllegalArgumentException();}WMRange nr = newWMRange(v, or.lower);if(rf.compareAndSet(or, nr)){return;}}} }利用 Immutability 模式解決并發問題,也許你覺得有點陌生,其實你天天都在享受它的戰果。Java 語言里面的 String 和 Long、Integer、Double 等基礎類型的包裝類都具備不可變性,這些對象的線程安全性都是靠不可變性來保證的。Immutability 模式是最簡單的解決并發問題的方法,建議當你試圖解決一個并發問題時,可以首先嘗試一下 Immutability 模式,看是否能夠快速解決。
具備不變性的對象,只有一種狀態,這個狀態由對象內部所有的不變屬性共同決定。其實還有一種更簡單的不變性對象,那就是無狀態。無狀態對象內部沒有屬性,只有方法。除了無狀態的對象,你可能還聽說過無狀態的服務、無狀態的協議等等。無狀態有很多好處,最核心的一點就是性能。在多線程領域,無狀態對象沒有線程安全問題,無需同步處理,自然性能很好;在分布式領域,無狀態意味著可以無限地水平擴展,所以分布式領域里面性能的瓶頸一定不是出在無狀態的服務節點上。
2、Copy-on-Write模式:不是延時策略的COW
Java 里 String 這個類在實現 replace() 方法的時候,并沒有更改原字符串里面 value[]數組的內容,而是創建了一個新字符串,這種方法在解決不可變對象的修改問題時經常用到。如果你深入地思考這個方法,你會發現它本質上是一種 Copy-on-Write 方法。所謂 Copy-on-Write,經常被縮寫為 COW 或者 CoW,顧名思義就是寫時復制。
不可變對象的寫操作往往都是使用 Copy-on-Write 方法解決的,當然 Copy-on-Write 的應用領域并不局限于 Immutability 模式。下面我們先簡單介紹一下 Copy-on-Write 的應用領域,讓你對它有個更全面的認識。
(1)Copy-on-Write 模式的應用領域
CopyOnWriteArrayList 和 CopyOnWriteArraySet 這兩個 Copy-on-Write 容器,它們背后的設計思想就是 Copy-on-Write;通過 Copy-on-Write 這兩個容器實現的讀操作是無鎖的,由于無鎖,所以將讀操作的性能發揮到了極致。
除了 Java 這個領域,Copy-on-Write 在操作系統領域也有廣泛的應用。
我第一次接觸 Copy-on-Write 其實就是在操作系統領域。類 Unix 的操作系統中創建進程的 API 是 fork(),傳統的 fork() 函數會創建父進程的一個完整副本,例如父進程的地址空間現在用到了 1G 的內存,那么 fork() 子進程的時候要復制父進程整個進程的地址空間(占有 1G 內存)給子進程,這個過程是很耗時的。而 Linux 中的 fork() 函數就聰明得多了,fork() 子進程的時候,并不復制整個進程的地址空間,而是讓父子進程共享同一個地址空間;只用在父進程或者子進程需要寫入的時候才會復制地址空間,從而使父子進程擁有各自的地址空間。
本質上來講,父子進程的地址空間以及數據都是要隔離的,使用 Copy-on-Write 更多地體現的是一種延時策略,只有在真正需要復制的時候才復制,而不是提前復制好,同時 Copy-on-Write 還支持按需復制,所以 Copy-on-Write 在操作系統領域是能夠提升性能的。相比較而言,Java 提供的 Copy-on-Write 容器,由于在修改的同時會復制整個容器,所以在提升讀操作性能的同時,是以內存復制為代價的。這里你會發現,同樣是應用 Copy-on-Write,不同的場景,對性能的影響是不同的。
在操作系統領域,除了創建進程用到了 Copy-on-Write,很多文件系統也同樣用到了,例如 Btrfs (B-Tree File System)、aufs(advanced multi-layered unification filesystem)等。
除了上面我們說的 Java 領域、操作系統領域,很多其他領域也都能看到 Copy-on-Write 的身影:Docker 容器鏡像的設計是 Copy-on-Write,甚至分布式源碼管理系統 Git 背后的設計思想都有 Copy-on-Write……
不過,Copy-on-Write 最大的應用領域還是在函數式編程領域。函數式編程的基礎是不可變性(Immutability),所以函數式編程里面所有的修改操作都需要 Copy-on-Write 來解決。你或許會有疑問,“所有數據的修改都需要復制一份,性能是不是會成為瓶頸呢?”你的擔憂是有道理的,之所以函數式編程早年間沒有興起,性能絕對拖了后腿。但是隨著硬件性能的提升,性能問題已經慢慢變得可以接受了。而且,Copy-on-Write 也遠不像 Java 里的 CopyOnWriteArrayList 那樣笨:整個數組都復制一遍。Copy-on-Write 也是可以按需復制的,如果你感興趣可以參考Purely Functional Data Structures這本書,里面描述了各種具備不變性的數據結構的實現。
CopyOnWriteArrayList 和 CopyOnWriteArraySet 這兩個 Copy-on-Write 容器在修改的時候會復制整個數組,所以如果容器經常被修改或者這個數組本身就非常大的時候,是不建議使用的。反之,如果是修改非常少、數組數量也不大,并且對讀性能要求苛刻的場景,使用 Copy-on-Write 容器效果就非常好了。下面我們結合一個真實的案例來講解一下。
(2)案例
一個 RPC 框架,有點類似 Dubbo,服務提供方是多實例分布式部署的,所以服務的客戶端在調用 RPC 的時候,會選定一個服務實例來調用,這個選定的過程本質上就是在做負載均衡,而做負載均衡的前提是客戶端要有全部的路由信息。例如在下圖中,A 服務的提供方有 3 個實例,分別是 192.168.1.1、192.168.1.2 和 192.168.1.3,客戶端在調用目標服務 A 前,首先需要做的是負載均衡,也就是從這 3 個實例中選出 1 個來,然后再通過 RPC 把請求發送選中的目標實例。
RPC 框架的一個核心任務就是維護服務的路由關系,我們可以把服務的路由關系簡化成下圖所示的路由表。當服務提供方上線或者下線的時候,就需要更新客戶端的這張路由表。
我們首先來分析一下如何用程序來實現。每次 RPC 調用都需要通過負載均衡器來計算目標服務的 IP 和端口號,而負載均衡器需要通過路由表獲取接口的所有路由信息,也就是說,每次 RPC 調用都需要訪問路由表,所以訪問路由表這個操作的性能要求是很高的。不過路由表對數據的一致性要求并不高,一個服務提供方從上線到反饋到客戶端的路由表里,即便有 5 秒鐘,很多時候也都是能接受的(5 秒鐘,對于以納秒作為時鐘周期的 CPU 來說,那何止是一萬年,所以路由表對一致性的要求并不高)。而且路由表是典型的讀多寫少類問題,寫操作的量相比于讀操作,可謂是滄海一粟,少得可憐。
通過以上分析,你會發現一些關鍵詞:對讀的性能要求很高,讀多寫少,弱一致性。它們綜合在一起,你會想到什么呢?CopyOnWriteArrayList 和 CopyOnWriteArraySet 天生就適用這種場景啊。所以下面的示例代碼中,RouteTable 這個類內部我們通過ConcurrentHashMap<string, copyonwritearrayset>這個數據結構來描述路由表,ConcurrentHashMap 的 Key 是接口名,Value 是路由集合,這個路由集合我們用是 CopyOnWriteArraySet。
下面我們再來思考 Router 該如何設計,服務提供方的每一次上線、下線都會更新路由信息,這時候你有兩種選擇。一種是通過更新 Router 的一個狀態位來標識,如果這樣做,那么所有訪問該狀態位的地方都需要同步訪問,這樣很影響性能。另外一種就是采用 Immutability 模式,每次上線、下線都創建新的 Router 對象或者刪除對應的 Router 對象。由于上線、下線的頻率很低,所以后者是最好的選擇。
Router 的實現代碼如下所示,是一種典型 Immutability 模式的實現,需要你注意的是我們重寫了 equals 方法,這樣 CopyOnWriteArraySet 的 add() 和 remove() 方法才能正常工作。
//路由信息 public final class Router{private final String ip;private final Integer port;private final String iface;//構造函數public Router(String ip, Integer port, String iface){this.ip = ip;this.port = port;this.iface = iface;}//重寫equals方法public boolean equals(Object obj){if (obj instanceof Router) {Router r = (Router)obj;return iface.equals(r.iface) &&ip.equals(r.ip) &&port.equals(r.port);}return false;}public int hashCode() {//省略hashCode相關代碼} } //路由表信息 public class RouterTable {//Key:接口名//Value:路由集合ConcurrentHashMap<String, CopyOnWriteArraySet<Router>> rt = new ConcurrentHashMap<>();//根據接口名獲取路由表public Set<Router> get(String iface){return rt.get(iface);}//刪除路由public void remove(Router router) {Set<Router> set=rt.get(router.iface);if (set != null) {set.remove(router);}}//增加路由public void add(Router router) {Set<Router> set = rt.computeIfAbsent(route.iface, r -> new CopyOnWriteArraySet<>());set.add(router);} }目前 Copy-on-Write 在 Java 并發編程領域知名度不是很高,很多人都在無意中把它忽視了,但其實 Copy-on-Write 才是最簡單的并發解決方案。它是如此簡單,以至于 Java 中的基本數據類型 String、Integer、Long 等都是基于 Copy-on-Write 方案實現的。
Copy-on-Write 是一項非常通用的技術方案,在很多領域都有著廣泛的應用。不過,它也有缺點的,那就是消耗內存,每次修改都需要復制一個新的對象出來,好在隨著自動垃圾回收(GC)算法的成熟以及硬件的發展,這種內存消耗已經漸漸可以接受了。所以在實際工作中,如果寫操作非常少,那你就可以嘗試用一下 Copy-on-Write,效果還是不錯的。
3、線程本地存儲模式:沒有共享,就沒有傷害
解決并發問題的一個重要方法:避免共享。
我們曾經一遍一遍又一遍地重復,多個線程同時讀寫同一共享變量存在并發問題。前面兩篇文章我們突破的是寫,沒有寫操作自然沒有并發問題了。其實還可以突破共享變量,沒有共享變量也不會有并發問題,正所謂是沒有共享,就沒有傷害。
那如何避免共享呢?思路其實很簡單,多個人爭一個球總容易出矛盾,那就每個人發一個球。對應到并發編程領域,就是每個線程都擁有自己的變量,彼此之間不共享,也就沒有并發問題了。
線程封閉,其本質上就是避免共享。你已經知道通過局部變量可以做到避免共享,那還有沒有其他方法可以做到呢?有的,Java 語言提供的線程本地存儲(ThreadLocal)就能夠做到。下面我們先看看 ThreadLocal 到底該如何使用。
(1)ThreadLocal 的使用方法
下面這個靜態類 ThreadId 會為每個線程分配一個唯一的線程 Id,如果一個線程前后兩次調用 ThreadId 的 get() 方法,兩次 get() 方法的返回值是相同的。但如果是兩個線程分別調用 ThreadId 的 get() 方法,那么兩個線程看到的 get() 方法的返回值是不同的。若你是初次接觸 ThreadLocal,可能會覺得奇怪,為什么相同線程調用 get() 方法結果就相同,而不同線程調用 get() 方法結果就不同呢?
static class ThreadId {static final AtomicLong nextId=new AtomicLong(0);//定義ThreadLocal變量static final ThreadLocal<Long> tl=ThreadLocal.withInitial(()->nextId.getAndIncrement());//此方法會為每個線程分配一個唯一的Idstatic long get(){return tl.get();} }能有這個奇怪的結果,都是 ThreadLocal 的杰作,不過在詳細解釋 ThreadLocal 的工作原理之前,我們再看一個實際工作中可能遇到的例子來加深一下對 ThreadLocal 的理解。你可能知道 SimpleDateFormat 不是線程安全的,那如果需要在并發場景下使用它,你該怎么辦呢?
其實有一個辦法就是用 ThreadLocal 來解決,下面的示例代碼就是 ThreadLocal 解決方案的具體實現,這段代碼與前面 ThreadId 的代碼高度相似,同樣地,不同線程調用 SafeDateFormat 的 get() 方法將返回不同的 SimpleDateFormat 對象實例,由于不同線程并不共享 SimpleDateFormat,所以就像局部變量一樣,是線程安全的。
static class SafeDateFormat {//定義ThreadLocal變量static final ThreadLocal<DateFormat>tl=ThreadLocal.withInitial(()-> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));static DateFormat get(){return tl.get();} } //不同線程執行下面代碼 //返回的df是不同的 DateFormat df =SafeDateFormat.get();通過上面兩個例子,相信你對 ThreadLocal 的用法以及應用場景都了解了,下面我們就來詳細解釋 ThreadLocal 的工作原理。
(2)ThreadLocal 的工作原理
在解釋 ThreadLocal 的工作原理之前, 你先自己想想:如果讓你來實現 ThreadLocal 的功能,你會怎么設計呢?ThreadLocal 的目標是讓不同的線程有不同的變量 V,那最直接的方法就是創建一個 Map,它的 Key 是線程,Value 是每個線程擁有的變量 V,ThreadLocal 內部持有這樣的一個 Map 就可以了。你可以參考下面的示意圖和示例代碼來理解。
class MyThreadLocal<T> {Map<Thread, T> locals = new ConcurrentHashMap<>();//獲取線程變量 T get() {return locals.get(Thread.currentThread());}//設置線程變量void set(T t) {locals.put(Thread.currentThread(), t);} }那 Java 的 ThreadLocal 是這么實現的嗎?這一次我們的設計思路和 Java 的實現差異很大。Java 的實現里面也有一個 Map,叫做 ThreadLocalMap,不過持有 ThreadLocalMap 的不是 ThreadLocal,而是 Thread。Thread 這個類內部有一個私有屬性 threadLocals,其類型就是 ThreadLocalMap,ThreadLocalMap 的 Key 是 ThreadLocal。你可以結合下面的示意圖和精簡之后的 Java 實現代碼來理解。
class Thread {//內部持有ThreadLocalMapThreadLocal.ThreadLocalMap threadLocals; } class ThreadLocal<T>{public T get() {//首先獲取線程持有的//ThreadLocalMapThreadLocalMap map =Thread.currentThread().threadLocals;//在ThreadLocalMap中//查找變量Entry e = map.getEntry(this);return e.value; }static class ThreadLocalMap{//內部是數組而不是MapEntry[] table;//根據ThreadLocal查找EntryEntry getEntry(ThreadLocal key){//省略查找邏輯}//Entry定義static class Entry extendsWeakReference<ThreadLocal>{Object value;}} }初看上去,我們的設計方案和 Java 的實現僅僅是 Map 的持有方不同而已,我們的設計里面 Map 屬于 ThreadLocal,而 Java 的實現里面 ThreadLocalMap 則是屬于 Thread。這兩種方式哪種更合理呢?很顯然 Java 的實現更合理一些。在 Java 的實現方案里面,ThreadLocal 僅僅是一個代理工具類,內部并不持有任何與線程相關的數據,所有和線程相關的數據都存儲在 Thread 里面,這樣的設計容易理解。而從數據的親緣性上來講,ThreadLocalMap 屬于 Thread 也更加合理。
當然還有一個更加深層次的原因,那就是不容易產生內存泄露。在我們的設計方案中,ThreadLocal 持有的 Map 會持有 Thread 對象的引用,這就意味著,只要 ThreadLocal 對象存在,那么 Map 中的 Thread 對象就永遠不會被回收。ThreadLocal 的生命周期往往都比線程要長,所以這種設計方案很容易導致內存泄露。而 Java 的實現中 Thread 持有 ThreadLocalMap,而且 ThreadLocalMap 里對 ThreadLocal 的引用還是弱引用(WeakReference),所以只要 Thread 對象可以被回收,那么 ThreadLocalMap 就能被回收。Java 的這種實現方案雖然看上去復雜一些,但是更加安全。
Java 的 ThreadLocal 實現應該稱得上深思熟慮了,不過即便如此深思熟慮,還是不能百分百地讓程序員避免內存泄露,例如在線程池中使用 ThreadLocal,如果不謹慎就可能導致內存泄露。
(3)ThreadLocal 與內存泄露
在線程池中使用 ThreadLocal 為什么可能導致內存泄露呢?原因就出在線程池中線程的存活時間太長,往往都是和程序同生共死的,這就意味著 Thread 持有的 ThreadLocalMap 一直都不會被回收,再加上 ThreadLocalMap 中的 Entry 對 ThreadLocal 是弱引用(WeakReference),所以只要 ThreadLocal 結束了自己的生命周期是可以被回收掉的。但是 Entry 中的 Value 卻是被 Entry 強引用的,所以即便 Value 的生命周期結束了,Value 也是無法被回收的,從而導致內存泄露。
那在線程池中,我們該如何正確使用 ThreadLocal 呢?其實很簡單,既然 JVM 不能做到自動釋放對 Value 的強引用,那我們手動釋放就可以了。如何能做到手動釋放呢?估計你馬上想到 try{}finally{}方案了,這個簡直就是手動釋放資源的利器。示例的代碼如下,你可以參考學習。
ExecutorService es; ThreadLocal tl; es.execute(()->{//ThreadLocal增加變量tl.set(obj);try {// 省略業務邏輯代碼}finally {//手動清理ThreadLocal tl.remove();} });(4)InheritableThreadLocal 與繼承性
通過 ThreadLocal 創建的線程變量,其子線程是無法繼承的。也就是說你在線程中通過 ThreadLocal 創建了線程變量 V,而后該線程創建了子線程,你在子線程中是無法通過 ThreadLocal 來訪問父線程的線程變量 V 的。
如果你需要子線程繼承父線程的線程變量,那該怎么辦呢?其實很簡單,Java 提供了 InheritableThreadLocal 來支持這種特性,InheritableThreadLocal 是 ThreadLocal 子類,所以用法和 ThreadLocal 相同,這里就不多介紹了。
不過,我完全不建議你在線程池中使用 InheritableThreadLocal,不僅僅是因為它具有 ThreadLocal 相同的缺點——可能導致內存泄露,更重要的原因是:線程池中線程的創建是動態的,很容易導致繼承關系錯亂,如果你的業務邏輯依賴 InheritableThreadLocal,那么很可能導致業務邏輯計算錯誤,而這個錯誤往往比內存泄露更要命。
線程本地存儲模式本質上是一種避免共享的方案,由于沒有共享,所以自然也就沒有并發問題。如果你需要在并發場景中使用一個線程不安全的工具類,最簡單的方案就是避免共享。避免共享有兩種方案,一種方案是將這個工具類作為局部變量使用,另外一種方案就是線程本地存儲模式。這兩種方案,局部變量方案的缺點是在高并發場景下會頻繁創建對象,而線程本地存儲方案,每個線程只需要創建一個工具類的實例,所以不存在頻繁創建對象的問題。
線程本地存儲模式是解決并發問題的常用方案,所以 Java SDK 也提供了相應的實現:ThreadLocal。通過上面我們的分析,你應該能體會到 Java SDK 的實現已經是深思熟慮了,不過即便如此,仍不能盡善盡美,例如在線程池中使用 ThreadLocal 仍可能導致內存泄漏,所以使用 ThreadLocal 還是需要你打起精神,足夠謹慎。
4、Guarded Suspension模式:等待喚醒機制的規范實現
前不久,同事小灰工作中遇到一個問題,他開發了一個 Web 項目:Web 版的文件瀏覽器,通過它用戶可以在瀏覽器里查看服務器上的目錄和文件。這個項目依賴運維部門提供的文件瀏覽服務,而這個文件瀏覽服務只支持消息隊列(MQ)方式接入。消息隊列在互聯網大廠中用的非常多,主要用作流量削峰和系統解耦。在這種接入方式中,發送消息和消費結果這兩個操作之間是異步的,你可以參考下面的示意圖來理解。
在小灰的這個 Web 項目中,用戶通過瀏覽器發過來一個請求,會被轉換成一個異步消息發送給 MQ,等 MQ 返回結果后,再將這個結果返回至瀏覽器。小灰同學的問題是:給 MQ 發送消息的線程是處理 Web 請求的線程 T1,但消費 MQ 結果的線程并不是線程 T1,那線程 T1 如何等待 MQ 的返回結果呢?為了便于你理解這個場景,我將其代碼化了,示例代碼如下。
class Message{String id;String content; } //該方法可以發送消息 void send(Message msg){//省略相關代碼 } //MQ消息返回后會調用該方法 //該方法的執行線程不同于 //發送消息的線程 void onMessage(Message msg){//省略相關代碼 } //處理瀏覽器發來的請求 Respond handleWebReq(){//創建一消息Message msg1 = new Message("1","{...}");//發送消息send(msg1);//如何等待MQ返回的消息呢?String result = ...; }異步轉同步問題嗎?仔細分析,的確是這樣,不過在那一篇文章中我們只是介紹了最終方案,讓你知其然,但是并沒有介紹這個方案是如何設計出來的,今天咱們再仔細聊聊這個問題,讓你知其所以然,遇到類似問題也能自己設計出方案來。
(1)Guarded Suspension 模式
上面小灰遇到的問題,在現實世界里比比皆是,只是我們一不小心就忽略了。比如,項目組團建要外出聚餐,我們提前預訂了一個包間,然后興沖沖地奔過去,到那兒后大堂經理看了一眼包間,發現服務員正在收拾,就會告訴我們:“您預訂的包間服務員正在收拾,請您稍等片刻。”過了一會,大堂經理發現包間已經收拾完了,于是馬上帶我們去包間就餐。
我們等待包間收拾完的這個過程和小灰遇到的等待 MQ 返回消息本質上是一樣的,都是等待一個條件滿足:就餐需要等待包間收拾完,小灰的程序里要等待 MQ 返回消息。
那我們來看看現實世界里是如何解決這類問題的呢?現實世界里大堂經理這個角色很重要,我們是否等待,完全是由他來協調的。通過類比,相信你也一定有思路了:我們的程序里,也需要這樣一個大堂經理。的確是這樣,那程序世界里的大堂經理該如何設計呢?其實設計方案前人早就搞定了,而且還將其總結成了一個設計模式:Guarded Suspension。所謂 Guarded Suspension,直譯過來就是“保護性地暫停”。那下面我們就來看看,Guarded Suspension 模式是如何模擬大堂經理進行保護性地暫停的。
下圖就是 Guarded Suspension 模式的結構圖,非常簡單,一個對象 GuardedObject,內部有一個成員變量——受保護的對象,以及兩個成員方法——get(Predicate p)和onChanged(T obj)方法。其中,對象 GuardedObject 就是我們前面提到的大堂經理,受保護對象就是餐廳里面的包間;受保護對象的 get() 方法對應的是我們的就餐,就餐的前提條件是包間已經收拾好了,參數 p 就是用來描述這個前提條件的;受保護對象的 onChanged() 方法對應的是服務員把包間收拾好了,通過 onChanged() 方法可以 fire 一個事件,而這個事件往往能改變前提條件 p 的計算結果。下圖中,左側的綠色線程就是需要就餐的顧客,而右側的藍色線程就是收拾包間的服務員。
GuardedObject 的內部實現非常簡單,是管程的一個經典用法,你可以參考下面的示例代碼,核心是:get() 方法通過條件變量的 await() 方法實現等待,onChanged() 方法通過條件變量的 signalAll() 方法實現喚醒功能。邏輯還是很簡單的,所以這里就不再詳細介紹了。
class GuardedObject<T>{//受保護的對象T obj;final Lock lock = new ReentrantLock();final Condition done =lock.newCondition();final int timeout=1;//獲取受保護對象 T get(Predicate<T> p) {lock.lock();try {//MESA管程推薦寫法while(!p.test(obj)){done.await(timeout, TimeUnit.SECONDS);}}catch(InterruptedException e){throw new RuntimeException(e);}finally{lock.unlock();}//返回非空的受保護對象return obj;}//事件通知方法void onChanged(T obj) {lock.lock();try {this.obj = obj;done.signalAll();} finally {lock.unlock();}} }(2)擴展 Guarded Suspension 模式
上面我們介紹了 Guarded Suspension 模式及其實現,這個模式能夠模擬現實世界里大堂經理的角色,那現在我們再來看看這個“大堂經理”能否解決小灰同學遇到的問題。
Guarded Suspension 模式里 GuardedObject 有兩個核心方法,一個是 get() 方法,一個是 onChanged() 方法。很顯然,在處理 Web 請求的方法 handleWebReq() 中,可以調用 GuardedObject 的 get() 方法來實現等待;在 MQ 消息的消費方法 onMessage() 中,可以調用 GuardedObject 的 onChanged() 方法來實現喚醒。
//處理瀏覽器發來的請求 Respond handleWebReq(){//創建一消息Message msg1 = new Message("1","{...}");//發送消息send(msg1);//利用GuardedObject實現等待GuardedObject<Message> go=new GuardObjec<>();Message r = go.get(t->t != null); } void onMessage(Message msg){//如何找到匹配的go?GuardedObject<Message> go=???go.onChanged(msg); }但是在實現的時候會遇到一個問題,handleWebReq() 里面創建了 GuardedObject 對象的實例 go,并調用其 get() 方等待結果,那在 onMessage() 方法中,如何才能夠找到匹配的 GuardedObject 對象呢?這個過程類似服務員告訴大堂經理某某包間已經收拾好了,大堂經理如何根據包間找到就餐的人。現實世界里,大堂經理的頭腦中,有包間和就餐人之間的關系圖,所以服務員說完之后大堂經理立刻就能把就餐人找出來。
我們可以參考大堂經理識別就餐人的辦法,來擴展一下 Guarded Suspension 模式,從而使它能夠很方便地解決小灰同學的問題。在小灰的程序中,每個發送到 MQ 的消息,都有一個唯一性的屬性 id,所以我們可以維護一個 MQ 消息 id 和 GuardedObject 對象實例的關系,這個關系可以類比大堂經理大腦里維護的包間和就餐人的關系。
有了這個關系,我們來看看具體如何實現。下面的示例代碼是擴展 Guarded Suspension 模式的實現,擴展后的 GuardedObject 內部維護了一個 Map,其 Key 是 MQ 消息 id,而 Value 是 GuardedObject 對象實例,同時增加了靜態方法 create() 和 fireEvent();create() 方法用來創建一個 GuardedObject 對象實例,并根據 key 值將其加入到 Map 中,而 fireEvent() 方法則是模擬的大堂經理根據包間找就餐人的邏輯。
class GuardedObject<T>{//受保護的對象T obj;final Lock lock = new ReentrantLock();final Condition done =lock.newCondition();final int timeout=2;//保存所有GuardedObjectfinal static Map<Object, GuardedObject> gos=new ConcurrentHashMap<>();//靜態方法創建GuardedObjectstatic <K> GuardedObject create(K key){GuardedObject go=new GuardedObject();gos.put(key, go);return go;}static <K, T> void fireEvent(K key, T obj){GuardedObject go=gos.remove(key);if (go != null){go.onChanged(obj);}}//獲取受保護對象 T get(Predicate<T> p) {lock.lock();try {//MESA管程推薦寫法while(!p.test(obj)){done.await(timeout, TimeUnit.SECONDS);}}catch(InterruptedException e){throw new RuntimeException(e);}finally{lock.unlock();}//返回非空的受保護對象return obj;}//事件通知方法void onChanged(T obj) {lock.lock();try {this.obj = obj;done.signalAll();} finally {lock.unlock();}} }這樣利用擴展后的 GuardedObject 來解決小灰同學的問題就很簡單了,具體代碼如下所示。
//處理瀏覽器發來的請求 Respond handleWebReq(){int id=序號生成器.get();//創建一消息Message msg1 = new Message(id,"{...}");//創建GuardedObject實例GuardedObject<Message> go=GuardedObject.create(id); //發送消息send(msg1);//等待MQ消息Message r = go.get(t->t != null); } void onMessage(Message msg){//喚醒等待的線程GuardedObject.fireEvent(msg.id, msg); }Guarded Suspension 模式本質上是一種等待喚醒機制的實現,只不過 Guarded Suspension 模式將其規范化了。規范化的好處是你無需重頭思考如何實現,也無需擔心實現程序的可理解性問題,同時也能避免一不小心寫出個 Bug 來。但 Guarded Suspension 模式在解決實際問題的時候,往往還是需要擴展的,擴展的方式有很多,本篇文章就直接對 GuardedObject 的功能進行了增強,Dubbo 中 DefaultFuture 這個類也是采用的這種方式,你可以對比著來看,相信對 DefaultFuture 的實現原理會理解得更透徹。當然,你也可以創建新的類來實現對 Guarded Suspension 模式的擴展。
Guarded Suspension 模式也常被稱作 Guarded Wait 模式、Spin Lock 模式(因為使用了 while 循環去等待),這些名字都很形象,不過它還有一個更形象的非官方名字:多線程版本的 if。單線程場景中,if 語句是不需要等待的,因為在只有一個線程的條件下,如果這個線程被阻塞,那就沒有其他活動線程了,這意味著 if 判斷條件的結果也不會發生變化了。但是多線程場景中,等待就變得有意義了,這種場景下,if 判斷條件的結果是可能發生變化的。所以,用“多線程版本的 if”來理解這個模式會更簡單。
5、Balking模式:再談線程安全的單例模式
“多線程版本的 if”來理解 Guarded Suspension 模式,不同于單線程中的 if,這個“多線程版本的 if”是需要等待的,而且還很執著,必須要等到條件為真。但很顯然這個世界,不是所有場景都需要這么執著,有時候我們還需要快速放棄。
需要快速放棄的一個最常見的例子是各種編輯器提供的自動保存功能。自動保存功能的實現邏輯一般都是隔一定時間自動執行存盤操作,存盤操作的前提是文件做過修改,如果文件沒有執行過修改操作,就需要快速放棄存盤操作。下面的示例代碼將自動保存功能代碼化了,很顯然 AutoSaveEditor 這個類不是線程安全的,因為對共享變量 changed 的讀寫沒有使用同步,那如何保證 AutoSaveEditor 的線程安全性呢?
class AutoSaveEditor{//文件是否被修改過boolean changed=false;//定時任務線程池ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();//定時執行自動保存void startAutoSave(){ses.scheduleWithFixedDelay(()->{autoSave();}, 5, 5, TimeUnit.SECONDS); }//自動存盤操作void autoSave(){if (!changed) {return;}changed = false;//執行存盤操作//省略且實現this.execSave();}//編輯操作void edit(){//省略編輯邏輯......changed = true;} }解決這個問題相信你一定手到擒來了:讀寫共享變量 changed 的方法 autoSave() 和 edit() 都加互斥鎖就可以了。這樣做雖然簡單,但是性能很差,原因是鎖的范圍太大了。那我們可以將鎖的范圍縮小,只在讀寫共享變量 changed 的地方加鎖,實現代碼如下所示。
//自動存盤操作 void autoSave(){synchronized(this){if (!changed) {return;}changed = false;}//執行存盤操作//省略且實現this.execSave(); } //編輯操作 void edit(){//省略編輯邏輯......synchronized(this){changed = true;} }如果你深入地分析一下這個示例程序,你會發現,示例中的共享變量是一個狀態變量,業務邏輯依賴于這個狀態變量的狀態:當狀態滿足某個條件時,執行某個業務邏輯,其本質其實不過就是一個 if 而已,放到多線程場景里,就是一種“多線程版本的 if”。這種“多線程版本的 if”的應用場景還是很多的,所以也有人把它總結成了一種設計模式,叫做 Balking 模式。
(1)Balking 模式的經典實現
Balking 模式本質上是一種規范化地解決“多線程版本的 if”的方案,對于上面自動保存的例子,使用 Balking 模式規范化之后的寫法如下所示,你會發現僅僅是將 edit() 方法中對共享變量 changed 的賦值操作抽取到了 change() 中,這樣的好處是將并發處理邏輯和業務邏輯分開。
boolean changed=false; //自動存盤操作 void autoSave(){synchronized(this){if (!changed) {return;}changed = false;}//執行存盤操作//省略且實現this.execSave(); } //編輯操作 void edit(){//省略編輯邏輯......change(); } //改變狀態 void change(){synchronized(this){changed = true;} }(2)用 volatile 實現 Balking 模式
前面我們用 synchronized 實現了 Balking 模式,這種實現方式最為穩妥,建議你實際工作中也使用這個方案。不過在某些特定場景下,也可以使用 volatile 來實現,但使用 volatile 的前提是對原子性沒有要求。
有一個 RPC 框架路由表的案例,在 RPC 框架中,本地路由表是要和注冊中心進行信息同步的,應用啟動的時候,會將應用依賴服務的路由表從注冊中心同步到本地路由表中,如果應用重啟的時候注冊中心宕機,那么會導致該應用依賴的服務均不可用,因為找不到依賴服務的路由表。為了防止這種極端情況出現,RPC 框架可以將本地路由表自動保存到本地文件中,如果重啟的時候注冊中心宕機,那么就從本地文件中恢復重啟前的路由表。這其實也是一種降級的方案。
自動保存路由表和前面介紹的編輯器自動保存原理是一樣的,也可以用 Balking 模式實現,不過我們這里采用 volatile 來實現,實現的代碼如下所示。之所以可以采用 volatile 來實現,是因為對共享變量 changed 和 rt 的寫操作不存在原子性的要求,而且采用 scheduleWithFixedDelay() 這種調度方式能保證同一時刻只有一個線程執行 autoSave() 方法。
//路由表信息 public class RouterTable {//Key:接口名//Value:路由集合ConcurrentHashMap<String, CopyOnWriteArraySet<Router>> rt = new ConcurrentHashMap<>(); //路由表是否發生變化volatile boolean changed;//將路由表寫入本地文件的線程池ScheduledExecutorService ses=Executors.newSingleThreadScheduledExecutor();//啟動定時任務//將變更后的路由表寫入本地文件public void startLocalSaver(){ses.scheduleWithFixedDelay(()->{autoSave();}, 1, 1, MINUTES);}//保存路由表到本地文件void autoSave() {if (!changed) {return;}changed = false;//將路由表寫入本地文件//省略其方法實現this.save2Local();}//刪除路由public void remove(Router router) {Set<Router> set=rt.get(router.iface);if (set != null) {set.remove(router);//路由表已發生變化changed = true;}}//增加路由public void add(Router router) {Set<Router> set = rt.computeIfAbsent(route.iface, r -> new CopyOnWriteArraySet<>());set.add(router);//路由表已發生變化changed = true;} }Balking 模式有一個非常典型的應用場景就是單次初始化,下面的示例代碼是它的實現。這個實現方案中,我們將 init() 聲明為一個同步方法,這樣同一個時刻就只有一個線程能夠執行 init() 方法;init() 方法在第一次執行完時會將 inited 設置為 true,這樣后續執行 init() 方法的線程就不會再執行 doInit() 了。
class InitTest{boolean inited = false;synchronized void init(){if(inited){return;}//省略doInit的實現doInit();inited=true;} }線程安全的單例模式本質上其實也是單次初始化,所以可以用 Balking 模式來實現線程安全的單例模式,下面的示例代碼是其實現。這個實現雖然功能上沒有問題,但是性能卻很差,因為互斥鎖 synchronized 將 getInstance() 方法串行化了,那有沒有辦法可以優化一下它的性能呢?
class Singleton{private staticSingleton singleton;//構造方法私有化 private Singleton(){}//獲取實例(單例)public synchronized static Singleton getInstance(){if(singleton == null){singleton=new Singleton();}return singleton;} }辦法當然是有的,那就是經典的雙重檢查(Double Check)方案,下面的示例代碼是其詳細實現。在雙重檢查方案中,一旦 Singleton 對象被成功創建之后,就不會執行 synchronized(Singleton.class){}相關的代碼,也就是說,此時 getInstance() 方法的執行路徑是無鎖的,從而解決了性能問題。不過需要你注意的是,這個方案中使用了 volatile 來禁止編譯優化,其原因你可以參考《01 | 可見性、原子性和有序性問題:并發編程 Bug 的源頭》中相關的內容。至于獲取鎖后的二次檢查,則是出于對安全性負責。
class Singleton{private static volatile Singleton singleton;//構造方法私有化 private Singleton() {}//獲取實例(單例)public static Singleton getInstance() {//第一次檢查if(singleton==null){synchronize(Singleton.class){//獲取鎖后二次檢查if(singleton==null){singleton=new Singleton();}}}return singleton;} }Balking 模式和 Guarded Suspension 模式從實現上看似乎沒有多大的關系,Balking 模式只需要用互斥鎖就能解決,而 Guarded Suspension 模式則要用到管程這種高級的并發原語;但是從應用的角度來看,它們解決的都是“線程安全的 if”語義,不同之處在于,Guarded Suspension 模式會等待 if 條件為真,而 Balking 模式不會等待。
Balking 模式的經典實現是使用互斥鎖,你可以使用 Java 語言內置 synchronized,也可以使用 SDK 提供 Lock;如果你對互斥鎖的性能不滿意,可以嘗試采用 volatile 方案,不過使用 volatile 方案需要你更加謹慎。
當然你也可以嘗試使用雙重檢查方案來優化性能,雙重檢查中的第一次檢查,完全是出于對性能的考量:避免執行加鎖操作,因為加鎖操作很耗時。而加鎖之后的二次檢查,則是出于對安全性負責。雙重檢查方案在優化加鎖性能方面經常用到,例如實現緩存按需加載功能時,也用到了雙重檢查方案。
6、Thread-Per-Message模式:最簡單實用的分工方法
我們曾經把并發編程領域的問題總結為三個核心問題:分工、同步和互斥。其中,同步和互斥相關問題更多地源自微觀,而分工問題則是源自宏觀。我們解決問題,往往都是從宏觀入手,在編程領域,軟件的設計過程也是先從概要設計開始,而后才進行詳細設計。同樣,解決并發編程問題,首要問題也是解決宏觀的分工問題。
并發編程領域里,解決分工問題也有一系列的設計模式,比較常用的主要有 Thread-Per-Message 模式、Worker Thread 模式、生產者 - 消費者模式等等。今天我們重點介紹 Thread-Per-Message 模式。
(1)如何理解 Thread-Per-Message 模式
現實世界里,很多事情我們都需要委托他人辦理,一方面受限于我們的能力,總有很多搞不定的事,比如教育小朋友,搞不定怎么辦呢?只能委托學校老師了;另一方面受限于我們的時間,比如忙著寫 Bug,哪有時間買別墅呢?只能委托房產中介了。委托他人代辦有一個非常大的好處,那就是可以專心做自己的事了。
在編程領域也有很多類似的需求,比如寫一個 HTTP Server,很顯然只能在主線程中接收請求,而不能處理 HTTP 請求,因為如果在主線程中處理 HTTP 請求的話,那同一時間只能處理一個請求,太慢了!怎么辦呢?可以利用代辦的思路,創建一個子線程,委托子線程去處理 HTTP 請求。
這種委托他人辦理的方式,在并發編程領域被總結為一種設計模式,叫做 Thread-Per-Message 模式,簡言之就是為每個任務分配一個獨立的線程。這是一種最簡單的分工方法,實現起來也非常簡單。
(2)用 Thread 實現 Thread-Per-Message 模式
Thread-Per-Message 模式的一個最經典的應用場景是網絡編程里服務端的實現,服務端為每個客戶端請求創建一個獨立的線程,當線程處理完請求后,自動銷毀,這是一種最簡單的并發處理網絡請求的方法。
網絡編程里最簡單的程序當數 echo 程序了,echo 程序的服務端會原封不動地將客戶端的請求發送回客戶端。例如,客戶端發送 TCP 請求"Hello World",那么服務端也會返回"Hello World"。
下面我們就以 echo 程序的服務端為例,介紹如何實現 Thread-Per-Message 模式。
在 Java 語言中,實現 echo 程序的服務端還是很簡單的。只需要 30 行代碼就能夠實現,示例代碼如下,我們為每個請求都創建了一個 Java 線程,核心代碼是:new Thread(()->{...}).start()。
final ServerSocketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(8080)); //處理請求 try {while (true) {// 接收請求SocketChannel sc = ssc.accept();// 每個請求都創建一個線程new Thread(()->{try {// 讀SocketByteBuffer rb = ByteBuffer.allocateDirect(1024);sc.read(rb);//模擬處理請求Thread.sleep(2000);// 寫SocketByteBuffer wb = (ByteBuffer)rb.flip();sc.write(wb);// 關閉Socketsc.close();}catch(Exception e){throw new UncheckedIOException(e);}}).start();} } finally {ssc.close(); }如果你熟悉網絡編程,相信你一定會提出一個很尖銳的問題:上面這個 echo 服務的實現方案是不具備可行性的。原因在于 Java 中的線程是一個重量級的對象,創建成本很高,一方面創建線程比較耗時,另一方面線程占用的內存也比較大。所以,為每個請求創建一個新的線程并不適合高并發場景。
于是,你開始質疑 Thread-Per-Message 模式,而且開始重新思索解決方案,這時候很可能你會想到 Java 提供的線程池。你的這個思路沒有問題,但是引入線程池難免會增加復雜度。其實你完全可以換一個角度來思考這個問題,語言、工具、框架本身應該是幫助我們更敏捷地實現方案的,而不是用來否定方案的,Thread-Per-Message 模式作為一種最簡單的分工方案,Java 語言支持不了,顯然是 Java 語言本身的問題。
Java 語言里,Java 線程是和操作系統線程一一對應的,這種做法本質上是將 Java 線程的調度權完全委托給操作系統,而操作系統在這方面非常成熟,所以這種做法的好處是穩定、可靠,但是也繼承了操作系統線程的缺點:創建成本高。為了解決這個缺點,Java 并發包里提供了線程池等工具類。這個思路在很長一段時間里都是很穩妥的方案,但是這個方案并不是唯一的方案。
業界還有另外一種方案,叫做輕量級線程。這個方案在 Java 領域知名度并不高,但是在其他編程語言里卻叫得很響,例如 Go 語言、Lua 語言里的協程,本質上就是一種輕量級的線程。輕量級的線程,創建的成本很低,基本上和創建一個普通對象的成本相似;并且創建的速度和內存占用相比操作系統線程至少有一個數量級的提升,所以基于輕量級線程實現 Thread-Per-Message 模式就完全沒有問題了。
Java 語言目前也已經意識到輕量級線程的重要性了,OpenJDK 有個 Loom 項目,就是要解決 Java 語言的輕量級線程問題,在這個項目中,輕量級線程被叫做 Fiber。下面我們就來看看基于 Fiber 如何實現 Thread-Per-Message 模式。
(3)用 Fiber 實現 Thread-Per-Message 模式
Loom 項目在設計輕量級線程時,充分考量了當前 Java 線程的使用方式,采取的是盡量兼容的態度,所以使用上還是挺簡單的。用 Fiber 實現 echo 服務的示例代碼如下所示,對比 Thread 的實現,你會發現改動量非常小,只需要把 new Thread(()->{...}).start() 換成 Fiber.schedule(()->{}) 就可以了。
final ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080)); //處理請求 try{while (true) {// 接收請求final SocketChannel sc = ssc.accept();Fiber.schedule(()->{try {// 讀SocketByteBuffer rb = ByteBuffer.allocateDirect(1024);sc.read(rb);//模擬處理請求LockSupport.parkNanos(2000*1000000);// 寫SocketByteBuffer wb = (ByteBuffer)rb.flip()sc.write(wb);// 關閉Socketsc.close();} catch(Exception e){throw new UncheckedIOException(e);}});}//while }finally{ssc.close(); }那使用 Fiber 實現的 echo 服務是否能夠達到預期的效果呢?我們可以在 Linux 環境下做一個簡單的實驗,步驟如下:
1、首先通過 ulimit -u 512 將用戶能創建的最大進程數(包括線程)設置為 512;
2、啟動 Fiber 實現的 echo 程序;
3、利用壓測工具 ab 進行壓測:ab -r -c 20000 -n 200000 http:// 測試機 IP 地址:8080/
壓測執行結果如下:
Concurrency Level: 20000 Time taken for tests: 67.718 seconds Complete requests: 200000 Failed requests: 0 Write errors: 0 Non-2xx responses: 200000 Total transferred: 16400000 bytes HTML transferred: 0 bytes Requests per second: 2953.41 [#/sec] (mean) Time per request: 6771.844 [ms] (mean) Time per request: 0.339 [ms] (mean, across all concurrent requests) Transfer rate: 236.50 [Kbytes/sec] receivedConnection Times (ms)min mean[+/-sd] median max Connect: 0 557 3541.6 1 63127 Processing: 2000 2010 31.8 2003 2615 Waiting: 1986 2008 30.9 2002 2615 Total: 2000 2567 3543.9 2004 65293你會發現即便在 20000 并發下,該程序依然能夠良好運行。同等條件下,Thread 實現的 echo 程序 512 并發都抗不過去,直接就 OOM 了。
如果你通過 Linux 命令 top -Hp pid 查看 Fiber 實現的 echo 程序的進程信息,你可以看到該進程僅僅創建了 16(不同 CPU 核數結果會不同)個操作系統線程。
并發編程領域的分工問題,指的是如何高效地拆解任務并分配給線程。前面我們在并發工具類模塊中已經介紹了不少解決分工問題的工具類,例如 Future、CompletableFuture 、CompletionService、Fork/Join 計算框架等,這些工具類都能很好地解決特定應用場景的問題,所以,這些工具類曾經是 Java 語言引以為傲的。不過這些工具類都繼承了 Java 語言的老毛病:太復雜。
如果你一直從事 Java 開發,估計你已經習以為常了,習慣性地認為這個復雜度是正常的。不過這個世界時刻都在變化,曾經正常的復雜度,現在看來也許就已經沒有必要了,例如 Thread-Per-Message 模式如果使用線程池方案就會增加復雜度。
Thread-Per-Message 模式在 Java 領域并不是那么知名,根本原因在于 Java 語言里的線程是一個重量級的對象,為每一個任務創建一個線程成本太高,尤其是在高并發領域,基本就不具備可行性。不過這個背景條件目前正在發生巨變,Java 語言未來一定會提供輕量級線程,這樣基于輕量級線程實現 Thread-Per-Message 模式就是一個非常靠譜的選擇。
當然,對于一些并發度沒那么高的異步場景,例如定時任務,采用 Thread-Per-Message 模式是完全沒有問題的。實際工作中,我就見過完全基于 Thread-Per-Message 模式實現的分布式調度框架,這個框架為每個定時任務都分配了一個獨立的線程。
7、Worker Thread模式:如何避免重復創建線程?
我們介紹了一種最簡單的分工模式——Thread-Per-Message 模式,對應到現實世界,其實就是委托代辦。這種分工模式如果用 Java Thread 實現,頻繁地創建、銷毀線程非常影響性能,同時無限制地創建線程還可能導致 OOM,所以在 Java 領域使用場景就受限了。
要想有效避免線程的頻繁創建、銷毀以及 OOM 問題,就不得不提今天我們要細聊的,也是 Java 領域使用最多的 Worker Thread 模式。
(1)Worker Thread 模式及其實現
Worker Thread 模式可以類比現實世界里車間的工作模式:車間里的工人,有活兒了,大家一起干,沒活兒了就聊聊天等著。你可以參考下面的示意圖來理解,Worker Thread 模式中 Worker Thread 對應到現實世界里,其實指的就是車間里的工人。不過這里需要注意的是,車間里的工人數量往往是確定的。
那在編程領域該如何模擬車間的這種工作模式呢?或者說如何去實現 Worker Thread 模式呢?通過上面的圖,你很容易就能想到用阻塞隊列做任務池,然后創建固定數量的線程消費阻塞隊列中的任務。其實你仔細想會發現,這個方案就是 Java 語言提供的線程池。
線程池有很多優點,例如能夠避免重復創建、銷毀線程,同時能夠限制創建線程的上限等等。學習完上一篇文章后你已經知道,用 Java 的 Thread 實現 Thread-Per-Message 模式難以應對高并發場景,原因就在于頻繁創建、銷毀 Java 線程的成本有點高,而且無限制地創建線程還可能導致應用 OOM。線程池,則恰好能解決這些問題。
那我們還是以 echo 程序為例,看看如何用線程池來實現。
下面的示例代碼是用線程池實現的 echo 服務端,相比于 Thread-Per-Message 模式的實現,改動非常少,僅僅是創建了一個最多線程數為 500 的線程池 es,然后通過 es.execute() 方法將請求處理的任務提交給線程池處理。
ExecutorService es = Executors.newFixedThreadPool(500); final ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080)); //處理請求 try {while (true) {// 接收請求SocketChannel sc = ssc.accept();// 將請求處理任務提交給線程池es.execute(()->{try {// 讀SocketByteBuffer rb = ByteBuffer.allocateDirect(1024);sc.read(rb);//模擬處理請求Thread.sleep(2000);// 寫SocketByteBuffer wb = (ByteBuffer)rb.flip();sc.write(wb);// 關閉Socketsc.close();}catch(Exception e){throw new UncheckedIOException(e);}});} } finally {ssc.close();es.shutdown(); }(1)正確地創建線程池
① Java 的線程池既能夠避免無限制地創建線程導致 OOM,也能避免無限制地接收任務導致 OOM。只不過后者經常容易被我們忽略,例如在上面的實現中,就被我們忽略了。所以強烈建議你用創建有界的隊列來接收任務。
② 當請求量大于有界隊列的容量時,就需要合理地拒絕請求。如何合理地拒絕呢?這需要你結合具體的業務場景來制定,即便線程池默認的拒絕策略能夠滿足你的需求,也同樣建議你在創建線程池時,清晰地指明拒絕策略。
③ 同時,為了便于調試和診斷問題,我也強烈建議你在實際工作中給線程賦予一個業務相關的名字。
綜合以上這三點建議,echo 程序中創建線程可以使用下面的示例代碼。
ExecutorService es = new ThreadPoolExecutor(50, 500,60L, TimeUnit.SECONDS,//注意要創建有界隊列new LinkedBlockingQueue<Runnable>(2000),//建議根據業務需求實現ThreadFactoryr->{return new Thread(r, "echo-"+ r.hashCode());},//建議根據業務需求實現RejectedExecutionHandlernew ThreadPoolExecutor.CallerRunsPolicy());(2)避免線程死鎖
使用線程池過程中,還要注意一種線程死鎖的場景。如果提交到相同線程池的任務不是相互獨立的,而是有依賴關系的,那么就有可能導致線程死鎖。實際工作中,我就親歷過這種線程死鎖的場景。具體現象是應用每運行一段時間偶爾就會處于無響應的狀態,監控數據看上去一切都正常,但是實際上已經不能正常工作了。
這個出問題的應用,相關的邏輯精簡之后,如下圖所示,該應用將一個大型的計算任務分成兩個階段,第一個階段的任務會等待第二階段的子任務完成。在這個應用里,每一個階段都使用了線程池,而且兩個階段使用的還是同一個線程池。
我們可以用下面的示例代碼來模擬該應用,如果你執行下面的這段代碼,會發現它永遠執行不到最后一行。執行過程中沒有任何異常,但是應用已經停止響應了。
//L1、L2階段共用的線程池 ExecutorService es = Executors.newFixedThreadPool(2); //L1階段的閉鎖 CountDownLatch l1=new CountDownLatch(2); for (int i=0; i<2; i++){System.out.println("L1");//執行L1階段任務es.execute(()->{//L2階段的閉鎖 CountDownLatch l2=new CountDownLatch(2);//執行L2階段子任務for (int j=0; j<2; j++){es.execute(()->{System.out.println("L2");l2.countDown();});}//等待L2階段任務執行完l2.await();l1.countDown();}); } //等著L1階段任務執行完 l1.await(); System.out.println("end");當應用出現類似問題時,首選的診斷方法是查看線程棧。下圖是上面示例代碼停止響應后的線程棧,你會發現線程池中的兩個線程全部都阻塞在 l2.await(); 這行代碼上了,也就是說,線程池里所有的線程都在等待 L2 階段的任務執行完,那 L2 階段的子任務什么時候能夠執行完呢?永遠都沒那一天了,為什么呢?因為線程池里的線程都阻塞了,沒有空閑的線程執行 L2 階段的任務了。
原因找到了,那如何解決就簡單了,最簡單粗暴的辦法就是將線程池的最大線程數調大,如果能夠確定任務的數量不是非常多的話,這個辦法也是可行的,否則這個辦法就行不通了。其實這種問題通用的解決方案是為不同的任務創建不同的線程池。對于上面的這個應用,L1 階段的任務和 L2 階段的任務如果各自都有自己的線程池,就不會出現這種問題了。
最后再次強調一下:提交到相同線程池中的任務一定是相互獨立的,否則就一定要慎重。
我們曾經說過,解決并發編程里的分工問題,最好的辦法是和現實世界做對比。對比現實世界構建編程領域的模型,能夠讓模型更容易理解。上一篇我們介紹的 Thread-Per-Message 模式,類似于現實世界里的委托他人辦理,而今天介紹的 Worker Thread 模式則類似于車間里工人的工作模式。如果你在設計階段,發現對業務模型建模之后,模型非常類似于車間的工作模式,那基本上就能確定可以在實現階段采用 Worker Thread 模式來實現。
Worker Thread 模式和 Thread-Per-Message 模式的區別有哪些呢?從現實世界的角度看,你委托代辦人做事,往往是和代辦人直接溝通的;對應到編程領域,其實現也是主線程直接創建了一個子線程,主子線程之間是可以直接通信的。而車間工人的工作方式則是完全圍繞任務展開的,一個具體的任務被哪個工人執行,預先是無法知道的;對應到編程領域,則是主線程提交任務到線程池,但主線程并不關心任務被哪個線程執行。
Worker Thread 模式能避免線程頻繁創建、銷毀的問題,而且能夠限制線程的最大數量。Java 語言里可以直接使用線程池來實現 Worker Thread 模式,線程池是一個非常基礎和優秀的工具類,甚至有些大廠的編碼規范都不允許用 new Thread() 來創建線程的,必須使用線程池。
不過使用線程池還是需要格外謹慎的,除了今天重點講到的如何正確創建線程池、如何避免線程死鎖問題,還需要注意前面我們曾經提到的 ThreadLocal 內存泄露問題。同時對于提交到線程池的任務,還要做好異常處理,避免異常的任務從眼前溜走,從業務的角度看,有時沒有發現異常的任務后果往往都很嚴重。
8、兩階段終止模式:如何優雅地終止線程?
從純技術的角度看,都是啟動多線程去執行一個異步任務。既啟動,那又該如何終止呢?今天咱們就從技術的角度聊聊如何優雅地終止線程,正所謂有始有終。
線程執行完或者出現異常就會進入終止狀態。這樣看,終止一個線程看上去很簡單啊!一個線程執行完自己的任務,自己進入終止狀態,這的確很簡單。不過我們今天談到的“優雅地終止線程”,不是自己終止自己,而是在一個線程 T1 中,終止線程 T2;這里所謂的“優雅”,指的是給 T2 一個機會料理后事,而不是被一劍封喉。
Java 語言的 Thread 類中曾經提供了一個 stop() 方法,用來終止線程,可是早已不建議使用了,原因是這個方法用的就是一劍封喉的做法,被終止的線程沒有機會料理后事。
既然不建議使用 stop() 方法,那在 Java 領域,我們又該如何優雅地終止線程呢?
(1)如何理解兩階段終止模式
前輩們經過認真對比分析,已經總結出了一套成熟的方案,叫做兩階段終止模式。顧名思義,就是將終止過程分成兩個階段,其中第一個階段主要是線程 T1 向線程 T2發送終止指令,而第二階段則是線程 T2響應終止指令。
那在 Java 語言里,終止指令是什么呢?這個要從 Java 線程的狀態轉換過程說起。Java 線程的狀態轉換圖,如下圖所示。
從這個圖里你會發現,Java 線程進入終止狀態的前提是線程進入 RUNNABLE 狀態,而實際上線程也可能處在休眠狀態,也就是說,我們要想終止一個線程,首先要把線程的狀態從休眠狀態轉換到 RUNNABLE 狀態。如何做到呢?這個要靠 Java Thread 類提供的 interrupt() 方法,它可以將休眠狀態的線程轉換到 RUNNABLE 狀態。
線程轉換到 RUNNABLE 狀態之后,我們如何再將其終止呢?RUNNABLE 狀態轉換到終止狀態,優雅的方式是讓 Java 線程自己執行完 run() 方法,所以一般我們采用的方法是設置一個標志位,然后線程會在合適的時機檢查這個標志位,如果發現符合終止條件,則自動退出 run() 方法。這個過程其實就是我們前面提到的第二階段:響應終止指令。
綜合上面這兩點,我們能總結出終止指令,其實包括兩方面內容:interrupt() 方法和線程終止的標志位。
理解了兩階段終止模式之后,下面我們看一個實際工作中的案例。
(2)用兩階段終止模式終止監控操作
實際工作中,有些監控系統需要動態地采集一些數據,一般都是監控系統發送采集指令給被監控系統的監控代理,監控代理接收到指令之后,從監控目標收集數據,然后回傳給監控系統,詳細過程如下圖所示。出于對性能的考慮(有些監控項對系統性能影響很大,所以不能一直持續監控),動態采集功能一般都會有終止操作。
下面的示例代碼是監控代理簡化之后的實現,start() 方法會啟動一個新的線程 rptThread 來執行監控數據采集和回傳的功能,stop() 方法需要優雅地終止線程 rptThread,那 stop() 相關功能該如何實現呢?
class Proxy {boolean started = false;//采集線程Thread rptThread;//啟動采集功能synchronized void start(){//不允許同時啟動多個采集線程if (started) {return;}started = true;rptThread = new Thread(()->{while (true) {//省略采集、回傳實現report();//每隔兩秒鐘采集、回傳一次數據try {Thread.sleep(2000);} catch (InterruptedException e) { }}//執行到此處說明線程馬上終止started = false;});rptThread.start();}//終止采集功能synchronized void stop(){//如何實現?} }按照兩階段終止模式,我們首先需要做的就是將線程 rptThread 狀態轉換到 RUNNABLE,做法很簡單,只需要在調用 rptThread.interrupt() 就可以了。線程 rptThread 的狀態轉換到 RUNNABLE 之后,如何優雅地終止呢?下面的示例代碼中,我們選擇的標志位是線程的中斷狀態:Thread.currentThread().isInterrupted() ,需要注意的是,我們在捕獲 Thread.sleep() 的中斷異常之后,通過 Thread.currentThread().interrupt() 重新設置了線程的中斷狀態,因為 JVM 的異常處理會清除線程的中斷狀態。
class Proxy {boolean started = false;//采集線程Thread rptThread;//啟動采集功能synchronized void start(){//不允許同時啟動多個采集線程if (started) {return;}started = true;rptThread = new Thread(()->{while (!Thread.currentThread().isInterrupted()){//省略采集、回傳實現report();//每隔兩秒鐘采集、回傳一次數據try {Thread.sleep(2000);} catch (InterruptedException e){//重新設置線程中斷狀態Thread.currentThread().interrupt();}}//執行到此處說明線程馬上終止started = false;});rptThread.start();}//終止采集功能synchronized void stop(){rptThread.interrupt();} }上面的示例代碼的確能夠解決當前的問題,但是建議你在實際工作中謹慎使用。原因在于我們很可能在線程的 run() 方法中調用第三方類庫提供的方法,而我們沒有辦法保證第三方類庫正確處理了線程的中斷異常,例如第三方類庫在捕獲到 Thread.sleep() 方法拋出的中斷異常后,沒有重新設置線程的中斷狀態,那么就會導致線程不能夠正常終止。所以強烈建議你設置自己的線程終止標志位,例如在下面的代碼中,使用 isTerminated 作為線程終止標志位,此時無論是否正確處理了線程的中斷異常,都不會影響線程優雅地終止。
class Proxy {//線程終止標志位volatile boolean terminated = false;boolean started = false;//采集線程Thread rptThread;//啟動采集功能synchronized void start(){//不允許同時啟動多個采集線程if (started) {return;}started = true;terminated = false;rptThread = new Thread(()->{while (!terminated){//省略采集、回傳實現report();//每隔兩秒鐘采集、回傳一次數據try {Thread.sleep(2000);} catch (InterruptedException e){//重新設置線程中斷狀態Thread.currentThread().interrupt();}}//執行到此處說明線程馬上終止started = false;});rptThread.start();}//終止采集功能synchronized void stop(){//設置中斷標志位terminated = true;//中斷線程rptThreadrptThread.interrupt();} }(3)如何優雅地終止線程池
Java 領域用的最多的還是線程池,而不是手動地創建線程。那我們該如何優雅地終止線程池呢?
線程池提供了兩個方法:shutdown()和shutdownNow()。這兩個方法有什么區別呢?要了解它們的區別,就先需要了解線程池的實現原理。
我們曾經講過,Java 線程池是生產者 - 消費者模式的一種實現,提交給線程池的任務,首先是進入一個阻塞隊列中,之后線程池中的線程從阻塞隊列中取出任務執行。
shutdown() 方法是一種很保守的關閉線程池的方法。線程池執行 shutdown() 后,就會拒絕接收新的任務,但是會等待線程池中正在執行的任務和已經進入阻塞隊列的任務都執行完之后才最終關閉線程池。
而 shutdownNow() 方法,相對就激進一些了,線程池執行 shutdownNow() 后,會拒絕接收新的任務,同時還會中斷線程池中正在執行的任務,已經進入阻塞隊列的任務也被剝奪了執行的機會,不過這些被剝奪執行機會的任務會作為 shutdownNow() 方法的返回值返回。因為 shutdownNow() 方法會中斷正在執行的線程,所以提交到線程池的任務,如果需要優雅地結束,就需要正確地處理線程中斷。
如果提交到線程池的任務不允許取消,那就不能使用 shutdownNow() 方法終止線程池。不過,如果提交到線程池的任務允許后續以補償的方式重新執行,也是可以使用 shutdownNow() 方法終止線程池的。《Java 并發編程實戰》這本書第 7 章《取消與關閉》的“shutdownNow 的局限性”一節中,提到一種將已提交但尚未開始執行的任務以及已經取消的正在執行的任務保存起來,以便后續重新執行的方案,你可以參考一下,方案很簡單,這里就不詳細介紹了。
其實分析完 shutdown() 和 shutdownNow() 方法你會發現,它們實質上使用的也是兩階段終止模式,只是終止指令的范圍不同而已,前者只影響阻塞隊列接收任務,后者范圍擴大到線程池中所有的任務。
兩階段終止模式是一種應用很廣泛的并發設計模式,在 Java 語言中使用兩階段終止模式來優雅地終止線程,需要注意兩個關鍵點:一個是僅檢查終止標志位是不夠的,因為線程的狀態可能處于休眠態;另一個是僅檢查線程的中斷狀態也是不夠的,因為我們依賴的第三方類庫很可能沒有正確處理中斷異常。
當你使用 Java 的線程池來管理線程的時候,需要依賴線程池提供的 shutdown() 和 shutdownNow() 方法來終止線程池。不過在使用時需要注意它們的應用場景,尤其是在使用 shutdownNow() 的時候,一定要謹慎。
9、生產者-消費者模式:用流水線思想提高效率
Worker Thread 模式類比的是工廠里車間工人的工作模式。但其實在現實世界,工廠里還有一種流水線的工作模式,類比到編程領域,就是生產者 - 消費者模式。
生產者 - 消費者模式在編程領域的應用也非常廣泛,前面我們曾經提到,Java 線程池本質上就是用生產者 - 消費者模式實現的,所以每當使用線程池的時候,其實就是在應用生產者 - 消費者模式。
當然,除了在線程池中的應用,為了提升性能,并發編程領域很多地方也都用到了生產者 - 消費者模式,例如 Log4j2 中異步 Appender 內部也用到了生產者 - 消費者模式。所以今天我們就來深入地聊聊生產者 - 消費者模式,看看它具體有哪些優點,以及如何提升系統的性能。
(1)生產者 - 消費者模式的優點
生產者 - 消費者模式的核心是一個任務隊列,生產者線程生產任務,并將任務添加到任務隊列中,而消費者線程從任務隊列中獲取任務并執行。下面是生產者 - 消費者模式的一個示意圖,你可以結合它來理解。
從架構設計的角度來看,生產者 - 消費者模式有一個很重要的優點,就是解耦。解耦對于大型系統的設計非常重要,而解耦的一個關鍵就是組件之間的依賴關系和通信方式必須受限。在生產者 - 消費者模式中,生產者和消費者沒有任何依賴關系,它們彼此之間的通信只能通過任務隊列,所以生產者 - 消費者模式是一個不錯的解耦方案。
除了架構設計上的優點之外,生產者 - 消費者模式還有一個重要的優點就是支持異步,并且能夠平衡生產者和消費者的速度差異。在生產者 - 消費者模式中,生產者線程只需要將任務添加到任務隊列而無需等待任務被消費者線程執行完,也就是說任務的生產和消費是異步的,這是與傳統的方法之間調用的本質區別,傳統的方法之間調用是同步的。
你或許會有這樣的疑問,異步化處理最簡單的方式就是創建一個新的線程去處理,那中間增加一個“任務隊列”究竟有什么用呢?我覺得主要還是用于平衡生產者和消費者的速度差異。我們假設生產者的速率很慢,而消費者的速率很高,比如是 1:3,如果生產者有 3 個線程,采用創建新的線程的方式,那么會創建 3 個子線程,而采用生產者 - 消費者模式,消費線程只需要 1 個就可以了。Java 語言里,Java 線程和操作系統線程是一一對應的,線程創建得太多,會增加上下文切換的成本,所以 Java 線程不是越多越好,適量即可。而生產者 - 消費者模式恰好能支持你用適量的線程。
(2)支持批量執行以提升性能
輕量級的線程,如果使用輕量級線程,就沒有必要平衡生產者和消費者的速度差異了,因為輕量級線程本身就是廉價的,那是否意味著生產者 - 消費者模式在性能優化方面就無用武之地了呢?當然不是,有一類并發場景應用生產者 - 消費者模式就有奇效,那就是批量執行任務。
例如,我們要在數據庫里 INSERT 1000 條數據,有兩種方案:第一種方案是用 1000 個線程并發執行,每個線程 INSERT 一條數據;第二種方案是用 1 個線程,執行一個批量的 SQL,一次性把 1000 條數據 INSERT 進去。這兩種方案,顯然是第二種方案效率更高,其實這樣的應用場景就是我們上面提到的批量執行場景。
一個監控系統動態采集的案例,其實最終回傳的監控數據還是要存入數據庫的(如下圖)。但被監控系統往往有很多,如果每一條回傳數據都直接 INSERT 到數據庫,那么這個方案就是上面提到的第一種方案:每個線程 INSERT 一條數據。很顯然,更好的方案是批量執行 SQL,那如何實現呢?這就要用到生產者 - 消費者模式了。
利用生產者 - 消費者模式實現批量執行 SQL 非常簡單:將原來直接 INSERT 數據到數據庫的線程作為生產者線程,生產者線程只需將數據添加到任務隊列,然后消費者線程負責將任務從任務隊列中批量取出并批量執行。
在下面的示例代碼中,我們創建了 5 個消費者線程負責批量執行 SQL,這 5 個消費者線程以 while(true){} 循環方式批量地獲取任務并批量地執行。需要注意的是,從任務隊列中獲取批量任務的方法 pollTasks() 中,首先是以阻塞方式獲取任務隊列中的一條任務,而后則是以非阻塞的方式獲取任務;之所以首先采用阻塞方式,是因為如果任務隊列中沒有任務,這樣的方式能夠避免無謂的循環。
//任務隊列 BlockingQueue<Task> bq=newLinkedBlockingQueue<>(2000); //啟動5個消費者線程 //執行批量任務 void start() {ExecutorService es=executors.newFixedThreadPool(5);for (int i=0; i<5; i++) {es.execute(()->{try {while (true) {//獲取批量任務List<Task> ts=pollTasks();//執行批量任務execTasks(ts);}} catch (Exception e) {e.printStackTrace();}});} } //從任務隊列中獲取批量任務 List<Task> pollTasks() throws InterruptedException{List<Task> ts=new LinkedList<>();//阻塞式獲取一條任務Task t = bq.take();while (t != null) {ts.add(t);//非阻塞式獲取一條任務t = bq.poll();}return ts; } //批量執行任務 execTasks(List<Task> ts) {//省略具體代碼無數 }(3)支持分階段提交以提升性能
利用生產者 - 消費者模式還可以輕松地支持一種分階段提交的應用場景。我們知道寫文件如果同步刷盤性能會很慢,所以對于不是很重要的數據,我們往往采用異步刷盤的方式。我曾經參與過一個項目,其中的日志組件是自己實現的,采用的就是異步刷盤方式,刷盤的時機是:
① ERROR 級別的日志需要立即刷盤;
② 數據積累到 500 條需要立即刷盤;
③ 存在未刷盤數據,且 5 秒鐘內未曾刷盤,需要立即刷盤。
這個日志組件的異步刷盤操作本質上其實就是一種分階段提交。下面我們具體看看用生產者 - 消費者模式如何實現。在下面的示例代碼中,可以通過調用 info()和error() 方法寫入日志,這兩個方法都是創建了一個日志任務 LogMsg,并添加到阻塞隊列中,調用 info()和error() 方法的線程是生產者;而真正將日志寫入文件的是消費者線程,在 Logger 這個類中,我們只創建了 1 個消費者線程,在這個消費者線程中,會根據刷盤規則執行刷盤操作,邏輯很簡單,這里就不贅述了。
class Logger {//任務隊列 final BlockingQueue<LogMsg> bq= new BlockingQueue<>();//flush批量 static final int batchSize=500;//只需要一個線程寫日志ExecutorService es = Executors.newFixedThreadPool(1);//啟動寫日志線程void start(){File file=File.createTempFile("foo", ".log");final FileWriter writer=new FileWriter(file);this.es.execute(()->{try {//未刷盤日志數量int curIdx = 0;long preFT=System.currentTimeMillis();while (true) {LogMsg log = bq.poll(5, TimeUnit.SECONDS);//寫日志if (log != null) {writer.write(log.toString());++curIdx;}//如果不存在未刷盤數據,則無需刷盤if (curIdx <= 0) {continue;}//根據規則刷盤if (log!=null && log.level==LEVEL.ERROR ||curIdx == batchSize ||System.currentTimeMillis()-preFT>5000){writer.flush();curIdx = 0;preFT=System.currentTimeMillis();}}}catch(Exception e){e.printStackTrace();} finally {try {writer.flush();writer.close();}catch(IOException e){e.printStackTrace();}}}); }//寫INFO級別日志void info(String msg) {bq.put(new LogMsg(LEVEL.INFO, msg));}//寫ERROR級別日志void error(String msg) {bq.put(new LogMsg(LEVEL.ERROR, msg));} } //日志級別 enum LEVEL {INFO, ERROR } class LogMsg {LEVEL level;String msg;//省略構造函數實現LogMsg(LEVEL lvl, String msg){}//省略toString()實現String toString(){} }Java 語言提供的線程池本身就是一種生產者 - 消費者模式的實現,但是線程池中的線程每次只能從任務隊列中消費一個任務來執行,對于大部分并發場景這種策略都沒有問題。但是有些場景還是需要自己來實現,例如需要批量執行以及分階段提交的場景。
生產者 - 消費者模式在分布式計算中的應用也非常廣泛。在分布式場景下,你可以借助分布式消息隊列(MQ)來實現生產者 - 消費者模式。MQ 一般都會支持兩種消息模型,一種是點對點模型,一種是發布訂閱模型。這兩種模型的區別在于,點對點模型里一個消息只會被一個消費者消費,和 Java 的線程池非常類似(Java 線程池的任務也只會被一個線程執行);而發布訂閱模型里一個消息會被多個消費者消費,本質上是一種消息的廣播,在多線程編程領域,你可以結合觀察者模式實現廣播功能。
10、避免共享的設計模式
Immutability 模式、Copy-on-Write 模式和線程本地存儲模式本質上都是為了避免共享,只是實現手段不同而已。這 3 種設計模式的實現都很簡單,但是實現過程中有些細節還是需要格外注意的。例如,使用 Immutability 模式需要注意對象屬性的不可變性,使用 Copy-on-Write 模式需要注意性能問題,使用線程本地存儲模式需要注意異步執行問題。所以,每篇文章最后我設置的課后思考題的目的就是提醒你注意這些細節。
Account 這個類是不是具備不可變性。這個類初看上去屬于不可變對象的中規中矩實現,而實質上這個實現是有問題的,原因在于 StringBuffer 不同于 String,StringBuffer 不具備不可變性,通過 getUser() 方法獲取 user 之后,是可以修改 user 的。一個簡單的解決方案是讓 getUser() 方法返回 String 對象。
public final class Account{private final StringBuffer user;public Account(String user){this.user = new StringBuffer(user);}//返回的StringBuffer并不具備不可變性public StringBuffer getUser(){return this.user;}public String toString(){return "user"+user;} }Java SDK 中為什么沒有提供 CopyOnWriteLinkedList。這是一個開放性的問題,沒有標準答案,但是性能問題一定是其中一個很重要的原因,畢竟完整地復制 LinkedList 性能開銷太大了。
在異步場景中,是否可以使用 Spring 的事務管理器。答案顯然是不能的,Spring 使用 ThreadLocal 來傳遞事務信息,因此這個事務信息是不能跨線程共享的。實際工作中有很多類庫都是用 ThreadLocal 傳遞上下文信息的,這種場景下如果有異步操作,一定要注意上下文信息是不能跨線程共享的。
11、多線程版本 IF 的設計模式
Guarded Suspension 模式和 Balking 模式都可以簡單地理解為“多線程版本的 if”,但它們的區別在于前者會等待 if 條件變為真,而后者則不需要等待。
Guarded Suspension 模式的經典實現是使用管程,很多初學者會簡單地用線程 sleep 的方式實現,比如用線程 sleep 方式實現的。但不推薦你使用這種方式,最重要的原因是性能,如果 sleep 的時間太長,會影響響應時間;sleep 的時間太短,會導致線程頻繁地被喚醒,消耗系統資源。
同時,示例代碼的實現也有問題:由于 obj 不是 volatile 變量,所以即便 obj 被設置了正確的值,執行 while(!p.test(obj)) 的線程也有可能看不到,從而導致更長時間的 sleep。
//獲取受保護對象 T get(Predicate<T> p) {try {//obj的可見性無法保證while(!p.test(obj)){TimeUnit.SECONDS.sleep(timeout);}}catch(InterruptedException e){throw new RuntimeException(e);}//返回非空的受保護對象return obj; } //事件通知方法 void onChanged(T obj) {this.obj = obj; }實現 Balking 模式最容易忽視的就是競態條件問題。比如,存在競態條件問題。因此,在多線程場景中使用 if 語句時,一定要多問自己一遍:是否存在競態條件。
class Test{volatile boolean inited = false;int count = 0;void init(){//存在競態條件if(inited){return;}//有可能多個線程執行到這里inited = true;//計算count的值count = calc();} }12、三種最簡單的分工模式
Thread-Per-Message 模式、Worker Thread 模式和生產者 - 消費者模式是三種最簡單實用的多線程分工方法。雖說簡單,但也還是有許多細節需要你多加小心和注意。
Thread-Per-Message 模式在實現的時候需要注意是否存在線程的頻繁創建、銷毀以及是否可能導致 OOM。關于如何快速解決 OOM 問題的。在高并發場景中,最簡單的辦法其實是限流。當然,限流方案也并不局限于解決 Thread-Per-Message 模式中的 OOM 問題。
Worker Thread 模式的實現,需要注意潛在的線程死鎖問題。示例代碼就存在線程死鎖。描述得很貼切和形象:“工廠里只有一個工人,他的工作就是同步地等待工廠里其他人給他提供東西,然而并沒有其他人,他將等到天荒地老,海枯石爛!”因此,共享線程池雖然能夠提供線程池的使用效率,但一定要保證一個前提,那就是:任務之間沒有依賴關系。
ExecutorService pool = Executors.newSingleThreadExecutor(); //提交主任務 pool.submit(() -> {try {//提交子任務并等待其完成,//會導致線程死鎖String qq=pool.submit(()->"QQ").get();System.out.println(qq);} catch (Exception e) {} });Java 線程池本身就是一種生產者 - 消費者模式的實現,所以大部分場景你都不需要自己實現,直接使用 Java 的線程池就可以了。但若能自己靈活地實現生產者 - 消費者模式會更好,比如可以實現批量執行和分階段提交,不過這過程中還需要注意如何優雅地終止線程。
如何優雅地終止線程?兩階段終止模式是一種通用的解決方案。但其實終止生產者 - 消費者服務還有一種更簡單的方案,叫做“毒丸”對象。“毒丸”對象有過詳細的介紹。簡單來講,“毒丸”對象是生產者生產的一條特殊任務,然后當消費者線程讀到“毒丸”對象時,會立即終止自身的執行。
下面是用“毒丸”對象終止寫日志線程的具體實現,整體的實現過程還是很簡單的:類 Logger 中聲明了一個“毒丸”對象 poisonPill ,當消費者線程從阻塞隊列 bq 中取出一條 LogMsg 后,先判斷是否是“毒丸”對象,如果是,則 break while 循環,從而終止自己的執行。
class Logger {//用于終止日志執行的“毒丸”final LogMsg poisonPill = new LogMsg(LEVEL.ERROR, "");//任務隊列 final BlockingQueue<LogMsg> bq= new BlockingQueue<>();//只需要一個線程寫日志ExecutorService es = Executors.newFixedThreadPool(1);//啟動寫日志線程void start(){File file=File.createTempFile("foo", ".log");final FileWriter writer=new FileWriter(file);this.es.execute(()->{try {while (true) {LogMsg log = bq.poll(5, TimeUnit.SECONDS);//如果是“毒丸”,終止執行 if(poisonPill.equals(logMsg)){break;} //省略執行邏輯}} catch(Exception e){} finally {try {writer.flush();writer.close();}catch(IOException e){}}}); }//終止寫日志線程public void stop() {//將“毒丸”對象加入阻塞隊列bq.add(poisonPill);es.shutdown();} }13、高性能限流器Guava RateLimiter
首先我們來看看 Guava RateLimiter 是如何解決高并發場景下的限流問題的。Guava 是 Google 開源的 Java 類庫,提供了一個工具類 RateLimiter。我們先來看看 RateLimiter 的使用,讓你對限流有個感官的印象。假設我們有一個線程池,它每秒只能處理兩個任務,如果提交的任務過快,可能導致系統不穩定,這個時候就需要用到限流。
在下面的示例代碼中,我們創建了一個流速為 2 個請求 / 秒的限流器,這里的流速該怎么理解呢?直觀地看,2 個請求 / 秒指的是每秒最多允許 2 個請求通過限流器,其實在 Guava 中,流速還有更深一層的意思:是一種勻速的概念,2 個請求 / 秒等價于 1 個請求 /500 毫秒。
在向線程池提交任務之前,調用 acquire() 方法就能起到限流的作用。通過示例代碼的執行結果,任務提交到線程池的時間間隔基本上穩定在 500 毫秒。
//限流器流速:2個請求/秒 RateLimiter limiter = RateLimiter.create(2.0); //執行任務的線程池 ExecutorService es = Executors.newFixedThreadPool(1); //記錄上一次執行時間 prev = System.nanoTime(); //測試執行20次 for (int i=0; i<20; i++){//限流器限流limiter.acquire();//提交任務異步執行es.execute(()->{long cur=System.nanoTime();//打印時間間隔:毫秒System.out.println((cur-prev)/1000_000);prev = cur;}); }輸出結果: ... 500 499 499 500 499(1)經典限流算法:令牌桶算法
Guava 的限流器使用上還是很簡單的,那它是如何實現的呢?Guava 采用的是令牌桶算法,其核心是要想通過限流器,必須拿到令牌。也就是說,只要我們能夠限制發放令牌的速率,那么就能控制流速了。令牌桶算法的詳細描述如下:
① 令牌以固定的速率添加到令牌桶中,假設限流的速率是 r/ 秒,則令牌每 1/r 秒會添加一個;
② 假設令牌桶的容量是 b ,如果令牌桶已滿,則新的令牌會被丟棄;
③ 請求能夠通過限流器的前提是令牌桶中有令牌。
這個算法中,限流的速率 r 還是比較容易理解的,但令牌桶的容量 b 該怎么理解呢?b 其實是 burst 的簡寫,意義是限流器允許的最大突發流量。比如 b=10,而且令牌桶中的令牌已滿,此時限流器允許 10 個請求同時通過限流器,當然只是突發流量而已,這 10 個請求會帶走 10 個令牌,所以后續的流量只能按照速率 r 通過限流器。
令牌桶這個算法,如何用 Java 實現呢?很可能你的直覺會告訴你生產者 - 消費者模式:一個生產者線程定時向阻塞隊列中添加令牌,而試圖通過限流器的線程則作為消費者線程,只有從阻塞隊列中獲取到令牌,才允許通過限流器。
這個算法看上去非常完美,而且實現起來非常簡單,如果并發量不大,這個實現并沒有什么問題。可實際情況卻是使用限流的場景大部分都是高并發場景,而且系統壓力已經臨近極限了,此時這個實現就有問題了。問題就出在定時器上,在高并發場景下,當系統壓力已經臨近極限的時候,定時器的精度誤差會非常大,同時定時器本身會創建調度線程,也會對系統的性能產生影響。
那還有什么好的實現方式呢?當然有,Guava 的實現就沒有使用定時器,下面我們就來看看它是如何實現的。
(2)Guava 如何實現令牌桶算法
Guava 實現令牌桶算法,用了一個很簡單的辦法,其關鍵是記錄并動態計算下一令牌發放的時間。下面我們以一個最簡單的場景來介紹該算法的執行過程。假設令牌桶的容量為 b=1,限流速率 r = 1 個請求 / 秒,如下圖所示,如果當前令牌桶中沒有令牌,下一個令牌的發放時間是在第 3 秒,而在第 2 秒的時候有一個線程 T1 請求令牌,此時該如何處理呢?
對于這個請求令牌的線程而言,很顯然需要等待 1 秒,因為 1 秒以后(第 3 秒)它就能拿到令牌了。此時需要注意的是,下一個令牌發放的時間也要增加 1 秒,為什么呢?因為第 3 秒發放的令牌已經被線程 T1 預占了。處理之后如下圖所示。
假設 T1 在預占了第 3 秒的令牌之后,馬上又有一個線程 T2 請求令牌,如下圖所示。
很顯然,由于下一個令牌產生的時間是第 4 秒,所以線程 T2 要等待兩秒的時間,才能獲取到令牌,同時由于 T2 預占了第 4 秒的令牌,所以下一令牌產生時間還要增加 1 秒,完全處理之后,如下圖所示。
上面線程 T1、T2 都是在下一令牌產生時間之前請求令牌,如果線程在下一令牌產生時間之后請求令牌會如何呢?假設在線程 T1 請求令牌之后的 5 秒,也就是第 7 秒,線程 T3 請求令牌,如下圖所示。
由于在第 5 秒已經產生了一個令牌,所以此時線程 T3 可以直接拿到令牌,而無需等待。在第 7 秒,實際上限流器能夠產生 3 個令牌,第 5、6、7 秒各產生一個令牌。由于我們假設令牌桶的容量是 1,所以第 6、7 秒產生的令牌就丟棄了,其實等價地你也可以認為是保留的第 7 秒的令牌,丟棄的第 5、6 秒的令牌,也就是說第 7 秒的令牌被線程 T3 占有了,于是下一令牌的的產生時間應該是第 8 秒,如下圖所示。
通過上面簡要地分析,你會發現,我們只需要記錄一個下一令牌產生的時間,并動態更新它,就能夠輕松完成限流功能。我們可以將上面的這個算法代碼化,示例代碼如下所示,依然假設令牌桶的容量是 1。關鍵是 reserve() 方法,這個方法會為請求令牌的線程預分配令牌,同時返回該線程能夠獲取令牌的時間。其實現邏輯就是上面提到的:如果線程請求令牌的時間在下一令牌產生時間之后,那么該線程立刻就能夠獲取令牌;反之,如果請求時間在下一令牌產生時間之前,那么該線程是在下一令牌產生的時間獲取令牌。由于此時下一令牌已經被該線程預占,所以下一令牌產生的時間需要加上 1 秒。
class SimpleLimiter {//下一令牌產生時間long next = System.nanoTime();//發放令牌間隔:納秒long interval = 1000_000_000;//預占令牌,返回能夠獲取令牌的時間synchronized long reserve(long now){//請求時間在下一令牌產生時間之后//重新計算下一令牌產生時間if (now > next){//將下一令牌產生時間重置為當前時間next = now;}//能夠獲取令牌的時間long at=next;//設置下一令牌產生時間next += interval;//返回線程需要等待的時間return Math.max(at, 0L);}//申請令牌void acquire() {//申請令牌時的時間long now = System.nanoTime();//預占令牌long at=reserve(now);long waitTime=max(at-now, 0);//按照條件等待if(waitTime > 0) {try {TimeUnit.NANOSECONDS.sleep(waitTime);}catch(InterruptedException e){e.printStackTrace();}}} }如果令牌桶的容量大于 1,又該如何處理呢?按照令牌桶算法,令牌要首先從令牌桶中出,所以我們需要按需計算令牌桶中的數量,當有線程請求令牌時,先從令牌桶中出。具體的代碼實現如下所示。我們增加了一個 resync() 方法,在這個方法中,如果線程請求令牌的時間在下一令牌產生時間之后,會重新計算令牌桶中的令牌數,新產生的令牌的計算公式是:(now-next)/interval,你可對照上面的示意圖來理解。reserve() 方法中,則增加了先從令牌桶中出令牌的邏輯,不過需要注意的是,如果令牌是從令牌桶中出的,那么 next 就無需增加一個 interval 了。
class SimpleLimiter {//當前令牌桶中的令牌數量long storedPermits = 0;//令牌桶的容量long maxPermits = 3;//下一令牌產生時間long next = System.nanoTime();//發放令牌間隔:納秒long interval = 1000_000_000;//請求時間在下一令牌產生時間之后,則// 1.重新計算令牌桶中的令牌數// 2.將下一個令牌發放時間重置為當前時間void resync(long now) {if (now > next) {//新產生的令牌數long newPermits=(now-next)/interval;//新令牌增加到令牌桶storedPermits=min(maxPermits, storedPermits + newPermits);//將下一個令牌發放時間重置為當前時間next = now;}}//預占令牌,返回能夠獲取令牌的時間synchronized long reserve(long now){resync(now);//能夠獲取令牌的時間long at = next;//令牌桶中能提供的令牌long fb=min(1, storedPermits);//令牌凈需求:首先減掉令牌桶中的令牌long nr = 1 - fb;//重新計算下一令牌產生時間next = next + nr*interval;//重新計算令牌桶中的令牌this.storedPermits -= fb;return at;}//申請令牌void acquire() {//申請令牌時的時間long now = System.nanoTime();//預占令牌long at=reserve(now);long waitTime=max(at-now, 0);//按照條件等待if(waitTime > 0) {try {TimeUnit.NANOSECONDS.sleep(waitTime);}catch(InterruptedException e){e.printStackTrace();}}} }經典的限流算法有兩個,一個是令牌桶算法(Token Bucket),另一個是漏桶算法(Leaky Bucket)。令牌桶算法是定時向令牌桶發送令牌,請求能夠從令牌桶中拿到令牌,然后才能通過限流器;而漏桶算法里,請求就像水一樣注入漏桶,漏桶會按照一定的速率自動將水漏掉,只有漏桶里還能注入水的時候,請求才能通過限流器。令牌桶算法和漏桶算法很像一個硬幣的正反面,所以你可以參考令牌桶算法的實現來實現漏桶算法。
上面我們介紹了 Guava 是如何實現令牌桶算法的,我們的示例代碼是對 Guava RateLimiter 的簡化,Guava RateLimiter 擴展了標準的令牌桶算法,比如還能支持預熱功能。對于按需加載的緩存來說,預熱后緩存能支持 5 萬 TPS 的并發,但是在預熱前 5 萬 TPS 的并發直接就把緩存擊垮了,所以如果需要給該緩存限流,限流器也需要支持預熱功能,在初始階段,限制的流速 r 很小,但是動態增長的。預熱功能的實現非常復雜,Guava 構建了一個積分函數來解決這個問題,如果你感興趣,可以繼續深入研究。
14、高性能網絡應用框架Netty
Netty 是一個高性能網絡應用框架,應用非常普遍,目前在 Java 領域里,Netty 基本上成為網絡程序的標配了。Netty 框架功能豐富,也非常復雜,今天我們主要分析 Netty 框架中的線程模型,而線程模型直接影響著網絡程序的性能。
在介紹 Netty 的線程模型之前,我們首先需要把問題搞清楚,了解網絡編程性能的瓶頸在哪里,然后再看 Netty 的線程模型是如何解決這個問題的。
(1)網絡編程性能的瓶頸
一個簡單的網絡程序 echo,采用的是阻塞式 I/O(BIO)。BIO 模型里,所有 read() 操作和 write() 操作都會阻塞當前線程的,如果客戶端已經和服務端建立了一個連接,而遲遲不發送數據,那么服務端的 read() 操作會一直阻塞,所以使用 BIO 模型,一般都會為每個 socket 分配一個獨立的線程,這樣就不會因為線程阻塞在一個 socket 上而影響對其他 socket 的讀寫。BIO 的線程模型如下圖所示,每一個 socket 都對應一個獨立的線程;為了避免頻繁創建、消耗線程,可以采用線程池,但是 socket 和線程之間的對應關系并不會變化。
BIO 這種線程模型適用于 socket 連接不是很多的場景;但是現在的互聯網場景,往往需要服務器能夠支撐十萬甚至百萬連接,而創建十萬甚至上百萬個線程顯然并不現實,所以 BIO 線程模型無法解決百萬連接的問題。如果仔細觀察,你會發現互聯網場景中,雖然連接多,但是每個連接上的請求并不頻繁,所以線程大部分時間都在等待 I/O 就緒。也就是說線程大部分時間都阻塞在那里,這完全是浪費,如果我們能夠解決這個問題,那就不需要這么多線程了。
順著這個思路,我們可以將線程模型優化為下圖這個樣子,可以用一個線程來處理多個連接,這樣線程的利用率就上來了,同時所需的線程數量也跟著降下來了。這個思路很好,可是使用 BIO 相關的 API 是無法實現的,這是為什么呢?因為 BIO 相關的 socket 讀寫操作都是阻塞式的,而一旦調用了阻塞式 API,在 I/O 就緒前,調用線程會一直阻塞,也就無法處理其他的 socket 連接了。
好在 Java 里還提供了非阻塞式(NIO)API,利用非阻塞式 API 就能夠實現一個線程處理多個連接了。那具體如何實現呢?現在普遍都是采用 Reactor 模式,包括 Netty 的實現。所以,要想理解 Netty 的實現,接下來我們就需要先了解一下 Reactor 模式。
(2)Reactor 模式
下面是 Reactor 模式的類結構圖,其中 Handle 指的是 I/O 句柄,在 Java 網絡編程里,它本質上就是一個網絡連接。Event Handler 很容易理解,就是一個事件處理器,其中 handle_event() 方法處理 I/O 事件,也就是每個 Event Handler 處理一個 I/O Handle;get_handle() 方法可以返回這個 I/O 的 Handle。Synchronous Event Demultiplexer 可以理解為操作系統提供的 I/O 多路復用 API,例如 POSIX 標準里的 select() 以及 Linux 里面的 epoll()。
Reactor 模式的核心自然是 Reactor 這個類,其中 register_handler() 和 remove_handler() 這兩個方法可以注冊和刪除一個事件處理器;handle_events() 方式是核心,也是 Reactor 模式的發動機,這個方法的核心邏輯如下:首先通過同步事件多路選擇器提供的 select() 方法監聽網絡事件,當有網絡事件就緒后,就遍歷事件處理器來處理該網絡事件。由于網絡事件是源源不斷的,所以在主程序中啟動 Reactor 模式,需要以 while(true){} 的方式調用 handle_events() 方法。
void Reactor::handle_events(){//通過同步事件多路選擇器提供的//select()方法監聽網絡事件select(handlers);//處理網絡事件for(h in handlers){h.handle_event();} } // 在主程序中啟動事件循環 while (true) {handle_events();(3)Netty 中的線程模型
Netty 的實現雖然參考了 Reactor 模式,但是并沒有完全照搬,Netty 中最核心的概念是事件循環(EventLoop),其實也就是 Reactor 模式中的 Reactor,負責監聽網絡事件并調用事件處理器進行處理。在 4.x 版本的 Netty 中,網絡連接和 EventLoop 是穩定的多對 1 關系,而 EventLoop 和 Java 線程是 1 對 1 關系,這里的穩定指的是關系一旦確定就不再發生變化。也就是說一個網絡連接只會對應唯一的一個 EventLoop,而一個 EventLoop 也只會對應到一個 Java 線程,所以一個網絡連接只會對應到一個 Java 線程。
一個網絡連接對應到一個 Java 線程上,有什么好處呢?最大的好處就是對于一個網絡連接的事件處理是單線程的,這樣就避免了各種并發問題。
Netty 中的線程模型可以參考下圖,這個圖和前面我們提到的理想的線程模型圖非常相似,核心目標都是用一個線程處理多個網絡連接。
Netty 中還有一個核心概念是 EventLoopGroup,顧名思義,一個 EventLoopGroup 由一組 EventLoop 組成。實際使用中,一般都會創建兩個 EventLoopGroup,一個稱為 bossGroup,一個稱為 workerGroup。為什么會有兩個 EventLoopGroup 呢?
這個和 socket 處理網絡請求的機制有關,socket 處理 TCP 網絡連接請求,是在一個獨立的 socket 中,每當有一個 TCP 連接成功建立,都會創建一個新的 socket,之后對 TCP 連接的讀寫都是由新創建處理的 socket 完成的。也就是說處理 TCP 連接請求和讀寫請求是通過兩個不同的 socket 完成的。上面我們在討論網絡請求的時候,為了簡化模型,只是討論了讀寫請求,而沒有討論連接請求。
在 Netty 中,bossGroup 就用來處理連接請求的,而 workerGroup 是用來處理讀寫請求的。bossGroup 處理完連接請求后,會將這個連接提交給 workerGroup 來處理, workerGroup 里面有多個 EventLoop,那新的連接會交給哪個 EventLoop 來處理呢?這就需要一個負載均衡算法,Netty 中目前使用的是輪詢算法。
下面我們用 Netty 重新實現以下 echo 程序的服務端,近距離感受一下 Netty。
(4)用 Netty 實現 Echo 程序服務端
下面的示例代碼基于 Netty 實現了 echo 程序服務端:首先創建了一個事件處理器(等同于 Reactor 模式中的事件處理器),然后創建了 bossGroup 和 workerGroup,再之后創建并初始化了 ServerBootstrap,代碼還是很簡單的,不過有兩個地方需要注意一下。
第一個,如果 NettybossGroup 只監聽一個端口,那 bossGroup 只需要 1 個 EventLoop 就可以了,多了純屬浪費。
第二個,默認情況下,Netty 會創建“2*CPU 核數”個 EventLoop,由于網絡連接與 EventLoop 有穩定的關系,所以事件處理器在處理網絡事件的時候是不能有阻塞操作的,否則很容易導致請求大面積超時。如果實在無法避免使用阻塞操作,那可以通過線程池來異步處理。
//事件處理器 final EchoServerHandler serverHandler = new EchoServerHandler(); //boss線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //worker線程組 EventLoopGroup workerGroup = new NioEventLoopGroup(); try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch){ch.pipeline().addLast(serverHandler);}});//bind服務端端口 ChannelFuture f = b.bind(9090).sync();f.channel().closeFuture().sync(); } finally {//終止工作線程組workerGroup.shutdownGracefully();//終止boss線程組bossGroup.shutdownGracefully(); }//socket連接處理器 class EchoServerHandler extends ChannelInboundHandlerAdapter {//處理讀事件 @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg){ctx.write(msg);}//處理讀完成事件@Overridepublic void channelReadComplete(ChannelHandlerContext ctx){ctx.flush();}//處理異常事件@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }Netty 是一個款優秀的網絡編程框架,性能非常好,為了實現高性能的目標,Netty 做了很多優化,例如優化了 ByteBuffer、支持零拷貝等等,和并發編程相關的就是它的線程模型了。Netty 的線程模型設計得很精巧,每個網絡連接都關聯到了一個線程上,這樣做的好處是:對于一個網絡連接,讀寫操作都是單線程執行的,從而避免了并發程序的各種問題。
15、高性能隊列Disruptor
Java SDK 提供了 2 個有界隊列:ArrayBlockingQueue 和 LinkedBlockingQueue,它們都是基于 ReentrantLock 實現的,在高并發場景下,鎖的效率并不高,那有沒有更好的替代品呢?有,今天我們就介紹一種性能更高的有界隊列:Disruptor。
Disruptor 是一款高性能的有界內存隊列,目前應用非常廣泛,Log4j2、Spring Messaging、HBase、Storm 都用到了 Disruptor,那 Disruptor 的性能為什么這么高呢?Disruptor 項目團隊曾經寫過一篇論文,詳細解釋了其原因,可以總結為如下:
① 內存分配更加合理,使用 RingBuffer 數據結構,數組元素在初始化時一次性全部創建,提升緩存命中率;對象循環利用,避免頻繁 GC。
② 能夠避免偽共享,提升緩存利用率。
③ 采用無鎖算法,避免頻繁加鎖、解鎖的性能消耗。
④ 支持批量消費,消費者可以無鎖方式消費多個消息。
其中,前三點涉及到的知識比較多,所以今天咱們重點講解前三點,不過在詳細介紹這些知識之前,我們先來聊聊 Disruptor 如何使用,好讓你先對 Disruptor 有個感官的認識。
下面的代碼出自官方示例,我略做了一些修改,相較而言,Disruptor 的使用比 Java SDK 提供 BlockingQueue 要復雜一些,但是總體思路還是一致的,其大致情況如下:
在 Disruptor 中,生產者生產的對象(也就是消費者消費的對象)稱為 Event,使用 Disruptor 必須自定義 Event,例如示例代碼的自定義 Event 是 LongEvent;
構建 Disruptor 對象除了要指定隊列大小外,還需要傳入一個 EventFactory,示例代碼中傳入的是LongEvent::new;
消費 Disruptor 中的 Event 需要通過 handleEventsWith() 方法注冊一個事件處理器,發布 Event 則需要通過 publishEvent() 方法。
//自定義Event class LongEvent {private long value;public void set(long value) {this.value = value;} } //指定RingBuffer大小, //必須是2的N次方 int bufferSize = 1024;//構建Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new,bufferSize,DaemonThreadFactory.INSTANCE);//注冊事件處理器 disruptor.handleEventsWith((event, sequence, endOfBatch) ->System.out.println("E: "+event));//啟動Disruptor disruptor.start();//獲取RingBuffer RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); //生產Event ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++){bb.putLong(0, l);//生產者生產消息ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);Thread.sleep(1000); }(1)RingBuffer 如何提升性能
Java SDK 中 ArrayBlockingQueue 使用數組作為底層的數據存儲,而 Disruptor 是使用 RingBuffer 作為數據存儲。RingBuffer 本質上也是數組,所以僅僅將數據存儲從數組換成 RingBuffer 并不能提升性能,但是 Disruptor 在 RingBuffer 的基礎上還做了很多優化,其中一項優化就是和內存分配有關的。
在介紹這項優化之前,你需要先了解一下程序的局部性原理。簡單來講,程序的局部性原理指的是在一段時間內程序的執行會限定在一個局部范圍內。這里的“局部性”可以從兩個方面來理解,一個是時間局部性,另一個是空間局部性。時間局部性指的是程序中的某條指令一旦被執行,不久之后這條指令很可能再次被執行;如果某條數據被訪問,不久之后這條數據很可能再次被訪問。而空間局部性是指某塊內存一旦被訪問,不久之后這塊內存附近的內存也很可能被訪問。
CPU 的緩存就利用了程序的局部性原理:CPU 從內存中加載數據 X 時,會將數據 X 緩存在高速緩存 Cache 中,實際上 CPU 緩存 X 的同時,還緩存了 X 周圍的數據,因為根據程序具備局部性原理,X 周圍的數據也很有可能被訪問。從另外一個角度來看,如果程序能夠很好地體現出局部性原理,也就能更好地利用 CPU 的緩存,從而提升程序的性能。Disruptor 在設計 RingBuffer 的時候就充分考慮了這個問題,下面我們就對比著 ArrayBlockingQueue 來分析一下。
首先是 ArrayBlockingQueue。生產者線程向 ArrayBlockingQueue 增加一個元素,每次增加元素 E 之前,都需要創建一個對象 E,如下圖所示,ArrayBlockingQueue 內部有 6 個元素,這 6 個元素都是由生產者線程創建的,由于創建這些元素的時間基本上是離散的,所以這些元素的內存地址大概率也不是連續的。
下面我們再看看 Disruptor 是如何處理的。Disruptor 內部的 RingBuffer 也是用數組實現的,但是這個數組中的所有元素在初始化時是一次性全部創建的,所以這些元素的內存地址大概率是連續的,相關的代碼如下所示。
for (int i=0; i<bufferSize; i++){//entries[]就是RingBuffer內部的數組//eventFactory就是前面示例代碼中傳入的LongEvent::newentries[BUFFER_PAD + i] = eventFactory.newInstance(); }Disruptor 內部 RingBuffer 的結構可以簡化成下圖,那問題來了,數組中所有元素內存地址連續能提升性能嗎?能!為什么呢?因為消費者線程在消費的時候,是遵循空間局部性原理的,消費完第 1 個元素,很快就會消費第 2 個元素;當消費第 1 個元素 E1 的時候,CPU 會把內存中 E1 后面的數據也加載進 Cache,如果 E1 和 E2 在內存中的地址是連續的,那么 E2 也就會被加載進 Cache 中,然后當消費第 2 個元素的時候,由于 E2 已經在 Cache 中了,所以就不需要從內存中加載了,這樣就能大大提升性能。
除此之外,在 Disruptor 中,生產者線程通過 publishEvent() 發布 Event 的時候,并不是創建一個新的 Event,而是通過 event.set() 方法修改 Event, 也就是說 RingBuffer 創建的 Event 是可以循環利用的,這樣還能避免頻繁創建、刪除 Event 導致的頻繁 GC 問題。
(2)如何避免“偽共享”
高效利用 Cache,能夠大大提升性能,所以要努力構建能夠高效利用 Cache 的內存結構。而從另外一個角度看,努力避免不能高效利用 Cache 的內存結構也同樣重要。
有一種叫做“偽共享(False sharing)”的內存布局就會使 Cache 失效,那什么是“偽共享”呢?
偽共享和 CPU 內部的 Cache 有關,Cache 內部是按照緩存行(Cache Line)管理的,緩存行的大小通常是 64 個字節;CPU 從內存中加載數據 X,會同時加載 X 后面(64-size(X))個字節的數據。下面的示例代碼出自 Java SDK 的 ArrayBlockingQueue,其內部維護了 4 個成員變量,分別是隊列數組 items、出隊索引 takeIndex、入隊索引 putIndex 以及隊列中的元素總數 count。
/** 隊列數組 */ final Object[] items; /** 出隊索引 */ int takeIndex; /** 入隊索引 */ int putIndex; /** 隊列中元素總數 */ int count;當 CPU 從內存中加載 takeIndex 的時候,會同時將 putIndex 以及 count 都加載進 Cache。下圖是某個時刻 CPU 中 Cache 的狀況,為了簡化,緩存行中我們僅列出了 takeIndex 和 putIndex。
假設線程 A 運行在 CPU-1 上,執行入隊操作,入隊操作會修改 putIndex,而修改 putIndex 會導致其所在的所有核上的緩存行均失效;此時假設運行在 CPU-2 上的線程執行出隊操作,出隊操作需要讀取 takeIndex,由于 takeIndex 所在的緩存行已經失效,所以 CPU-2 必須從內存中重新讀取。入隊操作本不會修改 takeIndex,但是由于 takeIndex 和 putIndex 共享的是一個緩存行,就導致出隊操作不能很好地利用 Cache,這其實就是偽共享。簡單來講,偽共享指的是由于共享緩存行導致緩存無效的場景。
ArrayBlockingQueue 的入隊和出隊操作是用鎖來保證互斥的,所以入隊和出隊不會同時發生。如果允許入隊和出隊同時發生,那就會導致線程 A 和線程 B 爭用同一個緩存行,這樣也會導致性能問題。所以為了更好地利用緩存,我們必須避免偽共享,那如何避免呢?
方案很簡單,每個變量獨占一個緩存行、不共享緩存行就可以了,具體技術是緩存行填充。比如想讓 takeIndex 獨占一個緩存行,可以在 takeIndex 的前后各填充 56 個字節,這樣就一定能保證 takeIndex 獨占一個緩存行。下面的示例代碼出自 Disruptor,Sequence 對象中的 value 屬性就能避免偽共享,因為這個屬性前后都填充了 56 個字節。Disruptor 中很多對象,例如 RingBuffer、RingBuffer 內部的數組都用到了這種填充技術來避免偽共享。
//前:填充56字節 class LhsPadding{long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding{volatile long value; } //后:填充56字節 class RhsPadding extends Value{long p9, p10, p11, p12, p13, p14, p15; } class Sequence extends RhsPadding{//省略實現 }(3)Disruptor 中的無鎖算法
ArrayBlockingQueue 是利用管程實現的,中規中矩,生產、消費操作都需要加鎖,實現起來簡單,但是性能并不十分理想。Disruptor 采用的是無鎖算法,很復雜,但是核心無非是生產和消費兩個操作。Disruptor 中最復雜的是入隊操作,所以我們重點來看看入隊操作是如何實現的。
對于入隊操作,最關鍵的要求是不能覆蓋沒有消費的元素;對于出隊操作,最關鍵的要求是不能讀取沒有寫入的元素,所以 Disruptor 中也一定會維護類似出隊索引和入隊索引這樣兩個關鍵變量。Disruptor 中的 RingBuffer 維護了入隊索引,但是并沒有維護出隊索引,這是因為在 Disruptor 中多個消費者可以同時消費,每個消費者都會有一個出隊索引,所以 RingBuffer 的出隊索引是所有消費者里面最小的那一個。
下面是 Disruptor 生產者入隊操作的核心代碼,看上去很復雜,其實邏輯很簡單:如果沒有足夠的空余位置,就出讓 CPU 使用權,然后重新計算;反之則用 CAS 設置入隊索引。
//生產者獲取n個寫入位置 do {//cursor類似于入隊索引,指的是上次生產到這里current = cursor.get();//目標是在生產n個next = current + n;//減掉一個循環long wrapPoint = next - bufferSize;//獲取上一次的最小消費位置long cachedGatingSequence = gatingSequenceCache.get();//沒有足夠的空余位置if (wrapPoint>cachedGatingSequence || cachedGatingSequence>current){//重新計算所有消費者里面的最小值位置long gatingSequence = Util.getMinimumSequence(gatingSequences, current);//仍然沒有足夠的空余位置,出讓CPU使用權,重新執行下一循環if (wrapPoint > gatingSequence){LockSupport.parkNanos(1);continue;}//從新設置上一次的最小消費位置gatingSequenceCache.set(gatingSequence);} else if (cursor.compareAndSet(current, next)){//獲取寫入位置成功,跳出循環break;} } while (true);Disruptor 在優化并發性能方面可謂是做到了極致,優化的思路大體是兩個方面,一個是利用無鎖算法避免鎖的爭用,另外一個則是將硬件(CPU)的性能發揮到極致。尤其是后者,在 Java 領域基本上屬于經典之作了。
發揮硬件的能力一般是 C 這種面向硬件的語言常干的事兒,C 語言領域經常通過調整內存布局優化內存占用,而 Java 領域則用的很少,原因在于 Java 可以智能地優化內存布局,內存布局對 Java 程序員的透明的。這種智能的優化大部分場景是很友好的,但是如果你想通過填充方式避免偽共享就必須繞過這種優化,關于這方面 Disruptor 提供了經典的實現,你可以參考。
由于偽共享問題如此重要,所以 Java 也開始重視它了,比如 Java 8 中,提供了避免偽共享的注解:@sun.misc.Contended,通過這個注解就能輕松避免偽共享(需要設置 JVM 參數 -XX:-RestrictContended)。不過避免偽共享是以犧牲內存為代價的,所以具體使用的時候還是需要仔細斟酌。
16、高性能數據庫連接池HiKariCP
只要和數據庫打交道,就免不了使用數據庫連接池。業界知名的數據庫連接池有不少,例如 c3p0、DBCP、Tomcat JDBC Connection Pool、Druid 等,不過最近最火的是 HiKariCP。
HiKariCP 號稱是業界跑得最快的數據庫連接池,這兩年發展得順風順水,尤其是 Springboot 2.0 將其作為默認數據庫連接池后,江湖一哥的地位已是毋庸置疑了。那它為什么那么快呢?今天咱們就重點聊聊這個話題。
(1)什么是數據庫連接池
在詳細分析 HiKariCP 高性能之前,我們有必要先簡單介紹一下什么是數據庫連接池。本質上,數據庫連接池和線程池一樣,都屬于池化資源,作用都是避免重量級資源的頻繁創建和銷毀,對于數據庫連接池來說,也就是避免數據庫連接頻繁創建和銷毀。如下圖所示,服務端會在運行期持有一定數量的數據庫連接,當需要執行 SQL 時,并不是直接創建一個數據庫連接,而是從連接池中獲取一個;當 SQL 執行完,也并不是將數據庫連接真的關掉,而是將其歸還到連接池中。
在實際工作中,我們都是使用各種持久化框架來完成數據庫的增刪改查,基本上不會直接和數據庫連接池打交道,為了能讓你更好地理解數據庫連接池的工作原理,下面的示例代碼并沒有使用任何框架,而是原生地使用 HiKariCP。執行數據庫操作基本上是一系列規范化的步驟:
① 通過數據源獲取一個數據庫連接;
② 創建 Statement;
③ 執行 SQL;
④ 通過 ResultSet 獲取 SQL 執行結果;
⑤ 釋放 ResultSet;
⑥ 釋放 Statement;
⑦ 釋放數據庫連接。
下面的示例代碼,通過 ds.getConnection() 獲取一個數據庫連接時,其實是向數據庫連接池申請一個數據庫連接,而不是創建一個新的數據庫連接。同樣,通過 conn.close() 釋放一個數據庫連接時,也不是直接將連接關閉,而是將連接歸還給數據庫連接池。
//數據庫連接池配置 HikariConfig config = new HikariConfig(); config.setMinimumIdle(1); config.setMaximumPoolSize(2); config.setConnectionTestQuery("SELECT 1"); config.setDataSourceClassName("org.h2.jdbcx.JdbcDataSource"); config.addDataSourceProperty("url", "jdbc:h2:mem:test"); // 創建數據源 DataSource ds = new HikariDataSource(config); Connection conn = null; Statement stmt = null; ResultSet rs = null; try {// 獲取數據庫連接conn = ds.getConnection();// 創建Statement stmt = conn.createStatement();// 執行SQLrs = stmt.executeQuery("select * from abc");// 獲取結果while (rs.next()) {int id = rs.getInt(1);......} } catch(Exception e) {e.printStackTrace(); } finally {//關閉ResultSetclose(rs);//關閉Statement close(stmt);//關閉Connectionclose(conn); } //關閉資源 void close(AutoCloseable rs) {if (rs != null) {try {rs.close();} catch (SQLException e) {e.printStackTrace();}} }HiKariCP 官方網站解釋了其性能之所以如此之高的秘密。微觀上 HiKariCP 程序編譯出的字節碼執行效率更高,站在字節碼的角度去優化 Java 代碼,HiKariCP 的作者對性能的執著可見一斑,不過遺憾的是他并沒有詳細解釋都做了哪些優化。而宏觀上主要是和兩個數據結構有關,一個是 FastList,另一個是 ConcurrentBag。下面我們來看看它們是如何提升 HiKariCP 的性能的。
(2)FastList 解決了哪些性能問題
按照規范步驟,執行完數據庫操作之后,需要依次關閉 ResultSet、Statement、Connection,但是總有粗心的同學只是關閉了 Connection,而忘了關閉 ResultSet 和 Statement。為了解決這種問題,最好的辦法是當關閉 Connection 時,能夠自動關閉 Statement。為了達到這個目標,Connection 就需要跟蹤創建的 Statement,最簡單的辦法就是將創建的 Statement 保存在數組 ArrayList 里,這樣當關閉 Connection 的時候,就可以依次將數組中的所有 Statement 關閉。
HiKariCP 覺得用 ArrayList 還是太慢,當通過 conn.createStatement() 創建一個 Statement 時,需要調用 ArrayList 的 add() 方法加入到 ArrayList 中,這個是沒有問題的;但是當通過 stmt.close() 關閉 Statement 的時候,需要調用 ArrayList 的 remove() 方法來將其從 ArrayList 中刪除,這里是有優化余地的。
假設一個 Connection 依次創建 6 個 Statement,分別是 S1、S2、S3、S4、S5、S6,按照正常的編碼習慣,關閉 Statement 的順序一般是逆序的,關閉的順序是:S6、S5、S4、S3、S2、S1,而 ArrayList 的 remove(Object o) 方法是順序遍歷查找,逆序刪除而順序查找,這樣的查找效率就太慢了。如何優化呢?很簡單,優化成逆序查找就可以了。
HiKariCP 中的 FastList 相對于 ArrayList 的一個優化點就是將 remove(Object element) 方法的查找順序變成了逆序查找。除此之外,FastList 還有另一個優化點,是 get(int index) 方法沒有對 index 參數進行越界檢查,HiKariCP 能保證不會越界,所以不用每次都進行越界檢查。
整體來看,FastList 的優化點還是很簡單的。下面我們再來聊聊 HiKariCP 中的另外一個數據結構 ConcurrentBag,看看它又是如何提升性能的。
(3)ConcurrentBag 解決了哪些性能問題
如果讓我們自己來實現一個數據庫連接池,最簡單的辦法就是用兩個阻塞隊列來實現,一個用于保存空閑數據庫連接的隊列 idle,另一個用于保存忙碌數據庫連接的隊列 busy;獲取連接時將空閑的數據庫連接從 idle 隊列移動到 busy 隊列,而關閉連接時將數據庫連接從 busy 移動到 idle。這種方案將并發問題委托給了阻塞隊列,實現簡單,但是性能并不是很理想。因為 Java SDK 中的阻塞隊列是用鎖實現的,而高并發場景下鎖的爭用對性能影響很大。
//忙碌隊列 BlockingQueue<Connection> busy; //空閑隊列 BlockingQueue<Connection> idle;HiKariCP 并沒有使用 Java SDK 中的阻塞隊列,而是自己實現了一個叫做 ConcurrentBag 的并發容器。ConcurrentBag 的設計最初源自 C#,它的一個核心設計是使用 ThreadLocal 避免部分并發問題,不過 HiKariCP 中的 ConcurrentBag 并沒有完全參考 C# 的實現,下面我們來看看它是如何實現的。
ConcurrentBag 中最關鍵的屬性有 4 個,分別是:用于存儲所有的數據庫連接的共享隊列 sharedList、線程本地存儲 threadList、等待數據庫連接的線程數 waiters 以及分配數據庫連接的工具 handoffQueue。其中,handoffQueue 用的是 Java SDK 提供的 SynchronousQueue,SynchronousQueue 主要用于線程之間傳遞數據。
//用于存儲所有的數據庫連接 CopyOnWriteArrayList<T> sharedList; //線程本地存儲中的數據庫連接 ThreadLocal<List<Object>> threadList; //等待數據庫連接的線程數 AtomicInteger waiters; //分配數據庫連接的工具 SynchronousQueue<T> handoffQueue;當線程池創建了一個數據庫連接時,通過調用 ConcurrentBag 的 add() 方法加入到 ConcurrentBag 中,下面是 add() 方法的具體實現,邏輯很簡單,就是將這個連接加入到共享隊列 sharedList 中,如果此時有線程在等待數據庫連接,那么就通過 handoffQueue 將這個連接分配給等待的線程。
//將空閑連接添加到隊列 void add(final T bagEntry){//加入共享隊列sharedList.add(bagEntry);//如果有等待連接的線程,//則通過handoffQueue直接分配給等待的線程while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {yield();} }通過 ConcurrentBag 提供的 borrow() 方法,可以獲取一個空閑的數據庫連接,borrow() 的主要邏輯是:
① 首先查看線程本地存儲是否有空閑連接,如果有,則返回一個空閑的連接;
② 如果線程本地存儲中無空閑連接,則從共享隊列中獲取。
③ 如果共享隊列中也沒有空閑的連接,則請求線程需要等待。
需要注意的是,線程本地存儲中的連接是可以被其他線程竊取的,所以需要用 CAS 方法防止重復分配。在共享隊列中獲取空閑連接,也采用了 CAS 方法防止重復分配。
T borrow(long timeout, final TimeUnit timeUnit){// 先查看線程本地存儲是否有空閑連接final List<Object> list = threadList.get();for (int i = list.size() - 1; i >= 0; i--) {final Object entry = list.remove(i);final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;//線程本地存儲中的連接也可以被竊取,//所以需要用CAS方法防止重復分配if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}}// 線程本地存儲中無空閑連接,則從共享隊列中獲取final int waiting = waiters.incrementAndGet();try {for (T bagEntry : sharedList) {//如果共享隊列中有空閑連接,則返回if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}}//共享隊列中沒有連接,則需要等待timeout = timeUnit.toNanos(timeout);do {final long start = currentTime();final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}//重新計算等待時間timeout -= elapsedNanos(start);} while (timeout > 10_000);//超時沒有獲取到連接,返回nullreturn null;} finally {waiters.decrementAndGet();} }釋放連接需要調用 ConcurrentBag 提供的 requite() 方法,該方法的邏輯很簡單,首先將數據庫連接狀態更改為 STATE_NOT_IN_USE,之后查看是否存在等待線程,如果有,則分配給等待線程;如果沒有,則將該數據庫連接保存到線程本地存儲里。
//釋放連接 void requite(final T bagEntry){//更新連接狀態bagEntry.setState(STATE_NOT_IN_USE);//如果有等待的線程,則直接分配給線程,無需進入任何隊列for (int i = 0; waiters.get() > 0; i++) {if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {return;} else if ((i & 0xff) == 0xff) {parkNanos(MICROSECONDS.toNanos(10));} else {yield();}}//如果沒有等待的線程,則進入線程本地存儲final List<Object> threadLocalList = threadList.get();if (threadLocalList.size() < 50) {threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);} }HiKariCP 中的 FastList 和 ConcurrentBag 這兩個數據結構使用得非常巧妙,雖然實現起來并不復雜,但是對于性能的提升非常明顯,根本原因在于這兩個數據結構適用于數據庫連接池這個特定的場景。FastList 適用于逆序刪除場景;而 ConcurrentBag 通過 ThreadLocal 做一次預分配,避免直接競爭共享資源,非常適合池化資源的分配。
在實際工作中,我們遇到的并發問題千差萬別,這時選擇合適的并發數據結構就非常重要了。當然能選對的前提是對特定場景的并發特性有深入的了解,只有了解到無謂的性能消耗在哪里,才能對癥下藥。
17、Actor模型:面向對象原生的并發模型
上學的時候,有門計算機專業課叫做面向對象編程,學這門課的時候有個問題困擾了我很久,按照面向對象編程的理論,對象之間通信需要依靠消息,而實際上,像 C++、Java 這些面向對象的語言,對象之間通信,依靠的是對象方法。對象方法和過程語言里的函數本質上沒有區別,有入參、有出參,思維方式很相似,使用起來都很簡單。那面向對象理論里的消息是否就等價于面向對象語言里的對象方法呢?很長一段時間里,我都以為對象方法是面向對象理論中消息的一種實現,直到接觸到 Actor 模型,才明白消息壓根不是這個實現法。
(1)Hello Actor 模型
Actor 模型本質上是一種計算模型,基本的計算單元稱為 Actor,換言之,在 Actor 模型中,所有的計算都是在 Actor 中執行的。在面向對象編程里面,一切都是對象;在 Actor 模型里,一切都是 Actor,并且 Actor 之間是完全隔離的,不會共享任何變量。
當看到“不共享任何變量”的時候,相信你一定會眼前一亮,并發問題的根源就在于共享變量,而 Actor 模型中 Actor 之間不共享變量,那用 Actor 模型解決并發問題,一定是相當順手。的確是這樣,所以很多人就把 Actor 模型定義為一種并發計算模型。其實 Actor 模型早在 1973 年就被提出來了,只是直到最近幾年才被廣泛關注,一個主要原因就在于它是解決并發問題的利器,而最近幾年隨著多核處理器的發展,并發問題被推到了風口浪尖上。
但是 Java 語言本身并不支持 Actor 模型,所以如果你想在 Java 語言里使用 Actor 模型,就需要借助第三方類庫,目前能完備地支持 Actor 模型而且比較成熟的類庫就是 Akka 了。在詳細介紹 Actor 模型之前,我們就先基于 Akka 寫一個 Hello World 程序,讓你對 Actor 模型先有個感官的印象。
在下面的示例代碼中,我們首先創建了一個 ActorSystem(Actor 不能脫離 ActorSystem 存在);之后創建了一個 HelloActor,Akka 中創建 Actor 并不是 new 一個對象出來,而是通過調用 system.actorOf() 方法創建的,該方法返回的是 ActorRef,而不是 HelloActor;最后通過調用 ActorRef 的 tell() 方法給 HelloActor 發送了一條消息 “Actor” 。
//該Actor當收到消息message后, //會打印Hello message static class HelloActor extends UntypedActor {@Overridepublic void onReceive(Object message) {System.out.println("Hello " + message);} }public static void main(String[] args) {//創建Actor系統ActorSystem system = ActorSystem.create("HelloSystem");//創建HelloActorActorRef helloActor = system.actorOf(Props.create(HelloActor.class));//發送消息給HelloActorhelloActor.tell("Actor", ActorRef.noSender()); }通過這個例子,你會發現 Actor 模型和面向對象編程契合度非常高,完全可以用 Actor 類比面向對象編程里面的對象,而且 Actor 之間的通信方式完美地遵守了消息機制,而不是通過對象方法來實現對象之間的通信。那 Actor 中的消息機制和面向對象語言里的對象方法有什么區別呢?
(2)消息和對象方法的區別
在沒有計算機的時代,異地的朋友往往是通過寫信來交流感情的,但信件發出去之后,也許會在寄送過程中弄丟了,也有可能寄到后,對方一直沒有時間寫回信……這個時候都可以讓郵局“背個鍋”,不過無論如何,也不過是重寫一封,生活繼續。
Actor 中的消息機制,就可以類比這現實世界里的寫信。Actor 內部有一個郵箱(Mailbox),接收到的消息都是先放到郵箱里,如果郵箱里有積壓的消息,那么新收到的消息就不會馬上得到處理,也正是因為 Actor 使用單線程處理消息,所以不會出現并發問題。你可以把 Actor 內部的工作模式想象成只有一個消費者線程的生產者 - 消費者模式。
所以,在 Actor 模型里,發送消息僅僅是把消息發出去而已,接收消息的 Actor 在接收到消息后,也不一定會立即處理,也就是說 Actor 中的消息機制完全是異步的。而調用對象方法,實際上是同步的,對象方法 return 之前,調用方會一直等待。
除此之外,調用對象方法,需要持有對象的引用,所有的對象必須在同一個進程中。而在 Actor 中發送消息,類似于現實中的寫信,只需要知道對方的地址就可以,發送消息和接收消息的 Actor 可以不在一個進程中,也可以不在同一臺機器上。因此,Actor 模型不但適用于并發計算,還適用于分布式計算。
(3)Actor 的規范化定義
通過上面的介紹,相信你應該已經對 Actor 有一個感官印象了,下面我們再來看看 Actor 規范化的定義是什么樣的。Actor 是一種基礎的計算單元,具體來講包括三部分能力,分別是:
① 處理能力,處理接收到的消息。
② 存儲能力,Actor 可以存儲自己的內部狀態,并且內部狀態在不同 Actor 之間是絕對隔離的。
③ 通信能力,Actor 可以和其他 Actor 之間通信。
當一個 Actor 接收的一條消息之后,這個 Actor 可以做以下三件事:
① 創建更多的 Actor;
② 發消息給其他 Actor;
③ 確定如何處理下一條消息。
其中前兩條還是很好理解的,就是最后一條,該如何去理解呢?前面我們說過 Actor 具備存儲能力,它有自己的內部狀態,所以你也可以把 Actor 看作一個狀態機,把 Actor 處理消息看作是觸發狀態機的狀態變化;而狀態機的變化往往要基于上一個狀態,觸發狀態機發生變化的時刻,上一個狀態必須是確定的,所以確定如何處理下一條消息,本質上不過是改變內部狀態。
在多線程里面,由于可能存在競態條件,所以根據當前狀態確定如何處理下一條消息還是有難度的,需要使用各種同步工具,但在 Actor 模型里,由于是單線程處理,所以就不存在競態條件問題了。
(4)用 Actor 實現累加器
支持并發的累加器可能是最簡單并且有代表性的并發問題了,可以基于互斥鎖方案實現,也可以基于原子類實現,但今天我們要嘗試用 Actor 來實現。
在下面的示例代碼中,CounterActor 內部持有累計值 counter,當 CounterActor 接收到一個數值型的消息 message 時,就將累計值 counter += message;但如果是其他類型的消息,則打印當前累計值 counter。在 main() 方法中,我們啟動了 4 個線程來執行累加操作。整個程序沒有鎖,也沒有 CAS,但是程序是線程安全的。
//累加器 static class CounterActor extends UntypedActor {private int counter = 0;@Overridepublic void onReceive(Object message){//如果接收到的消息是數字類型,執行累加操作,//否則打印counter的值if (message instanceof Number) {counter += ((Number) message).intValue();} else {System.out.println(counter);}} } public static void main(String[] args) throws InterruptedException {//創建Actor系統ActorSystem system = ActorSystem.create("HelloSystem");//4個線程生產消息ExecutorService es = Executors.newFixedThreadPool(4);//創建CounterActor ActorRef counterActor = system.actorOf(Props.create(CounterActor.class));//生產4*100000個消息 for (int i=0; i<4; i++) {es.execute(()->{for (int j=0; j<100000; j++) {counterActor.tell(1, ActorRef.noSender());}});}//關閉線程池es.shutdown();//等待CounterActor處理完所有消息Thread.sleep(1000);//打印結果counterActor.tell("", ActorRef.noSender());//關閉Actor系統system.shutdown(); }Actor 模型是一種非常簡單的計算模型,其中 Actor 是最基本的計算單元,Actor 之間是通過消息進行通信。Actor 與面向對象編程(OOP)中的對象匹配度非常高,在面向對象編程里,系統由類似于生物細胞那樣的對象構成,對象之間也是通過消息進行通信,所以在面向對象語言里使用 Actor 模型基本上不會有違和感。
在 Java 領域,除了可以使用 Akka 來支持 Actor 模型外,還可以使用 Vert.x,不過相對來說 Vert.x 更像是 Actor 模型的隱式實現,對應關系不像 Akka 那樣明顯,不過本質上也是一種 Actor 模型。
Actor 可以創建新的 Actor,這些 Actor 最終會呈現出一個樹狀結構,非常像現實世界里的組織結構,所以利用 Actor 模型來對程序進行建模,和現實世界的匹配度非常高。Actor 模型和現實世界一樣都是異步模型,理論上不保證消息百分百送達,也不保證消息送達的順序和發送的順序是一致的,甚至無法保證消息會被百分百處理。雖然實現 Actor 模型的廠商都在試圖解決這些問題,但遺憾的是解決得并不完美,所以使用 Actor 模型也是有成本的。
18、軟件事務內存:借鑒數據庫的并發經驗
很多同學反饋說,工作了挺長時間但是沒有機會接觸并發編程,實際上我們天天都在寫并發程序,只不過并發相關的問題都被類似 Tomcat 這樣的 Web 服務器以及 MySQL 這樣的數據庫解決了。尤其是數據庫,在解決并發問題方面,可謂成績斐然,它的事務機制非常簡單易用,能甩 Java 里面的鎖、原子類十條街。技術無邊界,很顯然要借鑒一下。
其實很多編程語言都有從數據庫的事務管理中獲得靈感,并且總結出了一個新的并發解決方案:軟件事務內存(Software Transactional Memory,簡稱 STM)。傳統的數據庫事務,支持 4 個特性:原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)和持久性(Durability),也就是大家常說的 ACID,STM 由于不涉及到持久化,所以只支持 ACI。
STM 的使用很簡單,下面我們以經典的轉賬操作為例,看看用 STM 該如何實現。
(1)用 STM 實現轉賬
并發轉賬的例子,示例代碼如下。簡單地使用 synchronized 將 transfer() 方法變成同步方法并不能解決并發問題,因為還存在死鎖問題。
class UnsafeAccount {//余額private long balance;//構造函數public UnsafeAccount(long balance) {this.balance = balance;}//轉賬void transfer(UnsafeAccount target, long amt){if (this.balance > amt) {this.balance -= amt;target.balance += amt;}} }該轉賬操作若使用數據庫事務就會非常簡單,如下面的示例代碼所示。如果所有 SQL 都正常執行,則通過 commit() 方法提交事務;如果 SQL 在執行過程中有異常,則通過 rollback() 方法回滾事務。數據庫保證在并發情況下不會有死鎖,而且還能保證前面我們說的原子性、一致性、隔離性和持久性,也就是 ACID。
Connection conn = null; try{//獲取數據庫連接conn = DriverManager.getConnection();//設置手動提交事務conn.setAutoCommit(false);//執行轉賬SQL......//提交事務conn.commit(); } catch (Exception e) {//出現異常回滾事務conn.rollback(); }那如果用 STM 又該如何實現呢?Java 語言并不支持 STM,不過可以借助第三方的類庫來支持,Multiverse就是個不錯的選擇。下面的示例代碼就是借助 Multiverse 實現了線程安全的轉賬操作,相比較上面線程不安全的 UnsafeAccount,其改動并不大,僅僅是將余額的類型從 long 變成了 TxnLong ,將轉賬的操作放到了 atomic(()->{}) 中。
class Account{//余額private TxnLong balance;//構造函數public Account(long balance){this.balance = StmUtils.newTxnLong(balance);}//轉賬public void transfer(Account to, int amt){//原子化操作atomic(()->{if (this.balance.get() > amt) {this.balance.decrement(amt);to.balance.increment(amt);}});} }一個關鍵的 atomic() 方法就把并發問題解決了,這個方案看上去比傳統的方案的確簡單了很多,那它是如何實現的呢?數據庫事務發展了幾十年了,目前被廣泛使用的是 MVCC(全稱是 Multi-Version Concurrency Control),也就是多版本并發控制。
MVCC 可以簡單地理解為數據庫事務在開啟的時候,會給數據庫打一個快照,以后所有的讀寫都是基于這個快照的。當提交事務的時候,如果所有讀寫過的數據在該事務執行期間沒有發生過變化,那么就可以提交;如果發生了變化,說明該事務和有其他事務讀寫的數據沖突了,這個時候是不可以提交的。
為了記錄數據是否發生了變化,可以給每條數據增加一個版本號,這樣每次成功修改數據都會增加版本號的值。MVCC 的工作原理和樂觀鎖非常相似。有不少 STM 的實現方案都是基于 MVCC 的,例如知名的 Clojure STM。
下面我們就用最簡單的代碼基于 MVCC 實現一個簡版的 STM,這樣你會對 STM 以及 MVCC 的工作原理有更深入的認識。
(1)自己實現 STM
我們首先要做的,就是讓 Java 中的對象有版本號,在下面的示例代碼中,VersionedRef 這個類的作用就是將對象 value 包裝成帶版本號的對象。按照 MVCC 理論,數據的每一次修改都對應著一個唯一的版本號,所以不存在僅僅改變 value 或者 version 的情況,用不變性模式就可以很好地解決這個問題,所以 VersionedRef 這個類被我們設計成了不可變的。
所有對數據的讀寫操作,一定是在一個事務里面,TxnRef 這個類負責完成事務內的讀寫操作,讀寫操作委托給了接口 Txn,Txn 代表的是讀寫操作所在的當前事務, 內部持有的 curRef 代表的是系統中的最新值。
//帶版本號的對象引用 public final class VersionedRef<T> {final T value;final long version;//構造方法public VersionedRef(T value, long version) {this.value = value;this.version = version;} } //支持事務的引用 public class TxnRef<T> {//當前數據,帶版本號volatile VersionedRef curRef;//構造方法public TxnRef(T value) {this.curRef = new VersionedRef(value, 0L);}//獲取當前事務中的數據public T getValue(Txn txn) {return txn.get(this);}//在當前事務中設置數據public void setValue(T value, Txn txn) {txn.set(this, value);} }STMTxn 是 Txn 最關鍵的一個實現類,事務內對于數據的讀寫,都是通過它來完成的。STMTxn 內部有兩個 Map:inTxnMap,用于保存當前事務中所有讀寫的數據的快照;writeMap,用于保存當前事務需要寫入的數據。每個事務都有一個唯一的事務 ID txnId,這個 txnId 是全局遞增的。
STMTxn 有三個核心方法,分別是讀數據的 get() 方法、寫數據的 set() 方法和提交事務的 commit() 方法。其中,get() 方法將要讀取數據作為快照放入 inTxnMap,同時保證每次讀取的數據都是一個版本。set() 方法會將要寫入的數據放入 writeMap,但如果寫入的數據沒被讀取過,也會將其放入 inTxnMap。
至于 commit() 方法,我們為了簡化實現,使用了互斥鎖,所以事務的提交是串行的。commit() 方法的實現很簡單,首先檢查 inTxnMap 中的數據是否發生過變化,如果沒有發生變化,那么就將 writeMap 中的數據寫入(這里的寫入其實就是 TxnRef 內部持有的 curRef);如果發生過變化,那么就不能將 writeMap 中的數據寫入了。
//事務接口 public interface Txn {<T> T get(TxnRef<T> ref);<T> void set(TxnRef<T> ref, T value); } //STM事務實現類 public final class STMTxn implements Txn {//事務ID生成器private static AtomicLong txnSeq = new AtomicLong(0);//當前事務所有的相關數據private Map<TxnRef, VersionedRef> inTxnMap = new HashMap<>();//當前事務所有需要修改的數據private Map<TxnRef, Object> writeMap = new HashMap<>();//當前事務IDprivate long txnId;//構造函數,自動生成當前事務IDSTMTxn() {txnId = txnSeq.incrementAndGet();}//獲取當前事務中的數據@Overridepublic <T> T get(TxnRef<T> ref) {//將需要讀取的數據,加入inTxnMapif (!inTxnMap.containsKey(ref)) {inTxnMap.put(ref, ref.curRef);}return (T) inTxnMap.get(ref).value;}//在當前事務中修改數據@Overridepublic <T> void set(TxnRef<T> ref, T value) {//將需要修改的數據,加入inTxnMapif (!inTxnMap.containsKey(ref)) {inTxnMap.put(ref, ref.curRef);}writeMap.put(ref, value);}//提交事務boolean commit() {synchronized (STM.commitLock) {//是否校驗通過boolean isValid = true;//校驗所有讀過的數據是否發生過變化for(Map.Entry<TxnRef, VersionedRef> entry : inTxnMap.entrySet()){VersionedRef curRef = entry.getKey().curRef;VersionedRef readRef = entry.getValue();//通過版本號來驗證數據是否發生過變化if (curRef.version != readRef.version) {isValid = false;break;}}//如果校驗通過,則所有更改生效if (isValid) {writeMap.forEach((k, v) -> {k.curRef = new VersionedRef(v, txnId);});}return isValid;} }下面我們來模擬實現 Multiverse 中的原子化操作 atomic()。atomic() 方法中使用了類似于 CAS 的操作,如果事務提交失敗,那么就重新創建一個新的事務,重新執行。
@FunctionalInterface public interface TxnRunnable {void run(Txn txn); } //STM public final class STM {//私有化構造方法private STM() {//提交數據需要用到的全局鎖 static final Object commitLock = new Object();//原子化提交方法public static void atomic(TxnRunnable action) {boolean committed = false;//如果沒有提交成功,則一直重試while (!committed) {//創建新的事務STMTxn txn = new STMTxn();//執行業務邏輯action.run(txn);//提交事務committed = txn.commit();}} }}就這樣,我們自己實現了 STM,并完成了線程安全的轉賬操作,使用方法和 Multiverse 差不多,這里就不贅述了,具體代碼如下面所示。
class Account {//余額private TxnRef<Integer> balance;//構造方法public Account(int balance) {this.balance = new TxnRef<Integer>(balance);}//轉賬操作public void transfer(Account target, int amt){STM.atomic((txn)->{Integer from = balance.getValue(txn);balance.setValue(from-amt, txn);Integer to = target.balance.getValue(txn);target.balance.setValue(to+amt, txn);});} }STM 借鑒的是數據庫的經驗,數據庫雖然復雜,但僅僅存儲數據,而編程語言除了有共享變量之外,還會執行各種 I/O 操作,很顯然 I/O 操作是很難支持回滾的。所以,STM 也不是萬能的。目前支持 STM 的編程語言主要是函數式語言,函數式語言里的數據天生具備不可變性,利用這種不可變性實現 STM 相對來說更簡單。
19、協程:更輕量級的線程
Java 語言里解決并發問題靠的是多線程,但線程是個重量級的對象,不能頻繁創建、銷毀,而且線程切換的成本也很高,為了解決這些問題,Java SDK 提供了線程池。然而用好線程池并不容易,Java 圍繞線程池提供了很多工具類,這些工具類學起來也不容易。那有沒有更好的解決方案呢?Java 語言里目前還沒有,但是其他語言里有,這個方案就是協程(Coroutine)。
我們可以把協程簡單地理解為一種輕量級的線程。從操作系統的角度來看,線程是在內核態中調度的,而協程是在用戶態調度的,所以相對于線程來說,協程切換的成本更低。協程雖然也有自己的棧,但是相比線程棧要小得多,典型的線程棧大小差不多有 1M,而協程棧的大小往往只有幾 K 或者幾十 K。所以,無論是從時間維度還是空間維度來看,協程都比線程輕量得多。
支持協程的語言還是挺多的,例如 Golang、Python、Lua、Kotlin 等都支持協程。下面我們就以 Golang 為代表,看看協程是如何在 Golang 中使用的。
(1)Golang 中的協程
在 Golang 中創建協程非常簡單,在下面的示例代碼中,要讓 hello() 方法在一個新的協程中執行,只需要go hello("World") 這一行代碼就搞定了。你可以對比著想想在 Java 里是如何“辛勤”地創建線程和線程池的吧,我的感覺一直都是:每次寫完 Golang 的代碼,就再也不想寫 Java 代碼了。
import ("fmt""time" ) func hello(msg string) {fmt.Println("Hello " + msg) } func main() {//在新的協程中執行hello方法go hello("World")fmt.Println("Run in main")//等待100毫秒讓協程執行結束time.Sleep(100 * time.Millisecond) }利用協程能夠很好地實現 Thread-Per-Message 模式。Thread-Per-Message 模式非常簡單,其實越是簡單的模式,功能上就越穩定,可理解性也越好。
下面的示例代碼是用 Golang 實現的 echo 程序的服務端,用的是 Thread-Per-Message 模式,為每個成功建立連接的 socket 分配一個協程,相比 Java 線程池的實現方案,Golang 中協程的方案更簡單。
import ("log""net" )func main() {//監聽本地9090端口socket, err := net.Listen("tcp", "127.0.0.1:9090")if err != nil {log.Panicln(err)}defer socket.Close()for {//處理連接請求 conn, err := socket.Accept()if err != nil {log.Panicln(err)}//處理已經成功建立連接的請求go handleRequest(conn)} } //處理已經成功建立連接的請求 func handleRequest(conn net.Conn) {defer conn.Close()for {buf := make([]byte, 1024)//讀取請求數據size, err := conn.Read(buf)if err != nil {return}//回寫相應數據 conn.Write(buf[:size])} }(2)利用協程實現同步
其實協程并不僅限于實現 Thread-Per-Message 模式,它還可以將異步模式轉換為同步模式。異步編程雖然近幾年取得了長足發展,但是異步的思維模式對于普通人來講畢竟是有難度的,只有線性的思維模式才是適合所有人的。而線性的思維模式反映到編程世界,就是同步。
在 Java 里使用多線程并發地處理 I/O,基本上用的都是異步非阻塞模型,這種模型的異步主要是靠注冊回調函數實現的,那能否都使用同步處理呢?顯然是不能的。因為同步意味著等待,而線程等待,本質上就是一種嚴重的浪費。不過對于協程來說,等待的成本就沒有那么高了,所以基于協程實現同步非阻塞是一個可行的方案。
OpenResty 里實現的 cosocket 就是一種同步非阻塞方案,借助 cosocket 我們可以用線性的思維模式來編寫非阻塞的程序。下面的示例代碼是用 cosocket 實現的 socket 程序的客戶端,建立連接、發送請求、讀取響應所有的操作都是同步的,由于 cosocket 本身是非阻塞的,所以這些操作雖然是同步的,但是并不會阻塞。
-- 創建socket local sock = ngx.socket.tcp() -- 設置socket超時時間 sock:settimeouts(connect_timeout, send_timeout, read_timeout) -- 連接到目標地址 local ok, err = sock:connect(host, port) if not ok then - -- 省略異常處理 end -- 發送請求 local bytes, err = sock:send(request_data) if not bytes then-- 省略異常處理 end -- 讀取響應 local line, err = sock:receive() if err then-- 省略異常處理 end -- 關閉socket sock:close() -- 處理讀取到的數據line handle(line)(3)結構化并發編程
Golang 中的 go 語句讓協程用起來太簡單了,但是這種簡單也蘊藏著風險。要深入了解這個風險是什么,就需要先了解一下 goto 語句的前世今生。
在我上學的時候,各種各樣的編程語言書籍中都會談到不建議使用 goto 語句,原因是 goto 語句會讓程序變得混亂,當時對于這個問題我也沒有多想,不建議用那就不用了。那為什么 goto 語句會讓程序變得混亂呢?混亂具體指的又是什么呢?多年之后,我才了解到所謂的混亂指的是代碼的書寫順序和執行順序不一致。代碼的書寫順序,代表的是我們的思維過程,如果思維的過程與代碼執行的順序不一致,那就會干擾我們對代碼的理解。我們的思維是線性的,傻傻地一條道兒跑到黑,而 goto 語句太靈活,隨時可以穿越時空,實在是太“混亂”了。
首先發現 goto 語句是“毒藥”的人是著名的計算機科學家艾茲格·迪科斯徹(Edsger Dijkstra),同時他還提出了結構化程序設計。在結構化程序設計中,可以使用三種基本控制結構來代替 goto,這三種基本的控制結構就是今天我們廣泛使用的順序結構、選擇結構和循環結構。
這三種基本的控制結構奠定了今天高級語言的基礎,如果仔細觀察這三種結構,你會發現它們的入口和出口只有一個,這意味它們是可組合的,而且組合起來一定是線性的,整體來看,代碼的書寫順序和執行順序也是一致的。
我們以前寫的并發程序,是否違背了結構化程序設計呢?這個問題以前并沒有被關注,但是最近兩年,隨著并發編程的快速發展,已經開始有人關注了,而且劍指 Golang 中的 go 語句,指其為“毒藥”,類比的是 goto 語句。詳情可以參考相關的文章。
Golang 中的 go 語句不過是快速創建協程的方法而已,這篇文章本質上并不僅僅在批判 Golang 中的 go 語句,而是在批判開啟新的線程(或者協程)異步執行這種粗糙的做法,違背了結構化程序設計,Java 語言其實也在其列。
當開啟一個新的線程時,程序會并行地出現兩個分支,主線程一個分支,子線程一個分支,這兩個分支很多情況下都是天各一方、永不相見。而結構化的程序,可以有分支,但是最終一定要匯聚,不能有多個出口,因為只有這樣它們組合起來才是線性的。
最近幾年支持協程的開發語言越來越多了,Java OpenSDK 中 Loom 項目的目標就是支持協程,相信不久的將來,Java 程序員也可以使用協程來解決并發問題了。
計算機里很多面向開發人員的技術,大多數都是在解決一個問題:易用性。協程作為一項并發編程技術,本質上也不過是解決并發工具的易用性問題而已。對于易用性,我覺得最重要的就是要適應我們的思維模式,在工作的前幾年,我并沒有怎么關注它,但是最近幾年思維模式已成為我重點關注的對象。因為思維模式對工作的很多方面都會產生影響,例如質量。
一個軟件產品是否能夠活下去,從質量的角度看,最核心的就是代碼寫得好。那什么樣的代碼是好代碼呢?我覺得,最根本的是可讀性好。可讀性好的代碼,意味著大家都可以上手,而且上手后不會大動干戈。那如何讓代碼的可讀性好呢?很簡單,換位思考,用大眾、普通的思維模式去寫代碼,而不是炫耀自己的各種設計能力。我覺得好的代碼,就像人民的藝術一樣,應該是為人民群眾服務的,只有根植于廣大群眾之中,才有生命力。
20、CSP模型:Golang的主力隊員
Golang 是一門號稱從語言層面支持并發的編程語言,支持并發是 Golang 一個非常重要的特性,Golang 支持協程,協程可以類比 Java 中的線程,解決并發問題的難點就在于線程(協程)之間的協作。
那 Golang 是如何解決協作問題的呢?
總的來說,Golang 提供了兩種不同的方案:一種方案支持協程之間以共享內存的方式通信,Golang 提供了管程和原子類來對協程進行同步控制,這個方案與 Java 語言類似;另一種方案支持協程之間以消息傳遞(Message-Passing)的方式通信,本質上是要避免共享,Golang 的這個方案是基于 CSP(Communicating Sequential Processes)模型實現的。Golang 比較推薦的方案是后者。
(1)什么是 CSP 模型
Actor 模型中 Actor 之間就是不能共享內存的,彼此之間通信只能依靠消息傳遞的方式。Golang 實現的 CSP 模型和 Actor 模型看上去非常相似,Golang 程序員中有句格言:“不要以共享內存方式通信,要以通信方式共享內存(Don’t communicate by sharing memory, share memory by communicating)。”雖然 Golang 中協程之間,也能夠以共享內存的方式通信,但是并不推薦;而推薦的以通信的方式共享內存,實際上指的就是協程之間以消息傳遞方式來通信。
下面我們先結合一個簡單的示例,看看 Golang 中協程之間是如何以消息傳遞的方式實現通信的。我們示例的目標是打印從 1 累加到 100 億的結果,如果使用單個協程來計算,大概需要 4 秒多的時間。單個協程,只能用到 CPU 中的一個核,為了提高計算性能,我們可以用多個協程來并行計算,這樣就能發揮多核的優勢了。
在下面的示例代碼中,我們用了 4 個子協程來并行執行,這 4 個子協程分別計算[1, 25 億]、(25 億, 50 億]、(50 億, 75 億]、(75 億, 100 億],最后再在主協程中匯總 4 個子協程的計算結果。主協程要匯總 4 個子協程的計算結果,勢必要和 4 個子協程之間通信,Golang 中協程之間通信推薦的是使用 channel,channel 你可以形象地理解為現實世界里的管道。另外,calc() 方法的返回值是一個只能接收數據的 channel ch,它創建的子協程會把計算結果發送到這個 ch 中,而主協程也會將這個計算結果通過 ch 讀取出來。
import ("fmt""time" )func main() {// 變量聲明var result, i uint64// 單個協程執行累加操作start := time.Now()for i = 1; i <= 10000000000; i++ {result += i}// 統計計算耗時elapsed := time.Since(start)fmt.Printf("執行消耗的時間為:", elapsed)fmt.Println(", result:", result)// 4個協程共同執行累加操作start = time.Now()ch1 := calc(1, 2500000000)ch2 := calc(2500000001, 5000000000)ch3 := calc(5000000001, 7500000000)ch4 := calc(7500000001, 10000000000)// 匯總4個協程的累加結果result = <-ch1 + <-ch2 + <-ch3 + <-ch4// 統計計算耗時elapsed = time.Since(start)fmt.Printf("執行消耗的時間為:", elapsed)fmt.Println(", result:", result) } // 在協程中異步執行累加操作,累加結果通過channel傳遞 func calc(from uint64, to uint64) <-chan uint64 {// channel用于協程間的通信ch := make(chan uint64)// 在協程中執行累加操作go func() {result := fromfor i := from + 1; i <= to; i++ {result += i}// 將結果寫入channelch <- result}()// 返回結果是用于通信的channelreturn ch }(2)CSP 模型與生產者 - 消費者模式
你可以簡單地把 Golang 實現的 CSP 模型類比為生產者 - 消費者模式,而 channel 可以類比為生產者 - 消費者模式中的阻塞隊列。不過,需要注意的是 Golang 中 channel 的容量可以是 0,容量為 0 的 channel 在 Golang 中被稱為無緩沖的 channel,容量大于 0 的則被稱為有緩沖的 channel。
無緩沖的 channel 類似于 Java 中提供的 SynchronousQueue,主要用途是在兩個協程之間做數據交換。比如上面累加器的示例代碼中,calc() 方法內部創建的 channel 就是無緩沖的 channel。
而創建一個有緩沖的 channel 也很簡單,在下面的示例代碼中,我們創建了一個容量為 4 的 channel,同時創建了 4 個協程作為生產者、4 個協程作為消費者。
// 創建一個容量為4的channel ch := make(chan int, 4) // 創建4個協程,作為生產者 for i := 0; i < 4; i++ {go func() {ch <- 7}() } // 創建4個協程,作為消費者 for i := 0; i < 4; i++ {go func() {o := <-chfmt.Println("received:", o)}() }Golang 中的 channel 是語言層面支持的,所以可以使用一個左向箭頭(<-)來完成向 channel 發送數據和讀取數據的任務,使用上還是比較簡單的。Golang 中的 channel 是支持雙向傳輸的,所謂雙向傳輸,指的是一個協程既可以通過它發送數據,也可以通過它接收數據。
不僅如此,Golang 中還可以將一個雙向的 channel 變成一個單向的 channel,在累加器的例子中,calc() 方法中創建了一個雙向 channel,但是返回的就是一個只能接收數據的單向 channel,所以主協程中只能通過它接收數據,而不能通過它發送數據,如果試圖通過它發送數據,編譯器會提示錯誤。對比之下,雙向變單向的功能,如果以 SDK 方式實現,還是很困難的。
(3)CSP 模型與 Actor 模型的區別
同樣是以消息傳遞的方式來避免共享,那 Golang 實現的 CSP 模型和 Actor 模型有什么區別呢?
第一個最明顯的區別就是:Actor 模型中沒有 channel。雖然 Actor 模型中的 mailbox 和 channel 非常像,看上去都像個 FIFO 隊列,但是區別還是很大的。Actor 模型中的 mailbox 對于程序員來說是“透明”的,mailbox 明確歸屬于一個特定的 Actor,是 Actor 模型中的內部機制;而且 Actor 之間是可以直接通信的,不需要通信中介。但 CSP 模型中的 channel 就不一樣了,它對于程序員來說是“可見”的,是通信的中介,傳遞的消息都是直接發送到 channel 中的。
第二個區別是:Actor 模型中發送消息是非阻塞的,而 CSP 模型中是阻塞的。Golang 實現的 CSP 模型,channel 是一個阻塞隊列,當阻塞隊列已滿的時候,向 channel 中發送數據,會導致發送消息的協程阻塞。
第三個區別則是關于消息送達的。 Actor 模型理論上不保證消息百分百送達,而在 Golang 實現的 CSP 模型中,是能保證消息百分百送達的。不過這種百分百送達也是有代價的,那就是有可能會導致死鎖。
比如,下面這段代碼就存在死鎖問題,在主協程中,我們創建了一個無緩沖的 channel ch,然后從 ch 中接收數據,此時主協程阻塞,main() 方法中的主協程阻塞,整個應用就阻塞了。這就是 Golang 中最簡單的一種死鎖。
func main() {// 創建一個無緩沖的channel ch := make(chan int)// 主協程會阻塞在此處,發生死鎖<- ch }Golang 中雖然也支持傳統的共享內存的協程間通信方式,但是推薦的還是使用 CSP 模型,以通信的方式共享內存。
Golang 中實現的 CSP 模型功能上還是很豐富的,例如支持 select 語句,select 語句類似于網絡編程里的多路復用函數 select(),只要有一個 channel 能夠發送成功或者接收到數據就可以跳出阻塞狀態。鑒于篇幅原因,我就點到這里,不詳細介紹那么多了。
CSP 模型是托尼·霍爾(Tony Hoare)在 1978 年提出的,不過這個模型這些年一直都在發展,其理論遠比 Golang 的實現復雜得多,如果你感興趣,可以參考霍爾寫的Communicating Sequential Processes這本電子書。另外,霍爾在并發領域還有一項重要成就,那就是提出了霍爾管程模型,這個你應該很熟悉了,Java 領域解決并發問題的理論基礎就是它。
Java 領域可以借助第三方的類庫JCSP來支持 CSP 模型,相比 Golang 的實現,JCSP 更接近理論模型,如果你感興趣,可以下載學習。不過需要注意的是,JCSP 并沒有經過廣泛的生產環境檢驗,所以并不建議你在生產環境中使用。
總結
- 上一篇: 【JZOJ 4623】搬运干草捆
- 下一篇: Ajax.NET Professiona