ConsurrentDictionary并发字典知多少?
在上一篇文章你真的了解字典嗎?一文中我介紹了Hash Function和字典的工作的基本原理.
有網(wǎng)友在文章底部評論,說我的Remove和Add方法沒有考慮線程安全問題.
https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.dictionary-2?redirectedfrom=MSDN&view=netframework-4.7.2
查閱相關(guān)資料后,發(fā)現(xiàn)字典.net中Dictionary本身時不支持線程安全的,如果要想使用支持線程安全的字典,那么我們就要使用ConcurrentDictionary了.
在研究ConcurrentDictionary的源碼后,我覺得在ConcurrentDictionary的線程安全的解決思路很有意思,其對線程安全的處理對對我們項目中的其他高并發(fā)場景也有一定的參考價值,在這里再次分享我的一些學習心得和體會,希望對大家有所幫助.
ConcurrentDictionary是Dictionary的線程安全版本,位于System.Collections.Concurrent的命名空間下,該命名空間下除了有ConcurrentDictionary,還有以下Class都是我們常用的那些類庫的線程安全版本.
BlockingCollection:為實現(xiàn)?IProducerConsumerCollection?的線程安全集合提供阻塞和限制功能。
ConcurrentBag:表示對象的線程安全的無序集合.
ConcurrentQueue:表示線程安全的先進先出 (FIFO) 集合。
如果讀過我上一篇文章你真的了解字典嗎?的小伙伴,對這個ConcurrentDictionary的工作原理應(yīng)該也不難理解,它是簡簡單單地在讀寫方法加個lock嗎?
Dictionary
如下圖所示,在字典中,數(shù)組entries用來存儲數(shù)據(jù),buckets作為橋梁,每次通過hash function獲取了key的哈希值后,對這個哈希值進行取余,即hashResult%bucketsLength=bucketIndex,余數(shù)作為buckets的index,而buckets的value就是這個key對應(yīng)的entry所在entries中的索引,所以最終我們就可以通過這個索引在entries中拿到我們想要的數(shù)據(jù),整個過程不需要對所有數(shù)據(jù)進行遍歷,的時間復雜度為1.
ConcurrentDictionary
ConcurrentDictionary的數(shù)據(jù)存儲類似,只是buckets有個更多的職責,它除了有dictionary中的buckets的橋梁的作用外,負責了數(shù)據(jù)存儲.
key的哈希值與buckets的length取余后hashResult%bucketsLength=bucketIndex,余數(shù)作為buckets的索引就能找到我們要的數(shù)據(jù)所存儲的塊,當出現(xiàn)兩個key指向同一個塊時,即上圖中的John Smith和Sandra Dee他同時指向152怎么辦呢?存儲節(jié)點Node具有Next屬性執(zhí)行下個Node,上圖中,node 152的Next為154,即我們從152開始找Sandra Dee,發(fā)現(xiàn)不是我們想要的,再到154找,即可取到所需數(shù)據(jù).
由于官方原版的源碼較為復雜,理解起來有所難度,我對官方源碼做了一些精簡,下文將圍繞這個精簡版的ConcurrentDictionary展開敘述.
https://github.com/liuzhenyulive/DictionaryMini
數(shù)據(jù)結(jié)構(gòu)
Node
ConcurrentDictionary中的每個數(shù)據(jù)存儲在一個Node中,它除了存儲value信息,還存儲key信息,以及key對應(yīng)的hashcode
private class Node{
internal TKey m_key;
internal TValue m_value;
internal volatile Node m_next;
internal int m_hashcode;
internal Node(TKey key, TValue value, int hashcode, Node next)
{
m_key = key;
m_value = value;
m_next = next;
m_hashcode = hashcode;
}
}
Table
而整個ConcurrentDictionary的數(shù)據(jù)存儲在這樣的一個Table中,其中m_buckets的Index負責映射key,m_locks是線程鎖,下文中會有詳細介紹,m_countPerLock存儲每個lock鎖負責的node數(shù)量.
private class Tables
{
internal readonly Node[] m_buckets;
internal readonly object[] m_locks;
internal volatile int[] m_countPerLock;
internal readonly IEqualityComparer<TKey> m_comparer;
internal Tables(Node[] buckets, object[] locks, int[] countPerlock, IEqualityComparer<TKey> comparer)
{
m_buckets = buckets;
m_locks = locks;
m_countPerLock = countPerlock;
m_comparer = comparer;
}
}
ConcurrentDictionary會在構(gòu)造函數(shù)中創(chuàng)建Table,這里我對原有的構(gòu)造函數(shù)進行了簡化,通過默認值進行創(chuàng)建,其中DefaultConcurrencyLevel默認并發(fā)級別為當前計算機處理器的線程數(shù).
public ConcurrentDictionaryMini() : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true,
EqualityComparer<TKey>.Default)
{
}
internal ConcurrentDictionaryMini(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer)
{
if (concurrencyLevel < 1)
{
throw new Exception("concurrencyLevel 必須為正數(shù)");
}
if (capacity < 0)
{
throw new Exception("capacity 不能為負數(shù).");
}
if (capacity < concurrencyLevel)
{
capacity = concurrencyLevel;
}
object[] locks = new object[concurrencyLevel];
for (int i = 0; i < locks.Length; i++)
{
locks[i] = new object();
}
int[] countPerLock = new int[locks.Length];
Node[] buckets = new Node[capacity];
m_tables = new Tables(buckets, locks, countPerLock, comparer);
m_growLockArray = growLockArray;
m_budget = buckets.Length / locks.Length;
}
方法
ConcurrentDictionary中較為基礎(chǔ)重點的方法分別位Add,Get,Remove,Grow Table方法,其他方法基本上是建立在這四個方法的基礎(chǔ)上進行的擴充.
Add
向Table中添加元素有以下亮點值得我們關(guān)注.
開始操作前會聲明一個tables變量來存儲操作開始前的m_tables,在正式開始操作后(進入lock)的時候,會檢查tables在準備工作階段是否別的線程改變,如果改變了,則重新開始準備工作并從新開始.
通過GetBucketAndLockNo方法獲取bucket索引以及l(fā)ock索引,其內(nèi)部就是取余操作.
int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount)
{
bucketNo = (hashcode & 0x7fffffff) % bucketCount;
lockNo = bucketNo % lockCount;
}
對數(shù)據(jù)進行操作前會從m_locks取出第lockNo個對象最為lock,操作完成后釋放該lock.多個lock一定程度上減少了阻塞的可能性.
在對數(shù)據(jù)進行更新時,如果該Value的Type為允許原子性寫入的,則直接更新該Value,否則創(chuàng)建一個新的node進行覆蓋.
private static bool IsValueWriteAtomic()
{
Type valueType = typeof(TValue);
if (valueType.IsClass)
{
return true;
}
switch (Type.GetTypeCode(valueType))
{
case TypeCode.Boolean:
case TypeCode.Byte:
case TypeCode.Char:
case TypeCode.Int16:
case TypeCode.Int32:
case TypeCode.SByte:
case TypeCode.Single:
case TypeCode.UInt16:
case TypeCode.UInt32:
return true;
case TypeCode.Int64:
case TypeCode.Double:
case TypeCode.UInt64:
return IntPtr.Size == 8;
default:
return false;
}
}
該方法依據(jù)CLI規(guī)范進行編寫,簡單來說,32位的計算機,對32字節(jié)以下的數(shù)據(jù)類型寫入時可以一次寫入的而不需要移動內(nèi)存指針,64位計算機對64位以下的數(shù)據(jù)可一次性寫入,不需要移動內(nèi)存指針.保證了寫入的安全.
詳見12.6.6?http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-335.pdf
private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
{
while (true)
{
int bucketNo, lockNo;
int hashcode;
Tables tables = m_tables;
IEqualityComparer<TKey> comparer = tables.m_comparer;
hashcode = comparer.GetHashCode(key);
GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);
bool resizeDesired = false;
bool lockTaken = false;
try
{
if (acquireLock)
Monitor.Enter(tables.m_locks[lockNo], ref lockTaken);
if (tables != m_tables)
continue;
Node prev = null;
for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next)
{
if (comparer.Equals(node.m_key, key))
{
if (updateIfExists)
{
if (s_isValueWriteAtomic)
{
node.m_value = value;
}
else
{
Node newNode = new Node(node.m_key, value, hashcode, node.m_next);
if (prev == null)
{
tables.m_buckets[bucketNo] = newNode;
}
else
{
prev.m_next = newNode;
}
}
resultingValue = value;
}
else
{
resultingValue = node.m_value;
}
return false;
}
prev = node;
}
Volatile.Write(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo]));
checked
{
tables.m_countPerLock[lockNo]++;
}
if (tables.m_countPerLock[lockNo] > m_budget)
{
resizeDesired = true;
}
}
finally
{
if (lockTaken)
Monitor.Exit(tables.m_locks[lockNo]);
}
if (resizeDesired)
{
GrowTable(tables, tables.m_comparer, false, m_keyRehashCount);
}
resultingValue = value;
return true;
}
}
Get
從Table中獲取元素的的流程與前文介紹ConcurrentDictionary工作原理時一致,但有以下亮點值得關(guān)注.
讀取bucket[i]在Volatile.Read()方法中進行,該方法會自動對讀取出來的數(shù)據(jù)加鎖,避免在讀取的過程中,數(shù)據(jù)被其他線程remove了.
Volatile讀取指定字段時,在讀取的內(nèi)存中插入一個內(nèi)存屏障,阻止處理器重新排序內(nèi)存操作,如果在代碼中此方法之后出現(xiàn)讀取或?qū)懭?#xff0c;則處理器無法在此方法之前移動它。
public bool TryGetValue(TKey key, out TValue value)
{
if (key == null) throw new ArgumentNullException("key");
Tables tables = m_tables;
IEqualityComparer<TKey> comparer = tables.m_comparer;
GetBucketAndLockNo(comparer.GetHashCode(key), out var bucketNo, out _, tables.m_buckets.Length, tables.m_locks.Length);
Node n = Volatile.Read(ref tables.m_buckets[bucketNo]);
while (n != null)
{
if (comparer.Equals(n.m_key, key))
{
value = n.m_value;
return true;
}
n = n.m_next;
}
value = default(TValue);
return false;
}
Remove
Remove方法實現(xiàn)其實也并不復雜,類似我們鏈表操作中移除某個Node.移除節(jié)點的同時,還要對前后節(jié)點進行鏈接,相信一塊小伙伴們肯定很好理解.
private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue){
while (true)
{
Tables tables = m_tables;
IEqualityComparer<TKey> comparer = tables.m_comparer;
int bucketNo, lockNo;
GetBucketAndLockNo(comparer.GetHashCode(key), out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);
lock (tables.m_locks[lockNo])
{
if (tables != m_tables)
continue;
Node prev = null;
for (Node curr = tables.m_buckets[bucketNo]; curr != null; curr = curr.m_next)
{
if (comparer.Equals(curr.m_key, key))
{
if (matchValue)
{
bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr.m_value);
if (!valuesMatch)
{
value = default(TValue);
return false;
}
}
if (prev == null)
Volatile.Write(ref tables.m_buckets[bucketNo], curr.m_next);
else
{
prev.m_next = curr.m_next;
}
value = curr.m_value;
tables.m_countPerLock[lockNo]--;
return true;
}
prev = curr;
}
}
value = default(TValue);
return false;
}
}
Grow table
當table中任何一個m_countPerLock的數(shù)量超過了設(shè)定的閾值后,會觸發(fā)此操作對Table進行擴容.
private void GrowTable(Tables tables, IEqualityComparer<TKey> newComparer, bool regenerateHashKeys,int rehashCount)
{
int locksAcquired = 0;
try
{
AcquireLocks(0, 1, ref locksAcquired);
if (regenerateHashKeys && rehashCount == m_keyRehashCount)
{
tables = m_tables;
}
else
{
if (tables != m_tables)
return;
long approxCount = 0;
for (int i = 0; i < tables.m_countPerLock.Length; i++)
{
approxCount += tables.m_countPerLock[i];
}
if (approxCount < tables.m_buckets.Length / 4)
{
m_budget = 2 * m_budget;
if (m_budget < 0)
{
m_budget = int.MaxValue;
}
return;
}
}
int newLength = 0;
bool maximizeTableSize = false;
try
{
checked
{
newLength = tables.m_buckets.Length * 2 + 1;
while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0)
{
newLength += 2;
}
}
}
catch (OverflowException)
{
maximizeTableSize = true;
}
if (maximizeTableSize)
{
newLength = int.MaxValue;
m_budget = int.MaxValue;
}
AcquireLocks(1, tables.m_locks.Length, ref locksAcquired);
object[] newLocks = tables.m_locks;
if (m_growLockArray && tables.m_locks.Length < MAX_LOCK_NUMBER)
{
newLocks = new object[tables.m_locks.Length * 2];
Array.Copy(tables.m_locks, newLocks, tables.m_locks.Length);
for (int i = tables.m_locks.Length; i < newLocks.Length; i++)
{
newLocks[i] = new object();
}
}
Node[] newBuckets = new Node[newLength];
int[] newCountPerLock = new int[newLocks.Length];
for (int i = 0; i < tables.m_buckets.Length; i++)
{
Node current = tables.m_buckets[i];
while (current != null)
{
Node next = current.m_next;
int newBucketNo, newLockNo;
int nodeHashCode = current.m_hashcode;
if (regenerateHashKeys)
{
nodeHashCode = newComparer.GetHashCode(current.m_key);
}
GetBucketAndLockNo(nodeHashCode, out newBucketNo, out newLockNo, newBuckets.Length,
newLocks.Length);
newBuckets[newBucketNo] = new Node(current.m_key, current.m_value, nodeHashCode,
newBuckets[newBucketNo]);
checked
{
newCountPerLock[newLockNo]++;
}
current = next;
}
}
if (regenerateHashKeys)
{
unchecked
{
m_keyRehashCount++;
}
}
m_budget = Math.Max(1, newBuckets.Length / newLocks.Length);
m_tables = new Tables(newBuckets, newLocks, newCountPerLock, newComparer);
}
finally
{
ReleaseLocks(0, locksAcquired);
}
}
lock[]:在以往的線程安全上,我們對數(shù)據(jù)的保護往往是對數(shù)據(jù)的修改寫入等地方加上lock,這個lock經(jīng)常上整個上下文中唯一的,這樣的設(shè)計下就可能會出現(xiàn)多個線程,寫入的根本不是一塊數(shù)據(jù),卻要等待前一個線程寫入完成下一個線程才能繼續(xù)操作.在ConcurrentDictionary中,通過哈希算法,從數(shù)組lock[]中找出key的準確lock,如果不同的key,使用的不是同一個lock,那么這多個線程的寫入時互不影響的.
寫入要考慮線程安全,讀取呢?不可否認,在大部分場景下,讀取不必去考慮線程安全,但是在我們這樣的鏈式讀取中,需要自上而下地查找,是不是有種可能在查找個過程中,鏈路被修改了呢?所以ConcurrentDictionary中使用Volatile.Read來讀取出數(shù)據(jù),該方法從指定字段讀取對象引用,在需要它的系統(tǒng)上,插入一個內(nèi)存屏障,阻止處理器重新排序內(nèi)存操作,如果在代碼中此方法之后出現(xiàn)讀取或?qū)懭?#xff0c;則處理器無法在此方法之前移動它。
在ConcurrentDictionary的更新方法中,對數(shù)據(jù)進行更新時,會判斷該數(shù)據(jù)是否可以原子寫入,如果時可以原子寫入的,那么就直接更新數(shù)據(jù),如果不是,那么會創(chuàng)建一個新的node覆蓋原有node,起初看到這里時候,我百思不得其解,不知道這么操作的目的,后面在jeo duffy的博客中Thread-safety, torn reads, and the like中找到了答案,這樣操作時為了防止torn reads(撕裂讀取),什么叫撕裂讀取呢?通俗地說,就是有的數(shù)據(jù)類型寫入時,要分多次寫入,寫一次,移動一次指針,那么就有可能寫了一半,這個結(jié)果被另外一個線程讀取走了.比如說我把?劉振宇三個字改成周杰倫的過程中,我先改把劉改成周了,正在我準備去把振改成杰的時候,另外一個線程過來讀取結(jié)果了,讀到的數(shù)據(jù)是周振宇,這顯然是不對的.所以對這種,更安全的做法是先把周杰倫三個字寫好在一張紙條上,然后直接替換掉劉振宇.更多信息在CLI規(guī)范12.6.6有詳細介紹.
checked和unckecked關(guān)鍵字.非常量的運算(non-constant)運算在編譯階段和運行時下不會做溢出檢查,如下這樣的代碼時不會拋出異常的,算錯了也不會報錯。
int i2 = 2147483647 + ten;
但是我們知道,int的最大值是2147483647,如果我們將上面這樣的代碼嵌套在checked就會做溢出檢查了.
checked{
int ten = 10;
int i2 = 2147483647 + ten;
}
相反,對于常量,編譯時是會做溢出檢查的,下面這樣的代碼在編譯時就會報錯的,如果我們使用unckeck標簽進行標記,則在編譯階段不會做移除檢查.
int a = int.MaxValue * 2;那么問題來了,我們當然知道checked很有用,那么uncheck呢?如果我們只是需要那么一個數(shù)而已,至于溢出不溢出的關(guān)系不大,比如說生成一個對象的HashCode,比如說根據(jù)一個算法計算出一個相對隨機數(shù),這都是不需要準確結(jié)果的,ConcurrentDictionary中對于m_keyRehashCount++這個運算就使用了unchecked,就是因為m_keyRehashCount是用來生成哈希值的,我們并不關(guān)心它有沒有溢出.
volatile關(guān)鍵字,表示一個字段可能是由在同一時間執(zhí)行多個線程進行修改。出于性能原因,編譯器\運行時系統(tǒng)甚至硬件可以重新排列對存儲器位置的讀取和寫入。聲明的字段volatile不受這些優(yōu)化的約束。添加volatile修飾符可確保所有線程都能按照執(zhí)行順序由任何其他線程執(zhí)行的易失性寫入,易失性寫入是一件瘋狂的事情的事情:普通玩家慎用.
本博客鎖涉及的代碼都保存在github中,Take it easy to enjoy it!
https://github.com/liuzhenyulive/DictionaryMini/blob/master/DictionaryMini/DictionaryMini/ConcurrentDictionaryMini.cs
原文地址:https://www.cnblogs.com/CoderAyu/p/10549409.html
.NET社區(qū)新聞,深度好文,歡迎訪問公眾號文章匯總 http://www.csharpkit.com
總結(jié)
以上是生活随笔為你收集整理的ConsurrentDictionary并发字典知多少?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C# .net 中 Timeout 的处
- 下一篇: Docker最全教程之Go实战,墙裂推荐