并发编程多进程

时间:2022-09-14 16:39:12

1.守护进程

守护进程:表示一个进程b守护另一个进程a,当被守护的进程a结束后,那么b也跟着结束了。

如何实现:在子进程开始之前,将daemon的值设置为True。

应用场景:子进程是帮助主进程完成任务的,如果主进程结束了,并且没必要去使用子进程执行的结果,就可以将子进程设置为守护进程。例如:在运行qq的过程中,开启一个下载任务的子进程,下载任务没有完成,qq就终止掉,下载任务也跟着终止掉。

主进程创建守护进程:

  其一:守护进程会在主进程代码执行结束后就终止

  其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止。

import time
from multiprocessing import Process

def task():
    print("妃子的一生")
    time.sleep(5)
    print("妃子凉了")

if __name__ == '__main__':
    fz = Process(target=task)
    fz.daemon = True  # 将子进程作为主进程的守护进程  要注意  必须在开启子进程之前 设置!
    fz.start()


    print("皇帝登基了")
    time.sleep(2)
    print("当了十年皇帝..")
    print("皇帝驾崩")

2.互斥锁

当多个进程共享一个数据时,共享数据就意味着数据的竞争,这样就可能发生数据错乱的情况。我们可以使用join来让这些进程穿行,但是会导致无法并发,降低效率,并且进程执行的顺序被固定死了。我们就可以使用互斥锁,来解决这种问题。

互斥锁简单的来说就是互相排斥的锁,将需要共享的数据进行加锁,其他进程在访问数据时,就必须等待当前进程使用完毕,才能进行访问。其本质就是一个bool类型的数据,在执行代码前,会先判断该值。

注:在使用互斥锁时,必须保证互斥锁是同一个。多次执行acquire会把数据锁死。

使用:在main下面创建Lock的对象(lock = Lock()),将这个对象作为参数传给子进程,在子进程中,在需要加锁的代码上方加上lock.acquire() (上锁),在代码下方加上lock.release()(解锁)。

RLock表示可重用锁,其特点是可以多次执行acquire。RLock在执行多次acqurie时和普通Lock没有任何区别,如果在多进程中使用RLock,并且一个进程a执行多次acquire,其他进程b要想获得这个锁,需要进程a把锁解开,就是说锁了几次,就要解开几次。

并发编程多进程并发编程多进程
from multiprocessing import Process,Lock       
import time,random                             
def task1(lock):                               
    lock.acquire()                             
    print('子进程1活着..')                          
    time.sleep(random.randint(1, 3))           
    print('子进程1正常死去。。。')                       
    lock.release()                             
                                               
                                               
def task2(lock):                               
    lock.acquire()                             
    print('子进程2活着..')                          
    time.sleep(random.randint(1, 3))           
    print('子进程2正常死去。。。')                       
    lock.release()                             
                                               
                                               
def task3(lock):                               
    lock.acquire()                             
    print('子进程3活着..')                          
    time.sleep(random.randint(1, 3))           
    print('子进程3正常死去。。。')                       
    lock.release()                             
                                               
                                               
if __name__ == '__main__':                     
    lock = Lock()                              
    p1 = Process(target=task1, args=(lock,))   
    p1.start()                                 
    p2 = Process(target=task2, args=(lock,))   
    p2.start()                                 
    p3 = Process(target=task3, args=(lock,))   
    p3.start()                                 
互斥锁
并发编程多进程并发编程多进程
# def task3(lock):
#     pass
# # 锁的实现原理 伪代码
# # l = False
# # def task3(lock):
# #     global l
# #     if l == False:
# #         l = True
# #         print("3my name is:常威")
# #         time.sleep(random.randint(1, 2))
# #         print("3my age is:68")
# #         time.sleep(random.randint(1, 2))
# #         print("3my sex is:femal")
# #     l = False
#
# if __name__ == '__main__':
#     lock = Lock()
#
#     p1 = Process(target=task1,args=(lock,))
#     p1.start()
#     # p1.join()
#
#     p2 = Process(target=task2,args=(lock,))
#     p2.start()
#     # p2.join()
#
#     p3 = Process(target=task3,args=(lock,))
#     p3.start()
#     # p3.join()
互斥锁实现原理
并发编程多进程并发编程多进程
# lock = RLock()
# lock.acquire()
# lock.acquire()
#
# print("哈哈")
# lock.release()

import time
def task(i,lock):
    lock.acquire()
    lock.acquire()
    print(i)
    time.sleep(3)
    lock.release()
    lock.release()
#第一个过来 睡一秒  第二个过来了 睡一秒   第一个打印1  第二个打印2

if __name__ == '__main__':
    lock = RLock()
    p1 = Process(target=task,args=(1,lock))
    p1.start()

    p2 = Process(target=task, args=(2,lock))
    p2.start()
RLock
并发编程多进程并发编程多进程
import json
from multiprocessing import Process,Lock
import time
import random

# 查看剩余票数
def check_ticket(usr):
    time.sleep(random.randint(1,3))
    with open("ticket.json","r",encoding="utf-8") as f:
        dic = json.load(f)
        print("%s查看 剩余票数:%s" % (usr,dic["count"]))

def buy_ticket(usr):
    with open("ticket.json","r",encoding="utf-8") as f:
        dic = json.load(f)
        if dic["count"] > 0:
            time.sleep(random.randint(1,3))
            dic["count"] -= 1
            with open("ticket.json", "w", encoding="utf-8") as f2:
                json.dump(dic,f2)
                print("%s 购票成功!" % usr)


def task(usr,lock):

    check_ticket(usr)

    lock.acquire()
    buy_ticket(usr)
    lock.release()

if __name__ == '__main__':
    lock = Lock()

    for i in range(10):
        p = Process(target=task,args=("用户%s" % i,lock))
        p.start()
        #p.join() # 只有第一个整个必须完毕 别人才能买 这是不公平的
互斥锁模拟抢票

死锁:指的是锁无法打开,导致程序卡死,一把锁是不会锁死的,只有存在多个锁的状态下,才会锁死,就是一个子进程a抢了一把锁,另一个子进程同时抢了另一把锁,就出现死锁状况。正常在程序开发时,是不要创建多把锁的。

并发编程多进程并发编程多进程
from multiprocessing import Process,Lock
import time
def task1(l1,l2,i):
    l1.acquire()
    print("盘子被%s抢走了" % i)
    time.sleep(1)


    l2.acquire()
    print("筷子被%s抢走了" % i)
    print("吃饭..")
    l1.release()
    l2.release()


    pass

def task2(l1,l2,i):

    l2.acquire()
    print("筷子被%s抢走了" % i)

    l1.acquire()
    print("盘子被%s抢走了" % i)

    print("吃饭..")
    l1.release()
    l2.release()


if __name__ == '__main__':
    l1 = Lock()
    l2 = Lock()
    Process(target=task1,args=(l1,l2,1)).start()
    Process(target=task2,args=(l1,l2,2)).start()
死锁

3.IPC

IPC指的是进程间通讯。由于进程之间内存是相互独立的,所以我们需要一种方案能够使得进程之间可以相互传递数据,有三种方式:

1.使用共享文件,多个进程同时读写同一个文件,其特点:I/O速度慢,传输数据大小不受限制。

2.管道:他是基于内存的,速度比较快,但是它是单向的,用起来比较麻烦。

3.申请共享内存空间,多个进程可以共享这个内存区域,其特点:速度快,传输数据量不能太大。

并发编程多进程并发编程多进程
from multiprocessing import Manager,Process,Lock
def work(d):
    # with lock:
        d['count']-=1

if __name__ == '__main__':

    with Manager() as m:
        dic=m.dict({'count':100}) #创建一个共享的字典
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,))
            p_l.append(p)
            p.start()

        for p in p_l:
            p.join()
        print(dic)
IPC管道

4.队列

队列不只用于进程间通讯,也是一种常见的数据容器,先进先出。还可以保证数据不会错乱,即使在多进程下,由于其put和get默认都是阻塞的。

并发编程多进程并发编程多进程
from multiprocessing import Queue

# q = Queue(1)  # 创建一个队列 最多可以存一个数据
#
# q.put("张三")
# print(q.get())
#
# q.put("李四") # put默认会阻塞 当容器中已经装满了
#
# print(q.get())
# print(q.get()) # get默认会阻塞 当容器中已经没有数据了
#
# print("over")


q = Queue(1)  # 创建一个队列 最多可以存一个数据
#
q.put("张三")
# q.put("李四",False) # 第二个参数 设置为False表示不会阻塞 无论容器是满了 都会强行塞 如果满了就抛异常

print(q.get())
print(q.get(timeout=3)) # timeout 仅用于阻塞时

# q.put("李四") # put默认会阻塞 当容器中已经装满了
#
# print(q.get())
# print(q.get()) # get默认会阻塞 当容器中已经没有数据了
#
# print("over")
队列

5.生产者消费者模型

   生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

    为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型。

并发编程多进程并发编程多进程
import random
from multiprocessing import Process,Queue
import time
# 爬数据
def get_data(q):

    for num in range(5):
        print("正在爬取第%s个数据" % num)
        time.sleep(random.randint(1,2))
        print("第%s个数据 爬取完成" % num)
        # 把数据装到队列中
        q.put("第%s个数据" % num)


def parse_data(q):
    for num in range(5):
        # 取出数据
        data = q.get()
        print("正在解析%s" % data)
        time.sleep(random.randint(1, 2))
        print("%s 解析完成" % data)

if __name__ == '__main__':
    # 共享数据容器
    q = Queue(5)
    #生产者进程
    produce =  Process(target=get_data,args=(q,))
    produce.start()
    #消费者进程
    customer = Process(target=parse_data,args=(q,))
    customer.start()
生产者消费者模型
并发编程多进程并发编程多进程
from multiprocessing import Process, JoinableQueue
import time
import random


def eat(name, q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print('%s 吃了 %s' % (name, res))
        q.task_done()


def make(name, q):
    for i in range(1, 6):
        time.sleep(random.randint(1, 3))
        print('%s 生产了第%s个 热狗' % (name, i))
        q.put('%s的第%s个热狗' % (name, i))


if __name__ == '__main__':
    q = JoinableQueue()  # 生成队列
    # 生产者1
    p1 = Process(target=make, args=('1号店', q))
    p1.start()
    # 生产者2
    p2 = Process(target=make, args=('2号店', q))
    p2.start()

    # 消费者
    c = Process(target=eat, args=('王思聪', q))
    c.daemon = True  # 将消费者设置为守护进程,主进程结束后结束。
    c.start()

    # 保证生产者的数据全部产生
    p1.join()
    p2.join()

    # 保证队列中的数据都被处理了
    q.join()  # 生产的数据量与q.task_done所记录的处理的数据量进行对比,如果一致则说明处理完毕,主进程结束

# 1号店 生产了第1个 热狗
# 2号店 生产了第1个 热狗
# 2号店 生产了第2个 热狗
# 王思聪 吃了 1号店的第1个热狗
# 1号店 生产了第2个 热狗
# 王思聪 吃了 2号店的第1个热狗
# 2号店 生产了第3个 热狗
# 1号店 生产了第3个 热狗
# 王思聪 吃了 2号店的第2个热狗
# 王思聪 吃了 1号店的第2个热狗
# 2号店 生产了第4个 热狗
# 1号店 生产了第4个 热狗
# 王思聪 吃了 2号店的第3个热狗
# 2号店 生产了第5个 热狗
# 1号店 生产了第5个 热狗
# 王思聪 吃了 1号店的第3个热狗
# 王思聪 吃了 2号店的第4个热狗
# 王思聪 吃了 1号店的第4个热狗
# 王思聪 吃了 2号店的第5个热狗
# 王思聪 吃了 1号店的第5个热狗
生产者与消费者模型+守护进程的使用