Process
创建子进程执行指定的函数
>>> from multiprocessing import Process,current_process
>>>
>>> def test(*args,**kwargs):
... p = current_process()
... print p.name,p.pid
... print args
... print kwargs
...
>>>
>>> p = Process(target=test,args=(1,2),kwargs={"a":"hello"},name="TEST")
>>> p.start();p.join();
TEST 24796
(1, 2)
{'a': 'hello'}
帮助文档:
class Process(__builtin__.object)
| Process objects represent activity that is run in a separate process
|
| The class is analagous to `threading.Thread`
|
| Methods defined here:
|
| __init__(self, group=None, target=None, name=None, args=(), kwargs={})
|
| __repr__(self)
|
| is_alive(self)
| Return whether process is alive
|
| join(self, timeout=None)
| Wait until child process terminates
|
| run(self)
| Method to be run in sub-process; can be overridden in sub-class
|
| start(self)
| Start child process
|
| terminate(self)
| Terminate process; sends SIGTERM signal or uses TerminateProcess()
[root@typhoeus79 20131104]# more myprocess.py
#!/usr/bin/env python26
#-*- coding:utf-8 -*-
import os
from multiprocessing import Process,current_process class MyProcess(Process):
def __init__(self):
print "init:",os.getpid()//还是父进程
super(MyProcess,self).__init__() def run(self):
print "run:",os.getpid()//子进程 if __name__ == '__main__':
print "parent:",os.getpid()
p = MyProcess()
p.start()
p.join() [root@typhoeus79 20131104]# ./myprocess.py
parent: 17213
init: 17213
run: 17216
子进程不会调用退出函数,而且只有后台(daemon)进程才可捕获主进程退出信号,默认处理自然是终止子进程。另外,后台进程不能创建新的子进程,这将导致僵尸出现。
[root@typhoeus79 20131104]# more myprocess2.py
#!/usr/bin/env python26
#-*- coding:utf-8 -*- import os
from time import sleep
from signal import signal,SIGTERM
from multiprocessing import Process def test():
def handler(signum,frame):
print "chid exit.",os.getpid()
exit(0) signal(SIGTERM,handler)
print "child start:",os.getpid() while True:
print "sleeping..."
sleep(1) if __name__ == "__main__":
p = Process(target = test)
p.daemon = True //必须明确指定,说明该子进程是个后台进程,且必须在start()前设置,否则子进程会一直打印sleeping...
p.start() sleep(2)//给点时间让子进程进入"状态"
print "parent exit." [root@typhoeus79 20131104]# ./myprocess2.py
child start: 22402
sleeping...
sleeping...
parent exit.
chid exit. 22402
调用terminate()会立即强制终止子进程(不会执行任何清理操作)。有关状态还有:is_alive()、pid、exitcode
Pool
进程池。用多个可重复使用的后台daemon进程执行函数,默认数量和CPU核相等。
[root@typhoeus79 20131104]# more process_pool.py
#!/usr/bin/env python26
#-*- coding:utf-8 -*-
from multiprocessing import Pool def test(*args,**kwargs):
print args
print kwargs
return 123 if __name__ == "__main__":
pool = Pool()
print pool.apply(test,range(3),dict(a=1,b=2)) pool.terminate()
pool.join()
[root@typhoeus79 20131104]# ./process_pool.py
(0, 1, 2)
{'a': 1, 'b': 2}
123
调用join()等待所有工作进程结束前,必须确保用close()或terminate()关闭进程池。close()阻止提交新任务,通知工作进程在完成全部任务后结束。该方法立即返回,不会阻塞等待。
使用异步模型时,callback是可选的。
[root@typhoeus79 20131104]# more callback.py
#!/usr/bin/env python26
#-*- coding:utf8 -*- from multiprocessing import Pool
from time import sleep def test(*args,**kwargs):
print "in testing"
print "sleeping..."
sleep(2) print "test returning..."
return 123 def callback(ret): print "callbacking..."
sleep(2)
print "return:",ret if __name__ == "__main__":
pool = Pool()
pool.apply_async(test,callback=callback) print "pooling..."
print ar = pool.apply_async(test)//apply_async返回AsyncResult实例 print
print ar.get() //get([timeout])、wait()、successful()等方法可获知任务执行状态和结果 pool.close()
pool.join()
[root@typhoeus79 20131104]# ./callback.py
pooling... in testing
sleeping... in testing
sleeping...
test returning...
test returning...
callbacking...
return: 123
123
get()第一次没有获取到,后第二次获取。
map()和imap()用于批量执行,分别返回列表和迭代器结果。
[root@typhoeus79 20131104]# more process_map.py
#!/usr/bin/env python26
#-*- coding:utf-8 -*- from multiprocessing import Pool,current_process def test(x):
print current_process().pid, x//获取当前进程的pid,是current_process()的属性
return x + 100 def test2(s):
print current_process().pid, s if __name__ == "__main__":
p = Pool(3) print p.map(test,xrange(5))
p.map(test2,"abc")
[root@typhoeus79 20131104]# ./process_map.py
5402 0
5403 1
5402 3
5402 4
5404 2
[100, 101, 102, 103, 104]
5402 a
5402 b
5402 c
从上面可以看到只有三个进程号
参数chunksize指定数据分块大小,如果待处理数据量很大,建议调高该参数。
if __name__ == "__main__":
p = Pool(5) print p.map(test,xrange(10),chunksize=2)
p.map(test2,"abc")
输出结果:
6796 0
6796 1
6797 2
6797 3
6798 4
6798 5
6797 8
6799 6
6797 9
6799 7
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
6796 b
6800 a
6798 c
Queue
Queue是最常用的数据交换方法。参数maxsize限制队列中的数据项数量,这会影响get/put等阻塞操作。默认值无限制。
通常直接使用JoinableQueue,其内部使用Semaphore进行协调。在执行put()、task_done()时调整信号量计数器。当task_done()发现计数值等于0,立即通知join()解决阻塞。
[root@typhoeus79 20131104]# more test_queue.py
#!/usr/bin/env python26
#-*- coding:utf-8 -*- from Queue import Empty
from multiprocessing import Process,current_process,JoinableQueue def test(q):
pid = current_process().pid while True:
try:
d = q.get(timeout=2) #阻塞+超时。照顾生产着以及生产情形 print pid,d
q.task_done()
except Empty:
print pid,"empty!"
break if __name__ == "__main__":
q = JoinableQueue(maxsize=1000) map(q.put,range(5)) #未超出队列容量限制,不会阻塞
print "put over!" for i in range(3): #创建多个consumer
Process(target=test,args=(q,)).start() q.join() #等待任务完成
print "task done"
[root@typhoeus79 20131104]# ./test_queue.py
put over!
16768 0
16768 1
16768 2
16768 3
16768 4
task done
16770 empty!
16769 empty!
16768 empty!