ConcurrentHashMap--自用,非教学
結論先行,細節在下面
jdk1.7是如何解決并發問題的以及完整流程
一.首先new一個concurrentHashMap
調用默認構造方法
二.初始化
初始化initialCapacity(默認是16,指一個segment內Entry的數量),loadFactor(默 認0.75f,負載因子),初始化concurrentLevel(默認是16,segment數量)。
1.校驗傳入的參數是否符合規定
2.計算concurrentLevel、segementMask(掩碼)和segementShift(移位數)
3.計算每個segment中的Entry數組大小,默認且最小為2
4.此時你得到了一個segment對象,調用UNSAFE.putOrderedObject方法,利用CAS將 此segment對象放在segment數組下標為0的位置,其余15個位置為null
三.初始化完開始使用。先put一個鍵值對進去
1.判斷value是否為空,為空直接報錯
2.計算hash值。int j = (hash >>> segmentShift) & segmentMask
先用segementShift將32位的hash右移28位,剩4位,再與segmentMask(二進制碼,具體數值為1111)進行與運算,得到j,此時segment[j]還是null,不像segment[0]已經初始化,那么調用ensureSegment(j)初始化segment[j]
3.上來第一步先 tryLock() ? null : scanAndLockForPut(key, hash, value);
如果tryLock失敗,也就是沒拿到獨占鎖,將調用scanAndLockForPut方法,這個方法大概是循環嘗試tryLock(),嘗試次數到一定后,將調用lock()進行阻塞,直到拿到鎖
4.獲取鎖成功后,hash計算entry下標,int index = (tab.length - 1) & hash
5.遍歷鏈表,有數據就覆蓋,沒數據就頭插
6.判斷是否需要擴容
7.釋放鎖
四.擴容
1.定義threshold = (int)(newCapacity * loadFactor),只要threshold小于map中實際存入的元素大小,就開始擴容;entry數組一次擴容成原來的兩倍
2.用rehash方法,計算新的掩碼segmentMask,然后遍歷原數組,老套路,將原數組位置 i 處的鏈表拆分到 新數組位置 i 和 i+oldCap 兩個位置(原理HashMap那里說過)
3.最后插入新節點
五.get方法
第一次計算hash定位segment,第二次hash定位entry,然后返回。
六.并發問題的解決
注意到,get沒有加鎖,put和remove都加上了獨占鎖,需要考慮的問題就是 get 的時候在同一個 segment 中發生了 put 或 remove 操作,會發生什么
1.對于put
第一個問題是:初始化segment是用CAS將segment對象放入segment數組index為0的位置的;
第二個問題是:put進entry是頭插,如果此時get操作已經遍歷到鏈表中間,無影響。但是還需要保證put之后get要找的到剛被插入的頭節點,這個依賴于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject;
第三個問題是:擴容也有并發。擴容是新創建了數組,然后進行遷移數據,最后面將 newTable 設置給屬性 table,get操作會在舊table上進行,不影響,如果put先行,擴容后行,那么 put 操作的可見性保證就是 table 使用了 volatile 關鍵字。
jdk1.8是如何解決并發問題的以及完整流程
一.首先new一個concurrentHashMap
調用默認構造方法,需要注意的是,1.8摒棄了segment這個概念,引入了紅黑樹這個數據結構,加鎖則采用CAS和synchronized實現
二.構造函數內部操作。
維護一個sizeCtl = (1.5 * initialCapacity + 1) 再向上取最近的2的倍數。比如initialCapacity = 10,則sizeCtl = 16。sizeCtl的使用場景很多。
構造函數只是計算值而已,初始化操作延遲到真正操作數據的時候。
三.put過程分析
1.key或value==null直接拋錯誤。
2.hash = spread(key.hashCode()),得到hash值,定義binCount記錄鏈表長度。
3. if 數組為空,初始化數組(這里才真正初始化數組);如果已經初始化,找出該hash值對應的數組下標,得到第一個節點
else if 該位置尚未有任何節點,利用CAS將新節點放入。put邏輯基本結束。
else if hash == MOVED,說明在擴容,轉而幫助其數據遷移。
else 此時節點存在,也不為空。
在這個 else 下,又有兩個判斷:
如果hash >= 0,說明是鏈表
如果節點f instanceof TreeBin,說明是紅黑樹
對應不同的插入邏輯
4.進行完以上判斷,開始進入判斷是否將鏈表轉化成紅黑樹的階段
if(binCount >= TREEIFY_THRESHOLD) 也就是第三步的第二小步定義的binCount記錄著本鏈表的長度,大于等于8就轉紅黑樹
四.真正對數組的初始化
initTable方法
初始化一個合適大小的數組,然后會設置 sizeCtl。
初始化方法中的并發問題是通過對 sizeCtl 進行一個 CAS 操作來控制的
U.compareAndSwapInt(this, SIZECTL, sc, -1),將sizeCtl改成-1,代表搶到鎖
接下來就是各種賦初值,比如數組長度什么的。
五.鏈表轉紅黑樹
treeifyBin方法
treeifyBin 不一定就會進行紅黑樹轉換,也可能是僅僅做數組擴容
如果數組長度小于 64 的時候,其實也就是 32 或者 16 或者更小的時候,會進行數組擴容,而不是轉化為紅黑樹。
如果需要轉化,那么用synchronized加鎖,將鏈表變成紅黑樹,然后返回頭結點,設置 到數組相應的位置上。
六.擴容機制
tryPresize方法
這個方法的核心在于對 sizeCtl 值的操作,首先將其設置為一個負數,然后執行 transfer(tab, null),再下一個循環將 sizeCtl 加 1,并執行 transfer(tab, nt),之后可能是繼續 sizeCtl 加 1,并執行 transfer(tab, nt)
所以,可能的操作就是執行 1 次 transfer(tab, null) + 多次 transfer(tab, nt),這里怎么結束循環的需要看完 transfer 源碼才清楚
總的來說,肯定是得把老數組的東西拷貝到新數組里面,然后引用指向新數組,這樣就行了,怎么拷貝呢?用transfer方法
原理太復雜,大概意思就是將一個大數組分割成很多個小部分,可以令每個線程負責轉移一部分數據,轉移數據的時候,會鎖頭節點或者根節點,轉移后一個位置,就會在那個位置放置一個特殊的節點,該節點hash值為-1,表示該位置已經轉移
七.get 過程分析
計算hash,利用hash定位。
如果為null,返回null;
如果剛好是需要的,那就返回;
如果hash < 0,說明正擴容,用find方法找;
如果上面都不滿足,說明是鏈表,直接往后遍歷即可。
八.并發問題的解決
1.初始化時:在initTable方法內可以看到,通過CAS判斷當前是否有其他線程在初始化,如果有,那么當前線程會被阻塞,一直CAS自旋等到數組初始化成功。
2.擴容時:將數組分割成若干份,允許多個線程一起擴容,一起轉移數據,每個線程在負責自己那一part的數據轉移時,會對頭結點加鎖。
3.插入時:位置為空時,CAS插入;不為空時,對頭結點加鎖,再插入。
上面是總結,速度過一遍;下面是細節,仔細看一遍
正式緒論
JDK1.7之前的ConcurrentHashMap使用分段鎖機制實現,JDK1.8則使用數組+鏈表+紅黑樹數據結構和CAS原子操作實現ConcurrentHashMap;本文將分別介紹這兩種方式的實現方案及其區別。
請帶著這些問題學習。
為什么HashTable慢
Hashtable之所以效率低下主要是因為其實現使用了synchronized關鍵字對put等操作進行加鎖,而synchronized關鍵字加鎖是對整個對象進行加鎖,也就是說在進行put等修改Hash表的操作時,鎖住了整個Hash表,從而使得其表現的效率低下。
JDK1.7版本
在JDK1.5~1.7版本,Java使用了分段鎖機制實現ConcurrentHashMap. 簡而言之,ConcurrentHashMap在對象中保存了一個Segment數組,即將整個Hash表劃分為多個分段;而每個Segment元素,即每個分段則類似于一個Hashtable;這樣,在執行put操作時首先根據hash算法定位到元素屬于哪個Segment,然后對該Segment加鎖即可。因此,ConcurrentHashMap在多線程并發編程中可是實現多線程put操作。接下來分析JDK1.7版本中ConcurrentHashMap的實現原理。
segment
整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 代表”部分“或”一段“的意思,所以很多地方都會將其描述為分段鎖。注意,行文中,我很多地方用了“槽”來代表一個 segment。
簡單理解就是,ConcurrentHashMap 是一個 Segment 數組,Segment 通過繼承 ReentrantLock 來進行加鎖,所以每次需要加鎖的操作鎖住的是一個 segment,這樣只要保證每個 Segment 是線程安全的,也就實現了全局的線程安全。
concurrentLevel:是一個int數值,命名為并發數,默認是16。也就是說,一個map中有16個segment,于是map支持16個線程并發寫,只要他們分別操作這16個segment。
可以人為在初始化時設置成其他值,一旦指定,不可擴容。
segment的內部
在使用之前先初始化map,調用上圖的方法,initialCapacity是初始容量,loadFactor是負載因子,concurrentLevel是并發數,也是segment的數量。
如果調用無參構造方法,那么我將得到:
segmentMask要等于數組長度減一,比如16 - 1 = 15,二進制碼是1111,可以更好地保證散列的均勻性;
segmentShift是移位數,由于hash是32位的,它設為28的話,可以使hash無符號右移28位,剩下4個高位數,而這四位再和1111(也就是segmentMask)做一次與運算就可以轉換為segment數組的下標,因為4位二進制數可以表示數字0~15,segment數組下標也是從0到15。
這里主要是為了計算出segment的下標,也就是該存到哪個segment下。
之后會進入segment內部獲取鎖,然后正式插入數據。
PUT方法的細節
初始化槽: ensureSegment(int k)方法
ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對于其他槽來說,在插入第一個值的時候進行初始化。
這里需要考慮并發,因為很可能會有多個線程同時進來初始化同一個槽 segment[k],不過只要有一個成功了就可以。
初始化第一個槽的原因
拿segment[0]這個最先被初始化且被操作的當做榜樣,利用[0]去初始化[k]。
總的來說,ensureSegment(int k) 比較簡單,對于并發操作使用 CAS 進行控制。
獲取寫入鎖方法scanAndLockForPut(K key, int hash, V value)
在往某個 segment 中 put 的時候,首先會調用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是說先進行一次 tryLock() 快速獲取該 segment 的獨占鎖,如果失敗,那么進入到 scanAndLockForPut 這個方法來獲取鎖。
這個方法有兩個出口,一個是 tryLock() 成功了,循環終止,另一個就是重試次數超過了 MAX_SCAN_RETRIES,進到 lock() 方法,此方法會阻塞等待,直到成功拿到獨占鎖。 這個方法就是看似復雜,但是其實就是做了一件事,那就是獲取該 segment 的獨占鎖,如果需要的話順便實例化了一下 node。
擴容:rehash
擴容是 segment 數組某個位置內部的數組 HashEntry<K,V>[] 進行擴容,擴容后,容量為原來的 2 倍。
注意到,在put方法里,會判斷該值插入后是否會導致超出閾值,超了就先擴容再插。
get方法
計算 hash 值,找到 segment 數組中的具體位置,或我們前面用的“槽”
槽中也是一個數組,根據 hash 找到數組中具體的位置
到這里是鏈表了,順著鏈表進行查找即可
并發問題分析
JDK1.8版本
寫在前面
在JDK1.7之前,ConcurrentHashMap是通過分段鎖機制來實現的,所以其最大并發度受Segment的個數限制。因此,在JDK1.8中,ConcurrentHashMap的實現原理摒棄了這種設計,而是選擇了與HashMap類似的數組+鏈表+紅黑樹的方式實現,而加鎖則采用CAS和synchronized實現。
數據結構
構造函數
// 這構造函數里,什么都不干 public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) {if (initialCapacity < 0)throw new IllegalArgumentException();int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));this.sizeCtl = cap; }sizeCtl = 【 (1.5 * initialCapacity + 1),然后向上取最近的 2 的 n 次方】。如 initialCapacity 為 10,那么得到 sizeCtl 為 16,如果 initialCapacity 為 11,得到 sizeCtl 為 32。
PUT方法
著作權歸https://pdai.tech所有。 鏈接:https://www.pdai.tech/md/java/thread/java-thread-x-juc-collection-ConcurrentHashMap.htmlpublic V put(K key, V value) {return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();// 得到 hash 值int hash = spread(key.hashCode());// 用于記錄相應鏈表的長度int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 如果數組"空",進行數組初始化if (tab == null || (n = tab.length) == 0)// 初始化數組,后面會詳細介紹tab = initTable();// 找該 hash 值對應的數組下標,得到第一個節點 felse if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 如果數組該位置為空,// 用一次 CAS 操作將這個新值放入其中即可,這個 put 操作差不多就結束了,可以拉到最后面了// 如果 CAS 失敗,那就是有并發操作,進到下一個循環就好了if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}// hash 居然可以等于 MOVED,這個需要到后面才能看明白,不過從名字上也能猜到,肯定是因為在擴容else if ((fh = f.hash) == MOVED)// 幫助數據遷移,這個等到看完數據遷移部分的介紹后,再理解這個就很簡單了tab = helpTransfer(tab, f);else { // 到這里就是說,f 是該位置的頭節點,而且不為空V oldVal = null;// 獲取數組該位置的頭節點的監視器鎖synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) { // 頭節點的 hash 值大于 0,說明是鏈表// 用于累加,記錄鏈表的長度binCount = 1;// 遍歷鏈表for (Node<K,V> e = f;; ++binCount) {K ek;// 如果發現了"相等"的 key,判斷是否要進行值覆蓋,然后也就可以 break 了if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}// 到了鏈表的最末端,將這個新值放到鏈表的最后面Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) { // 紅黑樹Node<K,V> p;binCount = 2;// 調用紅黑樹的插值方法插入新節點if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {// 判斷是否要將鏈表轉換為紅黑樹,臨界值和 HashMap 一樣,也是 8if (binCount >= TREEIFY_THRESHOLD)// 這個方法和 HashMap 中稍微有一點點不同,那就是它不是一定會進行紅黑樹轉換,// 如果當前數組的長度小于 64,那么會選擇進行數組擴容,而不是轉換為紅黑樹// 具體源碼我們就不看了,擴容部分后面說treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// addCount(1L, binCount);return null; }初始化數組: initTable
這個比較簡單,主要就是初始化一個合適大小的數組,然后會設置 sizeCtl。
初始化方法中的并發問題是通過對 sizeCtl 進行一個 CAS 操作來控制的。
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {// 初始化的"功勞"被其他線程"搶去"了if ((sc = sizeCtl) < 0)Thread.yield(); // lost initialization race; just spin// CAS 一下,將 sizeCtl 設置為 -1,代表搶到了鎖else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if ((tab = table) == null || tab.length == 0) {// DEFAULT_CAPACITY 默認初始容量是 16int n = (sc > 0) ? sc : DEFAULT_CAPACITY;// 初始化數組,長度為 16 或初始化時提供的長度Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];// 將這個數組賦值給 table,table 是 volatile 的table = tab = nt;// 如果 n 為 16 的話,那么這里 sc = 12// 其實就是 0.75 * nsc = n - (n >>> 2);}} finally {// 設置 sizeCtl 為 sc,我們就當是 12 吧sizeCtl = sc;}break;}}return tab; }數組轉紅黑樹
前面我們在 put 源碼分析也說過,treeifyBin 不一定就會進行紅黑樹轉換,也可能是僅僅做數組擴容。我們還是進行源碼分析吧。
private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {// MIN_TREEIFY_CAPACITY 為 64// 所以,如果數組長度小于 64 的時候,其實也就是 32 或者 16 或者更小的時候,會進行數組擴容if ((n = tab.length) < MIN_TREEIFY_CAPACITY)// 后面我們再詳細分析這個方法tryPresize(n << 1);// b 是頭節點else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {// 加鎖synchronized (b) {if (tabAt(tab, index) == b) {// 下面就是遍歷鏈表,建立一顆紅黑樹TreeNode<K,V> hd = null, tl = null;for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)hd = p;elsetl.next = p;tl = p;}// 將紅黑樹設置到數組相應位置中setTabAt(tab, index, new TreeBin<K,V>(hd));}}}} }擴容: tryPresize
如果說 Java8 ConcurrentHashMap 的源碼不簡單,那么說的就是擴容操作和遷移操作。 這個方法要完完全全看懂還需要看之后的 transfer 方法,讀者應該提前知道這點。 這里的擴容也是做翻倍擴容的,擴容后數組容量為原來的 2 倍。
著作權歸https://pdai.tech所有。 鏈接:https://www.pdai.tech/md/java/thread/java-thread-x-juc-collection-ConcurrentHashMap.html// 首先要說明的是,方法參數 size 傳進來的時候就已經翻了倍了 private final void tryPresize(int size) {// c: size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;// 這個 if 分支和之前說的初始化數組的代碼基本上是一樣的,在這里,我們可以不用管這塊代碼if (tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c;if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;sc = n - (n >>> 2); // 0.75 * n}} finally {sizeCtl = sc;}}}else if (c <= sc || n >= MAXIMUM_CAPACITY)break;else if (tab == table) {// 我沒看懂 rs 的真正含義是什么,不過也關系不大int rs = resizeStamp(n);if (sc < 0) {Node<K,V>[] nt;if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;// 2. 用 CAS 將 sizeCtl 加 1,然后執行 transfer 方法// 此時 nextTab 不為 nullif (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}// 1. 將 sizeCtl 設置為 (rs << RESIZE_STAMP_SHIFT) + 2)// 我是沒看懂這個值真正的意義是什么? 不過可以計算出來的是,結果是一個比較大的負數// 調用 transfer 方法,此時 nextTab 參數為 nullelse if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);}} }這個方法的核心在于 sizeCtl 值的操作,首先將其設置為一個負數,然后執行 transfer(tab, null),再下一個循環將 sizeCtl 加 1,并執行 transfer(tab, nt),之后可能是繼續 sizeCtl 加 1,并執行 transfer(tab, nt)。 所以,可能的操作就是執行 1 次 transfer(tab, null) + 多次 transfer(tab, nt),這里怎么結束循環的需要看完 transfer 源碼才清楚。
transfer數據遷移方法
太麻煩了
get方法
get 方法從來都是最簡單的,這里也不例外:
計算 hash 值 根據 hash 值找到數組對應位置: (n - 1) & h
根據該位置處結點性質進行相應查找
如果該位置為 null,那么直接返回 null 就可以了
如果該位置處的節點剛好就是我們需要的,返回該節點的值即可
如果該位置節點的 hash 值小于 0,說明正在擴容,或者是紅黑樹,后面我們再介紹 find 方法
如果以上 3 條都不滿足,那就是鏈表,進行遍歷比對即可
兩個版本的區別
參考資料
https://www.pdai.tech/md/java/thread/java-thread-x-juc-collection-ConcurrentHashMap.html
總結
以上是生活随笔為你收集整理的ConcurrentHashMap--自用,非教学的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑没进入界面怎么截图(电脑截图无法显示
- 下一篇: 局域网物理机怎么访问虚拟机