如何从python中的线程获取返回值?

时间:2021-08-06 01:16:23

How to get the value 'foo' which is returned from the thread?

如何获取从线程返回的值“foo”?

from threading import Thread

def foo(bar):
    print 'hello {}'.format(bar)
    return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
ret = thread.join()
print ret

The one obvious way to do it, shown above, returns None.

最明显的方法是,上面显示的,返回None。

18 个解决方案

#1


173  

FWIW, the multiprocessing module has a nice interface for this using the Pool class. And if you want to stick with threads rather than processes, you can just use the multiprocessing.pool.ThreadPool class as a drop-in replacement.

FWIW,多处理模块使用池类有一个很好的接口。如果您想要使用线程而不是进程,您可以只使用多进程。ThreadPool类作为插入替换。

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.

#2


125  

One way I've seen is to pass a mutable object, such as a list or a dictionary, to the thread's constructor, along with a an index or other identifier of some sort. The thread can then store its results in its dedicated slot in that object. For example:

我看到的一种方法是将一个可变对象(如列表或字典)传递给线程的构造函数,以及一个索引或某种其他标识符。然后线程可以将其结果存储在该对象的专用插槽中。例如:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

If you really want join() to return the return value of the called function, you can do this with a Thread subclass like the following:

如果您确实希望join()返回被调用函数的返回值,那么可以使用如下所示的线程子类来完成此操作:

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

That gets a little hairy because of some name mangling, and it accesses "private" data structures that are specific to Thread implementation... but it works.

这有点麻烦,因为它的名称是“mangling”,它访问特定于线程实现的“私有”数据结构。但它的工作原理。

#3


43  

Jake's answer is good, but if you don't want to use a threadpool (you don't know how many threads you'll need, but create them as needed) then a good way to transmit information between threads is the built-in Queue.Queue class, as it offers thread safety.

Jake的回答很好,但是如果您不想使用threadpool(您不知道您需要多少线程,但是根据需要创建它们),那么在线程之间传输信息的一个好方法就是内置队列。队列类,因为它提供线程安全性。

I created the following decorator to make it act in a similar fashion to the threadpool:

我创建了下面的decorator,以使它以类似于threadpool的方式运行:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

Then you just use it as:

然后你可以把它当作:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

The decorated function creates a new thread each time it's called and returns a Thread object that contains the queue that will receive the result.

修饰函数每次调用时都会创建一个新线程,并返回一个包含将接收结果的队列的线程对象。

#4


14  

Another solution that doesn't require changing your existing code:

另一种不需要更改现有代码的解决方案:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result

It can be also easily adjusted to a multi-threaded environment:

它还可以很容易地适应多线程环境:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()
threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Join all the threads
for t in threads_list:
    t.join()

# Check thread's return value
while not que.empty():
    result = que.get()
    print result

#5


12  

I stole kindall's answer and cleaned it up just a little bit.

我偷了kindall的答案,把它清理了一下。

The key part is adding *args and **kwargs to join() in order to handle the timeout

关键部分是添加*args和**kwargs来连接()以处理超时。

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)

        self._return = None

    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)

    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)

        return self._return

#6


11  

Parris / kindall's answer join/return answer ported to Python 3:

Parris / kindall的答案加入/返回的答案移植到Python 3:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

Note, the Thread class is implemented differently in Python 3.

注意,在python3中,线程类的实现方式不同。

#7


6  

My solution to the problem is to wrap the function and thread in a class. Does not require using pools,queues, or c type variable passing. It is also non blocking. You check status instead. See example of how to use it at end of code.

我解决这个问题的方法是在一个类中包装函数和线程。不需要使用池、队列或c类型变量传递。它也是非阻塞的。你检查状态。参见如何在代码末尾使用它的示例。

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()

#8


3  

Using Queue :

使用队列:

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())

#9


2  

You can use Pool as a pool of worker processes as below:

您可以将池作为工作流程的池,如下所示:

from multiprocessing import Pool


def f1(x, y):
    return x*y


if __name__ == '__main__':
    with Pool(processes=10) as pool:
        result = pool.apply(f1, (2, 3))
        print(result)

#10


1  

join always return None, i think you should subclass Thread to handle return codes and so.

join总是返回None,我认为您应该子类化线程来处理返回代码。

#11


1  

You can define a mutable above the scope of the threaded function, and add the result to that. (I also modified the code to be python3 compatible)

您可以在线程函数的范围内定义一个可变的,并将结果添加到该函数中。(我还将代码修改为python3兼容)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

This returns {'world!': 'foo'}

这将返回{的世界!”:“foo”}

If you use the function input as the key to your results dict, every unique input is guaranteed to give an entry in the results

如果您使用函数输入作为结果的关键,那么每个唯一的输入都保证在结果中给出一个条目。

#12


1  

Taking into consideration @iman comment on @JakeBiesinger answer I have recomposed it to have various number of threads:

考虑到@iman评论@JakeBiesinger的回答,我重新组合了它,它有不同数量的线程:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()

Cheers,

欢呼,

Guy.

的家伙。

#13


0  

Define your target to
1) take an argument q
2) replace any statements return foo with q.put(foo); return

定义你的目标到1)使用参数q 2)替换任何用q.put(foo)返回的语句;返回

so a function

所以一个函数

def func(a):
    ans = a * a
    return ans

would become

将成为

def func(a, q):
    ans = a * a
    q.put(ans)
    return

and then you would proceed as such

然后你就这样继续下去。

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

And you can use function decorators/wrappers to make it so you can use your existing functions as target without modifying them, but follow this basic scheme.

您可以使用函数decorator /wrappers来实现它,这样您就可以在不修改它们的情况下使用现有的功能作为目标,但是要遵循这个基本的方案。

#14


0  

I'm using this wrapper, which comfortably turns any function for running in a Thread - taking care of its return value or exception. It doesn't add Queue overhead.

我正在使用这个包装器,它可以轻松地将运行在线程中的任何函数转换为处理它的返回值或异常。它不添加队列开销。

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

Usage Examples

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

Notes on threading module

Comfortable return value & exception handling of a threaded function is a frequent "Pythonic" need and should indeed already be offered by the threading module - possibly directly in the standard Thread class. ThreadPool has way too much overhead for simple tasks - 3 managing threads, lots of bureaucracy. Unfortunately Thread's layout was copied from Java originally - which you see e.g. from the still useless 1st (!) constructor parameter group.

舒适的返回值和线程函数的异常处理是一个常见的“python”需求,而且应该已经由线程模块提供了——可能直接在标准线程类中。对于简单的任务,ThreadPool的开销太大了——3个管理线程,大量的官僚程序。不幸的是,线程的布局是从Java中复制过来的——您可以从仍然没用的1(!)构造函数参数组中看到。

#15


0  

One usual solution is to wrap your function foo with a decorator like

通常的解决方案是将函数foo与decorator类似。

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

Then the whole code may looks like that

然后整个代码看起来是这样的。

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

Note

One important issue is that the return values may be unorderred. (In fact, the return value is not necessarily saved to the queue, since you can choose arbitrary thread-safe data structure )

一个重要的问题是返回值可能是无序的。(实际上,返回值不一定保存到队列中,因为您可以选择任意的线程安全数据结构)

#16


0  

Why don't just use global variable?

为什么不直接使用全局变量呢?

import threading


class myThread(threading.Thread):
    def __init__(self, ind, lock):
        threading.Thread.__init__(self)
        self.ind = ind
        self.lock = lock

    def run(self):
        global results
        with self.lock:
            results.append(self.ind)



results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(results)

#17


0  

Very simple way to get this done for such dummies like me:

很简单的方法让这样的傻瓜像我一样:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self):
        # in this class and function we will put our test target function
        test()

t = AnyThread()

# having our test target function
def test():
    # do something in this function:
    result = 3 + 2
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run()
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 5
>>> 5

main thing here - is queue module. We create queue.Queue() instance and include it in our function. We feed it with our result which later we get beyond the thread.

这里的主要内容是队列模块。我们创建queue.Queue()实例,并将其包含在我们的函数中。我们用我们的结果来喂养它,之后我们就越过了线程。

Please see one more example with arguments passed to our test function:

请再看一个带有参数传递给我们测试函数的例子:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self, a, b):
        # in this class and function we will put our execution test function
        test(a, b)

t = AnyThread()

# having our test target function
def test(a, b):
    # do something in this function:
    result = a + b
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run(3+i, 2+i)
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 7
>>> 9

#18


0  

As mentioned multiprocessing pool is much slower than basic threading. Using queues as proposeded in some answers here is a very effective alternative. I have use it with dictionaries in order to be able run a lot of small threads and recuperate multiple answers by combining them with dictionaries:

正如前面提到的,多处理池比基本线程要慢得多。在一些答案中使用队列是一个非常有效的选择。我在字典中使用它,以便能够运行许多小的线程,并通过将它们与字典相结合来恢复多个答案:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)

#1


173  

FWIW, the multiprocessing module has a nice interface for this using the Pool class. And if you want to stick with threads rather than processes, you can just use the multiprocessing.pool.ThreadPool class as a drop-in replacement.

FWIW,多处理模块使用池类有一个很好的接口。如果您想要使用线程而不是进程,您可以只使用多进程。ThreadPool类作为插入替换。

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.

#2


125  

One way I've seen is to pass a mutable object, such as a list or a dictionary, to the thread's constructor, along with a an index or other identifier of some sort. The thread can then store its results in its dedicated slot in that object. For example:

我看到的一种方法是将一个可变对象(如列表或字典)传递给线程的构造函数,以及一个索引或某种其他标识符。然后线程可以将其结果存储在该对象的专用插槽中。例如:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

If you really want join() to return the return value of the called function, you can do this with a Thread subclass like the following:

如果您确实希望join()返回被调用函数的返回值,那么可以使用如下所示的线程子类来完成此操作:

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

That gets a little hairy because of some name mangling, and it accesses "private" data structures that are specific to Thread implementation... but it works.

这有点麻烦,因为它的名称是“mangling”,它访问特定于线程实现的“私有”数据结构。但它的工作原理。

#3


43  

Jake's answer is good, but if you don't want to use a threadpool (you don't know how many threads you'll need, but create them as needed) then a good way to transmit information between threads is the built-in Queue.Queue class, as it offers thread safety.

Jake的回答很好,但是如果您不想使用threadpool(您不知道您需要多少线程,但是根据需要创建它们),那么在线程之间传输信息的一个好方法就是内置队列。队列类,因为它提供线程安全性。

I created the following decorator to make it act in a similar fashion to the threadpool:

我创建了下面的decorator,以使它以类似于threadpool的方式运行:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

Then you just use it as:

然后你可以把它当作:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

The decorated function creates a new thread each time it's called and returns a Thread object that contains the queue that will receive the result.

修饰函数每次调用时都会创建一个新线程,并返回一个包含将接收结果的队列的线程对象。

#4


14  

Another solution that doesn't require changing your existing code:

另一种不需要更改现有代码的解决方案:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result

It can be also easily adjusted to a multi-threaded environment:

它还可以很容易地适应多线程环境:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()
threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Join all the threads
for t in threads_list:
    t.join()

# Check thread's return value
while not que.empty():
    result = que.get()
    print result

#5


12  

I stole kindall's answer and cleaned it up just a little bit.

我偷了kindall的答案,把它清理了一下。

The key part is adding *args and **kwargs to join() in order to handle the timeout

关键部分是添加*args和**kwargs来连接()以处理超时。

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)

        self._return = None

    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)

    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)

        return self._return

#6


11  

Parris / kindall's answer join/return answer ported to Python 3:

Parris / kindall的答案加入/返回的答案移植到Python 3:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

Note, the Thread class is implemented differently in Python 3.

注意,在python3中,线程类的实现方式不同。

#7


6  

My solution to the problem is to wrap the function and thread in a class. Does not require using pools,queues, or c type variable passing. It is also non blocking. You check status instead. See example of how to use it at end of code.

我解决这个问题的方法是在一个类中包装函数和线程。不需要使用池、队列或c类型变量传递。它也是非阻塞的。你检查状态。参见如何在代码末尾使用它的示例。

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()

#8


3  

Using Queue :

使用队列:

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())

#9


2  

You can use Pool as a pool of worker processes as below:

您可以将池作为工作流程的池,如下所示:

from multiprocessing import Pool


def f1(x, y):
    return x*y


if __name__ == '__main__':
    with Pool(processes=10) as pool:
        result = pool.apply(f1, (2, 3))
        print(result)

#10


1  

join always return None, i think you should subclass Thread to handle return codes and so.

join总是返回None,我认为您应该子类化线程来处理返回代码。

#11


1  

You can define a mutable above the scope of the threaded function, and add the result to that. (I also modified the code to be python3 compatible)

您可以在线程函数的范围内定义一个可变的,并将结果添加到该函数中。(我还将代码修改为python3兼容)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

This returns {'world!': 'foo'}

这将返回{的世界!”:“foo”}

If you use the function input as the key to your results dict, every unique input is guaranteed to give an entry in the results

如果您使用函数输入作为结果的关键,那么每个唯一的输入都保证在结果中给出一个条目。

#12


1  

Taking into consideration @iman comment on @JakeBiesinger answer I have recomposed it to have various number of threads:

考虑到@iman评论@JakeBiesinger的回答,我重新组合了它,它有不同数量的线程:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()

Cheers,

欢呼,

Guy.

的家伙。

#13


0  

Define your target to
1) take an argument q
2) replace any statements return foo with q.put(foo); return

定义你的目标到1)使用参数q 2)替换任何用q.put(foo)返回的语句;返回

so a function

所以一个函数

def func(a):
    ans = a * a
    return ans

would become

将成为

def func(a, q):
    ans = a * a
    q.put(ans)
    return

and then you would proceed as such

然后你就这样继续下去。

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

And you can use function decorators/wrappers to make it so you can use your existing functions as target without modifying them, but follow this basic scheme.

您可以使用函数decorator /wrappers来实现它,这样您就可以在不修改它们的情况下使用现有的功能作为目标,但是要遵循这个基本的方案。

#14


0  

I'm using this wrapper, which comfortably turns any function for running in a Thread - taking care of its return value or exception. It doesn't add Queue overhead.

我正在使用这个包装器,它可以轻松地将运行在线程中的任何函数转换为处理它的返回值或异常。它不添加队列开销。

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

Usage Examples

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

Notes on threading module

Comfortable return value & exception handling of a threaded function is a frequent "Pythonic" need and should indeed already be offered by the threading module - possibly directly in the standard Thread class. ThreadPool has way too much overhead for simple tasks - 3 managing threads, lots of bureaucracy. Unfortunately Thread's layout was copied from Java originally - which you see e.g. from the still useless 1st (!) constructor parameter group.

舒适的返回值和线程函数的异常处理是一个常见的“python”需求,而且应该已经由线程模块提供了——可能直接在标准线程类中。对于简单的任务,ThreadPool的开销太大了——3个管理线程,大量的官僚程序。不幸的是,线程的布局是从Java中复制过来的——您可以从仍然没用的1(!)构造函数参数组中看到。

#15


0  

One usual solution is to wrap your function foo with a decorator like

通常的解决方案是将函数foo与decorator类似。

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

Then the whole code may looks like that

然后整个代码看起来是这样的。

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

Note

One important issue is that the return values may be unorderred. (In fact, the return value is not necessarily saved to the queue, since you can choose arbitrary thread-safe data structure )

一个重要的问题是返回值可能是无序的。(实际上,返回值不一定保存到队列中,因为您可以选择任意的线程安全数据结构)

#16


0  

Why don't just use global variable?

为什么不直接使用全局变量呢?

import threading


class myThread(threading.Thread):
    def __init__(self, ind, lock):
        threading.Thread.__init__(self)
        self.ind = ind
        self.lock = lock

    def run(self):
        global results
        with self.lock:
            results.append(self.ind)



results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(results)

#17


0  

Very simple way to get this done for such dummies like me:

很简单的方法让这样的傻瓜像我一样:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self):
        # in this class and function we will put our test target function
        test()

t = AnyThread()

# having our test target function
def test():
    # do something in this function:
    result = 3 + 2
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run()
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 5
>>> 5

main thing here - is queue module. We create queue.Queue() instance and include it in our function. We feed it with our result which later we get beyond the thread.

这里的主要内容是队列模块。我们创建queue.Queue()实例,并将其包含在我们的函数中。我们用我们的结果来喂养它,之后我们就越过了线程。

Please see one more example with arguments passed to our test function:

请再看一个带有参数传递给我们测试函数的例子:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self, a, b):
        # in this class and function we will put our execution test function
        test(a, b)

t = AnyThread()

# having our test target function
def test(a, b):
    # do something in this function:
    result = a + b
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run(3+i, 2+i)
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 7
>>> 9

#18


0  

As mentioned multiprocessing pool is much slower than basic threading. Using queues as proposeded in some answers here is a very effective alternative. I have use it with dictionaries in order to be able run a lot of small threads and recuperate multiple answers by combining them with dictionaries:

正如前面提到的,多处理池比基本线程要慢得多。在一些答案中使用队列是一个非常有效的选择。我在字典中使用它,以便能够运行许多小的线程,并通过将它们与字典相结合来恢复多个答案:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)