如何在Python中循环多线程操作

时间:2021-07-17 23:15:56

Say I have a very large list and I'm performing an operation like so:

假设我有一个非常大的列表,我正在执行这样的操作:

for item in items:
    try:
        api.my_operation(item)
    except:
        print 'error with item'

My issue is two fold:

我的问题有两个方面:

  • There are a lot of items
  • 有很多项目
  • api.my_operation takes forever to return
  • api.my_operation需要永远回归

I'd like to use multi-threading to spin up a bunch of api.my_operations at once so I can process maybe 5 or 10 or even 100 items at once.

我想使用多线程一次启动一堆api.my_operations,这样我就可以同时处理5个或10个甚至100个项目。

If my_operation() returns an exception (because maybe I already processed that item) - that's OK. It won't break anything. The loop can continue to the next item.

如果my_operation()返回一个异常(因为我可能已经处理过该项) - 那没关系。它不会破坏任何东西。循环可以继续到下一个项目。

Note: this is for Python 2.7.3

注意:这适用于Python 2.7.3

3 个解决方案

#1


72  

First, in Python, if your code is CPU-bound, multithreading won't help, because only one thread can hold the Global Interpreter Lock, and therefore run Python code, at a time. So, you need to use processes, not threads.

首先,在Python中,如果您的代码受CPU限制,多线程将无济于事,因为只有一个线程可以保存全局解释器锁,因此一次运行Python代码。所以,你需要使用进程,而不是线程。

This is not true if your operation "takes forever to return" because it's IO-bound—that is, waiting on the network or disk copies or the like. I'll come back to that later.

如果您的操作“需要永远返回”,这是不正确的,因为它是IO绑定的 - 即等待网络或磁盘副本等。我稍后再说。


Next, the way to process 5 or 10 or 100 items at once is to create a pool of 5 or 10 or 100 workers, and put the items into a queue that the workers service. Fortunately, the stdlib multiprocessing and concurrent.futures libraries both wraps up most of the details for you.

接下来,一次处理5个或10个或100个项目的方法是创建一个5或10或100个工作池,并将这些项目放入工作人员服务的队列中。幸运的是,stdlib多处理和concurrent.futures库都包含了大部分细节。

The former is more powerful and flexible for traditional programming; the latter is simpler if you need to compose future-waiting; for trivial cases, it really doesn't matter which you choose. (In this case, the most obvious implementation with each takes 3 lines with futures, 4 lines with multiprocessing.)

前者对传统节目更强大,更灵活;如果你需要组成未来等待,后者会更简单;对于琐碎的案例,你选择哪个并不重要。 (在这种情况下,每个最明显的实现需要3行与期货,4行与多处理。)

If you're using 2.6-2.7 or 3.0-3.1, futures isn't built in, but you can install it from PyPI (pip install futures).

如果您使用的是2.6-2.7或3.0-3.1,则不会内置期货,但您可以从PyPI(pip install期货)安装期货。


Finally, it's usually a lot simpler to parallelize things if you can turn the entire loop iteration into a function call (something you could, e.g., pass to map), so let's do that first:

最后,如果您可以将整个循环迭代转换为函数调用(例如,传递给映射),那么并行化事情通常要简单得多,所以让我们先做:

def try_my_operation(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

Putting it all together:

把它们放在一起:

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)

If you have lots of relatively small jobs, the overhead of multiprocessing might swamp the gains. The way to solve that is to batch up the work into larger jobs. For example (using grouper from the itertools recipes, which you can copy and paste into your code, or get from the more-itertools project on PyPI):

如果你有很多相对较小的工作,多处理的开销可能会淹没收益。解决这个问题的方法是将工作批量化为更大的工作。例如(使用itertools配方中的分组器,您可以将其复制并粘贴到代码中,或者从PyPI上的more-itertools项目获取):

def try_multiple_operations(items):
    for item in items:
        try:
            api.my_operation(item)
        except:
            print('error with item')

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group) 
           for group in grouper(5, items)]
concurrent.futures.wait(futures)

Finally, what if your code is IO bound? Then threads are just as good as processes, and with less overhead (and fewer limitations, but those limitations usually won't affect you in cases like this). Sometimes that "less overhead" is enough to mean you don't need batching with threads, but you do with processes, which is a nice win.

最后,如果您的代码是IO绑定怎么办?然后线程和进程一样好,并且开销更少(限制更少,但在这种情况下,这些限制通常不会影响您)。有时“减少开销”足以表示您不需要使用线程进行批处理,但是您需要使用进程,这是一个不错的胜利。

So, how do you use threads instead of processes? Just change ProcessPoolExecutor to ThreadPoolExecutor.

那么,你如何使用线程而不是进程呢?只需将ProcessPoolExecutor更改为ThreadPoolExecutor即可。

If you're not sure whether your code is CPU-bound or IO-bound, just try it both ways.

如果您不确定您的代码是受CPU限制还是IO绑定,请尝试两种方式。


Can I do this for multiple functions in my python script? For example, if I had another for loop elsewhere in the code that I wanted to parallelize. Is it possible to do two multi threaded functions in the same script?

我可以在我的python脚本中为多个函数执行此操作吗?例如,如果我想要并行化的代码中的其他地方有另一个for循环。是否可以在同一个脚本中执行两个多线程函数?

Yes. In fact, there are two different ways to do it.

是。实际上,有两种不同的方法可以做到这一点。

First, you can share the same (thread or process) executor and use it from multiple places with no problem. The whole point of tasks and futures is that they're self-contained; you don't care where they run, just that you queue them up and eventually get the answer back.

首先,您可以共享相同的(线程或进程)执行程序,并在多个位置使用它,没有任何问题。任务和未来的全部意义在于它们是独立的;你不关心他们在哪里跑,只是你把他们排队并最终得到答案。

Alternatively, you can have two executors in the same program with no problem. This has a performance cost—if you're using both executors at the same time, you'll end up trying to run (for example) 16 busy threads on 8 cores, which means there's going to be some context switching. But sometimes it's worth doing because, say, the two executors are rarely busy at the same time, and it makes your code a lot simpler. Or maybe one executor is running very large tasks that can take a while to complete, and the other is running very small tasks that need to complete as quickly as possible, because responsiveness is more important than throughput for part of your program.

或者,您可以在同一程序中有两个执行程序,没有问题。这有一个性能成本 - 如果你同时使用两个执行程序,你最终会尝试在8个内核上运行(例如)16个忙线程,这意味着会有一些上下文切换。但有时它值得做,因为比方说,两个执行器很少同时忙,它使你的代码更简单。或者,一个执行程序正在运行可能需要一段时间才能完成的非常大的任务,另一个执行程序正在运行需要尽快完成的非常小的任务,因为响应性比部分程序的吞吐量更重要。

If you don't know which is appropriate for your program, usually it's the first.

如果你不知道哪个适合你的程序,通常它是第一个。

#2


19  

Edit 2018-02-06: revision based on this comment

编辑2018-02-06:基于此评论的修订

Edit: forgot to mention that this works on Python 2.7.x

编辑:忘了提到这适用于Python 2.7.x.

There's multiprocesing.pool, and the following sample illustrates how to use one of them:

有multiprocesing.pool,以下示例说明了如何使用其中一个:

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool

pool_size = 5  # your "parallelness"

# define worker function before a Pool is instantiated
def worker(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

pool = Pool(pool_size)

for item in items:
    pool.apply_async(worker, (item,))

pool.close()
pool.join()

Now if you indeed identify that your process is CPU bound as @abarnert mentioned, change ThreadPool to the process pool implementation (commented under ThreadPool import). You can find more details here: http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

现在,如果确实确定您的进程受到@abarnert提到的CPU绑定,请将ThreadPool更改为进程池实现(在ThreadPool导入下注释)。您可以在此处找到更多详细信息:http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

#3


7  

You can split the processing into a specified number of threads using an approach like this:

您可以使用以下方法将处理拆分为指定数量的线程:

import threading                                                                

def process(items, start, end):                                                 
    for item in items[start:end]:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')                                            


def split_processing(items, num_splits=4):                                      
    split_size = len(items) // num_splits                                       
    threads = []                                                                
    for i in range(num_splits):                                                 
        # determine the indices of the list this thread will handle             
        start = i * split_size                                                  
        # special case on the last chunk to account for uneven splits           
        end = None if i+1 == num_splits else (i+1) * split_size                 
        # create the thread                                                     
        threads.append(                                                         
            threading.Thread(target=process, args=(items, start, end)))         
        threads[-1].start() # start the thread we just created                  

    # wait for all threads to finish                                            
    for t in threads:                                                           
        t.join()                                                                



split_processing(items)

#1


72  

First, in Python, if your code is CPU-bound, multithreading won't help, because only one thread can hold the Global Interpreter Lock, and therefore run Python code, at a time. So, you need to use processes, not threads.

首先,在Python中,如果您的代码受CPU限制,多线程将无济于事,因为只有一个线程可以保存全局解释器锁,因此一次运行Python代码。所以,你需要使用进程,而不是线程。

This is not true if your operation "takes forever to return" because it's IO-bound—that is, waiting on the network or disk copies or the like. I'll come back to that later.

如果您的操作“需要永远返回”,这是不正确的,因为它是IO绑定的 - 即等待网络或磁盘副本等。我稍后再说。


Next, the way to process 5 or 10 or 100 items at once is to create a pool of 5 or 10 or 100 workers, and put the items into a queue that the workers service. Fortunately, the stdlib multiprocessing and concurrent.futures libraries both wraps up most of the details for you.

接下来,一次处理5个或10个或100个项目的方法是创建一个5或10或100个工作池,并将这些项目放入工作人员服务的队列中。幸运的是,stdlib多处理和concurrent.futures库都包含了大部分细节。

The former is more powerful and flexible for traditional programming; the latter is simpler if you need to compose future-waiting; for trivial cases, it really doesn't matter which you choose. (In this case, the most obvious implementation with each takes 3 lines with futures, 4 lines with multiprocessing.)

前者对传统节目更强大,更灵活;如果你需要组成未来等待,后者会更简单;对于琐碎的案例,你选择哪个并不重要。 (在这种情况下,每个最明显的实现需要3行与期货,4行与多处理。)

If you're using 2.6-2.7 or 3.0-3.1, futures isn't built in, but you can install it from PyPI (pip install futures).

如果您使用的是2.6-2.7或3.0-3.1,则不会内置期货,但您可以从PyPI(pip install期货)安装期货。


Finally, it's usually a lot simpler to parallelize things if you can turn the entire loop iteration into a function call (something you could, e.g., pass to map), so let's do that first:

最后,如果您可以将整个循环迭代转换为函数调用(例如,传递给映射),那么并行化事情通常要简单得多,所以让我们先做:

def try_my_operation(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

Putting it all together:

把它们放在一起:

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)

If you have lots of relatively small jobs, the overhead of multiprocessing might swamp the gains. The way to solve that is to batch up the work into larger jobs. For example (using grouper from the itertools recipes, which you can copy and paste into your code, or get from the more-itertools project on PyPI):

如果你有很多相对较小的工作,多处理的开销可能会淹没收益。解决这个问题的方法是将工作批量化为更大的工作。例如(使用itertools配方中的分组器,您可以将其复制并粘贴到代码中,或者从PyPI上的more-itertools项目获取):

def try_multiple_operations(items):
    for item in items:
        try:
            api.my_operation(item)
        except:
            print('error with item')

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group) 
           for group in grouper(5, items)]
concurrent.futures.wait(futures)

Finally, what if your code is IO bound? Then threads are just as good as processes, and with less overhead (and fewer limitations, but those limitations usually won't affect you in cases like this). Sometimes that "less overhead" is enough to mean you don't need batching with threads, but you do with processes, which is a nice win.

最后,如果您的代码是IO绑定怎么办?然后线程和进程一样好,并且开销更少(限制更少,但在这种情况下,这些限制通常不会影响您)。有时“减少开销”足以表示您不需要使用线程进行批处理,但是您需要使用进程,这是一个不错的胜利。

So, how do you use threads instead of processes? Just change ProcessPoolExecutor to ThreadPoolExecutor.

那么,你如何使用线程而不是进程呢?只需将ProcessPoolExecutor更改为ThreadPoolExecutor即可。

If you're not sure whether your code is CPU-bound or IO-bound, just try it both ways.

如果您不确定您的代码是受CPU限制还是IO绑定,请尝试两种方式。


Can I do this for multiple functions in my python script? For example, if I had another for loop elsewhere in the code that I wanted to parallelize. Is it possible to do two multi threaded functions in the same script?

我可以在我的python脚本中为多个函数执行此操作吗?例如,如果我想要并行化的代码中的其他地方有另一个for循环。是否可以在同一个脚本中执行两个多线程函数?

Yes. In fact, there are two different ways to do it.

是。实际上,有两种不同的方法可以做到这一点。

First, you can share the same (thread or process) executor and use it from multiple places with no problem. The whole point of tasks and futures is that they're self-contained; you don't care where they run, just that you queue them up and eventually get the answer back.

首先,您可以共享相同的(线程或进程)执行程序,并在多个位置使用它,没有任何问题。任务和未来的全部意义在于它们是独立的;你不关心他们在哪里跑,只是你把他们排队并最终得到答案。

Alternatively, you can have two executors in the same program with no problem. This has a performance cost—if you're using both executors at the same time, you'll end up trying to run (for example) 16 busy threads on 8 cores, which means there's going to be some context switching. But sometimes it's worth doing because, say, the two executors are rarely busy at the same time, and it makes your code a lot simpler. Or maybe one executor is running very large tasks that can take a while to complete, and the other is running very small tasks that need to complete as quickly as possible, because responsiveness is more important than throughput for part of your program.

或者,您可以在同一程序中有两个执行程序,没有问题。这有一个性能成本 - 如果你同时使用两个执行程序,你最终会尝试在8个内核上运行(例如)16个忙线程,这意味着会有一些上下文切换。但有时它值得做,因为比方说,两个执行器很少同时忙,它使你的代码更简单。或者,一个执行程序正在运行可能需要一段时间才能完成的非常大的任务,另一个执行程序正在运行需要尽快完成的非常小的任务,因为响应性比部分程序的吞吐量更重要。

If you don't know which is appropriate for your program, usually it's the first.

如果你不知道哪个适合你的程序,通常它是第一个。

#2


19  

Edit 2018-02-06: revision based on this comment

编辑2018-02-06:基于此评论的修订

Edit: forgot to mention that this works on Python 2.7.x

编辑:忘了提到这适用于Python 2.7.x.

There's multiprocesing.pool, and the following sample illustrates how to use one of them:

有multiprocesing.pool,以下示例说明了如何使用其中一个:

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool

pool_size = 5  # your "parallelness"

# define worker function before a Pool is instantiated
def worker(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

pool = Pool(pool_size)

for item in items:
    pool.apply_async(worker, (item,))

pool.close()
pool.join()

Now if you indeed identify that your process is CPU bound as @abarnert mentioned, change ThreadPool to the process pool implementation (commented under ThreadPool import). You can find more details here: http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

现在,如果确实确定您的进程受到@abarnert提到的CPU绑定,请将ThreadPool更改为进程池实现(在ThreadPool导入下注释)。您可以在此处找到更多详细信息:http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

#3


7  

You can split the processing into a specified number of threads using an approach like this:

您可以使用以下方法将处理拆分为指定数量的线程:

import threading                                                                

def process(items, start, end):                                                 
    for item in items[start:end]:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')                                            


def split_processing(items, num_splits=4):                                      
    split_size = len(items) // num_splits                                       
    threads = []                                                                
    for i in range(num_splits):                                                 
        # determine the indices of the list this thread will handle             
        start = i * split_size                                                  
        # special case on the last chunk to account for uneven splits           
        end = None if i+1 == num_splits else (i+1) * split_size                 
        # create the thread                                                     
        threads.append(                                                         
            threading.Thread(target=process, args=(items, start, end)))         
        threads[-1].start() # start the thread we just created                  

    # wait for all threads to finish                                            
    for t in threads:                                                           
        t.join()                                                                



split_processing(items)