日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > C# >内容正文

C#

[一起读源码]走进C#并发队列ConcurrentQueue的内部世界 — .NET Core篇

發(fā)布時間:2023/12/4 C# 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [一起读源码]走进C#并发队列ConcurrentQueue的内部世界 — .NET Core篇 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在上一篇《走進C#并發(fā)隊列ConcurrentQueue的內部世界》中解析了Framework下的ConcurrentQueue實現原理,經過拋磚引玉,得到了一眾大佬的指點,找到了.NET Core版本下的ConcurrentQueue源碼,位于以下地址:

  • https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs

  • https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueueSegment.cs

我大致看了一下,雖然兩者的實現有不少相似的地方,不過在細節(jié)上新增了許多有意思的東西,還是覺得要單獨拉出來說一下。畫外音:誰叫我上篇立了flag,現在跪著也要寫完。。????

必須要吐糟的是,代碼中ConcurrentQueue類明明是包含在System.Collections.Concurrent命名空間下,但是源碼結構中的文件卻放在System.Private.CoreLib目錄中,這是鬧哪出~

存儲結構

從上面給出的源碼地址可以猜測出整個結構依然是Segment+Queue的組合,通過一個Segment鏈表實現了Queue結構,但實際上內部又加了新的設計。拋去Queue先不看的話,Segment本身就是一個實現了多生產者多消費者的線程安全集合,甚至可以直接拿它當一個固定容量的線程安全隊列使用,這點與之前Framework中差別很大。如果結合Queue整體來看,Segment不再是固定容量,而是可以由Queue來控制每個Segment的容量大小(最小是32,上限是1024 * 1024)。

在Framework中,隊列會給每個Segment分配一個索引,雖然這個索引是long類型的,但理論上說隊列容量還是存在上限。在Core中就不一樣了,它取消了這個索引,真正實現了一個無邊界(unbounded)隊列。

我猜測的原因是,在Framework中由于每個Segment是固定大小的,維護一個索引可以很方便的計算隊列里的元素數量,但是Core中的Segment大小不是固定的,使用索引并不能加快計算速度,使得這個索引不再有意義,這也意味著計算元素數量變得非常復雜。

一張圖看清它的真實面目,這里繼續(xù)沿用上一篇的結構圖稍作修改:

從圖中可以看到,整體結構上基本一致,核心改動就是Segment中增加了Slot(槽)的概念,這是真正存儲數據的地方,同時有一個序列號與之對應。

從代碼來看一下Segment的核心定義:

internal sealed class ConcurrentQueueSegment<T> {//存放數據的容器internal readonly Slot[] _slots;//這個mask用來計算槽點,可以防止查找越界internal readonly int _slotsMask;//首尾位置指針internal PaddedHeadAndTail _headAndTail;//觀察保留標記,表示當前段在出隊時能否刪除數據internal bool _preservedForObservation;//標記當前段是否被鎖住internal bool _frozenForEnqueues;//下一段的指針internal ConcurrentQueueSegment<T>? _nextSegment; }

其中_preservedForObservation和_frozenForEnqueues會比較難理解,后面再詳細介紹。

再看一下隊列的核心定義:

public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T> {//每一段的初始化長度,也是最小長度private const int InitialSegmentLength = 32;//每一段的最大長度private const int MaxSegmentLength = 1024 * 1024;//操作多個段時的鎖對象private readonly object _crossSegmentLock;//尾段指針private volatile ConcurrentQueueSegment<T> _tail;//首段指針private volatile ConcurrentQueueSegment<T> _head; }

常規(guī)操作

還是按上一篇的套路為主線循序漸進。

創(chuàng)建實例

ConcurrentQueue依然提供了2個構造函數,分別可以創(chuàng)建一個空隊列和指定數據集的隊列。

/// <summary> /// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class. /// </summary> public ConcurrentQueue() {_crossSegmentLock = new object();_tail = _head = new ConcurrentQueueSegment<T>(InitialSegmentLength); }

還是熟悉的操作,創(chuàng)建了一個長度是32的Segment并把隊列的首尾指針都指向它,同時創(chuàng)建了鎖對象實例,僅此而已。
進一步看看Segment是怎么創(chuàng)建的:

internal ConcurrentQueueSegment(int boundedLength) {//這里驗證了長度不能小于2并且必須是2的N次冪Debug.Assert(boundedLength >= 2, $"Must be >= 2, got {boundedLength}");Debug.Assert((boundedLength & (boundedLength - 1)) == 0, $"Must be a power of 2, got {boundedLength}");_slots = new Slot[boundedLength];//這個mask的作用就是用來計算數組索引的防止越界,可以用`& _slotsMask`取代`% _slots.Length`_slotsMask = boundedLength - 1;//設置初始序列號for (int i = 0; i < _slots.Length; i++){_slots[i].SequenceNumber = i;} }internal struct Slot {[AllowNull, MaybeNull] public T Item;public int SequenceNumber; }

再看看怎么用集合初始化隊列,這個過程稍微麻煩點,但是很有意思:

public ConcurrentQueue(IEnumerable<T> collection) {if (collection == null){ThrowHelper.ThrowArgumentNullException(ExceptionArgument.collection);}_crossSegmentLock = new object();//計算得到第一段的長度int length = InitialSegmentLength;if (collection is ICollection<T> c){int count = c.Count;if (count > length){length = Math.Min(ConcurrentQueueSegment<T>.RoundUpToPowerOf2(count), MaxSegmentLength);}}//根據前面計算出來的長度創(chuàng)建一個Segment,再把數據依次入隊_tail = _head = new ConcurrentQueueSegment<T>(length);foreach (T item in collection){Enqueue(item);} }

可以看到,第一段的大小是根據初始集合的大小確定的,如果集合大小count大于32就對count進行向上取2的N次冪(RoundUpToPowerOf2)得到實際大小(但是不能超過最大值),否則就按默認值32來初始化。

向上取2的N次冪到底是啥意思??例如count是5,那得到的結果就是8(2×2×2);如果count是9,那結果就是16(2×2×2×2);如果剛好count是8那結果就是8(2×2×2),具體算法是通過位運算實現的很有意思。至于為什么一定要是2的N次冪,中間的玄機我也沒搞明白。。

順藤摸瓜,再看看進隊操作如何實現。

元素進隊

/// <summary>在隊尾追加一個元素</summary> public void Enqueue(T item) {// 先嘗試在尾段插入一個元素if (!_tail.TryEnqueue(item)){// 如果插入失敗,就意味著尾段已經填滿,需要往后擴容EnqueueSlow(item);} }private void EnqueueSlow(T item) {while (true){ConcurrentQueueSegment<T> tail = _tail;// 先嘗試再隊尾插入元素,如果擴容完成了就會成功if (tail.TryEnqueue(item)){return;}// 獲得一把鎖,避免多個線程同時進行擴容lock (_crossSegmentLock){//檢查是否擴容過了if (tail == _tail){// 尾段凍結tail.EnsureFrozenForEnqueues();// 計算下一段的長度int nextSize = tail._preservedForObservation ? InitialSegmentLength : Math.Min(tail.Capacity * 2, MaxSegmentLength);var newTail = new ConcurrentQueueSegment<T>(nextSize);// 改變隊尾指向tail._nextSegment = newTail;// 指針交換_tail = newTail;}}} }

從以上流程可以看到,擴容的主動權不再由Segment去控制,而是交給了隊列。正因為如此,所以在跨段操作時要先加鎖,在Framework版本中是在原子操作獲得指針后進行的擴容所以不會有這個問題,后面的出隊操作也是一樣的道理。擴容過程中有兩個細節(jié)需要重點關注,那就是SegmentFrozen和下一段的長度計算。
從前面Segment的定義中我們看到它維護了一個_frozenForEnqueues標記字段,表示當前段是否被凍結鎖定,在被鎖住的情況下會讓其他入隊操作失敗,看一下實現過程:

// must only be called while queue's segment lock is held internal void EnsureFrozenForEnqueues() {// flag used to ensure we don't increase the Tail more than once if frozen more than onceif (!_frozenForEnqueues){_frozenForEnqueues = true;Interlocked.Add(ref _headAndTail.Tail, FreezeOffset);} }

首先判斷當前凍結狀態(tài),然后把它設置為true,再使用原子操作把尾指針增加了2倍段長的偏移量,這個尾指針才是真正限制當前段不可新增元素的關鍵點,后面講段的元素追加再關聯起來詳細介紹。而為什么要指定2倍段長這么一個特殊值呢,目的是為了把尾指針和mask做運算后落在同一個slot上,也就是說雖然兩個指針位置不一樣但是都指向的是同一個槽。

再說說下一段長度的計算問題,它主要是受_preservedForObservation這個字段影響,正常情況下一段的長度是尾段的2倍,但如果尾段正好被標記為觀察保留(類似于上一篇的截取快照),那么下一段的長度依然是初始值32,原作者認為入隊操作不是很頻繁,這樣做主要是為了避免浪費空間。

接著是重頭戲,看一下如何給段追加元素:

public bool TryEnqueue(T item) {Slot[] slots = _slots;// 如果發(fā)生競爭就自旋等待SpinWait spinner = default;while (true){// 獲取當前段的尾指針int currentTail = Volatile.Read(ref _headAndTail.Tail);// 計算槽點int slotsIndex = currentTail & _slotsMask;// 讀取對應槽的序列號int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);// 判斷槽點序列號和指針是否匹配int diff = sequenceNumber - currentTail;if (diff == 0){// 通過原子操作比較交換,保證了只有一個入隊者獲得可用空間if (Interlocked.CompareExchange(ref _headAndTail.Tail, currentTail + 1, currentTail) == currentTail){// 把數據存入對應的槽點,以及更新序列號slots[slotsIndex].Item = item;Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentTail + 1);return true;}}else if (diff < 0){// 序列號小于指針就說明該段已經裝滿了,直接返回falsereturn false;}// 這次競爭失敗了,只好等下去spinner.SpinOnce(sleep1Threshold: -1);} }

整個流程的核心就是借助槽點序列號和尾指針的匹配情況判斷是否有可用空間,因為在初始化的時候序列號是從0遞增,正常情況下尾指針和序列號肯定是匹配的,只有在整個段被裝滿時尾指針才會大于序列號,因為前面的凍結操作會給尾指針追加2倍段長的偏移量。要重點提出的是,只有在數據被寫入并且序列號更新完成后才表示整個位置的元素有效,才能有出隊的機會,在Framework是通過維護一個狀態(tài)位來實現這個功能。整個設計很有意思,要慢慢品。

這里我們可以總結一下序列號的核心作用:假設一個槽點N,對應序列號是Q,它能允許入隊的必要條件之一就是N==Q,由于入隊操作把位置N的序列號修改成N+1,那么可以猜測出在出隊時的必要條件之一就是滿足Q==N+1。

代碼中的CompareExchange在上一篇中有介紹,這里不再重復。另外關于Volatile相關的稍微提一下,它的核心作用是避免內存與CPU之間的高速緩存帶來的數據不一致問題,告訴編譯器直接讀寫原始數據,有興趣的可以找資料了解,限于篇幅不過多介紹。

元素出隊

可以猜測到,入隊的時候要根據容量大小進行擴容,那么與之對應的,出隊的時候就需要對它進行壓縮,也就是丟棄沒有數據的段。

/// <summary>從隊首移除一個元素</summary> public bool TryDequeue([MaybeNullWhen(false)] out T result) =>_head.TryDequeue(out result) ||TryDequeueSlow(out result);private bool TryDequeueSlow([MaybeNullWhen(false)] out T item) {// 不斷循環(huán)嘗試出隊,直到成功或失敗為止while (true){ConcurrentQueueSegment<T> head = _head;// 嘗試從隊首移除,如果成功就直接返回了if (head.TryDequeue(out item)){return true;}// 如果首段為空并且沒有下一段了,則說明整個隊列都沒有數據了,返回失敗if (head._nextSegment == null){item = default!;return false;}// 既然下一段不為空,那就再次確認本段是否還能出隊成功,否則就要把它給移除了,等待下次循環(huán)從下一段出隊if (head.TryDequeue(out item)){return true;}// 首段指針要往后移動,表示當前首段已丟棄,跨段操作要先加鎖lock (_crossSegmentLock){if (head == _head){_head = head._nextSegment;}}} }

整體流程基本和入隊一樣,外層通過一個死循環(huán)不斷嘗試操作,直到出隊成功或者隊列為空返回失敗為止。釋放空間的操作也從Segment轉移到隊列上,所以要加鎖保證線程安全。這一步我在代碼注釋中寫的很詳細就不多解釋了,再看一下核心操作Segment是如何移除元素的:

public bool TryDequeue([MaybeNullWhen(false)] out T item) {Slot[] slots = _slots;// 遇到競爭時自旋等待SpinWait spinner = default;while (true){// 獲取頭指針地址int currentHead = Volatile.Read(ref _headAndTail.Head);// 計算槽點int slotsIndex = currentHead & _slotsMask;// 獲取槽點對應的序列號int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);// 比較序列號是否和期望值一樣,為什么要加1的原因前面入隊時說過int diff = sequenceNumber - (currentHead + 1);if (diff == 0){// 通過原子操作比較交換得到可以出隊的槽點,并把頭指針往后移動一位if (Interlocked.CompareExchange(ref _headAndTail.Head, currentHead + 1, currentHead) == currentHead){// 取出數據item = slots[slotsIndex].Item!;// 此時如果該段沒有被標記觀察保護,要把這個槽點的數據清空if (!Volatile.Read(ref _preservedForObservation)){slots[slotsIndex].Item = default;Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentHead + slots.Length);}return true;}}else if (diff < 0){// 這種情況說明該段已經沒有有效數據了,直接返回失敗。bool frozen = _frozenForEnqueues;int currentTail = Volatile.Read(ref _headAndTail.Tail);if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0))){item = default!;return false;}}// 競爭失敗進入下一輪等待spinner.SpinOnce(sleep1Threshold: -1);} }

流程和追加元素類似,大部分都寫在備注里面了,這里只額外提一下為空的情況。Segment為空只有一種情況,那就是頭尾指針落在了同一個槽點,但這是會出現兩種可能性:

  • 第一種是都落在了非最后一個槽點,意味著該段沒有被裝滿,拿首尾指針相減即可判斷。

  • 第二種是都落在了最后一個槽點,意味著該段已經被裝滿了,如果此時正在進行擴容(frozen),那么必須要在尾指針的基礎上減去FreezeOffset再去和頭指針判斷,原因前面有說過;

是不是感覺環(huán)環(huán)相扣、相輔相成、如膠似漆、balabala.....????

統計元素數量

前面也預告過,因為隊列不再維護段索引,這樣會導致計算元素數量變得非常復雜,復雜到我都不想說這一部分了????。簡單描述一下就跳過了:核心思路就是一段一段來遍歷,然后計算出每段的大小最后把結果累加,如果涉及多個段還得加鎖,具體到段內部就要根據首尾指針計算槽點得出實際數量等等等等,代碼很長就不貼出來了。

這里也嚴重提醒一句,非必要情況下不要調用Count不要調用Count不要調用Count。

接下來重點說一下隊列的IsEmpty。由于Segment不再維護IsEmpty信息,所以實現方式就有點曲線救國了,通過嘗試能否從隊首位置獲取一個元素來判斷是否隊列為空,也就是常說的TryPeek操作,但細節(jié)上稍有不同。

/// <summary> /// 判斷隊列是否為空,千萬不要使用Count==0來判斷,也不要直接TryPeek /// </summary> public bool IsEmpty => !TryPeek(out _, resultUsed: false);private bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed) {ConcurrentQueueSegment<T> s = _head;while (true){ConcurrentQueueSegment<T>? next = Volatile.Read(ref s._nextSegment);// 從首段中獲取頭部元素,成功的話直接返回true,獲取失敗就意味著首段為空了if (s.TryPeek(out result, resultUsed)){return true;}// 如果下一段不為空那就再嘗試從下一段重新獲取if (next != null){s = next;}//如果下一段為空就說明整個隊列為空,跳出循環(huán)直接返回false了else if (Volatile.Read(ref s._nextSegment) == null){break;}}result = default!;return false; }

上面的代碼可以看到有一個特殊的參數resultUsed,它具體會有什么影響呢,那就得看看Segment是如何peek的:

public bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed) {// 實際上隊列的TryPeek是一個觀察保護操作,這時resultUsed會標記成true,如果是IsEmpty操作的話就為false,因為并不關心這個元素是否被釋放了if (resultUsed){_preservedForObservation = true;Interlocked.MemoryBarrier();}Slot[] slots = _slots;SpinWait spinner = default;while (true){int currentHead = Volatile.Read(ref _headAndTail.Head);int slotsIndex = currentHead & _slotsMask;int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);int diff = sequenceNumber - (currentHead + 1);if (diff == 0){result = resultUsed ? slots[slotsIndex].Item! : default!;return true;}else if (diff < 0){bool frozen = _frozenForEnqueues;int currentTail = Volatile.Read(ref _headAndTail.Tail);if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0))){result = default!;return false;}}spinner.SpinOnce(sleep1Threshold: -1);} }

除了最開始的resultUsed判斷,其他的基本和出隊的邏輯一致,前面說的很詳細,這里不多介紹了。

枚舉轉換數據

前面反復的提到觀察保護,這究竟是個啥意思??為什么要有這個操作??

其實看過上一篇文章的話就比較好理解一點,這里稍微回顧一下方便對比。在Framework中會有截取快照的操作,也就是類似ToArray\ToList\GetEnumerator這種要做數據迭代,它是通過原子操作維護一個m_numSnapshotTakers字段來實現對數據的保護,目的是為了告訴其他出隊的線程我正在遍歷數據,你們執(zhí)行出隊的時候不要把數據給刪了我要用的。在Core中也是為了實現同樣的功能才引入了觀察保護的概念,換了一種實現方式而已。

那么就以ToArray為例是怎么和其他操作交互的:

public T[] ToArray() {// 這一步可以理解為保護現場SnapForObservation(out ConcurrentQueueSegment<T> head, out int headHead, out ConcurrentQueueSegment<T> tail, out int tailTail);// 計算隊列長度,這也是要返回的數組大小long count = GetCount(head, headHead, tail, tailTail);T[] arr = new T[count];// 開始迭代數據塞到目標數組中using (IEnumerator<T> e = Enumerate(head, headHead, tail, tailTail)){int i = 0;while (e.MoveNext()){arr[i++] = e.Current;}Debug.Assert(count == i);}return arr; }

上面的代碼中,有一次獲取隊列長度的操作,還有一次獲取迭代數據的操作,這兩步邏輯比較相似都是對整個隊列進行遍歷,所以做一次數據轉換的開銷非常非常大,使用的時候一定要謹慎。別的不多說,重點介紹一下如何實現保護現場的過程:

private void SnapForObservation(out ConcurrentQueueSegment<T> head, out int headHead, out ConcurrentQueueSegment<T> tail, out int tailTail) {// 要保護現場肯定要先來一把鎖lock (_crossSegmentLock){head = _head;tail = _tail;// 一段一段進行遍歷for (ConcurrentQueueSegment<T> s = head; ; s = s._nextSegment!){// 把每一段的觀察保護標記設置成trues._preservedForObservation = true;// 遍歷到最后一段了就結束if (s == tail) break;}// 尾段凍結,這樣就不能新增元素tail.EnsureFrozenForEnqueues();// 返回兩個指針地址用來對每一個元素進行遍歷headHead = Volatile.Read(ref head._headAndTail.Head);tailTail = Volatile.Read(ref tail._headAndTail.Tail);} }

可以看到上來就是一把鎖,如果此時正在進行擴容或者收容的操作會直接阻塞掉,運氣好沒有阻塞的話你也不能有新元素入隊了,因為尾段已經凍結鎖死只能自旋等待,而出隊也不能釋放空間了。原話是:

At this point, any dequeues from any segment won't overwrite the value, and none of the existing segments can have new items enqueued.

有人就要問,這里把尾段鎖死那等ToArray()完成后豈不是也不能有新元素入隊了?不用擔心,前面入隊邏輯提到過如果該段被鎖住隊列會新創(chuàng)建一個段然后再嘗試入隊,這樣就能成功了。但是問題又來了,假如前面的段還有很多空位,那豈不是有浪費空間的嫌疑?我們知道沒有觀察保護的時候每段會以2倍長度遞增,這樣的話空間浪費率還是挺高的。帶著疑問提了個Issue問一下:
https://github.com/dotnet/runtime/issues/35094

到這里就基本把.NET Core ConcurrentQueue說完了。

總結

對比Framework下的并發(fā)隊列,Core里面的改動還是不小的,盡管保留了SpinWait和Interlocked相關操作,但是也加入了lock,邏輯上也復雜了很多,我一步步分析和寫文章搞了好幾天。

至于性能對比,我找到一個官方給出的測試結果,有興趣的可以看看:

https://github.com/dotnet/runtime/issues/27458#issuecomment-423964046

最后強行打個廣告,基于.NET Core平臺的開源分布式任務調度系統ScheduleMaster有興趣的star支持一下,2.0版本即將上線:

  • https://github.com/hey-hoho/ScheduleMasterCore

  • https://gitee.com/hey-hoho/ScheduleMasterCore(只從github同步)

總結

以上是生活随笔為你收集整理的[一起读源码]走进C#并发队列ConcurrentQueue的内部世界 — .NET Core篇的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。