网络编程——进程池

时间:2022-12-15 16:24:58

管道

(1)

from multiprocessing import Process, Pipe
def func(con2):
    msg = con2.recv()
    print('>>>>>>',msg)

if __name__ == '__main__':
    con1,con2 = Pipe()
    p = Process(target=func,args=(con2,))
    p.start()
    con1.send(' 小鬼 !!!')

(2)

def func (con1, con2):
    try :
        msg = con2.recv() # 如果主进程没有发送消息,recv()就会阻塞住
        con2.send(' 哈哈哈 ')
        print('>>>>>>', msg)
        # 如果管道一端关闭了 , 那么另外一端在接收消息的时候会报错
        msg2 = con2.recv()
    except EOFError:
        print(' 对方管道已关闭 ')
        con2.close()
if __name__ == '__main__':
    con1, con2 = Pipe()
    p = Process(target=func, args=(con1, con2,))
    p.start()
    con1.send(' 小鬼 !!!')
    con1.close()
    # try:
        # mag = con1.recv() # OSError: handle is closed
        # print('>>>>>>>>>',mag)
    # except OSError:
        # print(' 该管道已经关闭 ')
# 父进程中 con1 发消息 , 子进程必须是由 con2 接收
# 父进程中 con2 发消息 , 子进程必须是由 con1 接收
# 父进程中 con1 收消息 , 子进程必须是由 con2 发送
# 父进程中 con2 发消息 , 子进程必须是由 con1 发送

 

数据共享

(1)共享数据

from multiprocessing import Process, Manager
def func (num):
    num[' 姓名 '] = ' 吕三儿 ' # 资源共享 , 在子进程中修改数据 , 父进程中得到的也是修改过后的
if __name__ == '__main__':
    m = Manager()
    num = m.dict({' 姓名 ': ' 刘二 '}) # 可以是列表 , 字典 , 同步锁 , 递归锁 , 命名空间 ,     事件 , 信号量 , 队列等等
    p = Process(target=func, args=(num,))
    p.start()
    p.join()
    print(' 父进程中的 num 的值 ', num)
    

 

(2)数据共享是不安全的,需要加锁 

from multiprocessing import Process, Manager, Lock
def func (dic, l):
    # l.acquire()
    # dic['count'] -= 1
    # l.release()
    with l: # 自动加锁 , 等同于上面的代码
    dic['count'] -= 1
if __name__ == '__main__':
    l = Lock()
    m = Manager()
    lst = []
    dic = m.dict({'count': 100})
    for i in range(20):
        p = Process(target=func, args=(dic, l))
        p.start()
        lst.append(p)
    for e in lst:
        e.join()
    print(' 主进程 ', dic)

 

三.进程池(重点)

(1)map(func,可迭代对象) 

from multiprocessing import Pool
import time
def func (n):
    time.sleep(1)
    print(n)
if __name__ == '__main__':
    p = Pool(4)
    p.map(func, range(100))

 

(2)进程池和多进程的效率对比 

from multiprocessing import Pool, Process
import time


def func (n):
    for i in range(5):
    n = n + i


if __name__ == '__main__':
    s1 = time.time()
    p = Pool(4)
    p.map(func, range(100))
    print(' 进程池 ', time.time() - s1)
    s2 = time.time()
    lst = []
        
    for i in range(100):
        p = Process(target=func, args=(i,))
        p.start()
        lst.append(p)
    for e in lst:
        e.join()
    print(' 多进程 ', time.time() - s2
    

(3)同步执行

from multiprocessing import Pool
import time

def func (i):
    time.sleep(0.5)
    return i * i

if __name__ == '__main__':
    p = Pool(4)
    for i in range(100):
        res = p.apply(func, args=(i,)) # 同步执行 , 第一个是要执行的函数 , 第二个是可迭代对象型的参数 , 给任务函数传的参数
        print(res) # 会等待你的任务返回结果

(4)异步执行

from multiprocessing import Pool
import time

def func (i):
    time.sleep(0.5)
    return i * i

if __name__ == '__main__':
    p = Pool(4)
    lst = []
    for i in range(100):
        res = p.apply_async(func, args=(i,)) 
# 异步执行 , 第一个是要执行的函数 , 第二个是可迭代对象型的参数 , 给任务函数传的参数 lst.append(res)
# res 拿到的是对象 , 需要通过 get 方法拿到对象的值 , 而直接用 get 方法的话 ,get 只能一个一个拿去数据,所以会出现阻塞的状态for e in lst: # 将对象添加到列表里面 , 通过遍历列表 , 可以进行拿值操作 print(e.get())

(5)异步中的具体注意事项(closs和join)

from multiprocessing import Pool
import time

def func (i):
    time.sleep(0.5)
    print(i * i)

if __name__ == '__main__':
    p = Pool(4)
    for i in range(100):
        res = p.apply_async(func, args=(i,)) 
# 异步执行 , 第一个是要执行的函数 , 第二个是可迭代对象型的参数 , 给任务函数传的参数 p.close() # 不是关闭进程池 , 而是不允许再有其他任务进来 p.join() # 感知进程池中任务的方法 , 等待进程池中的所有进程执行完毕 print(' 主进程结束 ')

(6)异步中的回调函数的使用 

from multiprocessing import Pool
import time, os

def func1 (i):
    print('func1 的 pid', os.getpid())
    time.sleep(0.5)
    return i * i
def func2 (n):
    print('func2 的 pid', os.getpid()) # 回调函数是由主进程调用的 , 不是子进程
    print('func2', n)
if __name__ == '__main__':
    print(' 主进程的 pid', os.getpid())
    p = Pool(4)
    for i in range(100):
        res = p.apply_async(func1, args=(i,), # 异步执行 , 第一个是要执行的函数 , 第二个是可迭代对象型的参数 , 给任务函
数传的参数 ,
        callback=func2) # 第三个是回调函数 , 负责接收子进程的运行结果 , 可以为其再进行操作
    p.close() # 不是关闭进程池 , 而是不允许再有其他任务进来
    p.join() # 感知进程池中任务的方法 , 等待进程池中的所有进程执行完毕
    print(' 主进程结束 ')

(7)异步中的close,join和get方法的运用 

from multiprocessing import Pool
import time

def func (i):
    time.sleep(0.5)
    print(i)
    return i * i

if __name__ == '__main__':
    p = Pool(4)
    lst = []
    for i in range(10):
        res = p.apply_async(func, args=(i,)) # 异步执行 , 第一个是要执行的函数 , 第二个是可迭代对象型的参数 , 给任务函数
传的参数
        lst.append(res) # res 拿到的是对象 , 需要通过 get 方法拿到对象的值 , 而直接用 get 方法的话 ,get 只能一个一个拿去数
据 , 所以会出现阻塞的状态
        p.close() # 不是关闭进程池 , 而是不允许再有其他任务进来
        p.join() # 感知进程池中任务的方法 , 等待进程池中的所有进程执行完毕
        for e in lst: # 将对象添加到列表里面 , 通过遍历列表 , 可以进行拿值操作 , 当前面加了 close 和
join 的时候 , 进程池里的所
有任务都已经执行完毕 , 所以结果就直接全部打印出来
            print(e.get())

四,初始线程

 

1.什么是线程?

  指的是一条流水线的工作过程,关键的一句话:一个进程内最少自带一个线程,其实进程根本不能执
行,进程不是执行单位,是资源的单位,分配资源的单位
  线程才是执行单位
  进程:做手机屏幕的工作过程,刚才讲的
  我们的py文件在执行的时候,如果你站在资源单位的角度来看,我们称为一个主进程,如果站在代码
执行的角度来看,它叫做主线程,只是一种形象的说法,其实整个代码的执行过程成为线程,也就是干
这个活儿的本身称为线程,但是我们后面学习的时候,我们就称为线程去执行某个任务,其实那某个任
务的执行过程称为一个线程,一条流水线的执行过程为线程

 

2.进程vs线程

  (1) 同一个进程内的多个线程是共享该进程的资源的,不同进程内的线程资源肯定是隔离的
  (2) 创建线程的开销比创建进程的开销要小的多

3.并发三个任务:

  启动三个进程:因为每个进程中有一个线程,但是我一个进程中开启三个线程就够了
  同一个程序中的三个任务需要执行,你是用三个进程好,还是三个线程好?
  例子:
    pycharm 三个任务:键盘输入屏幕输出自动保存到硬盘
    如果三个任务是同步的话,你键盘输入的时候,屏幕看不到
    咱们的pycharm是不是一边输入你边看啊,就是将串行变为了三个并发的任务
    解决方案:三个进程或者三个线程,哪个方案可行。如果是三个进程,进程的资源是不是隔离的并
且开销大,最致命的就是资源隔离,但是用户输入的数据还要给另外一个进程发送过去,进程之间能直
接给数据吗?你是不是copy一份给他或者通信啊,但是数据是同一份,我们有必要搞多个进程吗,线程
是不是共享资源的,我们是不是可以使用多线程来搞,你线程1输入的数据,线程2能不能看到,你以后
的场景还是应用多线程多,而且起线程我们说是不是很快啊,占用资源也小,还能共享同一个进程的资
源,不需要将数据来回的copy!

 

4.创建线程的两种方式

(1)基本创建,导入模块

from threading import Thread
import time

def func (n):
    time.sleep(1)
    print(n)

if __name__ == '__main__':
    t = Thread(target=func, args=(1,))
    t.start()
    t.join()
    print('主线程')

(2)继承Thread类

from threading import Thread
import time

class MyThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name
    def run (self):
        time.sleep(1)
        print('%s子线程' % self.name)

if __name__ == '__main__':
    t = MyThread('刘二')
    t.start()
    t.join()
    print('主线程结束')

 

进程池

 

  一个池子,里边有固定数量的进程,这些进程一直处于待命状态,一旦有任务来,马上就有进程去处理。

  因为在实际业务中,任务量是有多有少的,如果任务量特别的多,不可能要开对应那么多的进程数

  开启那么多进程首先需要消耗大量的时间让操作系统来为你管理它,其次还需要小号大量时间让cpu帮你调度它。

  进程池还会帮程序员去管理池中的进程。

  from multiprocessing impor Pool

  p = Pool(os.cpu_count()+1)   ps:一般进程池开自己内核数(os.cpu_count()+1)个

 

 

  进程池有三个方法:

    map(func.iterable)

    func:进程池中的进程执行的任务函数

    iterable: 可迭代对象,是把可迭代对象中的每个元素一次传给任务函数当参数

 

    apply(func.args = ()): 同步的效率,也就是说池中的进程一个一个的去执行任务

    func:进程池中的进程执行的任务函数

    args:可迭代对象型的参数,是传给任务函数的参数

    同步处理任务时,不需要close和join

    同步处理任务时,进程池中的所有进程是普通进程(主进程需要等待其执行结果)

 

    apply_async(func,args= (), callback = None) :异步的效率,也就是说池中的进程一次性都去执行任务

    func: 进程池中的进程执行的任务函数

    args: 可迭代对象型的参数,是传给任务函数的参数

    callback: 回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交回调函数,由回调函数进行进一步处理,回调函数只有异步才有,同步是没有的

    异步处理任务时,进程池中的所有进程是守护进程(主今晨代码执行完毕守护进程就结束)

    异步处理任务是,必须加上close和join

 

  回调函数的使用:

    进程的任务函数的返回值,被当成回调函数的形参接受,以此进行进一步的处理操作

    回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数