日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > C# >内容正文

C#

C#并行编程(4):基于任务的并行

發布時間:2023/12/4 C# 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 C#并行编程(4):基于任务的并行 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

C#中的任務Task

在C#編程中,實現并行可以直接使用線程,但使用起來很繁瑣;也可以使用線程池,線程池很大程度上簡化了線程的使用,但是也有著一些局限,比如我們不知道作業什么時候完成,也取不到作業的返回值;解決線程池局限性的方案是使用任務。本文將總結C#中Task的使用。

類似于線程池工作項對異步操作的封裝,任務是對異步操作的另一種形式的封裝,這種封裝抽象層次更高,讓我們能夠對異步操作進行更多的控制。

任務啟動后,通過任務調度器TaskScheduler來調度。.NET中提供兩種任務調度器,一種是線程池任務調度器,也是默認調度器,它會將任務派發給線程池工作者線程;另一種是上下文同步任務調度器,它會將任務派發給當前上下文線程,例如GUI線程。此外,我們也能自定義任務調度器,例如可以將異步IO任務派發給線程池IO線程。

Task的使用方法

隱式使用

Parallel靜態類除了提供并行循環的各種重載,還提供了一個方法Parallel.Invoke。這個方法可以創建并執行一個或多個異步任務,使用方法如下:




private static void DoWork(int workId = 0)
{
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started work[{workId}].");
Thread.Sleep(3000);
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] done work[{workId}].");
}




public static void ImplicitUsingOfTask()
{
Parallel.Invoke(()=>DoWork(1),()=>DoWork(2),() => DoWork(3));
}

上例的運行結果如下:

2019/3/27?20:40:18=>?Thread[9]?started?work[1].
2019/3/27?20:40:18=>?Thread[12]?started?work[3].
2019/3/27?20:40:18=>?Thread[10]?started?work[2].
2019/3/27?20:40:21=>?Thread[9]?done?work[1].
2019/3/27?20:40:21=>?Thread[12]?done?work[3].
2019/3/27?20:40:21=>?Thread[10]?done?work[2].

對于簡單的多任務并行,使用上述的方式很方便,但是這種方式與線程池一樣,我們不能控制任務的執行或者獲取任務返回值。

顯式使用

相對于使用Parallel.Invoke執行并行操作,更常用的是使用Task和Task<T>提供的方法進行異步和并行處理。下面是任務最基本的使用:

Task.Run(() =>
{

});
Task.Factory.StartNew(() =>
{

});

任務的常用操作

獲取任務的返回值

具有返回值的任務使用Task<T>,T可根據我們的需求指定,下面是獲取任務返回值的方法。

Task<int> task = Task<int>.Factory.StartNew(() =>
{
Thread.Sleep(3000);
return DateTime.Now.Day;
});
int day = task.Result;

需要說明的是,獲取任務的結果會阻塞當前線程。

等待任務完成

有時候,我們需要等待一些任務全部完成后才能執行后續操作,有時候只要多個任務中的一個完成了,就可以執行后續操作。Task提供了Wait、WaitAll和WaitAny等方法滿足我們的需求。下面的例子展示了各種等待方法的使用。




public static void TaskWait()
{
Stopwatch watch = new Stopwatch();

#region?場景1:等待一個任務完成
Task task = Task.Run(() => DoWorkOfTask(1000));
Console.WriteLine("start?wait.?work?duration:?1000");
watch.Start();
task.Wait();
watch.Stop();
Console.WriteLine($"end?wait.?time:?{watch.ElapsedMilliseconds}");
#endregion

#region?場景2:等待多個任務完成
Task[] tasks = new Task[3]
{
Task.Run(() => DoWorkOfTask(1000)),
Task.Run(() => DoWorkOfTask(2000)),
Task.Run(() => DoWorkOfTask(3000)),
};

Console.WriteLine("start?wait?all.?work?duration:?min?1000?max?3000.");
watch.Restart();
Task.WaitAll(tasks);
watch.Stop();
Console.WriteLine($"end?wait.?time:?{watch.ElapsedMilliseconds}");
#endregion

#region?場景3:等待某個任務完成
tasks = new Task[3]
{
Task.Run(() => DoWorkOfTask(1000)),
Task.Run(() => DoWorkOfTask(2000)),
Task.Run(() => DoWorkOfTask(3000)),
};
Console.WriteLine("start?wait?any.?work?duration:?min?1000?max?3000.");
watch.Restart();
Task.WaitAny(tasks);
watch.Stop();
Console.WriteLine($"end?wait.?time:?{watch.ElapsedMilliseconds}");
#endregion
}





private static void DoWorkOfTask(int workDuration)
{
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task[{Task.CurrentId}].");
Thread.Sleep(workDuration);
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task[{Task.CurrentId}].");
}

使用Wait、WaitAll和WaitAny方法時,我們可以設置超時時間或者傳入取消Token,以控制等待時間。但這些方法返回布爾值,只能表明是否等待成功;假如我們需要知道所等待的任務返回值,則可以使用WhenAll或WhenAny方法,這兩個方法不能控制等待時間,但會返回一個完成的任務。如下例:

Task<int>[] tasks = new Task<int>[3]
{
Task<int>.Factory.StartNew(() =>
{
Console.WriteLine($"task #{Task.CurrentId} run");
Thread.Sleep(100);
Console.WriteLine($"task #{Task.CurrentId} done");
return 100;
}),
Task<int>.Factory.StartNew(() =>
{
Console.WriteLine($"task #{Task.CurrentId} run");
Thread.Sleep(500);
Console.WriteLine($"task #{Task.CurrentId} done");
return 1000;
}),
Task<int>.Factory.StartNew(() =>
{
Console.WriteLine($"task #{Task.CurrentId} run");
Thread.Sleep(1000);
Console.WriteLine($"task #{Task.CurrentId} done");
return 10000;
}),
};




Task<int> task = Task.WhenAny(tasks).Result;
Console.WriteLine($"task #{task.Id}. result {task.Result}");

Task.WhenAll?和Task.WhenAny在等待結束時,都會創建一個完成狀態的任務,WhenAll將等待的所有已完成任務的結果放入創建任務的結果中,WhenAny則將等待的已完成任務放到創建任務的結果中。

任務延續

有時候,我們需要在一個任務完成時開始另一個任務。對于這種需求,我們可以使用Task的ContinueWith等方法來處理。

Task task = Task.Run(() => DoWorkOfTask(3000));
task.ContinueWith(t => DoWorkOfTask(1000));

運行結果:

2019/3/27?21:25:09=>?Thread[10]?started?task[1].

2019/3/27?21:25:12=>?Thread[10]?completed?task[1].

2019/3/27?21:25:12=>?Thread[11]?started?task[2].

2019/3/27?21:25:13=>?Thread[11]?completed?task[2].


我們還可以通過TaskContinuationOptions指定延續任務的執行條件,如任務取消時或者任務出現異常時才執行,等。

子任務的使用

有時候,我們要在一個任務里面創建一些其他任務,并且還要在任務里面等待創建的任務完成,此時我們可以使用子任務。

Task parent = Task.Factory.StartNew(() =>
{
Console.WriteLine($"parent task #{Task.CurrentId} run.");
for (int i = 0; i < 10; i++)
{
Task.Factory.StartNew(() =>
{
Console.WriteLine($"child task #{Task.CurrentId} run.");
Thread.Sleep(1000);
Console.WriteLine($"child task #{Task.CurrentId} done.");
}, TaskCreationOptions.AttachedToParent);
}
});
parent.Wait();
Console.WriteLine($"parent task #{parent.Id} done.");

在一個任務中創建的新任務,默認情況下與父級任務是分離的,各自的運行不受影響,除非在創建任務時顯式附加到父級任務中。例如,上例中如果不指定TaskCreationOptions.AttachedToParent,parent.Wait()就不會持續到所有子任務都執行完成。

任務的取消

我們在啟動任務時,傳入取消令牌CancellationToken,當收到取消請求時,拋出取消異常并在等待任務完成時捕獲異常TaskCanceledException。我們通過這種方式控制任務的取消。




public static void TaskCancle()
{
Console.WriteLine("Press any key to begin. Press 'c' to cancel. ");
Console.ReadKey(true);
Console.WriteLine();

CancellationTokenSource tokenSource = new CancellationTokenSource();
ConcurrentBag<Task> tasks = new ConcurrentBag<Task>();

Task task1 = Task.Factory.StartNew(() => DoWorkOfTask(5000, tokenSource.Token), tokenSource.Token);
tasks.Add(task1);


Task task2 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10; i++)
{
int duration = 1000 * i;
tasks.Add(Task.Factory.StartNew(()=>DoWorkOfTask(duration, tokenSource.Token), tokenSource.Token));
}
DoWorkOfTask(5000,tokenSource.Token);
}, tokenSource.Token);
tasks.Add(task2);

char ch = Console.ReadKey().KeyChar;
if (ch == 'c' || ch == 'C')
{
tokenSource.Cancel();
Console.WriteLine($"{DateTime.Now}=> Task cancellation requested.");
}

try
{
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException ae)
{
foreach (Exception ex in ae.InnerExceptions)
{
TaskCanceledException tce = ex as TaskCanceledException;
string cancelledTask = tce == null???string.Empty?:?$"Task #{tce.Task.Id}";
Console.WriteLine($"Exception:?{ex.GetType().Name}. {cancelledTask}");
}
}
finally
{
tokenSource.Dispose();
}

Console.WriteLine();

foreach (Task task in tasks)
{
Console.WriteLine($"Task:?#{task.Id} now is {task.Status}");
}
}

private static void DoWorkOfTask(int workDuration, CancellationToken cancleToken)
{
if (cancleToken.IsCancellationRequested)
{
Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled before it got started.");
cancleToken.ThrowIfCancellationRequested();
}

Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task #{Task.CurrentId}.");
Thread.Sleep(workDuration);

if (cancleToken.IsCancellationRequested)
{
Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled.");
cancleToken.ThrowIfCancellationRequested();
}
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task #{Task.CurrentId}.");
}

任務的異常處理

上面提到通過取消令牌拋出TaskCanceledException的方式控制任務的取消,實際上,Task會把自身執行過程中的所有異常都包裝到一個AggregateException中,并傳回調用線程。我們在主線程中通過捕獲AggregateException來進行異常處理。

簡單的處理方式

我們可以在任務的調用線程捕獲并遍歷AggregateException的內部異常,或者使用AggregateException提供的Handle方法進行處理,如下:

Task task = Task.Run(() =>
{
throw new Exception($"Task #{Task.CurrentId} thrown an exception");
});
try
{
task.Wait();
}
catch (AggregateException ae)
{

foreach (Exception ex in ae.InnerExceptions)
{
Console.WriteLine($"foreach:?{ex.Message}");
}


ae.Handle(ex=>
{
Console.WriteLine($"handle:?{ex.Message}");
return true ;
});
}

使用延續任務處理任務的異常

有時候,我們可以給任務附加一個任務異常時才會執行的延續任務,并在延續任務中進行異常處理。

Task.Run(() => { throw new Exception($"Task #{Task.CurrentId} thrown an exception"); })
.ContinueWith(t =>
{
Console.WriteLine($"{t.Exception?.InnerException?.Message}");
}, TaskContinuationOptions.OnlyOnFaulted);

嵌套任務的異常處理

下面是一個3層嵌套的任務。

Task parent = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10; i++)
{
Task.Factory.StartNew(() =>
{
for (int j = 0; j < 10; j++)
{
Task.Factory.StartNew(() =>
{
throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
});
}

throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
});
}

throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
});
try
{
parent.Wait();
}
catch (AggregateException ae)
{
ae.Flatten().Handle(ex =>
{
Console.WriteLine(ex.Message);
return true;
});
}

運行上面的代碼只會得到一行輸出:

Task #1 thrown an exception.

看起來有點奇怪,為什么只捕獲到一個異常呢?其實也是在情理之中的:任務默認只會把自身異常傳遞到它自己的調用線程,子任務是在父任務中調用的,其異常只會傳遞到父任務的執行線程,所以我們在父任務的調用線程,也就是我們的主線程中是捕獲不到子任務的異常的。

取消上面代碼的兩處/*, TaskCreationOptions.AttachedToParent*/,就會捕獲到所有異常。

任務調度器

.NET提供的任務調度器

任務是由TaskScheduler調度的,啟動任務時,默認使用線程池任務調度器,任務將會被派發到線程池工作線程。線程池的調度前面已經總結過,這里不再展開。.NET提供的另一種任務調度器是同步上下文調度器,用TaskScheduler.FromCurrentSynchronizationContext()獲取,這個調度器會把任務派發給當前的上下文線程,常用在GUI應用程序中。

例如,我們在一個窗體中新建一個ListBox,新建幾個任務向其中添加項,代碼如下:

this.lbxMsg.Items.Add($"{DateTime.Now:O}=>Current thread is thread #{Thread.CurrentThread.ManagedThreadId} .");
for (int i = 0; i < 10; i++)
{
new Task(() =>
{
for (int j = 0; j < 3; j++)
{
this.lbxMsg.Items.Add($"{DateTime.Now:O}=> Task #{Task.CurrentId} add an item with thread #{Thread.CurrentThread.ManagedThreadId}.");
}

}).Start(TaskScheduler.FromCurrentSynchronizationContext());
}

運行上面的代碼可以發現創建的任務都是由界面線程執行的。這里如果使用默認的任務調度器將產生"線程間操作無效"的異常。

實際使用時,可以給一個異步任務添加延續任務,來處理異步任務的結果或者異常等。如下:

Task.Run(() =>
{
Thread.Sleep(3000);
return 1000;
}).ContinueWith(t =>
{
this.lbxMsg.Items.Add(t.Result);
}, TaskScheduler.FromCurrentSynchronizationContext());

自定義任務調度器

除了使用.NET提供的調度器外,我們能夠繼承類TaskScheduler來實現自己的任務調度器。這里不再展開,需要了解的可以參考Samples?for?Parallel?Programming?with?the?.NET?Framework。

原文地址:https://www.cnblogs.com/chenbaoshun/p/10621819.html

.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總?http://www.csharpkit.com?

總結

以上是生活随笔為你收集整理的C#并行编程(4):基于任务的并行的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。