如何管理python线程的结果?

时间:2022-09-25 21:01:31

I am using this code:

我使用的代码是:

def startThreads(arrayofkeywords):
    global i
    i = 0
    while len(arrayofkeywords):
        try:
            if i<maxThreads:
                keyword = arrayofkeywords.pop(0)
                i = i+1
                thread = doStuffWith(keyword)
                thread.start()
        except KeyboardInterrupt:
            sys.exit()
    thread.join()

for threading in python, I have almost everything done, but I dont know how to manage the results of each thread, on each thread I have an array of strings as result, how can I join all those arrays into one safely? Because, I if I try writing into a global array, two threads could be writing at the same time.

对于python中的线程,我几乎已经完成了所有的工作,但是我不知道如何管理每个线程的结果,因此在每个线程上我都有一个字符串数组,我如何安全地将所有这些数组合并到一个数组中呢?因为,如果我尝试在全局数组中写入,两个线程可以同时写入。

6 个解决方案

#1


13  

First, you actually need to save all those thread objects to call join() on them. As written, you're saving only the last one of them, and then only if there isn't an exception.

首先,您实际上需要保存所有这些线程对象,以调用它们的join()。如前所述,您只保存其中的最后一个,然后只有在没有异常时才保存。

An easy way to do multithreaded programming is to give each thread all the data it needs to run, and then have it not write to anything outside that working set. If all threads follow that guideline, their writes will not interfere with each other. Then, once a thread has finished, have the main thread only aggregate the results into a global array. This is know as "fork/join parallelism."

实现多线程编程的一个简单方法是给每个线程运行所需的所有数据,然后让它不写任何工作集之外的内容。然后,一旦线程完成,主线程只将结果聚合到一个全局数组中。这被称为“fork/join并行”。

If you subclass the Thread object, you can give it space to store that return value without interfering with other threads. Then you can do something like this:

如果您子类化Thread对象,您可以给它空间来存储返回值,而不会干扰其他线程。然后你可以这样做:

class MyThread(threading.Thread):
    def __init__(self, ...):
        self.result = []
        ...

def main():
    # doStuffWith() returns a MyThread instance
    threads = [ doStuffWith(k).start() for k in arrayofkeywords[:maxThreads] ]
    for t in threads:
        t.join()
        ret = t.result
        # process return value here

Edit:

编辑:

After looking around a bit, it seems like the above method isn't the preferred way to do threads in Python. The above is more of a Java-esque pattern for threads. Instead you could do something like:

仔细研究一下,似乎上面的方法并不是在Python中执行线程的首选方法。上面是一个类似java的线程模式。你可以这样做:

def handler(outList)
    ...
    # Modify existing object (important!)
    outList.append(1)
    ...

def doStuffWith(keyword):
    ...
    result = []
    thread = Thread(target=handler, args=(result,))
    return (thread, result)

def main():
    threads = [ doStuffWith(k) for k in arrayofkeywords[:maxThreads] ]
    for t in threads:
        t[0].start()
    for t in threads:
        t[0].join()
        ret = t[1]
        # process return value here

#2


14  

Use a Queue.Queue instance, which is intrinsically thread-safe. Each thread can .put its results to that global instance when it's done, and the main thread (when it knows all working threads are done, by .joining them for example as in @unholysampler's answer) can loop .getting each result from it, and use each result to .extend the "overall result" list, until the queue is emptied.

使用一个队列。队列实例,本质上是线程安全的。每个线程可以.put结果全局实例的时候,和主线程(当它知道所有工作线程完成,通过. join例如@unholysampler的回答)可以循环. get每个结果,并使用每个结果.extend“总体结果”列表,直到队列为空。

Edit: there are other big problems with your code -- if the maximum number of threads is less than the number of keywords, it will never terminate (you're trying to start a thread per keyword -- never less -- but if you've already started the max numbers you loop forever to no further purpose).

代码编辑:有其他大问题——如果线程的最大数量小于关键词的数量,它永远不会终止(你试图启动一个线程每字,从来没有少,但是如果你已经开始了max数字你永远循环没有进一步的目的)。

Consider instead using a threading pool, kind of like the one in this recipe, except that in lieu of queueing callables you'll queue the keywords -- since the callable you want to run in the thread is the same in each thread, just varying the argument. Of course that callable will be changed to peel something from the incoming-tasks queue (with .get) and .put the list of results to the outgoing-results queue when done.

可以考虑使用线程池,类似于这个菜谱中的一个,除了替代可调用的callables,您将对关键字进行排队——因为您希望在线程中运行的callable在每个线程中都是相同的,只是改变了参数。当然,可调用性将被更改为从输入任务队列中剥离一些内容(使用.get),并在完成时将结果列表放到输出结果队列中。

To terminate the N threads you could, after all keywords, .put N "sentinels" (e.g. None, assuming no keyword can be None): a thread's callable will exit if the "keyword" it just pulled is None.

为了终止N个线程,在所有关键字之后,.put N“sentinels”(例如:None,假设no关键字可以是None):如果线程刚刚提取的“关键字”为None,则该线程的可调用性将退出。

More often than not, Queue.Queue offers the best way to organize threading (and multiprocessing!) architectures in Python, be they generic like in the recipe I pointed you to, or more specialized like I'm suggesting for your use case in the last two paragraphs.

通常是排队。Queue提供了在Python中组织线程(和多处理!)体系结构的最佳方式,不管它们是通用的,如我给您指出的配方,还是更专业的,如我在最后两段中建议您使用的用例。

#3


3  

You need to keep pointers to each thread you make. As is, your code only ensures the last created thread finishes. This does not imply that all the ones you started before it have also finished.

您需要保持对每个线程的指针。实际上,您的代码只确保最后创建的线程完成。这并不意味着你在开始之前就已经完成了。

def startThreads(arrayofkeywords):
    global i
    i = 0
    threads = []
    while len(arrayofkeywords):
        try:
            if i<maxThreads:
                keyword = arrayofkeywords.pop(0)
                i = i+1
                thread = doStuffWith(keyword)
                thread.start()
                threads.append(thread)
        except KeyboardInterrupt:
            sys.exit()
    for t in threads:
        t.join()
    //process results stored in each thread

This also solves the problem of write access because each thread will store it's data locally. Then after all of them are done, you can do the work to combine each threads local data.

这也解决了写入访问的问题,因为每个线程将在本地存储它的数据。然后,在完成所有这些操作之后,您可以完成将每个线程本地数据合并的工作。

#4


1  

I know that this question is a little bit old, but the best way to do this is not to harm yourself too much in the way proposed by other colleagues :)

我知道这个问题有点老生常谈,但最好的方法是不要在其他同事提出的问题上伤害你自己。

Please read the reference on Pool. This way you will fork-join your work:

请参阅有关泳池的参考资料。这样你就可以分身工作了:

def doStuffWith(keyword):
    return keyword + ' processed in thread'

def startThreads(arrayofkeywords):
    pool = Pool(processes=maxThreads)
    result = pool.map(doStuffWith, arrayofkeywords)
    print result

#5


0  

Writing into a global array is fine if you use a semaphore to protect the critical section. You 'acquire' the lock when you want to append to the global array, then 'release' when you are done. This way, only one thread is every appending to the array.

如果使用信号量来保护关键部分,那么写入全局数组是可以的。当您想要追加到全局数组时,您将“获得”锁,然后在完成后“release”。这样,每个附加到数组的线程都只有一个。

Check out http://docs.python.org/library/threading.html and search for semaphore for more info.

查看http://docs.python.org/library/thread.html并搜索信号量以获得更多信息。

sem = threading.Semaphore()
...
sem.acquire()
# do dangerous stuff
sem.release()

#6


0  

try some semaphore's methods, like acquire and release.. http://docs.python.org/library/threading.html

尝试一些信号量的方法,比如获取和释放。http://docs.python.org/library/threading.html

#1


13  

First, you actually need to save all those thread objects to call join() on them. As written, you're saving only the last one of them, and then only if there isn't an exception.

首先,您实际上需要保存所有这些线程对象,以调用它们的join()。如前所述,您只保存其中的最后一个,然后只有在没有异常时才保存。

An easy way to do multithreaded programming is to give each thread all the data it needs to run, and then have it not write to anything outside that working set. If all threads follow that guideline, their writes will not interfere with each other. Then, once a thread has finished, have the main thread only aggregate the results into a global array. This is know as "fork/join parallelism."

实现多线程编程的一个简单方法是给每个线程运行所需的所有数据,然后让它不写任何工作集之外的内容。然后,一旦线程完成,主线程只将结果聚合到一个全局数组中。这被称为“fork/join并行”。

If you subclass the Thread object, you can give it space to store that return value without interfering with other threads. Then you can do something like this:

如果您子类化Thread对象,您可以给它空间来存储返回值,而不会干扰其他线程。然后你可以这样做:

class MyThread(threading.Thread):
    def __init__(self, ...):
        self.result = []
        ...

def main():
    # doStuffWith() returns a MyThread instance
    threads = [ doStuffWith(k).start() for k in arrayofkeywords[:maxThreads] ]
    for t in threads:
        t.join()
        ret = t.result
        # process return value here

Edit:

编辑:

After looking around a bit, it seems like the above method isn't the preferred way to do threads in Python. The above is more of a Java-esque pattern for threads. Instead you could do something like:

仔细研究一下,似乎上面的方法并不是在Python中执行线程的首选方法。上面是一个类似java的线程模式。你可以这样做:

def handler(outList)
    ...
    # Modify existing object (important!)
    outList.append(1)
    ...

def doStuffWith(keyword):
    ...
    result = []
    thread = Thread(target=handler, args=(result,))
    return (thread, result)

def main():
    threads = [ doStuffWith(k) for k in arrayofkeywords[:maxThreads] ]
    for t in threads:
        t[0].start()
    for t in threads:
        t[0].join()
        ret = t[1]
        # process return value here

#2


14  

Use a Queue.Queue instance, which is intrinsically thread-safe. Each thread can .put its results to that global instance when it's done, and the main thread (when it knows all working threads are done, by .joining them for example as in @unholysampler's answer) can loop .getting each result from it, and use each result to .extend the "overall result" list, until the queue is emptied.

使用一个队列。队列实例,本质上是线程安全的。每个线程可以.put结果全局实例的时候,和主线程(当它知道所有工作线程完成,通过. join例如@unholysampler的回答)可以循环. get每个结果,并使用每个结果.extend“总体结果”列表,直到队列为空。

Edit: there are other big problems with your code -- if the maximum number of threads is less than the number of keywords, it will never terminate (you're trying to start a thread per keyword -- never less -- but if you've already started the max numbers you loop forever to no further purpose).

代码编辑:有其他大问题——如果线程的最大数量小于关键词的数量,它永远不会终止(你试图启动一个线程每字,从来没有少,但是如果你已经开始了max数字你永远循环没有进一步的目的)。

Consider instead using a threading pool, kind of like the one in this recipe, except that in lieu of queueing callables you'll queue the keywords -- since the callable you want to run in the thread is the same in each thread, just varying the argument. Of course that callable will be changed to peel something from the incoming-tasks queue (with .get) and .put the list of results to the outgoing-results queue when done.

可以考虑使用线程池,类似于这个菜谱中的一个,除了替代可调用的callables,您将对关键字进行排队——因为您希望在线程中运行的callable在每个线程中都是相同的,只是改变了参数。当然,可调用性将被更改为从输入任务队列中剥离一些内容(使用.get),并在完成时将结果列表放到输出结果队列中。

To terminate the N threads you could, after all keywords, .put N "sentinels" (e.g. None, assuming no keyword can be None): a thread's callable will exit if the "keyword" it just pulled is None.

为了终止N个线程,在所有关键字之后,.put N“sentinels”(例如:None,假设no关键字可以是None):如果线程刚刚提取的“关键字”为None,则该线程的可调用性将退出。

More often than not, Queue.Queue offers the best way to organize threading (and multiprocessing!) architectures in Python, be they generic like in the recipe I pointed you to, or more specialized like I'm suggesting for your use case in the last two paragraphs.

通常是排队。Queue提供了在Python中组织线程(和多处理!)体系结构的最佳方式,不管它们是通用的,如我给您指出的配方,还是更专业的,如我在最后两段中建议您使用的用例。

#3


3  

You need to keep pointers to each thread you make. As is, your code only ensures the last created thread finishes. This does not imply that all the ones you started before it have also finished.

您需要保持对每个线程的指针。实际上,您的代码只确保最后创建的线程完成。这并不意味着你在开始之前就已经完成了。

def startThreads(arrayofkeywords):
    global i
    i = 0
    threads = []
    while len(arrayofkeywords):
        try:
            if i<maxThreads:
                keyword = arrayofkeywords.pop(0)
                i = i+1
                thread = doStuffWith(keyword)
                thread.start()
                threads.append(thread)
        except KeyboardInterrupt:
            sys.exit()
    for t in threads:
        t.join()
    //process results stored in each thread

This also solves the problem of write access because each thread will store it's data locally. Then after all of them are done, you can do the work to combine each threads local data.

这也解决了写入访问的问题,因为每个线程将在本地存储它的数据。然后,在完成所有这些操作之后,您可以完成将每个线程本地数据合并的工作。

#4


1  

I know that this question is a little bit old, but the best way to do this is not to harm yourself too much in the way proposed by other colleagues :)

我知道这个问题有点老生常谈,但最好的方法是不要在其他同事提出的问题上伤害你自己。

Please read the reference on Pool. This way you will fork-join your work:

请参阅有关泳池的参考资料。这样你就可以分身工作了:

def doStuffWith(keyword):
    return keyword + ' processed in thread'

def startThreads(arrayofkeywords):
    pool = Pool(processes=maxThreads)
    result = pool.map(doStuffWith, arrayofkeywords)
    print result

#5


0  

Writing into a global array is fine if you use a semaphore to protect the critical section. You 'acquire' the lock when you want to append to the global array, then 'release' when you are done. This way, only one thread is every appending to the array.

如果使用信号量来保护关键部分,那么写入全局数组是可以的。当您想要追加到全局数组时,您将“获得”锁,然后在完成后“release”。这样,每个附加到数组的线程都只有一个。

Check out http://docs.python.org/library/threading.html and search for semaphore for more info.

查看http://docs.python.org/library/thread.html并搜索信号量以获得更多信息。

sem = threading.Semaphore()
...
sem.acquire()
# do dangerous stuff
sem.release()

#6


0  

try some semaphore's methods, like acquire and release.. http://docs.python.org/library/threading.html

尝试一些信号量的方法,比如获取和释放。http://docs.python.org/library/threading.html