C#和ConcurrentDictionary中的多线程:以下用法是否正确?

时间:2022-09-01 21:02:07

I have such a scenario at hand (using C#): I need to use a parallel "foreach" on a list of objects: Each object in this list is working like a data source, which is generating series of binary vector patterns (like "0010100110"). As each vector pattern is generated, I need to update the occurrence count of the current vector pattern on a shared ConcurrentDictionary. This ConcurrentDictionary acts like a histogram of specific binary patterns among ALL data sources. In a pseudo-code it should work like this:

我有这样的场景(使用C#):我需要在对象列表中使用并行“foreach”:此列表中的每个对象都像数据源一样工作,它生成一系列二进制矢量模式(如“ 0010100110" )。在生成每个矢量模式时,我需要更新共享ConcurrentDictionary上当前矢量模式的出现次数。此ConcurrentDictionary充当所有数据源中特定二进制模式的直方图。在伪代码中,它应该像这样工作:

ConcurrentDictionary<BinaryPattern,int> concDict = new ConcurrentDictionary<BinaryPattern,int>();
Parallel.Foreach(var dataSource in listOfDataSources)
{
     for(int i=0;i<dataSource.OperationCount;i++)
     {
          BinaryPattern pattern = dataSource.GeneratePattern(i);

          //Add the pattern to concDict if it does not exist, 
          //or increment the current value of it, in a thread-safe fashion among all
          //dataSource objects in parallel steps.
     }
}

I have read about TryAdd() and TryUpdate() methods of ConcurrentDictionary class in the documentation but I am not sure that I have clearly understood them. TryAdd() obtains an access to the Dictionary for the current thread and looks for the existence of a specific key, a binary pattern in this case, and then if it does not exist, it creates its entry, sets its value to 1 as it is the first occurence of this pattern. TryUpdate() gains acces to the dictionary for the current thread, looks whether the entry with the specified key has its current value equal to a "known" value, if it is so, updates it. By the way, TryGetValue() checks whether a key exits in the dictionary and returns the current value, if it does.

我在文档中已经阅读了ConcurrentDictionary类的TryAdd()和TryUpdate()方法,但我不确定我是否清楚地理解它们。 TryAdd()获取对当前线程的Dictionary的访问权,并查找特定键的存在,在这种情况下是二进制模式,然后如果它不存在,则创建其条目,将其值设置为1,因为它是这种模式的第一次出现。 TryUpdate()获得对当前线程的字典的访问,查看具有指定键的条目是否具有等于“已知”值的当前值,如果是,则更新它。顺便说一句,TryGetValue()检查一个键是否退出字典并返回当前值(如果有)。

Now I think of the following usage and wonder if it is a correct implementation of a thread-safe population of the ConcurrentDictionary:

现在我想到以下用法并想知道它是否是ConcurrentDictionary的线程安全填充的正确实现:

ConcurrentDictionary<BinaryPattern,int> concDict = new ConcurrentDictionary<BinaryPattern,int>();
Parallel.Foreach(var dataSource in listOfDataSources)
{
     for(int i=0;i<dataSource.OperationCount;i++)
     {
          BinaryPattern pattern = dataSource.GeneratePattern(i);

          while(true)
          {
             //Look whether the pattern is in dictionary currently,
             //if it is, get its current value.
             int currOccurenceOfPattern;
             bool isPatternInDict = concDict.TryGetValue(pattern,out currOccurenceOfPattern);

             //Not in dict, try to add.
             if(!isPatternInDict)
             {
                  //If the pattern is not added in the meanwhile, add it to the dict.
                  //If added, then exit from the while loop.
                  //If not added, then skip this step and try updating again.
                  if(TryAdd(pattern,1))
                        break;
             }
             //The pattern is already in the dictionary. 
             //Try to increment its current occurrence value instead.
             else
             {
                  //If the pattern's occurence value is not incremented by another thread
                  //in the meanwhile, update it. If this succeeds, then exit from the loop.
                  //If TryUpdate fails, then we see that the value has been updated
                  //by another thread in the meanwhile, we need to try our chances in the next
                  //step of the while loop.                   
                  int newValue = currOccurenceOfPattern + 1;
                  if(TryUpdate(pattern,newValue,currOccurenceOfPattern))
                       break;
             }

          }
     }
}

I tried to firmly summarize my logic in the above code snippet in the comments. From what I gather from the documentation, a thread-safe updating scheme can be coded in this fashion, given the atomic "TryXXX()" methods of the ConcurrentDictionary. Is this a correct approach to the problem? How can this be improved or corrected if it is not?

我试着在评论中的上面的代码片段中总结我的逻辑。根据我从文档中收集的内容,可以使用ConcurrentDictionary的原子“TryXXX()”方法以这种方式编码线程安全的更新方案。这是解决问题的正确方法吗?如果不是,如何改进或纠正?

3 个解决方案

#1


3  

You can use AddOrUpdate method that encapsulates either add or update logic as single thread-safe operation:

您可以使用AddOrUpdate方法封装添加或更新逻辑作为单线程安全操作:

ConcurrentDictionary<BinaryPattern,int> concDict = new ConcurrentDictionary<BinaryPattern,int>();
Parallel.Foreach(listOfDataSources, dataSource =>
{
    for(int i=0;i<dataSource.OperationCount;i++)
    {
        BinaryPattern pattern = dataSource.GeneratePattern(i);

        concDict.AddOrUpdate(
            pattern,
            _ => 1, // if pattern doesn't exist - add with value "1"
            (_, previous) => previous + 1 // if pattern exists - increment existing value
        );
    }
});

Please note that AddOrUpdateoperation is not atomic, not sure if it's your requirement but if you need to know the exact iteration when a value was added to the dictionary you can keep your code (or extract it to kind of extension method)

请注意,AddOrUpdateoperation不是原子的,不确定它是否是您的要求,但如果您需要知道在将值添加到字典时的确切迭代,您可以保留您的代码(或将其提取为一种扩展方法)

You might also want to go through this article

您可能还想阅读本文

#2


0  

I don't know what BinaryPattern is here, but I would probably address this in a different way. Instead of copying value types around, inserting things into dictionaries, etc.. like this, I would probably be more inclined if performance was critical to simply place your instance counter in BinaryPattern. Then use InterlockedIncrement() to increment the counter whenever the pattern was found.

我不知道BinaryPattern在这里是什么,但我可能会以不同的方式解决这个问题。而不是复制值类型,插入字典等等。像这样,如果性能对于简单地将实例计数器放在BinaryPattern中,我可能更倾向于。然后使用InterlockedIncrement()在找到模式时递增计数器。

Unless there is a reason to separate the count from the pattern, in which case the ConccurentDictionary is probably a good choice.

除非有理由将计数与模式分开,否则ConccurentDictionary可能是一个不错的选择。

#3


0  

First, the question is a little confusing because it's not clear what you mean by Parallel.Foreach. I would naively expect this to be System.Threading.Tasks.Parallel.ForEach(), but that's not usable with the syntax you show here.

首先,这个问题有点令人困惑,因为并不清楚Parallel.Foreach的含义。我天真地期望这是System.Threading.Tasks.Parallel.ForEach(),但是这不适用于你在这里显示的语法。

That said, assuming you actually mean something like Parallel.ForEach(listOfDataSources, dataSource => { ... } )

也就是说,假设你实际上是指Parallel.ForEach(listOfDataSources,dataSource => {...})......

Personally, unless you have some specific need to show intermediate results, I would not bother with ConcurrentDictionary here. Instead, I would let each concurrent operation generate its own dictionary of counts, and then merge the results at the end. Something like this:

就个人而言,除非你有一些特定的需要显示中间结果,否则我不打扰这里的ConcurrentDictionary。相反,我会让每个并发操作生成自己的计数字典,然后在最后合并结果。像这样的东西:

var results = listOfDataSources.Select(dataSource =>
    Tuple.Create(dataSource, new Dictionary<BinaryPattern, int>())).ToList();

Parallel.ForEach(results, result =>
{
    for(int i = 0; i < result.Item1.OperationCount; i++)
    {
        BinaryPattern pattern = result.Item1.GeneratePattern(i);
        int count;

        result.Item2.TryGetValue(pattern, out count);
        result.Item2[pattern] = count + 1;
    }
});

var finalResult = new Dictionary<BinaryPattern, int>();

foreach (result in results)
{
    foreach (var kvp in result.Item2)
    {
        int count;

        finalResult.TryGetValue(kvp.Key, out count);
        finalResult[kvp.Key] = count + kvp.Value;
    }
}

This approach would avoid contention between the worker threads (at least where the counts are concerned), potentially improving efficiency. The final aggregation operation should be very fast and can easily be handled in the single, original thread.

这种方法可以避免工作线程之间的争用(至少在涉及计数的地方),可能会提高效率。最终的聚合操作应该非常快,并且可以在单个原始线程中轻松处理。

#1


3  

You can use AddOrUpdate method that encapsulates either add or update logic as single thread-safe operation:

您可以使用AddOrUpdate方法封装添加或更新逻辑作为单线程安全操作:

ConcurrentDictionary<BinaryPattern,int> concDict = new ConcurrentDictionary<BinaryPattern,int>();
Parallel.Foreach(listOfDataSources, dataSource =>
{
    for(int i=0;i<dataSource.OperationCount;i++)
    {
        BinaryPattern pattern = dataSource.GeneratePattern(i);

        concDict.AddOrUpdate(
            pattern,
            _ => 1, // if pattern doesn't exist - add with value "1"
            (_, previous) => previous + 1 // if pattern exists - increment existing value
        );
    }
});

Please note that AddOrUpdateoperation is not atomic, not sure if it's your requirement but if you need to know the exact iteration when a value was added to the dictionary you can keep your code (or extract it to kind of extension method)

请注意,AddOrUpdateoperation不是原子的,不确定它是否是您的要求,但如果您需要知道在将值添加到字典时的确切迭代,您可以保留您的代码(或将其提取为一种扩展方法)

You might also want to go through this article

您可能还想阅读本文

#2


0  

I don't know what BinaryPattern is here, but I would probably address this in a different way. Instead of copying value types around, inserting things into dictionaries, etc.. like this, I would probably be more inclined if performance was critical to simply place your instance counter in BinaryPattern. Then use InterlockedIncrement() to increment the counter whenever the pattern was found.

我不知道BinaryPattern在这里是什么,但我可能会以不同的方式解决这个问题。而不是复制值类型,插入字典等等。像这样,如果性能对于简单地将实例计数器放在BinaryPattern中,我可能更倾向于。然后使用InterlockedIncrement()在找到模式时递增计数器。

Unless there is a reason to separate the count from the pattern, in which case the ConccurentDictionary is probably a good choice.

除非有理由将计数与模式分开,否则ConccurentDictionary可能是一个不错的选择。

#3


0  

First, the question is a little confusing because it's not clear what you mean by Parallel.Foreach. I would naively expect this to be System.Threading.Tasks.Parallel.ForEach(), but that's not usable with the syntax you show here.

首先,这个问题有点令人困惑,因为并不清楚Parallel.Foreach的含义。我天真地期望这是System.Threading.Tasks.Parallel.ForEach(),但是这不适用于你在这里显示的语法。

That said, assuming you actually mean something like Parallel.ForEach(listOfDataSources, dataSource => { ... } )

也就是说,假设你实际上是指Parallel.ForEach(listOfDataSources,dataSource => {...})......

Personally, unless you have some specific need to show intermediate results, I would not bother with ConcurrentDictionary here. Instead, I would let each concurrent operation generate its own dictionary of counts, and then merge the results at the end. Something like this:

就个人而言,除非你有一些特定的需要显示中间结果,否则我不打扰这里的ConcurrentDictionary。相反,我会让每个并发操作生成自己的计数字典,然后在最后合并结果。像这样的东西:

var results = listOfDataSources.Select(dataSource =>
    Tuple.Create(dataSource, new Dictionary<BinaryPattern, int>())).ToList();

Parallel.ForEach(results, result =>
{
    for(int i = 0; i < result.Item1.OperationCount; i++)
    {
        BinaryPattern pattern = result.Item1.GeneratePattern(i);
        int count;

        result.Item2.TryGetValue(pattern, out count);
        result.Item2[pattern] = count + 1;
    }
});

var finalResult = new Dictionary<BinaryPattern, int>();

foreach (result in results)
{
    foreach (var kvp in result.Item2)
    {
        int count;

        finalResult.TryGetValue(kvp.Key, out count);
        finalResult[kvp.Key] = count + kvp.Value;
    }
}

This approach would avoid contention between the worker threads (at least where the counts are concerned), potentially improving efficiency. The final aggregation operation should be very fast and can easily be handled in the single, original thread.

这种方法可以避免工作线程之间的争用(至少在涉及计数的地方),可能会提高效率。最终的聚合操作应该非常快,并且可以在单个原始线程中轻松处理。