1、IProducerConsumerCollection (线程安全接口)
此接口的所有实现必须都启用此接口的所有成员,若要从多个线程同时使用。
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic; namespace ConsoleApp1
{
public class SafeStack<T> : IProducerConsumerCollection<T>
{
// Used for enforcing thread-safety
private object m_lockObject = new object(); // We'll use a regular old Stack for our core operations
private Stack<T> m_sequentialStack = null; //
// Constructors
//
public SafeStack()
{
m_sequentialStack = new Stack<T>();
} public SafeStack(IEnumerable<T> collection)
{
m_sequentialStack = new Stack<T>(collection);
} //
// Safe Push/Pop support
//
public void Push(T item)
{
lock (m_lockObject) m_sequentialStack.Push(item);
} public bool TryPop(out T item)
{
bool rval = true;
lock (m_lockObject)
{
if (m_sequentialStack.Count == ) { item = default(T); rval = false; }
else item = m_sequentialStack.Pop();
}
return rval;
} //
// IProducerConsumerCollection(T) support
//
public bool TryTake(out T item)
{
return TryPop(out item);
} public bool TryAdd(T item)
{
Push(item);
return true; // Push doesn't fail
} public T[] ToArray()
{
T[] rval = null;
lock (m_lockObject) rval = m_sequentialStack.ToArray();
return rval;
} public void CopyTo(T[] array, int index)
{
lock (m_lockObject) m_sequentialStack.CopyTo(array, index);
} //
// Support for IEnumerable(T)
//
public IEnumerator<T> GetEnumerator()
{
// The performance here will be unfortunate for large stacks,
// but thread-safety is effectively implemented.
Stack<T> stackCopy = null;
lock (m_lockObject) stackCopy = new Stack<T>(m_sequentialStack);
return stackCopy.GetEnumerator();
} //
// Support for IEnumerable
//
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable<T>)this).GetEnumerator();
} //
// Support for ICollection
//
public bool IsSynchronized
{
get { return true; }
} public object SyncRoot
{
get { return m_lockObject; }
} public int Count
{
get { return m_sequentialStack.Count; }
} public void CopyTo(Array array, int index)
{
lock (m_lockObject) ((ICollection)m_sequentialStack).CopyTo(array, index);
}
}
}
SafeStack
using System;
using System.Collections.Concurrent; namespace ConsoleApp1
{
class Program
{
static void Main()
{
TestSafeStack(); // Keep the console window open in debug mode.
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
} // Test our implementation of IProducerConsumerCollection(T)
// Demonstrates:
// IPCC(T).TryAdd()
// IPCC(T).TryTake()
// IPCC(T).CopyTo()
static void TestSafeStack()
{
SafeStack<int> stack = new SafeStack<int>();
IProducerConsumerCollection<int> ipcc = (IProducerConsumerCollection<int>)stack; // Test Push()/TryAdd()
stack.Push(); Console.WriteLine("Pushed 10");
ipcc.TryAdd(); Console.WriteLine("IPCC.TryAdded 20");
stack.Push(); Console.WriteLine("Pushed 15"); int[] testArray = new int[]; // Try CopyTo() within boundaries
try
{
ipcc.CopyTo(testArray, );
Console.WriteLine("CopyTo() within boundaries worked, as expected");
}
catch (Exception e)
{
Console.WriteLine("CopyTo() within boundaries unexpectedly threw an exception: {0}", e.Message);
} // Try CopyTo() that overflows
try
{
ipcc.CopyTo(testArray, );
Console.WriteLine("CopyTo() with index overflow worked, and it SHOULD NOT HAVE");
}
catch (Exception e)
{
Console.WriteLine("CopyTo() with index overflow threw an exception, as expected: {0}", e.Message);
} // Test enumeration
Console.Write("Enumeration (should be three items): ");
foreach (int item in stack)
Console.Write("{0} ", item);
Console.WriteLine(""); // Test TryPop()
int popped = ;
if (stack.TryPop(out popped))
{
Console.WriteLine("Successfully popped {0}", popped);
}
else Console.WriteLine("FAILED to pop!!"); // Test Count
Console.WriteLine("stack count is {0}, should be 2", stack.Count); // Test TryTake()
if (ipcc.TryTake(out popped))
{
Console.WriteLine("Successfully IPCC-TryTaked {0}", popped);
}
else Console.WriteLine("FAILED to IPCC.TryTake!!");
}
}
}
Program
2、ConcurrentStack类:安全堆栈
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks; namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
Task t = RunProgram();
t.Wait();
} static async Task RunProgram()
{
var taskStack = new ConcurrentStack<CustomTask>();
var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskStack)); Task[] processors = new Task[];
for (int i = ; i <= ; i++)
{
string processorId = i.ToString();
processors[i - ] = Task.Run(
() => TaskProcessor(taskStack, "Processor " + processorId, cts.Token));
} await taskSource;
cts.CancelAfter(TimeSpan.FromSeconds()); await Task.WhenAll(processors);
} static async Task TaskProducer(ConcurrentStack<CustomTask> stack)
{
for (int i = ; i <= ; i++)
{
await Task.Delay();
var workItem = new CustomTask { Id = i };
stack.Push(workItem);
Console.WriteLine("Task {0} has been posted", workItem.Id);
}
} static async Task TaskProcessor(
ConcurrentStack<CustomTask> stack, string name, CancellationToken token)
{
await GetRandomDelay();
do
{
CustomTask workItem;
bool popSuccesful = stack.TryPop(out workItem);
if (popSuccesful)
{
Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name);
} await GetRandomDelay();
}
while (!token.IsCancellationRequested);
} static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(, );
return Task.Delay(delay);
} class CustomTask
{
public int Id { get; set; }
}
}
}
Program
3、ConcurrentQueue类:安全队列
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks; namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
Task t = RunProgram();
t.Wait();
} static async Task RunProgram()
{
var taskQueue = new ConcurrentQueue<CustomTask>();
var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskQueue)); Task[] processors = new Task[];
for (int i = ; i <= ; i++)
{
string processorId = i.ToString();
processors[i - ] = Task.Run(
() => TaskProcessor(taskQueue, "Processor " + processorId, cts.Token));
} await taskSource;
cts.CancelAfter(TimeSpan.FromSeconds()); await Task.WhenAll(processors);
} static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
{
for (int i = ; i <= ; i++)
{
await Task.Delay();
var workItem = new CustomTask { Id = i };
queue.Enqueue(workItem);
Console.WriteLine("插入Task {0} has been posted ThreadID={1}", workItem.Id, Thread.CurrentThread.ManagedThreadId);
}
} static async Task TaskProcessor(
ConcurrentQueue<CustomTask> queue, string name, CancellationToken token)
{
CustomTask workItem;
bool dequeueSuccesful = false; await GetRandomDelay();
do
{
dequeueSuccesful = queue.TryDequeue(out workItem);
if (dequeueSuccesful)
{
Console.WriteLine("读取Task {0} has been processed by {1} ThreadID={2}",
workItem.Id, name, Thread.CurrentThread.ManagedThreadId);
} await GetRandomDelay();
}
while (!token.IsCancellationRequested);
} static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(, );
return Task.Delay(delay);
} class CustomTask
{
public int Id { get; set; }
}
}
}
Program
4、ConcurrentDictionary类
ConcurrentDictionary类写操作比使用锁的通常字典(Dictionary)要慢的多,而读操作则要快些。因此对字典要大量的线程安全的读操作,ConcurrentDictionary类是最好的选择
ConcurrentDictionary类的实现使用了细粒度锁(fine-grained locking)技术,这在多线程写入方面比使用锁的通常的字典(也被称为粗粒度锁)
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics; namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
var concurrentDictionary = new ConcurrentDictionary<int, string>();
var dictionary = new Dictionary<int, string>(); var sw = new Stopwatch(); sw.Start();
for (int i = ; i < ; i++)
{
lock (dictionary)
{
dictionary[i] = Item;
}
}
sw.Stop();
Console.WriteLine("Writing to dictionary with a lock: {0}", sw.Elapsed); sw.Restart();
for (int i = ; i < ; i++)
{
concurrentDictionary[i] = Item;
}
sw.Stop();
Console.WriteLine("Writing to a concurrent dictionary: {0}", sw.Elapsed); sw.Restart();
for (int i = ; i < ; i++)
{
lock (dictionary)
{
CurrentItem = dictionary[i];
}
}
sw.Stop();
Console.WriteLine("Reading from dictionary with a lock: {0}", sw.Elapsed); sw.Restart();
for (int i = ; i < ; i++)
{
CurrentItem = concurrentDictionary[i];
}
sw.Stop();
Console.WriteLine("Reading from a concurrent dictionary: {0}", sw.Elapsed);
} const string Item = "Dictionary item";
public static string CurrentItem;
}
}
Program
5、ConcurrentBag类
namespace ConsoleApp1
{
class CrawlingTask
{
public string UrlToCrawl { get; set; } public string ProducerName { get; set; }
}
}
CrawlingTask
using System.Collections.Generic; namespace ConsoleApp1
{
static class Module
{
public static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>(); public static void CreateLinks()
{
_contentEmulation["http://microsoft.com/"] = new[] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" };
_contentEmulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" };
_contentEmulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" }; _contentEmulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" };
_contentEmulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" };
_contentEmulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" };
_contentEmulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" }; _contentEmulation["http://facebook.com/"] = new[] { "http://facebook.com/a.html", "http://facebook.com/b.html" };
_contentEmulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" };
_contentEmulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" }; _contentEmulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" };
_contentEmulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" };
_contentEmulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" };
_contentEmulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" };
_contentEmulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" };
_contentEmulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" };
}
}
}
Module
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks; namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
Module.CreateLinks();
Task t = RunProgram();
t.Wait();
} static async Task RunProgram()
{
var bag = new ConcurrentBag<CrawlingTask>(); string[] urls = new[] { "http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/" }; var crawlers = new Task[];
for (int i = ; i <= ; i++)
{
string crawlerName = "Crawler " + i.ToString();
bag.Add(new CrawlingTask { UrlToCrawl = urls[i - ], ProducerName = "root" });
crawlers[i - ] = Task.Run(() => Crawl(bag, crawlerName));
} await Task.WhenAll(crawlers);
} static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName)
{
CrawlingTask task;
//尝试从bag中取出对象
while (bag.TryTake(out task))
{
IEnumerable<string> urls = await GetLinksFromContent(task);
if (urls != null)
{
foreach (var url in urls)
{
var t = new CrawlingTask
{
UrlToCrawl = url,
ProducerName = crawlerName
};
//将子集插入到bag中
bag.Add(t);
}
}
Console.WriteLine("Indexing url {0} posted by {1} is completed by {2}!",
task.UrlToCrawl, task.ProducerName, crawlerName);
}
} static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task)
{
await GetRandomDelay(); if (Module._contentEmulation.ContainsKey(task.UrlToCrawl)) return Module._contentEmulation[task.UrlToCrawl]; return null;
} static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(, );
return Task.Delay(delay);
} }
}
Program
6、BlockingCollection类
BlockingCollection类: 我们能够改变任务存储在阻塞集合中的方式。默认情况下它使用的是ConcurrentQueue容器,但是我们能够使用任何实现了IProducerConsumerCollection泛型接口的集合。
namespace ConsoleApp1
{
class CustomTask
{
public int Id { get; set; }
}
}
CustomTask
using System;
using System.Threading.Tasks; namespace ConsoleApp1
{
static class Module
{
public static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(, );
return Task.Delay(delay);
}
}
}
Module
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks; namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Using a Queue inside of BlockingCollection");
Console.WriteLine();
Task t = RunProgram();
t.Wait(); //Console.WriteLine();
//Console.WriteLine("Using a Stack inside of BlockingCollection");
//Console.WriteLine();
//Task t = RunProgram(new ConcurrentStack<CustomTask>());
//t.Wait();
} static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null)
{
var taskCollection = new BlockingCollection<CustomTask>();
if (collection != null)
taskCollection = new BlockingCollection<CustomTask>(collection);
//初始化collection中的数据
var taskSource = Task.Run(() => TaskProducer(taskCollection)); Task[] processors = new Task[];
for (int i = ; i <= ; i++)
{
string processorId = "Processor " + i;
processors[i - ] = Task.Run(
() => TaskProcessor(taskCollection, processorId));
} await taskSource; await Task.WhenAll(processors);
}
/// <summary>
/// 初始化collection中的数据
/// </summary>
/// <param name="collection"></param>
/// <returns></returns>
static async Task TaskProducer(BlockingCollection<CustomTask> collection)
{
for (int i = ; i <= ; i++)
{
await Task.Delay();
var workItem = new CustomTask { Id = i };
collection.Add(workItem);
Console.WriteLine("Task {0} has been posted", workItem.Id);
}
collection.CompleteAdding();
}
/// <summary>
/// 打印collection中的数据
/// </summary>
/// <param name="collection"></param>
/// <param name="name"></param>
/// <returns></returns>
static async Task TaskProcessor(
BlockingCollection<CustomTask> collection, string name)
{
await Module.GetRandomDelay();
foreach (CustomTask item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Task {0} has been processed by {1}", item.Id, name);
await Module.GetRandomDelay();
}
}
}
}
Program
7、使用ThreadStatic特性
ThreadStatic特性是最简单的TLS使用,且只支持静态字段,只需要在字段上标记这个特性就可以了
using System;
using System.Threading; namespace ConsoleApp1
{
class Program
{
//TLS中的str变量
//可以看到,str静态字段在两个线程中都是独立存储的,互相不会被修改。
[ThreadStatic]
static string str = "hehe"; static void Main(string[] args)
{
//另一个线程只会修改自己TLS中的hehe
Thread th = new Thread(() => { str = "Mgen"; Display(); });
th.Start();
th.Join();
Display();
}
static void Display()
{
Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, str);
} }
}
Program
8、使用命名的LocalDataStoreSlot类型
显然ThreadStatic特性只支持静态字段太受限制了。.NET线程类型中的LocalDataStoreSlot提供更好的TLS支持。我们先来看看命名的LocalDataStoreSlot类型,可以通过Thread.AllocateNamedDataSlot来分配一个命名的空间,通过Thread.FreeNamedDataSlot来销毁一个命名的空间。空间数据的获取和设置则通过Thread类型的GetData方法和SetData方法。
using System;
using System.Threading; namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
//创建Slot
LocalDataStoreSlot slot = Thread.AllocateNamedDataSlot("slot"); //设置TLS中的值
Thread.SetData(slot, "hehe"); //修改TLS的线程
Thread th = new Thread(() =>
{
Thread.SetData(slot, "Mgen");
Display();
}); th.Start();
th.Join();
Display(); //清除Slot
Thread.FreeNamedDataSlot("slot");
} //显示TLS中Slot值
static void Display()
{
LocalDataStoreSlot dataslot = Thread.GetNamedDataSlot("slot");
Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, Thread.GetData(dataslot));
} }
}
Program
9、使用未命名的LocalDataStoreSlot类型
线程同样支持未命名的LocalDataStoreSlot,未命名的LocalDataStoreSlot不需要手动清除,分配则需要Thread.AllocateDataSlot方法。注意由于未命名的LocalDataStoreSlot没有名称,因此无法使用Thread.GetNamedDataSlot方法,只能在多个线程中引用同一个LocalDataStoreSlot才可以对TLS空间进行操作,将上面的命名的LocalDataStoreSlot代码改成未命名的LocalDataStoreSlot执行
using System;
using System.Threading; namespace ConsoleApp1
{
class Program
{
//静态LocalDataStoreSlot变量
static LocalDataStoreSlot slot; static void Main(string[] args)
{
//创建Slot
slot = Thread.AllocateDataSlot(); //设置TLS中的值
Thread.SetData(slot, "hehe"); //修改TLS的线程
Thread th = new Thread(() =>
{
Thread.SetData(slot, "Mgen");
Display(); }); th.Start();
th.Join();
Display();
} //显示TLS中Slot值
static void Display()
{
Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, Thread.GetData(slot));
} }
}
Program
10、使用.NET 4.0的ThreadLocal<T>类型
.NET 4.0在线程方面加入了很多东西,其中就包括ThreadLocal<T>类型,他的出现更大的简化了TLS的操作。ThreadLocal<T>类型和Lazy<T>惊人相似,构造函数参数是Func<T>用来创建对象(当然也可以理解成对象的默认值),然后用Value属性来得到或者设置这个对象。
ThreadLocal的操作或多或少有点像上面的未命名的LocalDataStoreSlot,但ThreadLocal感觉更简洁更好理解。
using System;
using System.Threading; namespace ConsoleApp1
{
class Program
{
static ThreadLocal<string> local; static void Main(string[] args)
{
//创建ThreadLocal并提供默认值
local = new ThreadLocal<string>(() => "hehe"); //修改TLS的线程
Thread th = new Thread(() =>
{ local.Value = "Mgen";
Display();
}); th.Start();
th.Join();
Display();
} //显示TLS中数据值
static void Display()
{
Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, local.Value);
} }
}
Program