Multithreading With C# Cookbook---Chapter4---使用任务并行库

时间:2022-08-16 21:01:22

概念

任务并行库(Task Parallel Library,TPL),在.NET Framework 4.0引入,4.5有所改进。我们可以把线程池认为是一个抽象层,其隐藏使用线程的细节;TPL可被认为是线程池上的又一个抽象层,其隐藏了与线程池交互的底层代码,并提供更方便的API。

更多内容

TPL核心概念是任务。一个任务代表一个异步操作,该操作可以通过多种方式运行,可以使用或不使用独立线程运行。TPL向用户隐藏任务的实现细节从而创建一个抽象层,我们无须知道任务实际是如何执行的。但如果不正确使用,将会导致诡异的错误。

一个任务可以通过多种方式与其他任务组合,其用于组合任务的便利的API也是一个与其他模式(APM,EAP)的一个关键优势。

处理任务中的异常:一个任务可能有多个任务组成,多个任务又有自己的子任务,通过AggregateException来捕获底层任务的所有异常,并允许单独处理这些异常。

准备工作

使用.NET Framework 4.5以上版本。

使用using

Multithreading With C# Cookbook---Chapter4---使用任务并行库Multithreading With C# Cookbook---Chapter4---使用任务并行库
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using static System.Threading.Thread;
using System.Threading;
View Code

创建任务

Task.Run创建任务,再使用Start开启任务;Task.Factory.StartNew创建任务后会立即启动。当创建任务时标记任务为长时间运行TaskCreationOptions.LongRunning,将不会使用线程池线程,而是另开线程。

Multithreading With C# Cookbook---Chapter4---使用任务并行库Multithreading With C# Cookbook---Chapter4---使用任务并行库
    class Program
{
static void Main(string[] args)
{
Task t1
= new Task(() => TaskMethod("Task1"));
Task t2
= new Task(() => TaskMethod("Task2"));
t1.Start();
t2.Start();
Task.Run(()
=> TaskMethod("Task3"));
Task.Factory.StartNew(()
=> TaskMethod("Task4"));
Task.Factory.StartNew(()
=> TaskMethod("Task5"), TaskCreationOptions.LongRunning);//标记该任务为长时间运行任务,将不会使用线程池线程

Console.ReadLine();
}

static void TaskMethod(string name)
{
Console.WriteLine($
"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
}
}
View Code

使用任务执行基本操作

创建Task<int>任务,使用任务对象.Result属性来获取该任务返回值,在在任务返回值之前,主线程会阻塞等待完成。使用Task.IsCompleted来轮询任务状态,通过Task.RunAsynchornously()方法,来指定任务运行在主线程。

Multithreading With C# Cookbook---Chapter4---使用任务并行库Multithreading With C# Cookbook---Chapter4---使用任务并行库
class Program
{
static void Main(string[] args)
{
Console.WriteLine(
"Task1 result:"+ TaskMethod("Task1"));

Task
<int> task = CreatTask("Task2");
task.Start();
Console.WriteLine(
"Task2 result:" + task.Result);

task
= CreatTask("Task3");
task.RunSynchronously();
//运行在主线程
Console.WriteLine("Task3 result:" + task.Result);

task
= CreatTask("Task4");
Console.WriteLine(task.Status) ;
task.Start();
while (!task.IsCompleted)
{
Console.WriteLine(task.Status);
Thread.Sleep(TimeSpan.FromSeconds(
1));
}
Console.WriteLine(task.Status);
Console.WriteLine(
"Task4 result:" + task.Result);

Console.ReadLine();
}

static Task<int> CreatTask(string name)
{
return new Task<int>(()=>TaskMethod(name));
}

static int TaskMethod(string name)
{
Console.WriteLine($
"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
Thread.Sleep(TimeSpan.FromSeconds(
2));
return 42;
}
}
View Code

组合任务

创建一个任务,再给他创建一个后续任务(Task.ContinueWith),先执行该任务,完成后才能执行后续任务。TaskContinuationOptions.OnlyOnRanToCompletion属性指定该延续任务只能在前面任务完成后才能执行;TaskContinuationOptions.ExecuteSynchronously属性指定该延续任务会工作在创建此延续任务的线程上执行。

创建任务时指定TaskCreationOptions.AttachedToParent属性,指定该任务为父子任务,只有当所有子任务完成,父任务才算完成。

Multithreading With C# Cookbook---Chapter4---使用任务并行库Multithreading With C# Cookbook---Chapter4---使用任务并行库
class Program
{
static void Main(string[] args)
{
var t1 = new Task<int>(() => TaskMethod("task1", 3));
var t2 = new Task<int>(() => TaskMethod("task2", 2));

t1.ContinueWith(t
=> Console.WriteLine($"The first answer is {t.Result}. Thread id :{CurrentThread.ManagedThreadId},is thread pool thread:{CurrentThread.IsThreadPoolThread}"), TaskContinuationOptions.OnlyOnRanToCompletion);//该属性指定该延续任务只能在前面任务完成后才能执行

t1.Start();
t2.Start();

Thread.Sleep(TimeSpan.FromSeconds(
4));

Task continuation
= t2.ContinueWith(t => Console.WriteLine($"The first answer is {t.Result}. Thread id :{CurrentThread.ManagedThreadId},is thread pool thread:{CurrentThread.IsThreadPoolThread}"), TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);//ExecuteSynchronously属性指定该延续任务会工作在创建此延续任务的线程上执行

continuation.GetAwaiter().OnCompleted(()
=> Console.WriteLine($"Continuation Task Completed! Thread id is: {CurrentThread.ManagedThreadId}. Is thread pool thread: {CurrentThread.IsThreadPoolThread}"));

Thread.Sleep(TimeSpan.FromSeconds(
2));
Console.WriteLine();

t1
= new Task<int>(() =>
{
var innerTask = Task.Factory.StartNew(() => TaskMethod("second Task", 5), TaskCreationOptions.AttachedToParent);
innerTask.ContinueWith(t
=> TaskMethod("Third Task", 2), TaskContinuationOptions.AttachedToParent);
return TaskMethod("First Task", 2);
});

t1.Start();

while (!t1.IsCompleted)
{
Console.WriteLine(t1.Status);
Thread.Sleep(TimeSpan.FromSeconds(
0.5));
}
Console.WriteLine(t1.Status);

Console.ReadLine();



Console.ReadLine();
}

static int TaskMethod(string name, int seconds)
{
Console.WriteLine($
"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
return seconds * 42;
}
}
View Code

APM(异步编程)模式转任务

 将APM API转换为任务。代码示例Task.Factory.FromAsync三种重载方法。

Multithreading With C# Cookbook---Chapter4---使用任务并行库Multithreading With C# Cookbook---Chapter4---使用任务并行库
class Program
{
static void Main(string[] args)
{
int threadId;
AsynchronouTask a
= Test;
IncompatibleAsynchronousTask c
= Test;

Console.WriteLine(
"Option1");
Task
<string> task1 = Task<string>.Factory.FromAsync(a.BeginInvoke("Async 1",callBack,"async1 state"),a.EndInvoke);
task1.ContinueWith(t
=> Console.WriteLine($"Callback is finished. now running a continuation!result:{t.Result}"));
while (!task1.IsCompleted)
{
//Console.WriteLine(task1.Status);
}
Console.WriteLine(task1.Status);

Console.WriteLine(
"----------------------------");
Console.WriteLine();

Console.WriteLine(
"Option2");
Task
<string> task2 = Task<string>.Factory.FromAsync(a.BeginInvoke,a.EndInvoke,"Async 2","async2 state");
task2.ContinueWith(t
=> Console.WriteLine($"Callback is finished. now running a continuation!result:{t.Result}"));
while (!task2.IsCompleted)
{
//Console.WriteLine(task2.Status);
}
Console.WriteLine(task2.Status);

Console.WriteLine(
"----------------------------");
Console.WriteLine();

Console.WriteLine(
"Option3");
IAsyncResult ar
= c.BeginInvoke(out threadId,callBack,"async3 state");
Task
<string> task3 = Task<string>.Factory.FromAsync(ar,_=>c.EndInvoke(out threadId,ar));
task2.ContinueWith(t
=> Console.WriteLine($"Callback is finished. now running a continuation!result:{t.Result}"));
while (!task3.IsCompleted)
{
//Console.WriteLine(task3.Status);
}
Console.WriteLine(task3.Status);

Console.ReadLine();
}

delegate string AsynchronouTask(string threadName);

delegate string IncompatibleAsynchronousTask(out int threadId);

static void callBack(IAsyncResult ar)
{
Console.WriteLine(
"Starting callback……");
Console.WriteLine(
"State paseed to a callback:"+ar.AsyncState);
Console.WriteLine(
"Is thread pool thread:"+CurrentThread.IsThreadPoolThread);
}

static string Test(string threadName)
{
Console.WriteLine(
"TN: Starting Test……");
Console.WriteLine(
"Is thread pool thread:"+CurrentThread.IsThreadPoolThread);
Sleep(
2000);
CurrentThread.Name
= threadName;
return "Thread name is:" + CurrentThread.Name;
}

static string Test(out int threadId)
{
Console.WriteLine(
"TI: Starting Test……");
Console.WriteLine(
"Is thread pool thread:" + CurrentThread.ManagedThreadId);
Sleep(
2000);
threadId
= CurrentThread.ManagedThreadId;
return "Thread Id is:" + threadId;
}
}
View Code

EAP(基于事件的异步操作)模式转任务

Multithreading With C# Cookbook---Chapter4---使用任务并行库Multithreading With C# Cookbook---Chapter4---使用任务并行库
class Program
{
static void Main(string[] args)
{
var tcs = new TaskCompletionSource<int>();

var worker = new BackgroundWorker();
worker.DoWork
+= (sender, evenArgs) =>
{
evenArgs.Result
= TaskMethod("BackgroundWorker",3);
};
worker.RunWorkerCompleted
+= (sender, evenArgs) =>
{
if (evenArgs.Error != null)
{
tcs.SetException(evenArgs.Error);
}
else if (evenArgs.Cancelled)
{
tcs.SetCanceled();
}
else
{
tcs.SetResult((
int)evenArgs.Result);
}
};
worker.RunWorkerAsync();
int result = tcs.Task.Result;
Console.WriteLine(result);

Console.ReadLine();
}

static int TaskMethod(string name,int seconds)
{
Console.WriteLine($
"Task {name} on a thread id:{CurrentThread.ManagedThreadId},is thread pool thread:{CurrentThread.IsThreadPoolThread}");
Sleep(TimeSpan.FromSeconds(seconds));
return seconds * 42;
}
}
View Code

实现取消选项

取消基于任务的异步操作。

Multithreading With C# Cookbook---Chapter4---使用任务并行库Multithreading With C# Cookbook---Chapter4---使用任务并行库
class Program
{
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
var longTask = new Task<int>(() =>
TaskMethod(
"Task1", 10, cts.Token), cts.Token);
Console.WriteLine(longTask.Status);
cts.Cancel();
//longTask.Start();//异常
Console.WriteLine(longTask.Status);
Console.WriteLine(
"First task hai been cancelled before execution.");

cts
= new CancellationTokenSource();
longTask
= new Task<int>(() =>
TaskMethod(
"Task2", 10, cts.Token), cts.Token);
longTask.Start();
for (int i = 0; i < 5; i++)
{
Thread.Sleep(TimeSpan.FromSeconds(
0.5));
Console.WriteLine(longTask.Status);
}
cts.Cancel();
Console.WriteLine(
"Task2 is canceled.");
for (int i = 0; i < 5; i++)
{
Thread.Sleep(TimeSpan.FromSeconds(
0.5));
Console.WriteLine(longTask.Status);
}
Console.WriteLine($
"A task has been completed with result {longTask.Result}");
Console.ReadLine();
}

static int TaskMethod(string name, int seconds, CancellationToken token)
{
Console.WriteLine($
"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");

for (int i = 0; i < seconds; i++)
{
Thread.Sleep(TimeSpan.FromSeconds(
1));
if (token.IsCancellationRequested)
return -1;
}

return seconds * 42;
}
}
View Code

并行运行任务

借助Task.WhenAll方法,创建第三个方法,该方法在所有任务完成之后执行,并返回一个结果集;使用Task.WhenAny方法,任务列表中每完成一个任务,就从列表删除并等待其他任务完成。可以使用一个任务来计时,超时取消其他所有任务,来模拟超时管理。

Multithreading With C# Cookbook---Chapter4---使用任务并行库Multithreading With C# Cookbook---Chapter4---使用任务并行库
class Program
{
static void Main(string[] args)
{
var t1 = new Task<int>(() => TaskMethod("Task1", 2));
var t2 = new Task<int>(() => TaskMethod("Task2", 3));
var whenAllTask = Task.WhenAll(t1, t2);

whenAllTask.ContinueWith(t
=> Console.WriteLine($"The first task answer is {t.Result[0]}. The second task answer is {t.Result[1]}"), TaskContinuationOptions.OnlyOnRanToCompletion);
t1.Start();
t2.Start();

Thread.Sleep(TimeSpan.FromSeconds(
5));

var tasks = new List<Task<int>>();
for (int i = 0; i < 4; i++)
{
int counter = i;
var task = new Task<int>(() => TaskMethod("Tasks:Task " + counter, counter));
tasks.Add(task);
task.Start();
}

while (tasks.Count > 0)
{
var completedTask = Task.WhenAny(tasks).Result;
tasks.Remove(completedTask);
Console.WriteLine($
"A task has been completed with result {completedTask.Result}");
}

Console.ReadLine();
}

static int TaskMethod(string name, int seconds)
{
Console.WriteLine($
"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
return seconds * 42;
}
}
View Code

UI访问线程

第一个按钮Sync点击后整个界面卡死,一段时间后返回错误“调用线程无法访问此对象。另一个线程拥有该对象。”,因为我们不允许从创建UI的线程之外的线程访问UI;第二个按钮Async点击后,界面没有卡死,但依然会报错;第三个按钮AsyncOk点击后,可以成功返回结果,并且等待过程中,界面依然可以相应其他事件。

WPF XAML:

Multithreading With C# Cookbook---Chapter4---使用任务并行库Multithreading With C# Cookbook---Chapter4---使用任务并行库
<Window x:Class="TaskSchedulerTest.MainWindow"
xmlns
="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x
="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:d
="http://schemas.microsoft.com/expression/blend/2008"
xmlns:mc
="http://schemas.openxmlformats.org/markup-compatibility/2006"
xmlns:local
="clr-namespace:TaskSchedulerTest"
mc:Ignorable
="d"
Title
="MainWindow" Height="350" Width="525">
<Grid>
<TextBlock Name="ContentTextBlock" HorizontalAlignment="Left" Margin="44,134,0,0" VerticalAlignment="Top"
Width
="425" Height="40"/>
<Button Name="btnSync" Content="Sync" HorizontalAlignment="Left" Margin="45,190,0,0" VerticalAlignment="Top" Width="75" Click="btnSync_Click"/>
<Button Name="btnAsync" Content="Async" HorizontalAlignment="Left" Margin="165,190,0,0" VerticalAlignment="Top" Width="75" Click="btnAsync_Click"/>
<Button Name="btnAsyncOk" Content="AsyncOk" HorizontalAlignment="Left" Margin="285,190,0,0" VerticalAlignment="Top" Width="75" Click="btnAsyncOk_Click"/>
</Grid>
</Window>
View Code

后台程序:

Multithreading With C# Cookbook---Chapter4---使用任务并行库Multithreading With C# Cookbook---Chapter4---使用任务并行库
public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();
}

private void btnSync_Click(object sender, RoutedEventArgs e)
{
ContentTextBlock.Text
= string.Empty;
try
{
string result = TaskMethod().Result;
ContentTextBlock.Text
= result;
}
catch (Exception ex)
{

ContentTextBlock.Text
= ex.InnerException.Message;
}
}

private void btnAsync_Click(object sender, RoutedEventArgs e)
{
ContentTextBlock.Text
= string.Empty;
Mouse.OverrideCursor
= Cursors.Wait;
Task
<string> task = TaskMethod();
task.ContinueWith(t
=>
{
ContentTextBlock.Text
= t.Exception.InnerException.Message;
Mouse.OverrideCursor
= null;
}, CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted,TaskScheduler.FromCurrentSynchronizationContext());

}

private void btnAsyncOk_Click(object sender, RoutedEventArgs e)
{
ContentTextBlock.Text
= string.Empty;
Mouse.OverrideCursor
= Cursors.Wait;
Task
<string> task = TaskMethod(TaskScheduler.FromCurrentSynchronizationContext());

task.ContinueWith(t
=>Mouse.OverrideCursor=null,CancellationToken.None,TaskContinuationOptions.None,TaskScheduler.FromCurrentSynchronizationContext());
}

private Task<string> TaskMethod()
{
return TaskMethod(TaskScheduler.Default);
}
private Task<string> TaskMethod(TaskScheduler scheduler)
{
Task delay
= Task.Delay(TimeSpan.FromSeconds(5));

return delay.ContinueWith(d=>
{
string str = $"Task is running on thread id :{CurrentThread.ManagedThreadId}. Is thread pool: {CurrentThread.IsThreadPoolThread}";
ContentTextBlock.Text
= str;
return str;
},scheduler);
}
}
View Code

 

注:本文是在阅读《C#多线程编程实战》后所写,部分内容引用该书内容,这是一本不错的书,感谢!