阅读ConcurrentHashMap源码的一些记录
在多線程的環境下,HashMap不能滿足我們的要求,而Hahstable效率太低,一般會推薦大家用ConcurrentHashMap,ConcurrentHashMap使用分段鎖,能夠讓鎖的粒度更小,能夠很大程度上提高并發的效率。本文為基于JDK1.6的源碼探索。
一、構造方法以及一些屬性
1、一些屬性
// 默認的初始容量 static final int DEFAULT_INITIAL_CAPACITY = 16; // 默認加載因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; // 默認并發量 static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; // 區段的最大量 static final int MAX_SEGMENTS = 1 << 16;2、無參構造方法
無參構造方法,容量默認(16),加載因子默認(0.75f),并發量默認(16)
public ConcurrentHashMap() {this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); }3、指定容量的構造方法
加載因子取默認值(0.75f),并發量也取默認值(16)
public ConcurrentHashMap(int initialCapacity) {this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); }4、指定初始容量和加載因子的構造方法
并發量默認值(16)
public ConcurrentHashMap(int initialCapacity, float loadFactor) {this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); }5、指定初始容量、加載因子和并發量的構造方法
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {// 校驗入參的合法性if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();// 如果入參并發量大于最大并發量,則取最大并發量if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS;int sshift = 0;int ssize = 1;// 經過位移運算確保了ssize是2的指數倍while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}segmentShift = 32 - sshift;segmentMask = ssize - 1;// 初始化segments的長度this.segments = Segment.newArray(ssize);// 如果入參初始容量大于最大容量,則容量取最大容量if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;int cap = 1;while (cap < c)cap <<= 1;// 初始化setmentsfor (int i = 0; i < this.segments.length; ++i)this.segments[i] = new Segment<K,V>(cap, loadFactor); }6、依據給定的集合構建
加載因子為默認值(0.75f),并發量為默認值(16),然后通過循環,單個的把數據加入到ConcurrentHashMap中
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,DEFAULT_INITIAL_CAPACITY),DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);putAll(m); }public void putAll(Map<? extends K, ? extends V> m) {for (Map.Entry<? extends K, ? extends V> e : m.entrySet())put(e.getKey(), e.getValue()); }二、增
1、put
注意:key和value都不能為空
如果已經存在對應的key,則新的值會覆蓋舊的值
下面來看一下HashEntry的結構,注意key,hash以及next都被final修飾了,這就注定HashEntry只能用頭插法,value被volatile修飾,保證了其修改的可見性。
static final class HashEntry<K,V> {final K key;final int hash;volatile V value;final HashEntry<K,V> next;HashEntry(K key, int hash, HashEntry<K,V> next, V value) {this.key = key;this.hash = hash;this.next = next;this.value = value;}@SuppressWarnings("unchecked")static final <K,V> HashEntry<K,V>[] newArray(int i) {return new HashEntry[i];} }擴容方法
void rehash() {HashEntry<K,V>[] oldTable = table;int oldCapacity = oldTable.length;if (oldCapacity >= MAXIMUM_CAPACITY)// 如果已經是最大容量了,則無法再擴容return;// 生成新的數組,容量為原數組的2倍HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);// 重新計算擴容的臨界值threshold = (int)(newTable.length * loadFactor);int sizeMask = newTable.length - 1;// 通過循環,把舊數組的值寫入新的數組for (int i = 0; i < oldCapacity ; i++) {HashEntry<K,V> e = oldTable[i];if (e != null) {HashEntry<K,V> next = e.next;int idx = e.hash & sizeMask;// 重新計算索引if (next == null)newTable[idx] = e;else {HashEntry<K,V> lastRun = e;int lastIdx = idx;// 注意這個循環①for (HashEntry<K,V> last = next; last != null; last = last.next) {int k = last.hash & sizeMask;// 重新計算索引if (k != lastIdx) {lastIdx = k;lastRun = last;}}newTable[lastIdx] = lastRun;for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {int k = p.hash & sizeMask;// 重新計算索引HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(p.key, p.hash, n, p.value);}}}}table = newTable; }注意①處的那個循環的作用。比如原來有個鏈e1->e2-e3->e4,然后循環發現e3、e4經過擴容以后還是落在了同一個桶中,則會把e3、e4作為一個整體加入都鏈表中,然后其他落在這個桶中的節點再采用頭節點插入,這樣e3、e4兩個節點就不需要重新生成了。
2、putIfAbsent
此方法主要的作用就是,如果ConcurrentHashMap中已經存在指定的key,則返回key對應的value,但是并不替換掉這個value,如果不存在,則把key-value放進ConcurrentHashMap中
public V putIfAbsent(K key, V value) {if (value == null)throw new NullPointerException();int hash = hash(key.hashCode());// 此處put方法的第四個參數傳的是truereturn segmentFor(hash).put(key, hash, value, true); }3、putAll
把一個集合放到ConCurrentHashMap中,就是循環調用put方法。
public void putAll(Map<? extends K, ? extends V> m) {for (Map.Entry<? extends K, ? extends V> e : m.entrySet())put(e.getKey(), e.getValue()); }三、刪
1、指定key刪除
public V remove(Object key) {// 獲取key的hashint hash = hash(key.hashCode());return segmentFor(hash).remove(key, hash, null); }Segment的remove方法
V remove(Object key, int hash, Object value) {lock();// 加鎖try {int c = count - 1;HashEntry<K,V>[] tab = table;// 計算key所在的桶的下標int index = hash & (tab.length - 1);HashEntry<K,V> first = tab[index];HashEntry<K,V> e = first;// 循環匹配keywhile (e != null && (e.hash != hash || !key.equals(e.key)))e = e.next;V oldValue = null;if (e != null) {// 如果有匹配成功V v = e.value;if (value == null || value.equals(v)) {// value為null,或者根據key匹配到的HashEntry的value跟傳入的value也匹配oldValue = v;++modCount;HashEntry<K,V> newFirst = e.next;// 移除e,因為HashEntry的next被final修飾了,所以,e前面的數據要重新生成并加入for (HashEntry<K,V> p = first; p != e; p = p.next)newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value);tab[index] = newFirst;count = c; // write-volatile}}return oldValue;} finally {unlock();// 釋放鎖} }2、指定key和value移除
key和value要都匹配才能移除
public boolean remove(Object key, Object value) {int hash = hash(key.hashCode());if (value == null)return false;return segmentFor(hash).remove(key, hash, value) != null; }3、移除所有元素
循環清除每個Segment中的數據
public void clear() {for (int i = 0; i < segments.length; ++i)segments[i].clear(); } void clear() {if (count != 0) {lock();// 加鎖try {HashEntry<K,V>[] tab = table;// 循環清除table中的數據for (int i = 0; i < tab.length ; i++)tab[i] = null;++modCount;count = 0; // write-volatile} finally {unlock();//釋放鎖}} }四、改
1、根據key來修改
public V replace(K key, V value) {// 注意,key和value都不能為nullif (value == null)throw new NullPointerException();int hash = hash(key.hashCode());return segmentFor(hash).replace(key, hash, value); }Segment的replace方法
V replace(K key, int hash, V newValue) {lock();//加鎖try {HashEntry<K,V> e = getFirst(hash);// 循環匹配while (e != null && (e.hash != hash || !key.equals(e.key)))e = e.next;V oldValue = null;if (e != null) {//匹配成功,則把舊的值替換為新的值,并把舊的值返回oldValue = e.value;e.value = newValue;}return oldValue;} finally {unlock();//釋放鎖} }HashEntry<K,V> getFirst(int hash) {HashEntry<K,V>[] tab = table;return tab[hash & (tab.length - 1)]; }2、根據給定的key以及value去匹配
如果替換成功返回true,否則返回false
Segment的replace方法
boolean replace(K key, int hash, V oldValue, V newValue) {lock();// 加鎖try {HashEntry<K,V> e = getFirst(hash);// 先根據key來匹配,匹配成功以后看起對應的value是否和傳進來的oldValue匹配,如果匹配,則進行修改,并返回true,否則返回falsewhile (e != null && (e.hash != hash || !key.equals(e.key)))e = e.next;boolean replaced = false;if (e != null && oldValue.equals(e.value)) {replaced = true;e.value = newValue;}return replaced;} finally {unlock();// 釋放鎖} }五、查
get方法沒有加鎖,那是如何保證其數據的可靠性的呢?
public V get(Object key) {int hash = hash(key.hashCode());return segmentFor(hash).get(key, hash); }Segment的get方法
V get(Object key, int hash) {if (count != 0) { // ①HashEntry<K,V> e = getFirst(hash);while (e != null) {if (e.hash == hash && key.equals(e.key)) {// ③V v = e.value;if (v != null)// ②return v;return readValueUnderLock(e); // recheck}e = e.next;}}return null; } V readValueUnderLock(HashEntry<K,V> e) {lock();try {return e.value;} finally {unlock();} }我們看到并沒有加鎖,那它是怎么保證數據的一致性的呢?首先看①這個地方,首先判斷count是否為0,我們看一下count在Segment中的定義
transient volatile int count;用volatile修飾,保證了其可見性,也就是說如果此Segment的count為0,就不用判斷了,肯定是不存在指定的key的。如果不為0,然后通過hash去獲取相應的鏈表,然后循環鏈表去匹配,如果匹配成功,就取其value,我們注意到②這個位置有個判斷,就是判斷value是否為null,如果為null的話就表示是其他線程正在創建這個HashEntry(肯定不是放進去的null值,因為在put的時候如果放進去的是null值,會直接拋出異常),只是這個HashEntry還沒創建完成,這個時候就在加鎖去取其value,只是此時鎖被創建HashEntry的線程持有,此時get的這個線程就需要去等待創建HashEntry的線程釋放鎖,鎖被釋放的時候,HashEntry就已經被創建成功,此時get線程就能夠獲取到其對應的value了。還有就是在獲取值的時候,如果其他線程修改這個key對應的值,我們獲取的也是最新的值,因為HashEntry的value被volatile修飾,保證了其可見性。但是有這么一種情況,并不能保證獲取的值是最新值,比如有一個鏈表e1->e2->e3,然后你要獲取的值正好是e1中的value,當get線程執行到③位置的時候,就是已經獲取到HashEntry,然后另一個線程獲取到執行權限,把e1給刪了,此時get還是能夠獲取到值的,此時獲取到的值就已經不是最新的值了。
六、其他一些方法
1、是否包含指定的key
public boolean containsKey(Object key) {int hash = hash(key.hashCode());return segmentFor(hash).containsKey(key, hash); }Segment的containsKey方法
boolean containsKey(Object key, int hash) {if (count != 0) { // read-volatileHashEntry<K,V> e = getFirst(hash);while (e != null) {if (e.hash == hash && key.equals(e.key))return true;e = e.next;}}return false; }因為沒有加鎖,所以也不能保證獲取到的結果是最新的
2、是否包含指定的value
因為不能根據value定位,所以要循環全部的segments去匹配,效率低。大體的思路是先循環去匹配,如果匹配成功就返回true,否則就根據modCount去判斷在此期間集合是否有變化,如果沒有變化就直接返回false,如果有變化,就會再次循環segments去匹配,這種操作會執行兩次,如果兩次匹配的過程中集合都在變化,就沒有辦法了,只能去鎖全部的Segment,然后去匹配了,鎖全部的Segment這種操作開銷是非常大的,所以在鎖全部的Segment之前,會假設在匹配的過程中集合不會變化(樂觀鎖),進行兩次嘗試性的判斷。如果前兩次有成功,就不用鎖定全部的Segment了,相應的會有效率的提升。
public boolean containsValue(Object value) {if (value == null)throw new NullPointerException();final Segment<K,V>[] segments = this.segments;int[] mc = new int[segments.length];// 記錄每個segment中的modCount// RETRIES_BEFORE_LOCK的值是2,而且不可改for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {int sum = 0;int mcsum = 0;// 記錄每個segment中modCount相加的和// 循環去匹配for (int i = 0; i < segments.length; ++i) {int c = segments[i].count;mcsum += mc[i] = segments[i].modCount;if (segments[i].containsValue(value))// 如果匹配到就直接返回truereturn true;}boolean cleanSweep = true;if (mcsum != 0) {// mcsum如果是0,就表明是個空集合// 這個循環主要是看在循環匹配的過程中,集合是否發生了增刪改之類的操作,如果沒有就表明集合中不存在對應的value,否則就會再次去循環匹配for (int i = 0; i < segments.length; ++i) {int c = segments[i].count;if (mc[i] != segments[i].modCount) {cleanSweep = false;break;}}}if (cleanSweep)return false;}// 就是如果兩次匹配都沒成功,而且是集合一直在改變,則鎖所有的Segment,然后再進行匹配,這個代價是非常大的。for (int i = 0; i < segments.length; ++i)segments[i].lock();boolean found = false;try {for (int i = 0; i < segments.length; ++i) {if (segments[i].containsValue(value)) {found = true;break;}}} finally {for (int i = 0; i < segments.length; ++i)segments[i].unlock();}return found;}Segment的containsValue方法
這個方法大致和get的邏輯是一樣的,在此不再說明
3、獲取長度
這個和containsValue方法實現的思路,就是先樂觀的去匹配,如果不成功再上鎖
4、是否為空集合
首先去循環所有的segment,如果有一個segment的count不為0,就直接返回false,否在就再次循環看在此期間集合有沒有發生變化,如果發生了變化也直接返回false。
public boolean isEmpty() {final Segment<K,V>[] segments = this.segments;int[] mc = new int[segments.length];int mcsum = 0;for (int i = 0; i < segments.length; ++i) {if (segments[i].count != 0)return false;elsemcsum += mc[i] = segments[i].modCount;}if (mcsum != 0) {for (int i = 0; i < segments.length; ++i) {if (segments[i].count != 0 ||mc[i] != segments[i].modCount)return false;}}return true; }總結
以上是生活随笔為你收集整理的阅读ConcurrentHashMap源码的一些记录的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 维基百科全书挺好的网站
- 下一篇: 本地消息表(异步确保)