在C#中实现多线程(代码审查)

时间:2022-09-01 21:02:25

Greetings.

I'm trying to implement some multithreaded code in an application. The purpose of this code is to validate items that the database gives it. Validation can take quite a while (a few hundred ms to a few seconds), so this process needs to be forked off into its own thread for each item.

我正在尝试在应用程序中实现一些多线程代码。此代码的目的是验证数据库提供的项目。验证可能需要相当长的时间(几百毫秒到几秒),因此需要将此过程分解为每个项目的自己的线程。

The database may give it 20 or 30 items a second in the beginning, but that begins to decline rapidly, eventually reaching about 65K items over 24 hours, at which point the application exits.

数据库可以在开始时每秒给它20或30个项目,但是开始迅速下降,最终在24小时内达到约65K项目,此时应用程序退出。

I'd like it if anyone more knowledgeable could take a peek at my code and see if there's any obvious problems. No one I work with knows multithreading, so I'm really just on my own, on this one.

如果有更多知识渊博的人可以查看我的代码并查看是否有任何明显的问题,我会喜欢它。我工作过的任何人都不知道多线程,所以我真的只靠我自己。

Here's the code. It's kinda long but should be pretty clear. Let me know if you have any feedback or advice. Thanks!

这是代码。这有点长,但应该很清楚。如果您有任何反馈或建议,请告诉我。谢谢!

public class ItemValidationService
{
    /// <summary>
    /// The object to lock on in this class, for multithreading purposes.
    /// </summary>
    private static object locker = new object();

    /// <summary>Items that have been validated.</summary>
    private HashSet<int> validatedItems;

    /// <summary>Items that are currently being validated.</summary>
    private HashSet<int> validatingItems;

    /// <summary>Remove an item from the index if its links are bad.</summary>
    /// <param name="id">The ID of the item.</param>
    public void ValidateItem(int id)
    {
        lock (locker)
        {
            if
            (
                !this.validatedItems.Contains(id) &&
                !this.validatingItems.Contains(id)
            ){
                ThreadPool.QueueUserWorkItem(sender =>
                {
                    this.Validate(id);
                });
            }
        }

    } // method

    private void Validate(int itemId)
    {
        lock (locker)
        {
            this.validatingItems.Add(itemId);
        }

        // *********************************************
        // Time-consuming routine to validate an item...
        // *********************************************

        lock (locker)
        {
            this.validatingItems.Remove(itemId);
            this.validatedItems.Add(itemId);
        }

    } // method

} // class

7 个解决方案

#1


4  

The thread pool is a convenient choice if you have light weight sporadic processing that isn't time sensitive. However, I recall reading on MSDN that it's not appropriate for large scale processing of this nature.

如果您的轻量级零星处理不是时间敏感的,则线程池是一个方便的选择。但是,我记得在MSDN上读到它不适合这种性质的大规模处理。

I used it for something quite similar to this and regret it. I took a worker-thread approach in subsequent apps and am much happier with the level of control I have.

我把它用于与此非常相似的东西,后悔了。我在后续的应用程序中采用了工作线程方法,并且对我的控制水平感到非常高兴。

My favorite pattern in the worker-thread model is to create a master thread which holds a queue of tasks items. Then fork a bunch of workers that pop items off that queue to process. I use a blocking queue so that when there are no items the process, the workers just block until something is pushed onto the queue. In this model, the master thread produces work items from some source (db, etc.) and the worker threads consume them.

工作线程模型中我最喜欢的模式是创建一个包含任务项队列的主线程。然后派出一堆工作人员从该队列中弹出项目进行处理。我使用阻塞队列,这样当进程中没有项目时,工作人员就会阻塞,直到某些东西被推入队列。在此模型中,主线程从某个源(db等)生成工作项,并且工作线程使用它们。

#2


2  

I second the idea of using a blocking queue and worker threads. Here is a blocking queue implementation that I've used in the past with good results: http://www.codeproject.com/KB/recipes/boundedblockingqueue.aspx

我的第二个想法是使用阻塞队列和工作线程。这是我过去使用的阻塞队列实现,效果很好:http://www.codeproject.com/KB/recipes/boundedblockingqueue.aspx

What's involved in your validation logic? If its mainly CPU bound then I would create no more than 1 worker thread per processor/core on the box. This will tell you the number of processors: Environment.ProcessorCount

您的验证逻辑涉及哪些内容?如果它主要是CPU绑定,那么我将在盒子上为每个处理器/核心创建不超过1个工作线程。这将告诉您处理器的数量:Environment.ProcessorCount

If your validation involves I/O such as File Access or database access then you could use a few more threads than the number of processors.

如果您的验证涉及I / O,例如文件访问或数据库访问,那么您可以使用比处理器数量更多的线程。

#3


1  

Be careful, QueueUserWorkItem might fail

请注意,QueueUserWorkItem可能会失败

#4


1  

There is a possible logic error in the code posted with the question, depending on where the item id in ValidateItem(int id) comes from. Why? Because although you correctly lock your validatingItems and validatedItems queues before queing a work item, you do not add the item to the validatingItems queue until the new thread spins up. That means there could be a time gap where another thread calls ValidateItem(id) with the same id (unless this is running on a single main thread).

使用问题发布的代码中可能存在逻辑错误,具体取决于ValidateItem(int id)中的项ID来自何处。为什么?因为虽然在排队工作项之前正确锁定了validatingItems和validatedItems队列,但是在新线程旋转之前,不会将该项添加到validatingItems队列。这意味着可能存在时间间隔,其中另一个线程调用具有相同id的ValidateItem(id)(除非它在单个主线程上运行)。

I would add item to the validatingItems queue just before queuing the item, inside the lock.

我会在锁定内部排队项目之前将项目添加到validatingItems队列。

Edit: also QueueUserWorkItem() returns a bool so you should use the return value to make sure the item was queued and THEN add it to the validatingItems queue.

编辑:同时QueueUserWorkItem()返回一个bool,因此您应该使用返回值来确保该项已排队,然后将其添加到validatingItems队列。

#5


1  

ThreadPool may not be optimal for jamming so much at once into it. You may want to research the upper limits of its capabilities and/or roll your own.

ThreadPool可能不是最适合干扰它的那么多。您可能想要研究其功能的上限和/或自己动手。

Also, there is a race condition that exists in your code, if you expect no duplicate validations. The call to

此外,如果您不希望重复验证,则代码中存在竞争条件。打电话给

this.validatingItems.Add(itemId);

needs to happen in the main thread (ValidateItem), not in the thread pool thread (Validate method). This call should occur a line before the queueing of the work item to the pool.

需要在主线程(ValidateItem)中发生,而不是在线程池线程(Validate方法)中发生。此调用应在将工作项排队到池之前发生一行。

A worse bug is found by not checking the return of QueueUserWorkItem. Queueing can fail, and why it doesn't throw an exception is a mystery to us all. If it returns false, you need to remove the item that was added to the validatingItems list, and handle the error (throw exeception probably).

通过不检查QueueUserWorkItem的返回来发现更糟糕的错误。排队可能会失败,为什么它不会抛出异常对我们所有人来说都是一个谜。如果返回false,则需要删除添加到validatingItems列表中的项,并处理错误(可能是throw exeception)。

#6


0  

I would be concerned about performance here. You indicated that the database may give it 20-30 items per second and an item could take up to a few seconds to be validated. That could be quite a large number of threads -- using your metrics, worst case 60-90 threads! I think you need to reconsider the design here. Michael mentioned a nice pattern. The use of the queue really helps keep things under control and organized. A semaphore could also be employed to control number of threads created -- i.e. you could have a maximum number of threads allowed, but under smaller loads, you wouldn't necessarily have to create the maximum number if fewer ended up getting the job done -- i.e. your own pool size could be dynamic with a cap.

我会关注这里的表现。您表示数据库可能每秒为其提供20-30个项目,并且一个项目可能需要几秒钟才能进行验证。这可能是相当多的线程 - 使用您的指标,最坏情况下60-90线程!我想你需要在这里重新考虑设计。迈克尔提到了一个很好队列的使用确实有助于控制和组织事物。也可以使用信号量来控制创建的线程数 - 也就是说,您可以拥有最大线程数,但是在较小的负载下,如果最终完成工作的次数较少,则不一定要创建最大数量 - - 即你自己的游泳池大小可能是带帽的动态。

When using the thread-pool, I also find it more difficult to monitor the execution of threads from the pool in their performing the work. So, unless it's fire and forget, I am in favor of more controlled execution. I know you mentioned that your app exits after the 65K items are all completed. How are you monitoring you threads to determine if they have completed their work -- i.e. all queued workers are done. Are you monitoring the status of all items in the HashSets? I think by queuing your items up and having your own worker threads consume off that queue, you can gain more control. Albeit, this can come at the cost of more overhead in terms of signaling between threads to indicate when all items have been queued allowing them to exit.

在使用线程池时,我还发现在执行工作时监视池中线程的执行更加困难。所以,除非它是火和忘记,我赞成更多的控制执行。我知道您提到您的应用程序在65K项目全部完成后退出。你如何监视你的线程,以确定他们是否已完成他们的工作 - 即所有排队的工作人员都已完成。您是否正在监控HashSet中所有项目的状态?我认为通过排队你的项目并让你自己的工作线程消耗掉那个队列,你可以获得更多的控制权。尽管如此,这可能以线程之间的信令方面的更多开销为代价来指示何时所有项目已经排队以允许它们退出。

#7


0  

You could also try using the CCR - Concurrency and Coordination Runtime. It's buried inside Microsoft Robotics Studio, but provides an excellent API for doing this sort of thing.

您还可以尝试使用CCR - 并发和协调运行时。它隐藏在Microsoft Robotics Studio中,但是为执行此类操作提供了出色的API。

You'd just need to create a "Port" (essentially a queue), hook up a receiver (method that gets called when something is posted to it), and then post work items to it. The CCR handles the queue and the worker thread to run it on.

您只需要创建一个“端口”(本质上是一个队列),连接一个接收器(当某些东西被发布到它时被调用的方法),然后将工作项发布到它。 CCR处理队列和工作线程以运行它。

Here's a video on Channel9 about the CCR.

这是关于CCR的Channel9的视频。

It's very high-performance and is even being used for non-Robotics stuff (Myspace.com uses it behind the scenese for their content-delivery network).

它非常高性能,甚至被用于非机器人的东西(Myspace.com在场景背后使用它作为他们的内容传送网络)。

#1


4  

The thread pool is a convenient choice if you have light weight sporadic processing that isn't time sensitive. However, I recall reading on MSDN that it's not appropriate for large scale processing of this nature.

如果您的轻量级零星处理不是时间敏感的,则线程池是一个方便的选择。但是,我记得在MSDN上读到它不适合这种性质的大规模处理。

I used it for something quite similar to this and regret it. I took a worker-thread approach in subsequent apps and am much happier with the level of control I have.

我把它用于与此非常相似的东西,后悔了。我在后续的应用程序中采用了工作线程方法,并且对我的控制水平感到非常高兴。

My favorite pattern in the worker-thread model is to create a master thread which holds a queue of tasks items. Then fork a bunch of workers that pop items off that queue to process. I use a blocking queue so that when there are no items the process, the workers just block until something is pushed onto the queue. In this model, the master thread produces work items from some source (db, etc.) and the worker threads consume them.

工作线程模型中我最喜欢的模式是创建一个包含任务项队列的主线程。然后派出一堆工作人员从该队列中弹出项目进行处理。我使用阻塞队列,这样当进程中没有项目时,工作人员就会阻塞,直到某些东西被推入队列。在此模型中,主线程从某个源(db等)生成工作项,并且工作线程使用它们。

#2


2  

I second the idea of using a blocking queue and worker threads. Here is a blocking queue implementation that I've used in the past with good results: http://www.codeproject.com/KB/recipes/boundedblockingqueue.aspx

我的第二个想法是使用阻塞队列和工作线程。这是我过去使用的阻塞队列实现,效果很好:http://www.codeproject.com/KB/recipes/boundedblockingqueue.aspx

What's involved in your validation logic? If its mainly CPU bound then I would create no more than 1 worker thread per processor/core on the box. This will tell you the number of processors: Environment.ProcessorCount

您的验证逻辑涉及哪些内容?如果它主要是CPU绑定,那么我将在盒子上为每个处理器/核心创建不超过1个工作线程。这将告诉您处理器的数量:Environment.ProcessorCount

If your validation involves I/O such as File Access or database access then you could use a few more threads than the number of processors.

如果您的验证涉及I / O,例如文件访问或数据库访问,那么您可以使用比处理器数量更多的线程。

#3


1  

Be careful, QueueUserWorkItem might fail

请注意,QueueUserWorkItem可能会失败

#4


1  

There is a possible logic error in the code posted with the question, depending on where the item id in ValidateItem(int id) comes from. Why? Because although you correctly lock your validatingItems and validatedItems queues before queing a work item, you do not add the item to the validatingItems queue until the new thread spins up. That means there could be a time gap where another thread calls ValidateItem(id) with the same id (unless this is running on a single main thread).

使用问题发布的代码中可能存在逻辑错误,具体取决于ValidateItem(int id)中的项ID来自何处。为什么?因为虽然在排队工作项之前正确锁定了validatingItems和validatedItems队列,但是在新线程旋转之前,不会将该项添加到validatingItems队列。这意味着可能存在时间间隔,其中另一个线程调用具有相同id的ValidateItem(id)(除非它在单个主线程上运行)。

I would add item to the validatingItems queue just before queuing the item, inside the lock.

我会在锁定内部排队项目之前将项目添加到validatingItems队列。

Edit: also QueueUserWorkItem() returns a bool so you should use the return value to make sure the item was queued and THEN add it to the validatingItems queue.

编辑:同时QueueUserWorkItem()返回一个bool,因此您应该使用返回值来确保该项已排队,然后将其添加到validatingItems队列。

#5


1  

ThreadPool may not be optimal for jamming so much at once into it. You may want to research the upper limits of its capabilities and/or roll your own.

ThreadPool可能不是最适合干扰它的那么多。您可能想要研究其功能的上限和/或自己动手。

Also, there is a race condition that exists in your code, if you expect no duplicate validations. The call to

此外,如果您不希望重复验证,则代码中存在竞争条件。打电话给

this.validatingItems.Add(itemId);

needs to happen in the main thread (ValidateItem), not in the thread pool thread (Validate method). This call should occur a line before the queueing of the work item to the pool.

需要在主线程(ValidateItem)中发生,而不是在线程池线程(Validate方法)中发生。此调用应在将工作项排队到池之前发生一行。

A worse bug is found by not checking the return of QueueUserWorkItem. Queueing can fail, and why it doesn't throw an exception is a mystery to us all. If it returns false, you need to remove the item that was added to the validatingItems list, and handle the error (throw exeception probably).

通过不检查QueueUserWorkItem的返回来发现更糟糕的错误。排队可能会失败,为什么它不会抛出异常对我们所有人来说都是一个谜。如果返回false,则需要删除添加到validatingItems列表中的项,并处理错误(可能是throw exeception)。

#6


0  

I would be concerned about performance here. You indicated that the database may give it 20-30 items per second and an item could take up to a few seconds to be validated. That could be quite a large number of threads -- using your metrics, worst case 60-90 threads! I think you need to reconsider the design here. Michael mentioned a nice pattern. The use of the queue really helps keep things under control and organized. A semaphore could also be employed to control number of threads created -- i.e. you could have a maximum number of threads allowed, but under smaller loads, you wouldn't necessarily have to create the maximum number if fewer ended up getting the job done -- i.e. your own pool size could be dynamic with a cap.

我会关注这里的表现。您表示数据库可能每秒为其提供20-30个项目,并且一个项目可能需要几秒钟才能进行验证。这可能是相当多的线程 - 使用您的指标,最坏情况下60-90线程!我想你需要在这里重新考虑设计。迈克尔提到了一个很好队列的使用确实有助于控制和组织事物。也可以使用信号量来控制创建的线程数 - 也就是说,您可以拥有最大线程数,但是在较小的负载下,如果最终完成工作的次数较少,则不一定要创建最大数量 - - 即你自己的游泳池大小可能是带帽的动态。

When using the thread-pool, I also find it more difficult to monitor the execution of threads from the pool in their performing the work. So, unless it's fire and forget, I am in favor of more controlled execution. I know you mentioned that your app exits after the 65K items are all completed. How are you monitoring you threads to determine if they have completed their work -- i.e. all queued workers are done. Are you monitoring the status of all items in the HashSets? I think by queuing your items up and having your own worker threads consume off that queue, you can gain more control. Albeit, this can come at the cost of more overhead in terms of signaling between threads to indicate when all items have been queued allowing them to exit.

在使用线程池时,我还发现在执行工作时监视池中线程的执行更加困难。所以,除非它是火和忘记,我赞成更多的控制执行。我知道您提到您的应用程序在65K项目全部完成后退出。你如何监视你的线程,以确定他们是否已完成他们的工作 - 即所有排队的工作人员都已完成。您是否正在监控HashSet中所有项目的状态?我认为通过排队你的项目并让你自己的工作线程消耗掉那个队列,你可以获得更多的控制权。尽管如此,这可能以线程之间的信令方面的更多开销为代价来指示何时所有项目已经排队以允许它们退出。

#7


0  

You could also try using the CCR - Concurrency and Coordination Runtime. It's buried inside Microsoft Robotics Studio, but provides an excellent API for doing this sort of thing.

您还可以尝试使用CCR - 并发和协调运行时。它隐藏在Microsoft Robotics Studio中,但是为执行此类操作提供了出色的API。

You'd just need to create a "Port" (essentially a queue), hook up a receiver (method that gets called when something is posted to it), and then post work items to it. The CCR handles the queue and the worker thread to run it on.

您只需要创建一个“端口”(本质上是一个队列),连接一个接收器(当某些东西被发布到它时被调用的方法),然后将工作项发布到它。 CCR处理队列和工作线程以运行它。

Here's a video on Channel9 about the CCR.

这是关于CCR的Channel9的视频。

It's very high-performance and is even being used for non-Robotics stuff (Myspace.com uses it behind the scenese for their content-delivery network).

它非常高性能,甚至被用于非机器人的东西(Myspace.com在场景背后使用它作为他们的内容传送网络)。