python多进程

时间:2023-11-25 14:06:50

一、多进程池

from multiprocessing import Pool
import time
pool = Pool(processes=3)
result=[];lr=range(t);ans=['']*t
for ti in range(t):
result.append(pool.apply_async(solve, (n,a,b)))
## pool.close()
while lr:
for i in lr:
if result[i].ready():
ans[i]="%s"%result[i].get()
print ans[i]
lr.remove(i)
time.sleep(.1)
## pool.join()
for ai in ans:
fo.write(ai+'\n')
    para=[]
for ti in range(t):
para.append((n,a,b))
pool = Pool(processes=3)
result=pool.map_async(msolve,para)
res=result.get()
for ti in range(t):
ans="%s"%(res[ti])
print ans
fo.write(ans+'\n') def msolve(p):
r=solve(*p)
return r
from multiprocessing import Pool
#from multiprocessing.dummy import Pool
p=Pool(4)
result=map(solve,para)
p.close()
p.join()

二、concurrent.future

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor
pool = ProcessPoolExecutor(max_workers=2)
#pool = ThreadPoolExecutor(max_workers=2)
results = list(pool.map(func, [para1,para2,...])) import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(func, item) for item in number_list]
for future in concurrent.futures.as_completed(futures):
print(future.result())

Python 3.2+
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):wait等待fs里面所有的Future实例(由不同的Executors实例创建的)完成。返回两个命名元祖,第一个元祖名为done,存放完成的futures对象,第二个元祖名为not_done,存放未完成的futures。return_when参数必须是concurrent.futures里面定义的常量:FIRST_COMPLETED,FIRST_EXCEPTION,ALL_COMPLETED

concurrent.futures.as_completed(fs, timeout=None):返回一个迭代器,yield那些完成的futures对象。fs里面有重复的也只可能返回一次。任何futures在调用as_completed()调用之前完成首先被yield。

Future对象:
Future()封装了可调用对象的异步执行。Future实例可以被Executor.submit()方法创建。
cancel():尝试去取消调用。如果调用当前正在执行,不能被取消。这个方法将返回False,否则调用将会被取消,方法将返回True
cancelled():如果调用被成功取消返回True
running():如果当前正在被执行不能被取消返回True
done():如果调用被成功取消或者完成running返回True
result(Timeout = None):拿到调用返回的结果。如果没有执行完毕就会去等待
exception(timeout=None):捕获程序执行过程中的异常
add_done_callback(fn):将fn绑定到future对象上。当future对象被取消或完成运行时,fn函数将会被调用
以下的方法是在unitest中
set_running_or_notify_cancel()
set_result(result)
set_exception(exception)

Executor对象:
提交任务方式一:submit(fn, *args, **kwargs):调度函数fn(*args **kwargs)返回一个Future对象代表调用的执行。
提交任务方式二:map(func, *iterables, timeout=None, chunksize=1):和map(func, *iterables)相似。但是该map方法的执行是异步的。多个func的调用可以同时执行。当Executor对象是 ProcessPoolExecutor,才可以使用chunksize,将iterable对象切成块,将其作为分开的任务提交给pool,默认为1。对于很大的iterables,设置较大chunksize可以提高性能。
shutdown(wait=True):给executor发信号,使其释放资源,当futures完成执行时。使用with语句,就可以避免必须调用本函数

Exception类
exception concurrent.futures.CancelledError
exception concurrent.futures.TimeoutError
exception concurrent.futures.process.BrokenProcessPool