日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

Java中ConcurrentHashMap底层实现原理(JDK1.8)源码分析2

發(fā)布時間:2024/1/18 java 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java中ConcurrentHashMap底层实现原理(JDK1.8)源码分析2 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

https://blog.csdn.net/programmer_at/article/details/79715177
https://blog.csdn.net/qq_41737716/category_8894817.html

空船理論

這個故事是出自莊子的《山木》中的“方舟濟(jì)河”,這是一篇極富道家哲學(xué)的寓言,一個人在乘船渡河的時候,前面一只船正要撞過來。
這個人喊了好幾聲沒有人回應(yīng),于是破口大罵前面開船的人不長眼。
結(jié)果撞上來的竟是一只空船,于是剛才怒氣沖沖的人,一下子怒火就消失得無影無蹤了。

其實(shí)你會發(fā)現(xiàn),生氣與不生氣,取決撞來的船上有沒有人!
一個人“看不慣”的東西、人和事越多,這個人的境界也就越低,格局也就越小。
一個人如果總是以自我為中心,太把自己當(dāng)回事,那么就很容易與別人起沖突。

前言

我們知道,在日常開發(fā)中使用的HashMap是線程不安全的,而線程安全類HashTable只是簡單的在方法上加鎖實(shí)現(xiàn)線程安全,效率低下,所以在線程安全的環(huán)境下我們通常會使用ConcurrentHashMap,但是又為何需要學(xué)習(xí)ConcurrentHashMap?用不就完事了?我認(rèn)為學(xué)習(xí)其源碼有兩個好處:

Amdahl定律

此節(jié)定律描述均來自《Java并發(fā)編程實(shí)戰(zhàn)》一書

假設(shè)F是必須被串行執(zhí)行的部分,N代表處理器數(shù)量,Speedup代表加速比,可以簡單理解為CPU使用率

此公式告訴我們,當(dāng)N趨近無限大,加速比最大趨近于1/F,假設(shè)我們的程序有50%的部分需要串行執(zhí)行,就算處理器數(shù)量無限多,最高的加速比只能是2(20%的使用率),如果程序中僅有10%的部分需要串行執(zhí)行,最高的加速比可以達(dá)到9.2(92%的使用率),但我們的程序或多或少都一定會有串行執(zhí)行的部分,所以F不可能為0,所以,就算有無限多的CPU,加速比也不可能達(dá)到10(100%的使用率),下面給一張圖來表示串行執(zhí)行部分占比不同對利用率的影響:

由此我們可以看出,程序中的可伸縮性(提升外部資源即可提升并發(fā)性能的比率)是由程序中串行執(zhí)行部分所影響的,而常見的串行執(zhí)行有鎖競爭(上下文切換消耗、等待、串行)等等,這給了我們一個啟發(fā),可以通過減少鎖競爭來優(yōu)化并發(fā)性能,而ConcurrentHashMap則使用了鎖分段(減小鎖范圍)、CAS(樂觀鎖,減小上下文切換開銷,無阻塞)等等技術(shù),下面來具體看看吧。

初始化數(shù)據(jù)結(jié)構(gòu)時的線程安全

HashMap的底層數(shù)據(jù)結(jié)構(gòu)這里簡單帶過一下,不做過多贅述:

大致是以一個Node對象數(shù)組來存放數(shù)據(jù),Hash沖突時會形成Node鏈表,在鏈表長度超過8,Node數(shù)組超過64時會將鏈表結(jié)構(gòu)轉(zhuǎn)換為紅黑樹,Node對象:

static class Node<K,V> implements Map.Entry<K,V> {final int hash;final K key;volatile V val;volatile Node<K,V> next;... }

值得注意的是,value和next指針使用了volatile來保證其可見性

在JDK1.8中,初始化ConcurrentHashMap的時候這個Node[]數(shù)組是還未初始化的,會等到第一次put方法調(diào)用時才初始化:

final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;//判斷Node數(shù)組為空if (tab == null || (n = tab.length) == 0)//初始化Node數(shù)組tab = initTable();... }

此時是會有并發(fā)問題的,如果多個線程同時調(diào)用initTable初始化Node數(shù)組怎么辦?看看大師是如何處理的:

private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;//每次循環(huán)都獲取最新的Node數(shù)組引用while ((tab = table) == null || tab.length == 0) {//sizeCtl是一個標(biāo)記位,若為-1也就是小于0,代表有線程在進(jìn)行初始化工作了if ((sc = sizeCtl) < 0)//讓出CPU時間片Thread.yield(); // lost initialization race; just spin//CAS操作,將本實(shí)例的sizeCtl變量設(shè)置為-1else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//如果CAS操作成功了,代表本線程將負(fù)責(zé)初始化工作try {//再檢查一遍數(shù)組是否為空if ((tab = table) == null || tab.length == 0) {//在初始化Map時,sizeCtl代表數(shù)組大小,默認(rèn)16//所以此時n默認(rèn)為16int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")//Node數(shù)組Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//將其賦值給table變量table = tab = nt;//通過位運(yùn)算,n減去n二進(jìn)制右移2位,相當(dāng)于乘以0.75//例如16經(jīng)過運(yùn)算為12,與乘0.75一樣,只不過位運(yùn)算更快sc = n - (n >>> 2);}} finally {//將計(jì)算后的sc(12)直接賦值給sizeCtl,表示達(dá)到12長度就擴(kuò)容//由于這里只會有一個線程在執(zhí)行,直接賦值即可,沒有線程安全問題//只需要保證可見性sizeCtl = sc;}break;}}return tab; }

table變量使用了volatile來保證每次獲取到的都是最新寫入的值

transient volatile Node<K,V>[] table;

總結(jié)

就算有多個線程同時進(jìn)行put操作,在初始化數(shù)組時使用了樂觀鎖CAS操作來決定到底是哪個線程有資格進(jìn)行初始化,其他線程均只能等待。

用到的并發(fā)技巧:

  • volatile變量(sizeCtl):它是一個標(biāo)記位,用來告訴其他線程這個坑位有沒有人在,其線程間的可見性由volatile保證。
  • CAS操作:CAS操作保證了設(shè)置sizeCtl標(biāo)記位的原子性,保證了只有一個線程能設(shè)置成功。

put操作的線程安全

final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();//對key的hashCode進(jìn)行散列int hash = spread(key.hashCode());int binCount = 0;//一個無限循環(huán),直到put操作完成后退出循環(huán)for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;//當(dāng)Node數(shù)組為空時進(jìn)行初始化if (tab == null || (n = tab.length) == 0)tab = initTable();//Unsafe類volatile的方式取出hashCode散列后通過與運(yùn)算得出的Node數(shù)組下標(biāo)值對應(yīng)的Node對象//此時的Node對象若為空,則代表還未有線程對此Node進(jìn)行插入操作else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//直接CAS方式插入數(shù)據(jù)if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))//插入成功,退出循環(huán)break; // no lock when adding to empty bin}//查看是否在擴(kuò)容,先不看,擴(kuò)容再介紹else if ((fh = f.hash) == MOVED)//幫助擴(kuò)容tab = helpTransfer(tab, f);else {V oldVal = null;//對Node對象進(jìn)行加鎖synchronized (f) {//二次確認(rèn)此Node對象還是原來的那一個if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;//無限循環(huán),直到完成putfor (Node<K,V> e = f;; ++binCount) {K ek;//和HashMap一樣,先比較hash,再比較equalsif (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {//和鏈表頭Node節(jié)點(diǎn)不沖突,就將其初始化為新Node作為上一個Node節(jié)點(diǎn)的next//形成鏈表結(jié)構(gòu)pred.next = new Node<K,V>(hash, key,value, null);break;}}}... }

值得關(guān)注的是tabAt(tab, i)方法,其使用Unsafe類volatile的操作volatile式地查看值,保證每次獲取到的值都是最新的:

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }

雖然上面的table變量加了volatile,但也只能保證其引用的可見性,并不能確保其數(shù)組中的對象是否是最新的,所以需要Unsafe類volatile式地拿到最新的Node

總結(jié)


由于其減小了鎖的粒度,若Hash完美不沖突的情況下,可同時支持n個線程同時put操作,n為Node數(shù)組大小,在默認(rèn)大小16下,可以支持最大同時16個線程無競爭同時操作且線程安全。當(dāng)hash沖突嚴(yán)重時,Node鏈表越來越長,將導(dǎo)致嚴(yán)重的鎖競爭,此時會進(jìn)行擴(kuò)容,將Node進(jìn)行再散列,下面會介紹擴(kuò)容的線程安全性??偨Y(jié)一下用到的并發(fā)技巧:

  • 減小鎖粒度:將Node鏈表的頭節(jié)點(diǎn)作為鎖,若在默認(rèn)大小16情況下,將有16把鎖,大大減小了鎖競爭(上下文切換),就像開頭所說,將串行的部分最大化縮小,在理想情況下線程的put操作都為并行操作。同時直接鎖住頭節(jié)點(diǎn),保證了線程安全。
  • Unsafe的getObjectVolatile方法:此方法確保獲取到的值為最新
  • 擴(kuò)容操作的線程安全

    在擴(kuò)容時,ConcurrentHashMap支持多線程并發(fā)擴(kuò)容,在擴(kuò)容過程中同時支持get查數(shù)據(jù),若有線程put數(shù)據(jù),還會幫助一起擴(kuò)容,這種無阻塞算法,將并行最大化的設(shè)計(jì),堪稱一絕。

    先來看看擴(kuò)容代碼實(shí)現(xiàn):

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;//根據(jù)機(jī)器CPU核心數(shù)來計(jì)算,一條線程負(fù)責(zé)Node數(shù)組中多長的遷移量if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)//本線程分到的遷移量//假設(shè)為16(默認(rèn)也為16)stride = MIN_TRANSFER_STRIDE; // subdivide range//nextTab若為空代表線程是第一個進(jìn)行遷移的//初始化遷移后的新Node數(shù)組if (nextTab == null) { // initiatingtry {@SuppressWarnings("unchecked")//這里n為舊數(shù)組長度,左移一位相當(dāng)于乘以2//例如原數(shù)組長度16,新數(shù)組長度則為32Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}//設(shè)置nextTable變量為新數(shù)組nextTable = nextTab;//假設(shè)為16transferIndex = n;}//假設(shè)為32int nextn = nextTab.length;//標(biāo)示Node對象,此對象的hash變量為-1//在get或者put時若遇到此Node,則可以知道當(dāng)前Node正在遷移//傳入nextTab對象ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;//i為當(dāng)前正在處理的Node數(shù)組下標(biāo),每次處理一個Node節(jié)點(diǎn)就會自減1if (--i >= bound || finishing)advance = false;//假設(shè)nextIndex=16else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}//由以上假設(shè),nextBound就為0//且將nextIndex設(shè)置為0else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {//bound=0bound = nextBound;//i=16-1=15i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}//此時i=15,取出Node數(shù)組下標(biāo)為15的那個Node,若為空則不需要遷移//直接設(shè)置占位標(biāo)示,代表此Node已處理完成else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);//檢測此Node的hash是否為MOVED,MOVED是一個常量-1,也就是上面說的占位Node的hash//如果是占位Node,證明此節(jié)點(diǎn)已經(jīng)處理過了,跳過i=15的處理,繼續(xù)循環(huán)else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {//鎖住這個Nodesynchronized (f) {//確認(rèn)Node是原先的Nodeif (tabAt(tab, i) == f) {//ln為lowNode,低位Node,hn為highNode,高位Node//這兩個概念下面以圖來說明Node<K,V> ln, hn;if (fh >= 0) {//此時fh與原來Node數(shù)組長度進(jìn)行與運(yùn)算//如果高X位為0,此時runBit=0//如果高X位為1,此時runBit=1int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {//這里的Node,都是同一Node鏈表中的Node對象int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}//正如上面所說,runBit=0,表示此Node為低位Nodeif (runBit == 0) {ln = lastRun;hn = null;}else {//Node為高位Nodehn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;//若hash和n與運(yùn)算為0,證明為低位Node,原理同上if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);//這里將高位Node與地位Node都各自組成了兩個鏈表elsehn = new Node<K,V>(ph, pk, pv, hn);}//將低位Node設(shè)置到新Node數(shù)組中,下標(biāo)為原來的位置setTabAt(nextTab, i, ln);//將高位Node設(shè)置到新Node數(shù)組中,下標(biāo)為原來的位置加上原Node數(shù)組長度setTabAt(nextTab, i + n, hn);//將此Node設(shè)置為占位Node,代表處理完成setTabAt(tab, i, fwd);//繼續(xù)循環(huán)advance = true;}....}}}} }

    這里說一下遷移時為什么要分一個ln(低位Node)、hn(高位Node),首先說一個現(xiàn)象:

    我們知道,在put值的時候,首先會計(jì)算hash值,再散列到指定的Node數(shù)組下標(biāo)中:

    //根據(jù)key的hashCode再散列 int hash = spread(key.hashCode()); //使用(n - 1) & hash 運(yùn)算,定位Node數(shù)組中下標(biāo)值 (f = tabAt(tab, i = (n - 1) & hash);

    其中n為Node數(shù)組長度,這里假設(shè)為16。

    假設(shè)有一個key進(jìn)來,它的散列之后的hash=9,那么它的下標(biāo)值是多少呢?

    (16 - 1)和 9 進(jìn)行與運(yùn)算 -> 0000 1111 和 0000 1001 結(jié)果還是 0000 1001 = 9

    假設(shè)Node數(shù)組需要擴(kuò)容,我們知道,擴(kuò)容是將數(shù)組長度增加兩倍,也就是32,那么下標(biāo)值會是多少呢?

    (32 - 1)和 9 進(jìn)行與運(yùn)算 -> 0001 1111 和 0000 1001 結(jié)果還是9

    此時,我們把散列之后的hash換成20,那么會有怎樣的變化呢?

    (16 - 1)和 20 進(jìn)行與運(yùn)算 -> 0000 1111 和 0001 0100 結(jié)果是 0000 0100 = 4 (32 - 1)和 20 進(jìn)行與運(yùn)算 -> 0001 1111 和 0001 0100 結(jié)果是 0001 0100 = 20

    此時細(xì)心的讀者應(yīng)該可以發(fā)現(xiàn),如果hash在高X位為1,(X為數(shù)組長度的二進(jìn)制-1的最高位),則擴(kuò)容時是需要變換在Node數(shù)組中的索引值的,不然就hash不到,丟失數(shù)據(jù),所以這里在遷移的時候?qū)⒏遆位為1的Node分類為hn,將高X位為0的Node分類為ln。

    回到代碼中:

    for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn); }

    這個操作將高低位組成了兩條鏈表結(jié)構(gòu),由下圖所示:

    然后將其CAS操作放入新的Node數(shù)組中:

    setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn);

    其中,低位鏈表放入原下標(biāo)處,而高位鏈表則需要加上原Node數(shù)組長度,其中為什么不多贅述,上面已經(jīng)舉例說明了,這樣就可以保證高位Node在遷移到新Node數(shù)組中依然可以使用hash算法散列到對應(yīng)下標(biāo)的數(shù)組中去了。

    最后將原Node數(shù)組中對應(yīng)下標(biāo)Node對象設(shè)置為fwd標(biāo)記Node,表示該節(jié)點(diǎn)遷移完成,到這里,一個節(jié)點(diǎn)的遷移就完成了,將進(jìn)行下一個節(jié)點(diǎn)的遷移,也就是i-1=14下標(biāo)的Node節(jié)點(diǎn)。

    擴(kuò)容時的get操作

    假設(shè)Node下標(biāo)為16的Node節(jié)點(diǎn)正在遷移,突然有一個線程進(jìn)來調(diào)用get方法,正好key又散列到下標(biāo)為16的節(jié)點(diǎn),此時怎么辦?

    public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;int h = spread(key.hashCode());if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}//假如Node節(jié)點(diǎn)的hash值小于0//則有可能是fwd節(jié)點(diǎn)else if (eh < 0)//調(diào)用節(jié)點(diǎn)對象的find方法查找值return (p = e.find(h, key)) != null ? p.val : null;while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null; }

    重點(diǎn)看有注釋的那兩行,在get操作的源碼中,會判斷Node中的hash是否小于0,是否還記得我們的占位Node,其hash為MOVED,為常量值-1,所以此時判斷線程正在遷移,委托給fwd占位Node去查找值:

    //內(nèi)部類 ForwardingNode中 Node<K,V> find(int h, Object k) {// loop to avoid arbitrarily deep recursion on forwarding nodes// 這里的查找,是去新Node數(shù)組中查找的// 下面的查找過程與HashMap查找無異,不多贅述outer: for (Node<K,V>[] tab = nextTable;;) {Node<K,V> e; int n;if (k == null || tab == null || (n = tab.length) == 0 ||(e = tabAt(tab, (n - 1) & h)) == null)return null;for (;;) {int eh; K ek;if ((eh = e.hash) == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;if (eh < 0) {if (e instanceof ForwardingNode) {tab = ((ForwardingNode<K,V>)e).nextTable;continue outer;}elsereturn e.find(h, k);}if ((e = e.next) == null)return null;}} }

    到這里應(yīng)該可以恍然大悟了,之所以占位Node需要保存新Node數(shù)組的引用也是因?yàn)檫@個,它可以支持在遷移的過程中照樣不阻塞地查找值,可謂是精妙絕倫的設(shè)計(jì)。

    多線程協(xié)助擴(kuò)容

    在put操作時,假設(shè)正在遷移,正好有一個線程進(jìn)來,想要put值到遷移的Node上,怎么辦?

    final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}//若此時發(fā)現(xiàn)了占位Node,證明此時HashMap正在遷移else if ((fh = f.hash) == MOVED)//進(jìn)行協(xié)助遷移tab = helpTransfer(tab, f);... } final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {int rs = resizeStamp(tab.length);while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;//sizeCtl加一,標(biāo)示多一個線程進(jìn)來協(xié)助擴(kuò)容if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//擴(kuò)容transfer(tab, nextTab);break;}}return nextTab;}return table; }

    此方法涉及大量復(fù)雜的位運(yùn)算,這里不多贅述,只是簡單的說幾句,此時sizeCtl變量用來標(biāo)示HashMap正在擴(kuò)容,當(dāng)其準(zhǔn)備擴(kuò)容時,會將sizeCtl設(shè)置為一個負(fù)數(shù),(例如數(shù)組長度為16時)其二進(jìn)制表示為:

    1000 0000 0001 1011 0000 0000 0000 0010

    無符號位為1,表示負(fù)數(shù)。其中高16位代表數(shù)組長度的一個位算法標(biāo)示(有點(diǎn)像epoch的作用,表示當(dāng)前遷移朝代為數(shù)組長度X),低16位表示有幾個線程正在做遷移,剛開始為2,接下來自增1,線程遷移完會進(jìn)行減1操作,也就是如果低十六位為2,代表有一個線程正在遷移,如果為3,代表2個線程正在遷移以此類推…

    只要數(shù)組長度足夠長,就可以同時容納足夠多的線程來一起擴(kuò)容,最大化并行任務(wù),提高性能。

    在什么情況下會進(jìn)行擴(kuò)容操作?

    • 在put值時,發(fā)現(xiàn)Node為占位Node(fwd)時,會協(xié)助擴(kuò)容
    • 在新增節(jié)點(diǎn)后,檢測到鏈表長度大于8時

    treeifyBin方法會將鏈表轉(zhuǎn)換為紅黑樹,增加查找效率,但在這之前,會檢查數(shù)組長度,若小于64,則會優(yōu)先做擴(kuò)容操作:

    private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {//MIN_TREEIFY_CAPACITY=64//若數(shù)組長度小于64,則先擴(kuò)容if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//擴(kuò)容tryPresize(n << 1);else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {synchronized (b) {//...轉(zhuǎn)換為紅黑樹的操作}}} }

    在每次新增節(jié)點(diǎn)之后,都會調(diào)用addCount方法,檢測Node數(shù)組大小是否達(dá)到閾值:

    final V putVal(K key, V value, boolean onlyIfAbsent) {...//在下面一節(jié)會講到,此方法統(tǒng)計(jì)容器元素?cái)?shù)量addCount(1L, binCount);return null; } private final void addCount(long x, int check) {CounterCell[] as; long b, s;if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//統(tǒng)計(jì)元素個數(shù)的操作...}if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;//元素個數(shù)達(dá)到閾值,進(jìn)行擴(kuò)容while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);//發(fā)現(xiàn)sizeCtl為負(fù)數(shù),證明有線程正在遷移if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}//不為負(fù)數(shù),則為第一個遷移的線程else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}} }

    總結(jié)

    ConcurrentHashMap運(yùn)用各類CAS操作,將擴(kuò)容操作的并發(fā)性能實(shí)現(xiàn)最大化,在擴(kuò)容過程中,就算有線程調(diào)用get查詢方法,也可以安全的查詢數(shù)據(jù),若有線程進(jìn)行put操作,還會協(xié)助擴(kuò)容,利用sizeCtl標(biāo)記位和各種volatile變量進(jìn)行CAS操作達(dá)到多線程之間的通信、協(xié)助,在遷移過程中只鎖一個Node節(jié)點(diǎn),即保證了線程安全,又提高了并發(fā)性能。

    統(tǒng)計(jì)容器大小的線程安全

    ConcurrentHashMap在每次put操作之后都會調(diào)用addCount方法,此方法用于統(tǒng)計(jì)容器大小且檢測容器大小是否達(dá)到閾值,若達(dá)到閾值需要進(jìn)行擴(kuò)容操作,這在上面也是有提到的。這一節(jié)重點(diǎn)討論容器大小的統(tǒng)計(jì)是如何做到線程安全且并發(fā)性能不低的。

    大部分的單機(jī)數(shù)據(jù)查詢優(yōu)化方案都會降低并發(fā)性能,就像緩存的存儲,在多線程環(huán)境下將有并發(fā)問題,所以會產(chǎn)生并行或者一系列并發(fā)沖突鎖競爭的問題,降低了并發(fā)性能。類似的,熱點(diǎn)數(shù)據(jù)也有這樣的問題,在多線程并發(fā)的過程中,熱點(diǎn)數(shù)據(jù)(頻繁被訪問的變量)是在每一個線程中幾乎或多或少都會訪問到的數(shù)據(jù),這將增加程序中的串行部分,回憶一下開頭所描述的,程序中的串行部分將影響并發(fā)的可伸縮性,使并發(fā)性能下降,這通常會成為并發(fā)程序性能的瓶頸。而在ConcurrentHashMap中,如何快速的統(tǒng)計(jì)容器大小更是一個很重要的議題,因?yàn)槿萜鲀?nèi)部需要依靠容器大小來考慮是否需要擴(kuò)容,而在客戶端而言需要調(diào)用此方法來知道容器有多少個元素,如果處理不好這種熱點(diǎn)數(shù)據(jù),并發(fā)性能將因?yàn)檫@個短板整體性能下降。

    試想一下,如果是你,你會如何設(shè)計(jì)這種熱點(diǎn)數(shù)據(jù)?是加鎖,還是進(jìn)行CAS操作?進(jìn)入ConcurrentHashMap中,看看大師是如何巧妙的運(yùn)用了并發(fā)技巧,提高熱點(diǎn)數(shù)據(jù)的并發(fā)性能。

    先用圖的方式來看看大致的實(shí)現(xiàn)思路:

    @sun.misc.Contended static final class CounterCell {volatile long value;CounterCell(long x) { value = x; } }

    這是一個粗略的實(shí)現(xiàn),在設(shè)計(jì)中,使用了分而治之的思想,將每一個計(jì)數(shù)都分散到各個countCell對象里面(下面稱之為桶),使競爭最小化,又使用了CAS操作,就算有競爭,也可以對失敗了的線程進(jìn)行其他的處理。樂觀鎖的實(shí)現(xiàn)方式與悲觀鎖不同之處就在于樂觀鎖可以對競爭失敗了的線程進(jìn)行其他策略的處理,而悲觀鎖只能等待鎖釋放,所以這里使用CAS操作對競爭失敗的線程做了其他處理,很巧妙的運(yùn)用了CAS樂觀鎖。

    下面看看具體的代碼實(shí)現(xiàn)吧:

    //計(jì)數(shù),并檢查長度是否達(dá)到閾值 private final void addCount(long x, int check) {//計(jì)數(shù)桶CounterCell[] as; long b, s;//如果counterCells不為null,則代表已經(jīng)初始化了,直接進(jìn)入if語句塊//若競爭不嚴(yán)重,counterCells有可能還未初始化,為null,先嘗試CAS操作遞增baseCount值if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//進(jìn)入此語句塊有兩種可能//1.counterCells被初始化完成了,不為null//2.CAS操作遞增baseCount值失敗了,說明有競爭CounterCell a; long v; int m;//標(biāo)志是否存在競爭boolean uncontended = true;//1.先判斷計(jì)數(shù)桶是否還沒初始化,則as=null,進(jìn)入語句塊//2.判斷計(jì)數(shù)桶長度是否為空或,若是進(jìn)入語句塊//3.這里做了一個線程變量隨機(jī)數(shù),與上桶大小-1,若桶的這個位置為空,進(jìn)入語句塊//4.到這里說明桶已經(jīng)初始化了,且隨機(jī)的這個位置不為空,嘗試CAS操作使桶加1,失敗進(jìn)入語句塊if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}if (check <= 1)return;//統(tǒng)計(jì)容器大小s = sumCount();}... }

    假設(shè)當(dāng)前線程為第一個put的線程

    先假設(shè)當(dāng)前Map還未被put數(shù)據(jù),則addCount一定沒有被調(diào)用過,當(dāng)前線程第一個調(diào)用addCount方法,則此時countCell一定沒有被初始化,為null,則進(jìn)行如下判斷:

    if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x))

    這里的if判斷一定會走第二個判斷,先CAS增加變量baseCount的值:

    private transient volatile long baseCount;

    這個值有什么用呢?我們看看統(tǒng)計(jì)容器大小的方法sumCount:

    final long sumCount() {//獲取計(jì)數(shù)桶CounterCell[] as = counterCells; CounterCell a;//獲取baseCount,賦值給sum總數(shù)long sum = baseCount;//若計(jì)數(shù)桶不為空,統(tǒng)計(jì)計(jì)數(shù)桶內(nèi)的值if (as != null) {for (int i = 0; i < as.length; ++i) {//遍歷計(jì)數(shù)桶,將value值相加if ((a = as[i]) != null)sum += a.value;}}return sum; }

    這個方法的大體思路與我們開頭那張圖差不多,容器的大小其實(shí)是分為兩部分,開頭只說了計(jì)數(shù)桶的那部分,其實(shí)還有一個baseCount,在線程沒有競爭的情況下的統(tǒng)計(jì)值,換句話說,在增加容量的時候其實(shí)是先去做CAS遞增baseCount的。

    由此可見,統(tǒng)計(jì)容器大小其實(shí)是用了兩種思路:

  • CAS方式直接遞增:在線程競爭不大的時候,直接使用CAS操作遞增baseCount值即可,這里說的競爭不大指的是CAS操作不會失敗的情況
  • 分而治之桶計(jì)數(shù):若出現(xiàn)了CAS操作失敗的情況,則證明此時有線程競爭了,計(jì)數(shù)方式從CAS方式轉(zhuǎn)變?yōu)榉侄沃耐坝?jì)數(shù)方式
  • 出現(xiàn)了線程競爭導(dǎo)致CAS失敗

    此時出現(xiàn)了競爭,則不會再用CAS方式來計(jì)數(shù)了,直接使用桶方式,從上面的addCount方法可以看出來,此時的countCell是為空的,最終一定會進(jìn)入fullAddCount方法來進(jìn)行初始化桶:

    private final void fullAddCount(long x, boolean wasUncontended) {int h;if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit(); // force initializationh = ThreadLocalRandom.getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonemptyfor (;;) {CounterCell[] as; CounterCell a; int n; long v;...//如果計(jì)數(shù)桶!=null,證明已經(jīng)初始化,此時不走此語句塊if ((as = counterCells) != null && (n = as.length) > 0) {...}//進(jìn)入此語句塊進(jìn)行計(jì)數(shù)桶的初始化//CAS設(shè)置cellsBusy=1,表示現(xiàn)在計(jì)數(shù)桶Busy中...else if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//若有線程同時初始化計(jì)數(shù)桶,由于CAS操作只有一個線程進(jìn)入這里boolean init = false;try { // Initialize table//再次確認(rèn)計(jì)數(shù)桶為空if (counterCells == as) {//初始化一個長度為2的計(jì)數(shù)桶CounterCell[] rs = new CounterCell[2];//h為一個隨機(jī)數(shù),與上1則代表結(jié)果為0、1中隨機(jī)的一個//也就是在0、1下標(biāo)中隨便選一個計(jì)數(shù)桶,x=1,放入1的值代表增加1個容量rs[h & 1] = new CounterCell(x);//將初始化好的計(jì)數(shù)桶賦值給ConcurrentHashMapcounterCells = rs;init = true;}} finally {//最后將busy標(biāo)識設(shè)置為0,表示不busy了cellsBusy = 0;}if (init)break;}//若有線程同時來初始化計(jì)數(shù)桶,則沒有搶到busy資格的線程就先來CAS遞增baseCountelse if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))break; // Fall back on using base}}

    到這里就完成了計(jì)數(shù)桶的初始化工作,在之后的計(jì)數(shù)都將會使用計(jì)數(shù)桶方式來統(tǒng)計(jì)總數(shù)

    計(jì)數(shù)桶擴(kuò)容

    從上面的分析中我們知道,計(jì)數(shù)桶初始化之后長度為2,在競爭大的時候肯定是不夠用的,所以一定有計(jì)數(shù)桶的擴(kuò)容操作,所以現(xiàn)在就有兩個問題了:

  • 什么條件下會進(jìn)行計(jì)數(shù)桶的擴(kuò)容?
  • 擴(kuò)容操作是怎么樣的?
  • 假設(shè)此時是用計(jì)數(shù)桶方式進(jìn)行計(jì)數(shù):

    private final void addCount(long x, int check) {CounterCell[] as; long b, s;if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;//此時顯然會在計(jì)數(shù)桶數(shù)組中隨機(jī)選一個計(jì)數(shù)桶//然后使用CAS操作將此計(jì)數(shù)桶中的value+1if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {//若CAS操作失敗,證明有競爭,進(jìn)入fullAddCount方法fullAddCount(x, uncontended);return;}if (check <= 1)return;s = sumCount();}... }

    進(jìn)入fullAddCount方法:

    private final void fullAddCount(long x, boolean wasUncontended) {int h;if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit(); // force initializationh = ThreadLocalRandom.getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonemptyfor (;;) {CounterCell[] as; CounterCell a; int n; long v;//計(jì)數(shù)桶初始化好了,一定是走這個if語句塊if ((as = counterCells) != null && (n = as.length) > 0) {//從計(jì)數(shù)桶數(shù)組隨機(jī)選一個計(jì)數(shù)桶,若為null表示該桶位還沒線程遞增過if ((a = as[(n - 1) & h]) == null) {//查看計(jì)數(shù)桶busy狀態(tài)是否被標(biāo)識if (cellsBusy == 0) { // Try to attach new Cell//若不busy,直接new一個計(jì)數(shù)桶CounterCell r = new CounterCell(x); // Optimistic create//CAS操作,標(biāo)示計(jì)數(shù)桶busy中if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean created = false;try { // Recheck under lockCounterCell[] rs; int m, j;//在鎖下再檢查一次計(jì)數(shù)桶為nullif ((rs = counterCells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {//將剛剛創(chuàng)建的計(jì)數(shù)桶賦值給對應(yīng)位置rs[j] = r;created = true;}} finally {//標(biāo)示不busy了cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehash//走到這里代表計(jì)數(shù)桶不為null,嘗試遞增計(jì)數(shù)桶else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))break;else if (counterCells != as || n >= NCPU)collide = false; // At max size or stale//若CAS操作失敗了,到了這里,會先進(jìn)入一次,然后再走一次剛剛的for循環(huán)//若是第二次for循環(huán),collide=true,則不會走進(jìn)去else if (!collide)collide = true;//計(jì)數(shù)桶擴(kuò)容,一個線程若走了兩次for循環(huán),也就是進(jìn)行了多次CAS操作遞增計(jì)數(shù)桶失敗了//則進(jìn)行計(jì)數(shù)桶擴(kuò)容,CAS標(biāo)示計(jì)數(shù)桶busy中else if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {try {//確認(rèn)計(jì)數(shù)桶還是同一個if (counterCells == as) {// Expand table unless stale//將長度擴(kuò)大到2倍CounterCell[] rs = new CounterCell[n << 1];//遍歷舊計(jì)數(shù)桶,將引用直接搬過來for (int i = 0; i < n; ++i)rs[i] = as[i];//賦值counterCells = rs;}} finally {//取消busy狀態(tài)cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = ThreadLocalRandom.advanceProbe(h);}else if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//初始化計(jì)數(shù)桶...}else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))break; // Fall back on using base} }

    看到這里,想必已經(jīng)可以解決上面兩個問題了:

  • 什么條件下會進(jìn)行計(jì)數(shù)桶的擴(kuò)容?
    答:在CAS操作遞增計(jì)數(shù)桶失敗了3次之后,會進(jìn)行擴(kuò)容計(jì)數(shù)桶操作,注意此時同時進(jìn)行了兩次隨機(jī)定位計(jì)數(shù)桶來進(jìn)行CAS遞增的,所以此時可以保證大概率是因?yàn)橛?jì)數(shù)桶不夠用了,才會進(jìn)行計(jì)數(shù)桶擴(kuò)容
  • 擴(kuò)容操作是怎么樣的?
    答:計(jì)數(shù)桶長度增加到兩倍長度,數(shù)據(jù)直接遍歷遷移過來,由于計(jì)數(shù)桶不像HashMap數(shù)據(jù)結(jié)構(gòu)那么復(fù)雜,有hash算法的影響,加上計(jì)數(shù)桶只是存放一個long類型的計(jì)數(shù)值而已,所以直接賦值引用即可。
  • 總結(jié)

    個人感覺,統(tǒng)計(jì)容器大小的線程安全與擴(kuò)容ConcurrentHashMap這兩個算得上ConcurrentHashMap中最聰明的兩個并發(fā)設(shè)計(jì)了,閱讀此源碼的我看到這兩個操作的設(shè)計(jì),都忍不住拍手叫絕,我想,這或許也是一個看源碼的樂趣吧,站在巨人的肩膀看巨人的思想。

    總結(jié)一下計(jì)數(shù)中用到的并發(fā)技巧:

  • 利用CAS遞增baseCount值來感知是否存在線程競爭,若競爭不大直接CAS遞增baseCount值即可,性能與直接baseCount++差別不大
  • 若存在線程競爭,則初始化計(jì)數(shù)桶,若此時初始化計(jì)數(shù)桶的過程中也存在競爭,多個線程同時初始化計(jì)數(shù)桶,則沒有搶到初始化資格的線程直接嘗試CAS遞增baseCount值的方式完成計(jì)數(shù),最大化利用了線程的并行。此時使用計(jì)數(shù)桶計(jì)數(shù),分而治之的方式來計(jì)數(shù),此時兩個計(jì)數(shù)桶最大可提供兩個線程同時計(jì)數(shù),同時使用CAS操作來感知線程競爭,若兩個桶情況下CAS操作還是頻繁失敗(失敗3次),則直接擴(kuò)容計(jì)數(shù)桶,變?yōu)?個計(jì)數(shù)桶,支持最大同時4個線程并發(fā)計(jì)數(shù),以此類推…同時使用位運(yùn)算和隨機(jī)數(shù)的方式"負(fù)載均衡"一樣的將線程計(jì)數(shù)請求接近均勻的落在各個計(jì)數(shù)桶中。
  • get操作的線程安全

    對于get操作,其實(shí)沒有線程安全的問題,只有可見性的問題,只需要確保get的數(shù)據(jù)是線程之間可見的即可:

    public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;int h = spread(key.hashCode());//此過程與HashMap的get操作無異,不多贅述if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}//當(dāng)hash<0,有可能是在遷移,使用fwd占位Node去查找新table中的數(shù)據(jù)else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null; }

    在get操作中除了增加了遷移的判斷以外,基本與HashMap的get操作無異,這里不多贅述,值得一提的是這里使用了tabAt方法Unsafe類volatile的方式去獲取Node數(shù)組中的Node,保證獲得到的Node是最新的

    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }

    JDK1.7與1.8的不同實(shí)現(xiàn)

    JDK1.7的ConcurrentHashMap底層數(shù)據(jù)結(jié)構(gòu):

    其中1.7的實(shí)現(xiàn)也同樣采用了分段鎖的技術(shù),只不過多個一個segment,一個segment里對應(yīng)一個小HashMap,其中segment繼承了ReentrantLock,充當(dāng)了鎖的角色,一把鎖鎖一個小HashMap(相當(dāng)于多個Node),從1.8的實(shí)現(xiàn)來看, 鎖的粒度從多個Node級別又減小到一個Node級別,再度減小鎖競爭,減小程序同步的部分。

    總結(jié)

    不得不說,大師將CAS操作運(yùn)用的淋漓盡致,相信理解了以上源碼的讀者也可以學(xué)習(xí)到大師所運(yùn)用的并發(fā)技巧,不僅僅是在ConcurrentHashMap中,其實(shí)在大部分JUC的源碼中很多并發(fā)思想很值得我們?nèi)ラ喿x、學(xué)習(xí)。

    源碼分析

    構(gòu)造函數(shù)

    public ConcurrentHashMap() {}public ConcurrentHashMap(int initialCapacity) {if (initialCapacity < 0)throw new IllegalArgumentException();int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));this.sizeCtl = cap;}public ConcurrentHashMap(Map<? extends K, ? extends V> m) {this.sizeCtl = DEFAULT_CAPACITY;putAll(m);}public ConcurrentHashMap(int initialCapacity, float loadFactor) {this(initialCapacity, loadFactor, 1);}public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel) // Use at least as many binsinitialCapacity = concurrencyLevel; // as estimated threadslong size = (long)(1.0 + (long)initialCapacity / loadFactor);int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl = cap;}

    靜態(tài)變量

    private static final int MAXIMUM_CAPACITY = 1 << 30;private static final int DEFAULT_CAPACITY = 16;static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;private static final int DEFAULT_CONCURRENCY_LEVEL = 16;private static final float LOAD_FACTOR = 0.75f;static final int TREEIFY_THRESHOLD = 8;static final int UNTREEIFY_THRESHOLD = 6;static final int MIN_TREEIFY_CAPACITY = 64;private static final int MIN_TRANSFER_STRIDE = 16;private static int RESIZE_STAMP_BITS = 16;private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;static final int MOVED = -1; // hash for forwarding nodesstatic final int TREEBIN = -2; // hash for roots of treesstatic final int RESERVED = -3; // hash for transient reservations

    成員變量

    /*** The array of bins. Lazily initialized upon first insertion.* Size is always a power of two. Accessed directly by iterators.*/transient volatile Node<K,V>[] table;/*** The next table to use; non-null only while resizing.*/private transient volatile Node<K,V>[] nextTable;/*** Base counter value, used mainly when there is no contention,* but also as a fallback during table initialization* races. Updated via CAS.*/private transient volatile long baseCount;/*** Table initialization and resizing control. When negative, the* table is being initialized or resized: -1 for initialization,* else -(1 + the number of active resizing threads). Otherwise,* when table is null, holds the initial table size to use upon* creation, or 0 for default. After initialization, holds the* next element count value upon which to resize the table.*/private transient volatile int sizeCtl;/*** The next table index (plus one) to split while resizing.*/private transient volatile int transferIndex;/*** Spinlock (locked via CAS) used when resizing and/or creating CounterCells.*/private transient volatile int cellsBusy;/*** Table of counter cells. When non-null, size is a power of 2.*/private transient volatile CounterCell[] counterCells;// viewsprivate transient KeySetView<K,V> keySet;private transient ValuesView<K,V> values;private transient EntrySetView<K,V> entrySet;

    方法

    public V put(K key, V value) {return putVal(key, value, false);}/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null)throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K, V>[] tab = table;;) {Node<K, V> f;int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null)))break; // no lock when adding to empty bin} else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;for (Node<K, V> e = f;; ++binCount) {K ek;if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K, V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K, V>(hash, key, value, null);break;}}} else if (f instanceof TreeBin) {Node<K, V> p;binCount = 2;if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;} public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;int h = spread(key.hashCode());if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;}

    https://www.cnblogs.com/banjinbaijiu/p/9147434.html

    總結(jié)

    以上是生活随笔為你收集整理的Java中ConcurrentHashMap底层实现原理(JDK1.8)源码分析2的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。