.NET 6 新特性 Parallel ForEachAsync
.NET 6 新特性 Parallel ForEachAsync
Intro
在 .NET 6 中有一個 API Parallel.ForEachAsync 在官方的博客中一直被忽略,但是我覺得這個 API 非常的實用,類似于同步版本的 Parallel.ForEach,可以比較高效地控制多個異步任務的并行度。之前的版本中我們會使用信號量來控制異步任務的并發度,使用這個 API 之后就可以大大簡化我們的代碼,詳細可以看下面的示例代碼。
為什么需要這個 API
API definition
在使用同步任務并行執行的時候, 我們可以使用 Parallel.ForEach 來比較方便的控制多個任務的并行度,以便更好的利用系統資源,比如任務中如果有對受限的系統資源進行訪問的時候,此時最好就能夠控制并行度, 避免系統資源爭用,效率反而不高。
Parallel.ForEachAsync 相關的 API 定義如下:
public?static?System.Threading.Tasks.Task?ForEachAsync<TSource>(System.Collections.Generic.IEnumerable<TSource>?source,?System.Func<TSource,?CancellationToken,?ValueTask>?body);public?static?System.Threading.Tasks.Task?ForEachAsync<TSource>(System.Collections.Generic.IEnumerable<TSource>?source,?CancellationToken?cancellationToken,?System.Func<TSource,?CancellationToken,?ValueTask>?body);public?static?System.Threading.Tasks.Task?ForEachAsync<TSource>(System.Collections.Generic.IEnumerable<TSource>?source,?System.Threading.Tasks.ParallelOptions?parallelOptions,?System.Func<TSource,?CancellationToken,?ValueTask>?body);public?static?System.Threading.Tasks.Task?ForEachAsync<TSource>(System.Collections.Generic.IAsyncEnumerable<TSource>?source,?System.Func<TSource,?CancellationToken,?ValueTask>?body);public?static?System.Threading.Tasks.Task?ForEachAsync<TSource>(System.Collections.Generic.IAsyncEnumerable<TSource>?source,?CancellationToken?cancellationToken,?System.Func<TSource,?CancellationToken,?ValueTask>?body);public?static?System.Threading.Tasks.Task?ForEachAsync<TSource>(System.Collections.Generic.IAsyncEnumerable<TSource>?source,?System.Threading.Tasks.ParallelOptions?parallelOptions,?System.Func<TSource,?CancellationToken,?ValueTask>?body);通過 ParallelOptions 我們可以限制最大并行度以及自定義 TaskScheduler 和取消令牌
public?class?ParallelOptions {private?TaskScheduler?_scheduler;private?int?_maxDegreeOfParallelism;private?CancellationToken?_cancellationToken;public?ParallelOptions(){this._scheduler?=?TaskScheduler.Default;this._maxDegreeOfParallelism?=?-1;this._cancellationToken?=?CancellationToken.None;}public?TaskScheduler??TaskScheduler{get?=>?this._scheduler;set?=>?this._scheduler?=?value;}public?int?MaxDegreeOfParallelism{get?=>?this._maxDegreeOfParallelism;set?=>?this._maxDegreeOfParallelism?=?value?!=?0?&&?value?>=?-1???value?:?throw?new?ArgumentOutOfRangeException(nameof?(MaxDegreeOfParallelism));}public?CancellationToken?CancellationToken{get?=>?this._cancellationToken;set?=>?this._cancellationToken?=?value;} }Sample
來看一個實際的示例吧,多個任務并行執行,通常我們會使用 Task.WhenAll 來并行多個 Task 的執行,但是 Task.WhenAll 不能限制并發度,通常我們是會在異步 task 上封裝一層,使用信號量來限制并發,示例如下:
using?var?semaphore?=?new?SemaphoreSlim(10,?10); await?Task.WhenAll(Enumerable.Range(1,?100).Select(async?_?=> {try{await?semaphore.WaitAsync();await?Task.Delay(1000);}finally{semaphore.Release();} }));使用 Parallel.ForEachAsync 之后,我們就可以大大簡化我們的代碼:
await?Parallel.ForEachAsync(Enumerable.Range(1,?100),?new?ParallelOptions() {MaxDegreeOfParallelism?=?10 },?async?(_,?_)?=>?await?Task.Delay(1000));這樣是不是簡單了很多。
再來看一個所有情況的對比,來看一下是不是符合我們的預期:
using?System; using?System.Diagnostics; using?System.Linq; using?System.Threading; using?System.Threading.Tasks; using?static?System.Console;var?watch?=?Stopwatch.StartNew(); await?Task.WhenAll(Enumerable.Range(1,?100).Select(_?=>?Task.Delay(1000))); watch.Stop(); WriteLine(watch.ElapsedMilliseconds);watch.Restart(); using?var?semaphore?=?new?SemaphoreSlim(10,?10); await?Task.WhenAll(Enumerable.Range(1,?100).Select(async?_?=> {try{await?semaphore.WaitAsync();await?Task.Delay(1000);}finally{semaphore.Release();} })); watch.Stop(); WriteLine(watch.ElapsedMilliseconds);WriteLine($"{nameof(Environment.ProcessorCount)}:?{Environment.ProcessorCount}");watch.Restart(); await?Parallel.ForEachAsync(Enumerable.Range(1,?100),?async?(_,?_)?=>?await?Task.Delay(1000)); watch.Stop(); WriteLine(watch.ElapsedMilliseconds);watch.Restart(); await?Parallel.ForEachAsync(Enumerable.Range(1,?100),?new?ParallelOptions() {MaxDegreeOfParallelism?=?10 },?async?(_,?_)?=>?await?Task.Delay(1000)); watch.Stop(); WriteLine(watch.ElapsedMilliseconds);watch.Restart(); await?Parallel.ForEachAsync(Enumerable.Range(1,?100),?new?ParallelOptions() {MaxDegreeOfParallelism?=?100 },?async?(_,?_)?=>?await?Task.Delay(1000)); watch.Stop(); WriteLine(watch.ElapsedMilliseconds);watch.Restart(); await?Parallel.ForEachAsync(Enumerable.Range(1,?100),?new?ParallelOptions() {MaxDegreeOfParallelism?=?int.MaxValue },?async?(_,?_)?=>?await?Task.Delay(1000)); watch.Stop(); WriteLine(watch.ElapsedMilliseconds);可以先想一下,每種方式執行需要的耗時大概是多久,之后再嘗試運行一下看一下結果
輸出結果如下:
outputMore
執行結果是不是符合你的預期呢?
默認情況下,Parallel.ForEachAsync 的最大并行度是當前機器的 CPU 數量,也就是 Environment.ProcessorCount,如果要不限制可以指定最大并行度為 int.MaxValue
References
https://github.com/WeihanLi/SamplesInPractice/tree/master/net6sample/ParallelSample
https://github.com/dotnet/runtime/pull/46943
https://github.com/dotnet/runtime/blob/911640b3a891f92ff66e9c82ce65f71d203f11a2/src/libraries/System.Threading.Tasks.Parallel/ref/System.Threading.Tasks.Parallel.cs#L39-L44
總結
以上是生活随笔為你收集整理的.NET 6 新特性 Parallel ForEachAsync的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 哼!看你能坚持多久
- 下一篇: asp.net ajax控件工具集 Au