多进程的数据共享

时间:2022-10-11 16:43:46

老师的博客:http://www.cnblogs.com/Eva-J/articles/8253549.html#_label14

Pipe

pipe是管道但是不是很推荐使用,因为有着不安全的危险,queue就相当于pipe加上lock,比较安全,但是的注意他们的close的时间,详见python3中的day38的笔记

下面是老师的代码

from multiprocessing import Lock,Pipe,Process
def producer(con,pro,name,food):
    con.close()
    for i in range(100):
        f = '%s生产%s%s'%(name,food,i)
        print(f)
        pro.send(f)
    pro.send(None)
    pro.send(None)
    pro.send(None)
    pro.close()

def consumer(con,pro,name,lock):
    pro.close()
    while True:
            lock.acquire()
            food = con.recv()
            lock.release()
            if food is None:
                con.close()
                break
            print('%s吃了%s' % (name, food))
if __name__ == '__main__':
    con,pro = Pipe()
    lock= Lock()
    p = Process(target=producer,args=(con,pro,'egon','泔水'))
    c1 = Process(target=consumer, args=(con, pro, 'alex',lock))
    c2 = Process(target=consumer, args=(con, pro, 'bossjin',lock))
    c3 = Process(target=consumer, args=(con, pro, 'wusir',lock))
    c1.start()
    c2.start()
    c3.start()
    p.start()
    con.close()
    pro.close()
# pipe 数据不安全性
# IPC
# 加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象

# 队列 进程之间数据安全的
# 管道 + 锁

 

manager

老师的代码

from multiprocessing import Manager,Process,Lock
def main(dic,lock):
    dic['count'] -= 1

if __name__ == '__main__':
    m = Manager()
    l = Lock()
    dic=m.dict({'count':100})
    p_lst = []
    for i in range(50):
        p = Process(target=main,args=(dic,l))
        p.start()
        p_lst.append(p)
    for i in p_lst: i.join()
    print('主进程',dic)

 

运行几次后,你会发现得到的结果不一样,所以也存在数据不安全的现象,所以一般推荐使用queue比较安全,queue,pipe都是可是实现数据通信的。

数据池pool

参数介绍:

Pool([numprocess [,initializer [, initargs]]]):创建进程池

1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值

2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None

3 initargs:是要传给initializer的参数组

1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
2 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,
必须从不同线程调用p.apply()函数或者使用p.apply_async()
''' 3 4 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 5 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,
将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
''' 6 7 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成 8 9 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

 

1 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
2 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
3 obj.ready():如果调用完成,返回True
4 obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
5 obj.wait([timeout]):等待结果变为可用。
6 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

 

进程池的效率要高许多,详见老师的博客

多进程的数据共享多进程的数据共享
import time
from multiprocessing import Pool,Process
def func(n):
    n=n
    for i in range(10):
        n+=i
    print(n)
if __name__ == '__main__':
    start = time.time()
    pool = Pool(5)               # 5个进程
    pool.map(func,range(100))    # 100个任务
    t1 = time.time() - start

    start = time.time()
    p_lst = []
    for i in range(100):#100个任务
        p = Process(target=func,args=(i,))
        p_lst.append(p)
        p.start()
    for p in p_lst :p.join()
    t2 = time.time() - start
    print(t1,t2)
测试代码

 

进程池的三种调用方式:

多进程的数据共享多进程的数据共享
def fun(name):
    for i in range(10):

        print('my name is %s'%name,getpid())
if __name__=='__main__':
    pool=Pool(3)
    pool.map(fun,(1,2,3,4,5,6,7,8,9,10))
    print('end')
    '''你可以发现进程号最多最多只有3个
    ,你可以看end的输出始终一直在最后面)
    map(函数名,加上一个可迭代的对象)'''
第一种

 

多进程的数据共享多进程的数据共享
import os
import time
from multiprocessing import Pool
def func(n):
    print('start func%s'%n,os.getpid())
    time.sleep(1)
    print('end func%s' % n,os.getpid())

if __name__ == '__main__':
    p = Pool(5)
    for i in range(10):
        p.apply_async(func,args=(i,))
    p.close()  # 结束进程池接收任务
    p.join()   # 感知进程池中的任务执行结束
'''注意此种调用方法时真真的异步,如果你不加入jion的的话你连打印的值就不可能显示出来
这个方法时一个一个的调,只是进程在那几个进程池中而已,而且传参的方式也不一样,前面一个是位置参数,后面一个是
第二个是默认参数须要=,具体自己看源码'''
第二种

 

多进程的数据共享多进程的数据共享
import os
import time
from multiprocessing import Pool
def func(n):
    print('start func%s'%n,os.getpid())
    time.sleep(1)
    print('end func%s' % n,os.getpid())

if __name__ == '__main__':
    p = Pool(5)
    for i in range(10):
        p.apply(func,args=(i,))
    p.close()  # 结束进程池接收任务,及不能再想进池中加入显得代码,就是不能调用p了
    p.apply(func,args=(10000,))
'''注意此种调用方法是同步的
这个方法时一个一个的调,只是进程在那几个进程池中而已,而且传参的方式也不一样,前面一个是位置参数,后面一个是
第二个是默认参数须要=,具体自己看源码
其实apply_async的形式是一样的
不用加join'''
第三种
多进程的数据共享多进程的数据共享
import time
from multiprocessing import Pool
def func(name):
    print('你传递的参数是%s'%name)
    return '你的返回值'
if __name__=='__main__':
    p=Pool(2)

    a=p.apply(func,args=('alex',))
    b=p.apply_async(func, args=('jin',))
    c = p.map(func, ('alex', 1))
    time.sleep(1)
    print(a)
    print(b)
    print(c,type(c))
    # print(c)
'''输出:你传递的参数是alex
你传递的参数是jin
你传递的参数是alex
你传递的参数是1
你的返回值
<multiprocessing.pool.ApplyResult object at 0x0000000002BE8198>
['你的返回值', '你的返回值'] <class 'list'>
说明了apply是有返回值的
而apply_async这是一种方法
map由于其特殊性,返回值所有函数返回值组成的list'''
返回值

 

总结一下:

1.三种放方法的调用方式不同,map的调用时的参数是位置参数,必须传,第一个是函数名字,而且第二个是可迭代的对象,调用的是一群函数,返回值是list。

2.apply是第一个是位置参数,第二个是默认参数了,调用的是一个函数,而且有返回值

3.apply_asnyc与applyl一样的调用,也只能调用一个函数

4.比较:apply与map是同步的。不需要添加join

  而apply_async如果主进程的时间太快则不会打印其内容的

5.p.close就是不能再往里里面添加新的进程了,pool里面的代码运行完毕后就结束了

更正一下:在apply_async是能拿到返回值的,但是需要.get()来调用的

map也是异步的,当时自动的添加了join,阻塞了,所以显示觉得是同步的。

另外,如果想要apply-async到达阻塞的效果,有两种放法,一种是通过get获取返回值,另外一种是先close在join也能达到前面的效果,但是再也不能往池子里加入代码了

回调函数

from multiprocessing import Pool
from time import sleep
def func(name):
    return name
def func2(age):
    print('my name is %s'%age)
    return 'i am this func2’s retrun'
if __name__=='__main__':
    p=Pool(5)
    a= p.apply_async(func,args=('alex',),callback=func2)
    # b = p.apply(func, args=('alex',), callback=func2)
    p.close()
    p.join()
    print(a)
    # print(b)
'''输出结果;my name is alex
<multiprocessing.pool.ApplyResult object at 0x0000000002BE35F8>
只有apply_aspnc才有回调函数,而且就是一样接受不到返回值,只是方法
原理是前面的函数的返回值带入callback函数的参数'''
多进程的数据共享多进程的数据共享
from multiprocessing import Pool
def func1(n):
    return n+1

def func2(m):
    print(m)

if __name__ == '__main__':
    p = Pool(5)
    for i in  range(10,20):
        a=p.apply_async(func1,args=(i,),callback=func2)
        a.get()
#看一下这个是通过,get来达到依次执行的效果的
View Code

 

 下面是老师的总结,可以看一下

# 管道
# 数据的共享 Manager dict list
# 进程池
    # cpu个数+1
    # ret = map(func,iterable)
        # 异步 自带close和join
        # 所有结果的[]
    # apply
        # 同步的:只有当func执行完之后,才会继续向下执行其他代码
        # ret = apply(func,args=())
        # 返回值就是func的return
    # apply_async
        # 异步的:当func被注册进入一个进程之后,程序就继续向下执行
        # apply_async(func,args=())
        # 返回值 : apply_async返回的对象obj
        #          为了用户能从中获取func的返回值obj.get()
        # get会阻塞直到对应的func执行完毕拿到结果
        # 使用apply_async给进程池分配任务,
        # 需要先close后join来保持多进程和主进程代码的同步性