.Net多线程编程—System.Threading.Tasks.Parallel

时间:2022-09-11 03:32:33

System.Threading.Tasks.Parallel类提供了Parallel.Invoke,Parallel.For,Parallel.ForEach这三个静态方法。

1 Parallel.Invoke

尽可能并行执行所提供的每个操作,除非用户取消了操作。

方法:

1)public static void Invoke(params Action[] actions);

2)public static void Invoke(ParallelOptions parallelOptions,

params Action[] actions);

参数:

parallelOptions:一个对象,用于配置此操作的行为。

Actions:要执行的操作数组

异常:

对方法1:

System.ArgumentNullException: actions 参数为 null。

System.AggregateException:当 actions 数组中的任何操作引发异常时引发的异常。

System.ArgumentException:actions数组包含 null 个元素。

对方法2除上述异常外还包括:

System.OperationCanceledException:parallelOptions 设置了System.Threading.CancellationToken。

System.ObjectDisposedException:在 parallelOptions 中与 System.Threading.CancellationToken 关联的System.Threading.CancellationTokenSource已被释放。

说明:

1)Invoke方法只有在actions全部执行完才会返回,即使在执行过程中出现异常也会完成。

2)不能保证actions中的所有操作同时执行。比如actions大小为4,但硬件线程数为2,那么同时运行的操作数最多为2。

3)actions中的操作并行的运行且与顺序无关,若编写与运行顺序有关的并发代码,应选择其他方法。

4)如果使用Invoke加载多个操作,多个操作运行时间迥异,总的运行时间以消耗时间最长操作为基准,这会导致很多逻辑内核长时间处于空闲状态。

5)受限的并行可扩展性,这源于Invoke所调用的委托数目是固定的。

2 Parallel.For

可能会并行运行迭代,可以监视和操作循环的状态。Parallel.For有多个重载的方法,下面列举部分方法。

方法:

1)public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body);

2)public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body);

3)public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body);

4)public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);

参数:

fromInclusive:开始索引(含)。

toExclusive:结束索引(不含)。

body:将被每个迭代调用一次的委托。

parallelOptions:一个对象,用于配置此操作的行为。

localInit:一个委托,用于返回每个任务的本地数据的初始状态。

localFinally:一个委托,用于对每个任务的本地状态执行一个最终操作。

返回结果:

ParallelLoopResult :包含有关已完成的循环部分的信息。

异常:

System.ArgumentNullException:body 参数为 null,或 localInit 参数为 null,或 localFinally 参数为 null,或 parallelOptions 参数为 null。 System.AggregateException:包含在所有线程上引发的全部单个异常的异常。

对于方法3)和4)除包含以上异常外还包括:

System.OperationCanceledException:在 parallelOptions 设置了参数 System.Threading.CancellationToken。

System.ObjectDisposedException:在 parallelOptions 中与 System.Threading.CancellationToken 关联的 System.Threading.CancellationTokenSource已被释放。

说明:

1)不支持浮点和步进。

2)无法保证迭代的执行顺序。

3)如果fromInclusive大于或等于toExclusive,方法立即返回而不会执行任何迭代。

4)对于body参数中含有的ParallelLoopState实例,其作用为提早中断并行循环。

5)只有在迭代全部完成以后才会返回结果,否则循环将一直阻塞。

3 Parallel.ForEach

方法

1)public static ParallelLoopResult ForEach(IEnumerable<TSource> source, Action<TSource> body);

2)public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body);

3)public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body);

参数:

source:数据源

body:将被每个迭代调用一次的委托。

parallelOptions:一个对象,用于配置此操作的行为。

返回结果:

ParallelLoopResult :包含有关已完成的循环部分的信息。

异常:

System.ArgumentNullException:source 参数为 null。-或- 方body 参数为 null。

System.AggregateException:包含了所有线程上引发的全部单个异常。

对于方法2)还包括:

System.OperationCanceledException:在 parallelOptions 设置了参数 System.Threading.CancellationToken。

System.ObjectDisposedException:在 parallelOptions 中与 System.Threading.CancellationToken 关联的 System.Threading.CancellationTokenSource已被释放。

对于3)包括的异常为:

System.ArgumentNullException:source 参数为 null。-或- 方body 参数为 null。

System.InvalidOperationException:source 分区程序中的 System.Collections.Concurrent.Partitioner<TSource>.SupportsDynamicPartitions 属性返回 false。或 在 source 分区程序中的任何方法返回 null 时引发异常。或在source 分区程序中的 System.Collections.Concurrent.Partitioner<TSource>.GetPartitions(System.Int32)方法不返回正确数目的分区。

说明:

1)对于body参数中含有的ParallelLoopState实例,其作用为提早中断并行循环。

2)Parallel.ForEach方法不保证执行顺序,它不像foreach循环那样总是顺序执行。

3)对于方法3)中的source,它的类型是Partitioner<TSource>。可以使用Partitioner.Create方法创建分区,该方法的几个重整方法为:

l public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive);

l public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive, int rangeSize);

fromInclusive为范围下限(含),toExclusive为范围下限(不含),rangeSize为每个子范围的大小。

使用Partitioner创建的子范围大小默认大约是计算机内核的三倍,而当使用rangeSize指定范围大小时,那么子范围大小为指定值。

4)只有在迭代全部完成以后才会返回结果,否则循环将一直阻塞。

4 ParallelOptions

定义:

存储选项,用于配置 System.Threading.Tasks.Parallel 类的方法。

ParallelOptions属性

1)public CancellationToken CancellationToken { get; set; }

获取或设置传播有关应取消操作的通知。

2)public int MaxDegreeOfParallelism { get; set; }

获取或设置此 ParallelOptions 实例所允许的最大并行度。

3)public TaskScheduler TaskScheduler { get; set; }

获取或设置与此 System.Threading.Tasks.ParallelOptions 实例关联的 System.Threading.Tasks.TaskScheduler

说明:

1)通过设置CancellationToken来取消并行循环,当前正在运行的迭代会执行完,然后抛出System.OperationCanceledException类型的异常。

2)TPL的方法总是会试图利用所有可用内核以达到最好的效果,但是很可能.NET Framework内部使用的启发式算法所得到的注入和使用的线程数比实际需要的多(通常都会高于硬件线程数,这样会更好地支持CPU和I/O混合型的工作负载)。

通常将最大并行度设置为小于等于逻辑内核数。如果设置为等于逻辑内核数,那么要确保不会影响其他程序的执行。设置为小于逻辑内核数是为了有空闲内核来处理其他紧急的任务。

用途:

1)从循环外部取消并行循环

2)指定并行度

3)指定自定义任务调度程序

5 ParallelLoopState

定义:

可使并行循环迭代与其他迭代交互。 此类的实例由 Parallel 类提供给每个循环;不能在用户代码中创建实例。

方法:

1)Break()方法:通知并行循环在执行完当前迭代之后尽快停止执行,可确保低索引步骤完成。且可确保正在执行的迭代继续运行直到完成。

2)Stop()方法:通知并行循环尽快停止执行。对于尚未运行的迭代不能会尝试执行低索引迭代。不保证所有已运行的迭代都执行完。

用途:提早退出并行循环。

说明:

1)不能同时在同一个并行循环中同时使用Break和Stop。

2)Stop比Break更常用。break语句用在并行循环中的效果和用在串行循环中不同。Break用在并行循环中,委托的主体方法在每次迭代的时候被调用,退出委托的主体方法对并行循环的执行没有影响。Stop停止循环比Break快。

6 ParallelLoopResult结构

定义:

并行循环运行结果的信息。

属性:

1)public bool IsCompleted { get; }

如果该循环已运行完成(该循环的所有迭代均已执行,并且该循环没有收到提前结束的请求),则为 true;否则为 false。

2)public long? LowestBreakIteration { get; }

返回一个表示从中调用 Break 语句的最低迭代的整数

用途:判断当并行循环结束时,是否因调用了break方法或stop方法而提前退出并行循环,或所有迭代均已执行。

判断依据:

条件

IsCompleted

运行完成

!IsCompleted &&

LowestBreakIteration==null

使用了Stop语句而提前终止

!IsCompleted &&

LowestBreakIteration!=null

使用了Break语句而提前终止

7 捕获并行循环中的异常

原则:

1)异常优先于从循环外部取消和使用Break()方法或Stop()方法提前退出并行循环。

2)并行循环体抛出一个未处理的异常,并行循环就不能再开始新的迭代。

3)默认情况下当某次迭代抛出一个未处理异常,那么正在执行的迭代如果没抛出异常,正在执行的迭代会执行完。当所有迭代都执行完(有可能其他的迭代在执行的过程中也抛出异常),并行循环将在调用它的线程中抛出异常。

并行循环运行的过程中,可能有多个迭代抛出异常,所以一般使用AggregateException来捕获异常。AggregateException继承自Exception。为了防止仅使用AggregateException未能捕获某些异常,使用AggregateException的同时还要使用Exception。

8 使用模式

8.1 Parallel.Invoke

 public static void DemonstrateInvoke()
{
//使用Lambda
Parallel.Invoke(
() =>
{
//具体操作1
},
() =>
{
//具体操作2
}); //不使用lambda
Parallel.Invoke(Operation1, Operation2);
} private static void Operation1()
{
//具体操作1
} private static void Operation2()
{
//具体操作2
}

8.2 Parallel.For

 串行循环:
int toExclusive = ...;
for(int i =;i<=toExclusive;i++){}; 对应的并行循环:
Parallel.For(, toExclusive+, (i) =>
{
//具体操作
});

8.3 Parallel.ForEach

 一般用法
IEnumerable<string> coll = ...;
Parallel.ForEach(coll,(str)=>
{
//具体操作
}); 基于分区的模式
优化分区数,使其最接近系统逻辑内核数:
子分区范围 = 对“待处理集合大小/系统逻辑内核数”取整+。
int logicalCores =...;
IEnumerable<string> collP = ...;
int fromInclusive = ...;
int toExclusiv = ...;
int rangeSize = (int)((toExclusiv-fromInclusive )/logicalCores) +;
Parallel.ForEach(Partitioner.Create(fromInclusive, toExclusiv, rangeSize), range =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
//使用集合:collection[i]
}
});

8.4 从循环外部取消并行循环

注意:不使用IsCancellationRequested或ThrowIfCancellationRequested()的情况下无法捕获类型为AggregateException的异常。

1)对于Parallel.For

使用IsCancellationRequested属性

 public static void CancelFromExternal()
{
CancellationTokenSource cts = new CancellationTokenSource();
//其他操作... //异步执行Operation方法
Task.Factory.StartNew(()=>{Operation(cts);});
//异步执行condition的计算过程
Task.Factory.StartNew(()=>{
bool condition = ...;
if (condition) cts.Cancel();
} //其他操作...
} private static void Operation(CancellationTokenSource cts)
{
CancellationToken ct = cts.Token;
ParallelOptions op = new ParallelOptions { CancellationToken = ct };
int toExclusive = ...;
Parallel.For(, toExclusive, op, (i) =>
{ //其他操作... //return只对当前子线程有效
if (ct.IsCancellationRequested)
{ return; } //其他操作...
});
}

使用ThrowIfCancellationRequested()方法抛出异常

将上面的并行循环部分替换为下面的代码:

 Parallel.For(, toExclusive, op, (i) =>
{ //其他操作... ct.ThrowIfCancellationRequested(); //其他操作...
});

不使用IsCancellationRequested和ThrowIfCancellationRequested()方法

将Operation方法中的涉及到IsCancellationRequested和ThrowIfCancellationRequested()方法的代码去掉

2)对于Parallel.ForEach

使用IsCancellationRequested属性

 public static void CancelFromExternal()
{
//同1)中CancelFromExternal方法
} private static void Operation(CancellationTokenSource cts)
{
CancellationToken ct = cts.Token;
ParallelOptions op = new ParallelOptions { CancellationToken = ct };
IEnumerable<string> coll = new List<string> { "str1", "str2" };
Parallel.ForEach(coll, op,(str, loopState) =>
{
//其他操作... //return只对当前子线程有效
if (ct.IsCancellationRequested)
{ return; } //其他操作...
});
}

使用ThrowIfCancellationRequested()方法抛出异常

将Operation方法中的:

if (ct.IsCancellationRequested)

{ return; }

替换为:

ct.ThrowIfCancellationRequested();

不使用IsCancellationRequested和ThrowIfCancellationRequested()方法

将Operation方法中的涉及到IsCancellationRequested和ThrowIfCancellationRequested()方法的代码去掉

8.5 指定并行度

 int maxDegreeOfParallelism = Environment.ProcessorCount - ;
ParallelOptions op = new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism };
IEnumerable<string> coll = new List<string> { };
Parallel.ForEach(coll, op ,(str) =>
{
//具体操作
});

8.6 提早退出并行循环

1)对于Parallel.For

 int toExclusive = ;
Parallel.For(, toExclusive, (i, loopState) =>
{
//其他操作...
//计算condition
bool condition = ...;
if (condition)
{
loopState.Break();//或使用loopState.Stop
return;
} //其他操作
});

2)对于Parallel.ForEach

 IEnumerable<string> coll = new List<string> {"str1","str2" };
Parallel.ForEach(coll, (str, loopState) =>
{
//其他操作... //计算condition
bool condition = ...;
if (condition)
{
loopState.Break();//或使用loopState.Stop
return;
} //其他操作 });

9 异常处理模式

基本形式

在确保使用AggregateException 能够捕捉到所有的异常时,可以省去catch(Exception e)的部分。

 try
{
//Do something
}
catch(AggregateException e)
{
Foreach(Exception ex in e.InnerExceptions)
{
//Do something
}
}
catch(Exception e)
{
//Do something
}

为上述并行循环使用模式添加异常处理机制

一种方式是把并行循环放入try块中,另一种方式是在每次迭代的过程中捕获异常。

-----------------------------------------------------------------------------------------

转载与引用请注明出处。

时间仓促,水平有限,如有不当之处,欢迎指正。

.Net多线程编程—System.Threading.Tasks.Parallel的更多相关文章

  1. 转载 Net多线程编程—System&period;Threading&period;Tasks&period;Parallel

    .Net多线程编程—System.Threading.Tasks.Parallel   System.Threading.Tasks.Parallel类提供了Parallel.Invoke,Paral ...

  2. 一、并行编程 - 数据并行 System&period;Threading&period;Tasks&period;Parallel 类

    一.并行概念 1.并行编程 在.NET 4中的并行编程是依赖Task Parallel Library(后面简称为TPL) 实现的.在TPL中,最基本的执行单元是task(中文可以理解为"任 ...

  3. System&period;Threading&period;Tasks

    前言: 我们之前介绍了两种构建多线程软件的编程技术(使用异步委托或通过System.Threading的成员).这两个可以在任何版本的.NET平台工作. 关于System.Threading 的介绍 ...

  4. System&period;Threading&period;Tasks并发和异步代码使用

    main.cs System.Threading.Tasks.Parallel.For(0, 10, i =>            {                TestLock test ...

  5. 5天玩转C&num;并行和多线程编程 —— 第一天 认识Parallel

    5天玩转C#并行和多线程编程系列文章目录 5天玩转C#并行和多线程编程 —— 第一天 认识Parallel 5天玩转C#并行和多线程编程 —— 第二天 并行集合和PLinq 5天玩转C#并行和多线程编 ...

  6. 5天玩转C&num;并行和多线程编程 —— 第一天 认识Parallel 转载 https&colon;&sol;&sol;www&period;cnblogs&period;com&sol;yunfeifei&sol;p&sol;3993401&period;html

    5天玩转C#并行和多线程编程系列文章目录 5天玩转C#并行和多线程编程 —— 第一天 认识Parallel 5天玩转C#并行和多线程编程 —— 第二天 并行集合和PLinq 5天玩转C#并行和多线程编 ...

  7. C&num;并行和多线程编程&lowbar;&lpar;1&rpar;认识Parallel

    Parallel: 英 [ˈpærəlel]    美 [ˈpærəˌlɛl] ,并联的,并行的. 随着多核时代的到来,并行开发越来越展示出它的强大威力!使用并行程序,充分的利用系统资源,提高程序的性 ...

  8. System&period;Threading&period;Tasks&period;Task 引起的 IIS 应用池崩溃

    接口服务运行一段时间后,IIS应用池就会突然挂掉,事件查看日志,会有事件日志Event ID为5011的错误 为应用程序池“PokeIn”提供服务的进程在与 Windows Process Activ ...

  9. System&period;Threading&period;Tasks&period;Task引起的IIS应用程序池崩溃

    问题现象 IIS应用程序池崩溃(Crash)的特征如下: 1. 从客户端看,浏览器一直处于连接状态,Web服务器无响应. 2. 从服务器端看(Windows Server 2008 + IIS 7.0 ...

随机推荐

  1. MVC中使用Action全局过滤器出现:网页无法正常运作 将您重定向的次数过多。解决办法

    前言当我们访问某个网站的时候需要检测用户是否已经登录(通过Session是否为null),我们知道在WebForm中可以定义一个BasePage类让他继承System.Web.UI.Page,重写它的 ...

  2. &lbrack;转&rsqb;Java反射之如何判断类或变量、方法的修饰符&lpar;Modifier解析&rpar;

    Java针对类.成员变量.方法,有很多修饰符,例如public.private.static.final.synchronized.abstract等,这些修饰符用来控制访问权限或其他特性. 本文就用 ...

  3. 每天一个linux命令(36):diff 命令

    diff 命令是 linux上非常重要的工具,用于比较文件的内容,特别是比较两个版本不同的文件以找到改动的地方.diff在命令行中打印每一个行的改动.最新版本的diff还支持二进制文件.diff程序的 ...

  4. C&plus;&plus;12!配对

    题目内容:找出输入数据中所有两两相乘的积为12!的对数. 输入描述:输入数据中含有一些整数n(1<=n<232). 输出描述:输出所有两两相乘的积为12!的对数. 题目分析:对于输入的每个 ...

  5. 李洪强iOS开发之OC&lbrack;015&rsqb;&num;pragma mark的使用

    // //  main.m //  14 - #pragma mark的使用 // //  Created by vic fan on 16/7/10. //  Copyright © 2016年 李 ...

  6. (一)《Java编程思想》学习——按位运算符、移位运算符

    (第三章) (一)按位运算符 按位逻辑运算符有: “与”(AND)        & 1&1=1;1&0=0;0&0=0 “或”(OR) | 1|1=1;1|0=1;0 ...

  7. 通过css控制超链接不显示下划线

    “页面属性”——“链接”——“下划线样式”——“始终无下划线” <style type="text/css"> a:link { text-decoration: no ...

  8. CapsNet胶囊网络(理解)

    0 - 背景 Geoffrey Hinton是深度学习的开创者之一,反向传播等神经网络经典算法发明人,他在去年年底和他的团队发表了两篇论文,介绍了一种全新的神经网络,这种网络基于一种称为胶囊(caps ...

  9. Jade教程

    Jade 是一个高性能的模板引擎,它深受 Haml 影响,它是用 JavaScript 实现的,并且可以供 Node 使用. 如何在jade模板上加业务逻辑 if res.length==5 h1= ...

  10. OpenCV 自定义任意区域形状及计算平均值 方差

    opencv中有矩形的Rect函数.圆形的circl函数等,那么任意形状怎么取呢?方法1:点乘,将其形状与图像进行点乘,求其形状对应的图像形状:方法2:用findContours函数得对应的形状区域, ...