C#并发编程

时间:2021-10-02 09:21:58

并发编程,一直是小白变成(●—●)的一个坎。平时也用到过不少并发编程操作,在这里进行一下记录。

多线程并不是唯一

并发:同时做多件事情。

多线程:并发的一种形式,采用多线程来执行程序。

并行处理:把正在执行的大量的任务分割成小块,分配给多个同时运行的线程。并发编程是多线程的一种。

响应式编程:并发的一种形式,一种声明式编程模式,程序在该模式中对事件作出响应。

异步编程:并发的一种形式,采用future模式或回调机制,以避免产生不必要的线程。

Net4.5中,添加关键字:async和await,用来简化异步。  

async 加在方法声明上,目的是使方法内的await关键字生效。如果有返回值Task<T>,没有返回值Task。这些Task类型相当于future。await关键字执行异步等待。

异步编程Async

async方法在开始时以同步方式执行。在async方法内部,await关键字对它的参数执行一个异步等待。

首先检查操作是否完成,完成继续运行,否则暂停async方法,返回未完成的task。等操作完成,async方法就回复运行。

 async Task DoSomeThingAsync()
{
int val = ;
var task = Task.Delay();
val *= ;
await task;
await Task.Delay();
Console.WriteLine(val);
}

故此,await Task.Delay 与 var task = Task.Delay。执行方式是不一样的,只有添加await,才会暂停整个方法。

当await暂停时,就可以捕捉上下文context。一般来说运行UI线程就采用UI上下文,线程池就采用线程池上下文。避免上下文获取错误,可以使用ConfigureAwait方法。

await Task.Delay().ConfigureAwait(false);

最佳做法,核心代码库一直使用ConfigureAwait,外界代码只在需要时才恢复上下文。

异步与多线程的概念

平时可能会遇见需要爬网页数据,这个时候如果数据很庞大,那么单线程会造成等待。如果这个时候使用Thread开一个线程,经管会避免UI线程卡顿,但是依然会很慢。这就是典型的多线程错误使用情况。

那么针对此情景,需要进行如何处理呢?首先要理解一下,网页数据的操作,IO操作。

IO操作:IO流实际上是一种无结构的字节序或字符序列,进行插入和移除就是IO操作。均是以IO流为基础,来进行的操作。

DMA:Direct Memory Access,是IO操作的模式。直接内存访问,是一种不经过CPU的数据交互模式。不损耗CPU资源。

异步区别:异步就是充分利用DMA功能来释放CPU压力。

CLR提供了很多关于异步操作的接口,针对上面这种情况。可以写成如下代码。

代码目的:异步获取网页源码

static void Main(string[] args)
{
//创建新浪的http请求
HttpWebRequest request = WebRequest.CreateHttp("http://www.google.com.hk");
//进行异步请求
request.BeginGetResponse(AsyncCallbackImpl, request);
for (int i = ; i < ; i++)
{
Console.WriteLine(i);
Thread.Sleep();
}
Console.Read();
}
//异步请求执行的委托
public static void AsyncCallbackImpl(IAsyncResult ar)
{
WebRequest request = ar.AsyncState as WebRequest;
//获取返回值
var response = request.EndGetResponse(ar);
//获取返回IO流
var stream = response.GetResponseStream();
//读取IO流内容,并输出
using (StreamReader reader = new StreamReader(stream))
{
var content = reader.ReadLine();
Console.WriteLine(content);
}
Console.WriteLine("CompletedSynchronously:" + ar.CompletedSynchronously + ",IsCompleted:" + ar.IsCompleted);
}

主要就是调用 BeginGetResponse 异步获取方法,来进行操作。异步委托执行的时候,终止请求。

在进行异步调用的时候,会自动交给线程池进行管理。当开始进行IO操作的时候,会将工作还给线程池,这就相当于整个工作不会占用CPU资源了。

所以如果有人问你,异步和多线程有什么区别。你就可以告诉他

异步采用IO的DMA模式,不会消耗CPU资源。计算密集型工作,采用多线程。IO密集型工作,采用异步机制。


线程同步

线程同步,指的是多线程在某个对象上执行等待,直到该对象被解除锁定。CLR中值类型是不能被锁定的,即不能在一个值类型对象上执行等待。

引用类型在等待机制上,又分为两类:锁定信号同步。锁定使用lock和Monitor类型,信号同步使用EventWaitHandle、Semaphore、Mutex。

EventWaitHandle(AutoResetEvent,ManualResetEvent):维护一个由内核产生的布尔类型对象,如果其值为flase,那么在它上面等待的线程就阻塞。可以调用类型Set方法设置true,解除阻塞。

Semaphore:维护一个由内核产生的整型变量,如果值为0,则它上面的线程会进行阻塞。如果大于0则解除阻塞。同步每解除一个,其值就减一。

EventWaitHandle与Semaphore均为单AppDomain内的线程同步功能。

Mutex:提供了跨AppDomain阻塞和解除阻塞线程的能力。

AutoResetEvent使用

进行信号同步的实例测试。

static void Main(string[] args)
{
//创建信号同步
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
//使用线程池,在3秒后进行解除阻塞
ThreadPool.QueueUserWorkItem(p => { Thread.Sleep(); autoResetEvent.Set(); }, null);
Console.WriteLine("模拟信号阻塞");
autoResetEvent.WaitOne();
Console.WriteLine("解除阻塞");
Console.Read();
}

ManualResetEvent使用

AutoResetEvent 和 ManualResetEvent区别是:前者在发送信号完毕后,会自动将自己的阻塞状态设为false,后者需要手动设定。

如果需要同步多个信号,使用AutoResetEvent会在收到信号后将阻塞状态设为false。也就是发送阻塞信号是一次性的,收到以后就依然自动设为阻塞状态。

static void Main(string[] args)
{
//创建信号同步
ManualResetEvent autoResetEvent = new ManualResetEvent(false);
//使用线程池,在3秒后进行解除阻塞
ThreadPool.QueueUserWorkItem(p => { Thread.Sleep(); autoResetEvent.Set(); }, null);
//模拟A
ThreadPool.QueueUserWorkItem(p => { Console.WriteLine("模拟信号阻塞A"); autoResetEvent.WaitOne(); Console.WriteLine("解除阻塞A"); });
//模拟B
ThreadPool.QueueUserWorkItem(p => { Console.WriteLine("模拟信号阻塞B"); autoResetEvent.WaitOne(); Console.WriteLine("解除阻塞B"); }); Console.Read();
}

如果此时将 ManualResetEvent换成AutoResetEvent,那么就会有一个线程收不到解除阻塞的信息了。

等待时间

WaitOne(),可以设置等待的时间。如果在时间段内收到信号,则返回true,否则返回false。

那么我们利用这个等待时间,就可以做一个模拟心跳的功能。

我们等待三秒心跳,如果三秒内可以收到信号,则代表连通状态。否则代表断开状态。

static void Main(string[] args)
{
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
ThreadPool.QueueUserWorkItem(p => Heartbeat(autoResetEvent), null);
ThreadPool.QueueUserWorkItem(p => Operation(autoResetEvent), null); Console.Read();
} //操作跳两次,断开
private static void Operation(AutoResetEvent autoResetEvent)
{
Thread.Sleep();
for (int i = ; i < ; i++)
{
autoResetEvent.Set();
Thread.Sleep();
}
} //模拟心跳
private static void Heartbeat(AutoResetEvent autoResetEvent)
{
while (true)
{
//进行三秒等待,模拟心跳
bool result = autoResetEvent.WaitOne();
if (result)
Console.WriteLine("连通状态");
else
Console.WriteLine("断开状态");
}
}

线程安全

如果多线程同时操作一个集合,那么如果是List集合,因为是非线程安全的,所以会导出占用冲突。

static void Main(string[] args)
{
List<int> list = new List<int>();
ConcurrentBag<int> list2 = new ConcurrentBag<int>();
var task1 = Task.Factory.StartNew(async () =>
{
for (int i = ; i < ; i++)
{
list.Add(i);
list2.Add(i);
await Task.Factory.StartNew(() => Task.Delay(), TaskCreationOptions.AttachedToParent);
}
Console.WriteLine("Task1");
});
var task2 = new Task(async () =>
{
for (int i = ; i < ; i++)
{
list.Add(i);
list2.Add(i);
await Task.Factory.StartNew(() => Task.Delay(), TaskCreationOptions.AttachedToParent);
}
Console.WriteLine("Task2");
});
task2.Start();
Task.WaitAll(new Task[] { task1, task2 });
Console.WriteLine(list.Count); //
Console.WriteLine(list2.Count); //
Console.Read();
}

类型安全的集合有如下几个

ConcurrentDictionary<TKey,TValue>:线程安全的键值对

ConcurrentBag<T>:线程安全的无序集合

ConcurrentQueue<T>:线程安全的先进先出集合

ConcurrentStack<T>:线程安全的后进先出集合

锁定

线程同步的另一种方法,就是使用线程锁。线程锁的原理就是锁住一个资源,让多线程程序在此刻变成只有一个线程访问该资源。让多线程临时变成单线程。

待锁定的资源应该注意以下几点:

1.同步对象需要在多线程中可见同一个对象。(在调用根方法定义,如多个方法均需要锁定,则定义静态变量)

2.非静态方法中,静态变量不应作为同步对象。(类型的静态方法应当保证线程安全,非静态方法不需实现线程安全。原因是如果实例存在多个,在遇到锁的时候都会产生同步)

3.值类型对象不能作为同步对象。(值类型在传递另一个线程时,会创建一个副本)

4.避免字符串。(CLR会将字符串临时存储在内存里,如果相同的话,系统会产生阻塞)

5.避免降低可见性。

多线程对同一个集合进行删除和遍历操作,使用lock可变成单线程,避免修改后遍历异常。

static IList<int> list = new List<int> { , , , ,  };
static object syncObj = new object();
static void Main(string[] args)
{
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
//两个线程一个循环遍历,另一个删除最后一位
ThreadPool.QueueUserWorkItem(p =>
{
//使用信号量,控制先执行循环,在进行删除
autoResetEvent.WaitOne();
lock (syncObj)
{
foreach (int item in list)
{
Console.WriteLine(item);
Thread.Sleep();
}
}
});
ThreadPool.QueueUserWorkItem(p =>
{
autoResetEvent.Set();
Thread.Sleep();
lock (syncObj)
{
list.RemoveAt(list.Count - );
}
});
Console.Read();
}

前台线程与后台线程

CLR中线程分为前台线程与后台线程,每个线程都有一个IsBackground属性。前台线程不退出,应用程序就会一直存在。而后台线程没有限制,应用程序退出,后台也会一并退出。

线程池中所有线程默认都是后台线程,只有在非常关键的操作中,才会需要使用前台线程。

线程是不会立即启动的,线程之间的调度占有一定时间和空间开销。

线程优先级

线程中有5个优先级:Highest、AboveNomal、Normal、BelowNormal、Lowest

系统中如果有一个线程优先级较高,则会优先运行该线程。优先级越高的线程,就会占用越多的CPU执行时间。

Thread.Priority属性,可以设置线程优先级。默认优先级为:Normal。

取消线程

如果要停止线程,那么线程自身就需要给调用者开发这样的接口。若检查到Cancled线程才会负责退出。

FLC中提供的取消模式为:CancellationTokenSource。IsCancellationRequested是取消工作的表示,可以调用Cancel()进行取消操作。Register是取消操作后的方法。

 static void Main(string[] args)
{
//定义取消操作
CancellationTokenSource cts = new CancellationTokenSource();
ThreadPool.QueueUserWorkItem(p=> {
while (true) //循环中如果判断取消,则跳出
{
if (cts.Token.IsCancellationRequested)
break;
Console.WriteLine("线程正在执行中");
Thread.Sleep();
}
});
//线程三秒后取消
Thread.Sleep();
cts.Cancel();
cts.Token.Register(() =>
{
Console.WriteLine("线程已取消");
});
Console.Read();
}

超时取消

CancellationTokenSource构造函数可以传递时间段,作为超时取消操作。或者使用方法CancelAfter,进行超时取消。

ThreadPool代替Thread

线程的开销是很大的。分为空间开销与时间开销。

空间开销:

1.线程内核对象:上下文信息。

2.线程环境块:异常处理链

3.用户模式栈:线程栈用于保存方法的参数,变量,返回值。

4.内核模式栈:将函数参数从用户模式赋值到内核模式。

时间开销:

1.创建时会初始化上面空间。

2.CLR会调用所有DLLMain方法,传递链接标志。

3.线程上下文切换。

线程池技术,可以替开发人员管理线程。何时创建或销毁由CLR来决定。在创建线程的时候,应优先考虑线程池技术。

BackgroundWorker

BackgroundWorker在内部使用线程池的技术。可以给UI线程提供交互能力。例如:报告进度、回调任务、取消任务、暂停任务等。

BackgroundWorker的交互均为事件形式,所以需要事先配置好对应事件。

private static BackgroundWorker worker = new BackgroundWorker();
static void Main(string[] args)
{
//设置可以报告进度
worker.WorkerReportsProgress = true;
//绑定worker事件
worker.DoWork += Worker_DoWork;
worker.ProgressChanged += Worker_ProgressChanged;
worker.RunWorkerAsync();
Console.Read();
}
//交互
private static void Worker_ProgressChanged(object sender, ProgressChangedEventArgs e)
{
Console.WriteLine(e.ProgressPercentage);
}
//开始执行
private static void Worker_DoWork(object sender, DoWorkEventArgs e)
{
for (int i = ; i < ; i++)
{
//每1秒同步一次
worker.ReportProgress(i);
Thread.Sleep();
}
}

Task

ThreadPool在使用上存在很多不便。例如:没有交互性操作,不支持先后顺序。

如果要实现这种功能,则需要使用Task。Task在线程池上进行过优化,并提供了更多的API。在FCL4.0中,Task已经优于传统方式了。

static void Main(string[] args)
{
Task s = new Task(() =>
{
Console.WriteLine("开始执行");
Thread.Sleep();
});
s.Start();
s.ContinueWith(p =>
{
Console.WriteLine("结束执行。状态为:");
Console.WriteLine("是否被取消:{0},是否完成:{1},是否异常:{2}",p.IsCanceled,p.IsCompleted,p.IsFaulted);
});
Console.Read();
}

Task的完成通知和取消

Task的完成通知,也是通过新起任务来进行的。下面添加异常取消、获取返回值

static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
Task<int> t = new Task<int>(() => AddCancle(cts.Token), cts.Token);
t.Start();
t.ContinueWith(TaskEndedByCatch);
Console.ReadKey();
cts.Cancel();
Console.ReadKey();
} private static void TaskEndedByCatch(Task<int> obj)
{
Console.WriteLine("任务完成");
Console.WriteLine("是否取消:{0},是否完成:{1},是否异常:{2}", obj.IsCanceled, obj.IsCompleted, obj.IsFaulted);
try
{
Console.WriteLine(obj.Result);
}
catch (OperationCanceledException) { }
} private static int AddCancle(CancellationToken token)
{
Console.WriteLine("任务开始");
int result = ;
while (true)
{
token.ThrowIfCancellationRequested();
result++;
Thread.Sleep();
}
return result;
}

Task报告进度

Task报告进度,需要传入IProgress<T>类型,采用这个类型的事件来进行报告。

static void Main(string[] args)
{
var progre = new Progress<int>();
progre.ProgressChanged += (sender,e)=> { Console.WriteLine(e); };
Progress(progre);
Console.ReadKey();
} static void Progress(IProgress<int> progress)
{
for (int i = ; i < ; i++)
{
progress.Report(i);
}
}

Task.Factory

也可以通过Task.Factory的静态工厂来创建Task并运行。他的创建和new Task是一样的。

Task t1 = new Task(()=> Console.WriteLine("t1")); t1.Start();
Task.Factory.StartNew(() => Console.WriteLine("t2"));

Task.Status

task.Status可以输出Task的当前状态。

简略的声明周期如下:

Created:默认初始化任务,

WaitingToRun:等待任务调度器分配线程给任务执行

RanToCompletion:任务执行完毕

Task工厂

如果需要同时操作多个Task,也可以做Task工厂

static void Main(string[] args)
{
TaskFactory taskFactory = new TaskFactory();
Task[] tasks = new Task[] {
taskFactory.StartNew(()=>Console.WriteLine("")),
taskFactory.StartNew(()=>Console.WriteLine("")),
taskFactory.StartNew(()=>Console.WriteLine(""))
};
taskFactory.ContinueWhenAll(tasks, p => { Console.WriteLine("完成"); }, CancellationToken.None);
Console.ReadKey();
}

Task.Wait

task.Wait() 等待任务执行完成

Task.WaitAll

Task.WaitAll(t1,t2) 等待所有任务完成

Task.WaitAny

等待任何一个任务完成就继续执行该

Task.ContinueWith

第一个Taks完成后自动启动下一个Task,实现Task的延续操作,可以写成链式操作

Task.Delay

可以暂停一段时间。我们可以用这个来完成一个简单的超时功能。

使用Task.WhenAny,让一个返回成功则成功。那么当整个页面的获取时间,超过1秒,则返回失败。

 static async Task<string> DownloadStringWithTimeout(string url)
{
using (var client = new HttpClient())
{
var downloadTask = client.GetStringAsync(url);
var timeoutTask = Task.Delay();
var completedTask = await Task.WhenAny(downloadTask, timeoutTask);
if (completedTask == timeoutTask)
return null;
return await downloadTask;
}
}

Task不等待其他任务

正在await一批任务,希望在每个任务完成时对它做一些处理。另外希望在任务已完成就立即进行处理,不需要等待其他任务。

 static async Task Method()
{
Task<int> taskA = DelayAndReturnAsync();
Task<int> taskB = DelayAndReturnAsync();
Task<int> taskC = DelayAndReturnAsync();
var tasks = new[] { taskA, taskB, taskC };
var processingTasks = tasks.Select(async s=> {
var result = await s;
Console.WriteLine(result);
}).ToArray();
await Task.WhenAll(processingTasks);
} static async Task<int> DelayAndReturnAsync(int val)
{
await Task.Delay(TimeSpan.FromSeconds(val));
return val;
}

Task嵌套

Task中可以嵌套另一个Taks,Task嵌套分为两种:关联嵌套、非关联嵌套,就是内层与外层有没有关系。

非关联嵌套(与外层无关)代码如下

static void Main(string[] args)
{
var pTask = Task.Factory.StartNew(() =>
{
var cTask = Task.Factory.StartNew(() =>
{
Task.Delay();
Console.WriteLine("(3)");
});
Console.WriteLine("(1)");
});
pTask.Wait();
Console.WriteLine("(2)"); Console.Read();
}

pTask运行完后不会等待内层的cTask,直接向下先走输出 1。这种嵌套模式,相当于创建了多个Task。

嵌套一起会方便管理查找,并且可以中途添加多个。

关联嵌套(与外层有关)代码如下

static void Main(string[] args)
{
var pTask = Task.Factory.StartNew(() =>
{
var cTask = Task.Factory.StartNew(() =>
{
Task.Delay();
Console.WriteLine("(2)");
}, TaskCreationOptions.AttachedToParent); new Task(() => Console.WriteLine("(3)"), TaskCreationOptions.AttachedToParent).Start(); Console.WriteLine("(1)");
});
pTask.Wait();
Console.WriteLine("(4)"); Console.Read();
}

在创建Task的时候,如果TaskCreationOptions.AttachedToParent存在,就会建立关联。

pTask会等待cTask执行完成,这样就不用写Task.WaitAll了,外层会等到内层所有执行完成后,再往下走。

Task异常信息

static void Main(string[] args)
{
try
{
var pTask = Task.Factory.StartNew(() =>
{
Task.Factory.StartNew(() => throw new Exception("cTask1"), TaskCreationOptions.AttachedToParent);
new Task(() => throw new Exception("cTask2"), TaskCreationOptions.AttachedToParent).Start();
Console.WriteLine("(1)");
});
pTask.Wait();
}
catch (AggregateException ex)
{
foreach (Exception inner in ex.InnerExceptions)
{
Console.WriteLine(inner.Message);
}
}
Console.WriteLine("(2)");
Console.Read();
}

死锁

如果多个任务执行时,其中一个任务一直不返回结果,就形成了死锁。此时可以给Task.WaitAll()一个最大等待时间,超过就直接返回。

static void Main(string[] args)
{
try
{
var task1 = Task.Factory.StartNew(async () =>
{
await Task.Delay();
Console.WriteLine("Task1");
});
var task2 = new Task(() =>
{
Console.WriteLine("Task2");
});
task2.Start();
Console.WriteLine("(1)");
Task.WaitAll(new Task[] { task1, task2 }, );
Console.WriteLine("完成");
}
catch (AggregateException ex)
{
foreach (Exception inner in ex.InnerExceptions)
{
Console.WriteLine(inner.Message);
}
}
Console.WriteLine("(2)");
Console.Read();
}

并行编程

如果程序中有大量计算任务,并且能够分割成互相独立的任务快,那就应该使用并行编程。并行编程可以临时提高CPU使用率,提高吞吐量。

只要任务块是相互独立的,并行性就能做到最大化。

Parallel简化同步并行操作

Parallel可以简化同步状态下的Task操作。一共有3个方法。Parallel并不是顺序输出,使用时需考虑清楚。

static void Main(string[] args)
{
int[] nums = new int[] { , , , };
Parallel.For(, nums.Length, p => Console.WriteLine(nums[p])); Parallel.ForEach(nums, p => Console.WriteLine(p)); Parallel.Invoke(() =>
{
Console.WriteLine("");
}, () =>
{
Console.WriteLine("");
}, () =>
{
Console.WriteLine("");
});
Console.Read();
}

取消并行操作

Parallel可以传入一个协作型取消,操作取消时会引发OperationCanceledException异常。

static void Main(string[] args)
{
ICollection<string> list = new List<string>();
for (int i = ; i < ; i++)
{
list.Add(i.ToString());
}
CancellationTokenSource source = new CancellationTokenSource();
var task = new Task(() => { source.Cancel(); });
ThreadPool.QueueUserWorkItem(p => { Thread.Sleep(); task.Start(); });
Rotate(list, source.Token);
Console.ReadKey();
} static void Rotate(IEnumerable<string> list, CancellationToken token)
{
try
{
Parallel.ForEach(list,
new ParallelOptions { CancellationToken = token },
(p, s) =>
{
Thread.Sleep();
Console.WriteLine(p.ToString());
});
}
catch (OperationCanceledException ex)
{
Console.WriteLine("已取消");
}
}

退出循环和异常处理

Parallel如果想要中途停止,在并行的委托函数中提供了一个ParallelLoopState,并提供了break和stop方法。

调用break方法,执行完表达式后在进行退出。而stop方法,会在访问时立刻退出。

Parallel.For(, , (i, state) =>
{
if (i > ) {
state.Break(); // 1,2,3,4,0
state.Stop(); // 1,2,3
return;
}
Console.Write(i);
});

AggregateException可以捕获并行的一组异常

static void Main(string[] args)
{
try
{
Parallel.Invoke(() =>
{
throw new Exception("Invoke1");
}, () =>
{
throw new Exception("Invoke2");
});
}
catch (AggregateException ex)
{
foreach (var item in ex.InnerExceptions)
{
Console.WriteLine(item.Message);
}
} Console.ReadKey();
}

PLINQ

ParallelEnumerable所提供的扩展方法会让LINQ支持并行计算,也就是所谓的PLINQ。LINQ是单线程的,PLINQ是多线程的。

static void Main(string[] args)
{
int[] nums = new int[] { , , , , , , , , , };
var list = from p in nums.AsParallel() select p;
foreach (var item in list)
{
Console.WriteLine(item);
}
Console.Read();
}
static void Main(string[] args)
{
int[] nums = new int[] { , , , , , , , , , };
foreach (var item in nums.AsParallel().Select(p => p))
{
Console.WriteLine(item);
}
Console.ReadKey();
}

Lambda表达式,需要用Select再次进行投影。