并行开发-Task

时间:2023-03-08 22:27:41

Task

对于多线程,经常使用的是Thread。在了解Task之前,如果要使用多核的功能可能就会自己来开线程,然而这种线程模型在.net 4.0之后被一种称为基于“任务的编程模型”所冲击,这就是Task,Task会比Thread具有更小的性能开销,Task是架构在Thread之上的就是说Task最终还是会抛给线程去做,并且任务跟线程不是一对一的关系,比如开10个任务并不是说会开10个线程,这一点任务有点类似线程池,但是任务相比线程池有很小的开销和精确的控制。Task 类的表示单个操作不返回一个值,通常以异步方式执行,Task 对象是"基于任务的异步模式"首次引入.NET Framework 4 中。 因为由执行工作 Task 对象通常以异步方式执行在线程池线程上而不是以同步方式在主应用程序线程,可以使用Status属性,以及 IsCanceled, IsCompleted, 和 IsFaulted 属性,以确定任务的状态

创建任务的方式

使用实例化的ThreadFactory类

var t1 = new TaskFactory().StartNew(() => Console.WriteLine("TaskFactory().StartNew"));

使用Task的静态属性Task静态属性Factory

var t2 = Task.Factory.StartNew(() => Console.WriteLine("Task静态属性Factory"));

使用Task的构造函数(实例化Task对象时,任务不会立即执行,而是指定Created状态,通过Task.Start()方法启动)

var t3 = new Task(() => Console.WriteLine("使用Task的构造函数"));
t3.Start();

.Net4.5新增功能使用Task类的Run方法

var t4 = Task.Run(() => Console.WriteLine("Task.Run"));

注意:使用Task.Run/Task.Factory.StartNew/new TaskFactory().StartNew()方法运行的任务会立即开始工作,无需显式地调用这些任务的Start方法

同步任务

任务不一定要使用线程池中的线程,也可以使用其他线程,任务也可以同步进行,以相同的线程作为主调线程

并行开发-Task
static void Main(string[] args)
{
var t1 = new Task(() => TaskMethod("t1"));
t1.Start();
Console.WriteLine("主线程调用结束");
Console.ReadKey();
} public static void TaskMethod(string taskName)
{
Console.WriteLine("Task {0} 运行在线程id为{1}的线程上。是否是线程池中线程?:{2}",
taskName, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
}
并行开发-Task

并行开发-Task

var t1 = new Task(() => TaskMethod("t1"));
t1.RunSynchronously();
Console.WriteLine("主线程调用结束");
Console.ReadKey();

并行开发-Task

使用单独线程的任务

如果任务的代码应该长时间运行,就应该使用TaskCreationOptions.LongRuning告诉任务调度器创建一个新线程,而不是使用线程池中的线程,此时,线程可以不受线程池管理

 var t1 = new Task(TaskMethod, TaskCreationOptions.LongRunning);
t1.Start();

并行开发-Task

Task生命周期

Created 该任务已初始化,但尚未被计划
WaitingForActivation 该任务正在等待 .NET Framework 基础结构在内部将其激活并进行计划
WaitingToRun 该任务已被计划执行,但尚未开始执行
Running 该任务正在运行,但尚未完成
WaitingForChildrenToComplete 该任务已完成执行,正在隐式等待附加的子任务完成
RanToCompletion 已成功完成执行的任务
Canceled

该任务已通过对其自身的 CancellationToken 引发 OperationCanceledException 对取消进行了确认,此时该标记处于已发送信号状态;或者在该任务开始执行之前,已向该任务的
CancellationToken 发出了信号

Faulted 由于未处理异常的原因而完成的任务

Task任务控制

Task.Wait task1.Wait();就是等待任务执行完成,task的状态变为Completed
Task.WaitAll 待所有的任务都执行完成
Task.WaitAny 等待任何一个任务完成就继续向下执行
Task.ContinueWith 第一个Task完成后自动启动下一个Task,实现Task的延续CancellationTokenSource
CancellationTokenSource 通过cancellation的tokens来取消一个Task

获取任务返回值Task<TResult>

并行开发-Task
static void Main(string[] args)
{
Task<string> t1 = new Task<string>(() => TaskMethod("t1"));
t1.Start();
Console.WriteLine(t1.Result);
Console.ReadKey();
} public static string TaskMethod(string taskName)
{
var result = string.Format("Task {0} 运行在线程id为{1}的线程上。是否是线程池中线程?:{2}",
taskName, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
return result;
}
并行开发-Task

连续任务

在指定任务完成后调用另一个指定任务

并行开发-Task
static void Main(string[] args)
{
Task<string> t1 = new Task<string>(() => TaskMethod1("t1"));
Console.WriteLine("创建Task,状态为:{0}", t1.Status);
t1.Start();
Console.WriteLine("启动Task,状态为:{0}", t1.Status);
Console.WriteLine(t1.Result);
Console.WriteLine("创建Task,状态为:{0}", t1.Status);
Task t2 = t1.ContinueWith(TaskMethod2);
Console.ReadKey();
} public static string TaskMethod1(string taskName)
{
var result = string.Format("TaskID {0} 运行在线程id为{1}的线程上。是否是线程池中线程?:{2}",
taskName, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
return result;
} public static void TaskMethod2(Task t)
{
Console.WriteLine("Task {0} 运行在线程id为{1}的线程上。是否是线程池中线程?:{2}",
t.Id, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
}
}
并行开发-Task

并行开发-Task

使用TaskContinuationOptions枚举的值可以指定连续任务只有在起始任务成功或失败结束时启动

Task t2 = t1.ContinueWith(TaskMethod2, TaskContinuationOptions.NotOnCanceled); 

嵌套Task

在Task内部创建Task,如果父任务在子任务之前结束,父任务状态就显示为WaitingForChilderenToComplete,所有的子任务也结束时,父任务的状态就改为RanToCompletion,如果使用TaskContinuationOptions枚举值创建子任务时会有不同结果,取消父任务也会取消子任务

Task的取消

static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
cts.CancelAfter();
Task t1 = Task.Run(() =>
{
for (int i = ; i < ; i++)
{
if (cts.Token.IsCancellationRequested)
{
cts.Token.ThrowIfCancellationRequested();
}
else
{
Thread.Sleep();
Console.WriteLine("任务t1,共执行30次,当前第{0}次", i);
}
}
}, cts.Token);
try
{
t1.Wait();
}
catch (AggregateException e)
{
foreach (var item in e.InnerExceptions)
{
Console.WriteLine(item);
}
}
Console.ReadKey();
}

并行开发-Task

工作原理

  程序运行主线程创建->创建CancellationTokenSource对象->设置task在指定毫秒数后取消(这里是8000)->创建task并传入CancellationTokenSource对象生成的token,循环打印1~30,取消标记为true则抛出异常中止任务,false则正常输出,在输出前等待500毫秒(避免8000毫秒还没到任务就已经执行完成)->使用try/catch包裹t1.Wait()等待任务执行完成语句,并捕获处理异常.

这里任务在执行完15次的时候被取消.

 多个Task

static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
cts.CancelAfter();
Task[] tasks = new Task[];
tasks[] = Task.Run(() =>
{
for (int i = ; i < ; i++)
{
if (cts.Token.IsCancellationRequested)
{
cts.Token.ThrowIfCancellationRequested();
}
else
{
Thread.Sleep();
Console.WriteLine("任务t1,共执行30次,当前第{0}次", i);
}
}
}, cts.Token);
tasks[] = Task.Run(() =>
{
for (int i = ; i < ; i++)
{
if (cts.Token.IsCancellationRequested)
{
cts.Token.ThrowIfCancellationRequested();
}
else
{
Thread.Sleep();
Console.WriteLine("任务t2,共执行50次,当前第{0}次", i);
}
}
}, cts.Token);
try
{
Task.WaitAll(tasks);
}
catch (AggregateException e)
{
foreach (var item in e.InnerExceptions)
{
Console.WriteLine(item);
}
}
Console.ReadKey();
}

并行开发-Task

使用Task.WaitAll()方法传递一个Task数组,并对其中多个Task的异常进行捕获处理

CancellationTokenSource.Token.Register()

static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
var token = cts.Token;
cts.CancelAfter();
token.Register(Callback);
Task t1 = Task.Run(() =>
{
for (int i = ; i < ; i++)
{
if (token.IsCancellationRequested)
{
token.ThrowIfCancellationRequested();
}
else
{
Thread.Sleep();
Console.WriteLine("任务t1,共执行30次,当前第{0}次", i);
}
}
}, token);
try
{
t1.Wait();
}
catch (AggregateException e)
{
foreach (var item in e.InnerExceptions)
{
Console.WriteLine(item);
}
}
Console.ReadKey();
} static void Callback()
{
Console.WriteLine("Register登记的任务取消回调函数");
}

并行开发-Task

可使用 Register,向取消标记登记一个回调方法。应用程序调用 CancellationTokenSource 对象的 Cancel 方法时,这个回调就会运行。但是不能保证这个方法在什么时候执行,可 能在任务执行完自己的取消处理之前或之后,也可能在那个过程之中。