《python》join、守护进程、锁/信号量/事件、进程队列

时间:2023-03-09 16:21:10
《python》join、守护进程、锁/信号量/事件、进程队列

一、multiprocess.process模块

1、join方法

  阻塞主进程,等待子进程执行完毕再放开阻塞

import time
import random
from multiprocessing import Process # 单个子进程
def func(index):
time.sleep(random.randint(1, 3))
print('发送完毕') if __name__ == '__main__':
p = Process(target=func, args=(1,))
p.start()
p.join() # 阻塞 直到p进程执行完毕就结束阻塞
print('邮件已经发送完毕')
'''
发送完毕
邮件已经发送完毕
'''

# 多个子进程
def func(index):
time.sleep(random.randint(1,3))
print('第%s个邮件已经发送完毕' % index) if __name__ == '__main__':
p_lst = []
for i in range(10):
p = Process(target=func, args=(i,))
p.start()
p_lst.append(p)
for p in p_lst:
p.join() # 等待每个子进程执行完毕
print('10个邮件已经发送完毕')
'''
第2个邮件已经发送完毕
第4个邮件已经发送完毕
第7个邮件已经发送完毕
第5个邮件已经发送完毕
第1个邮件已经发送完毕
第9个邮件已经发送完毕
第0个邮件已经发送完毕
第3个邮件已经发送完毕
第8个邮件已经发送完毕
第6个邮件已经发送完毕
10个邮件已经发送完毕
'''

2、第二种开启进程的方法

  以继承Process类的形式开启进程的方式

import os
from multiprocessing import Process class MyProcess(Process):
def run(self):
print('子进程: ', os.getpid(), os.getppid()) if __name__ == '__main__':
p = MyProcess()
p.start() # 开启一个子进程,让这个子进程执行run方法
print('主进程:', os.getpid())
'''
主进程: 6852
子进程: 6644 6852
''' # 给子进程传参数 class MyProcess(Process):
def __init__(self, arg):
super().__init__()
self.arg = arg
def run(self):
time.sleep(1)
print('子进程: ', os.getpid(), os.getppid(), self.arg) if __name__ == '__main__':
# 开启单个子进程
p = MyProcess('参数')
p.start() # 开启一个子进程,让这个子进程执行run方法
p.join()
print('主进程:', os.getpid())
'''
子进程: 6552 7784 参数
主进程: 7784
''' if __name__ == '__main__':
# 开启多个子进程
for i in range(10):
p = MyProcess('参数%s' % i)
p.start() # 开启一个子进程,让这个子进程执行run方法
print('主进程:', os.getpid())
'''
主进程: 7340
子进程: 6540 7340 参数0
子进程: 6512 7340 参数3
子进程: 7648 7340 参数1
子进程: 7460 7340 参数2
子进程: 8048 7340 参数4
子进程: 5108 7340 参数8
子进程: 7868 7340 参数7
子进程: 7892 7340 参数6
子进程: 4172 7340 参数9
子进程: 7224 7340 参数5
'''

3、守护进程

  主要功能:每隔一段时间就向一台机器汇报自己的状态(程序的报活)

  特点:会随着主进程的结束而结束。

import time
from multiprocessing import Process def func():
print('子进程 start')
time.sleep(3)
print('子进程 end') if __name__ == '__main__':
p = Process(target=func)
p.daemon = True # 设置p为一个守护进程,必须在start之前完成
p.start()
time.sleep(2)
print('主进程')
'''
子进程 start
主进程
'''
# 主进程会等待子进程完全结束才结束
# 守护进程会随着主进程的代码执行完毕而结束

def func1():
count = 1
while 1:
print(count * '*')
time.sleep(0.5)
count += 1 def func2():
print('func2 start')
time.sleep(5)
print('func2 end') if __name__ == '__main__':
p1 = Process(target=func1)
p1.daemon = True
p1.start() # p1是守护进程
p2 = Process(target=func2)
p2.start()
time.sleep(3)
print('主进程')
'''
func2 start
*
**
***
****
*****
******
主进程
func2 end
'''
# 如果主进程代码已经执行完毕,但是子进程还没执行完,守护进程都不会继续执行
# 守护进程会随着主进程的代码执行完毕而结束
# 主进程会等待子进程结束,守护进程只等待主进程代码结束就结束了

二、进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)

1、锁---multiprocess.Lock

  加锁降低了程序的效率,让原来能够同时执行的代码变成顺序执行了,异步变同步的过程

  好处:保证了数据的安全

import time
import json
from multiprocessing import Process, Lock # 当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。 # 多进程抢占输出资源
def search(person):
with open('ticket') as f:
dic = json.load(f)
time.sleep(0.2) # 模拟网络延迟
print('%s查询余票:' % person, dic['count']) def get_ticket(person):
with open('ticket') as f:
dic = json.load(f)
time.sleep(0.2) # 模拟网络延迟
if dic['count'] > 0:
print('%s买到票了' % person)
dic['count'] -= 1 # 买到票,数量减1
time.sleep(0.2) # 模拟网络延迟
with open('ticket', 'w') as f:
json.dump(dic, f) # 把剩余票数写回文件
else:
print('%s没买到票' % person) def ticket(person):
search(person) # 查票
get_ticket(person) # 抢票 if __name__ == '__main__':
for i in range(10):
p = Process(target=ticket, args=('person%s' % i,))
p.start()
'''
person0查询余票: 5
person4查询余票: 5
person3查询余票: 5
person0买到票了
person8查询余票: 5
person2查询余票: 5
person7查询余票: 5
person4买到票了
person1查询余票: 5
person5查询余票: 5
person3买到票了
person9查询余票: 5
person6查询余票: 5
person8买到票了
person2买到票了
person7买到票了
person1买到票了
person9买到票了
person6买到票了
person5买到票了
''' # 使用锁维护执行顺序
def search(person):
with open('ticket') as f:
dic = json.load(f)
time.sleep(0.2) # 模拟网络延迟
print('%s查询余票:' % person, dic['count']) def get_ticket(person):
with open('ticket') as f:
dic = json.load(f)
time.sleep(0.2) # 模拟网络延迟
if dic['count'] > 0:
print('%s买到票了' % person)
dic['count'] -= 1 # 买到票,数量减1
time.sleep(0.2) # 模拟网络延迟
with open('ticket', 'w') as f:
json.dump(dic, f) # 把剩余票数写回文件
else:
print('%s没买到票' % person) def ticket(person, lock):
search(person)
lock.acquire() # 加锁
get_ticket(person)
lock.release() # 解锁 if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=ticket, args=('person%s' % i, lock))
p.start()
'''
person0查询余票: 5
person1查询余票: 5
person2查询余票: 5
person0买到票了
person5查询余票: 5
person6查询余票: 5
person3查询余票: 5
person9查询余票: 5
person4查询余票: 5
person8查询余票: 5
person7查询余票: 5
person1买到票了
person2买到票了
person5买到票了
person6买到票了
person3没买到票
person9没买到票
person4没买到票
person8没买到票
person7没买到票
'''
# 为了保证数据的安全
# 在异步的情况下,多个进程有可能同时修改同一份资源
# 就给这个修改的过程加上锁
import time
from multiprocessing import Process, Lock # 加了锁就把异步变成同步了
def func(num, lock):
time.sleep(1)
print('异步执行', num) # 异步会同时开始 lock.acquire()
time.sleep(0.5)
print('同步执行', num) # 同步要一个结束才开始下一个
lock.release() if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=func, args=(i, lock))
p.start()
from multiprocessing import Process, Lock

# 互斥锁
lock = Lock()
lock.acquire()
print('')
lock.acquire()
print('')
# 只打印456,123不打印

2、信号量---multiprocess.Semaphore

  信号量的实现机制:计数器 + 锁 实现的

import time
import random
from multiprocessing import Process, Semaphore def ktv(person, sem):
sem.acquire()
print('\033[32m%s走进ktv\033[0m' % person)
time.sleep(random.randint(1,5))
print('\033[31m%s走出ktv\033[0m' % person)
sem.release() if __name__ == '__main__':
sem = Semaphore(4) # 限定4个
for i in range(10):
p = Process(target=ktv, args=('person%s' % i, sem))
p.start()
'''
person2走进ktv
person1走进ktv
person3走进ktv
person0走进ktv
person1走出ktv
person6走进ktv
person6走出ktv
person7走进ktv
person2走出ktv
person5走进ktv
person3走出ktv
person9走进ktv
person0走出ktv
person8走进ktv
person7走出ktv
person4走进ktv
person8走出ktv
person9走出ktv
person5走出ktv
person4走出ktv
'''

3、事件---multiprocess.Event  

  阻塞事件:wait()方法

    wait 是否阻塞是看 event 对象内部的一个属性

  控制这个属性的值

    set()  将这个属性的值改成True

    clear() 将这个属性的值改成False

    is_set()  判断当前的属性是否为True

import time
import random
from multiprocessing import Process, Event def traffic_light(e):
print('\033[31m红灯亮\033[0m')
while 1:
if e.is_set():
time.sleep(2)
print('\033[31m红灯亮\033[0m')
e.clear()
else:
time.sleep(2)
print('\033[32m绿灯亮\033[0m')
e.set() def car(e, i):
if not e.is_set():
print('car %s 在等待' % i)
e.wait()
print('car %s 通过了' % i) if __name__ == '__main__':
e = Event()
p = Process(target=traffic_light, args=(e,))
p.daemon = True
p.start()
p_lst = []
for i in range(20):
time.sleep(random.randrange(0, 3, 2))
p = Process(target=car, args=(e, i))
p.start()
p_lst.append(p)
for p in p_lst:
p.join()
'''
红灯亮
car 0 在等待
car 1 在等待
绿灯亮
car 1 通过了
car 0 通过了
car 2 通过了
car 3 通过了
car 4 通过了
红灯亮
car 7 在等待
car 6 在等待
car 5 在等待
car 8 在等待
绿灯亮
car 6 通过了
car 7 通过了
car 8 通过了
car 5 通过了
红灯亮
car 9 在等待
绿灯亮
car 9 通过了
car 11 通过了
car 10 通过了
红灯亮
car 12 在等待
car 13 在等待
绿灯亮
car 13 通过了
car 12 通过了
car 15 通过了
car 14 通过了
红灯亮
car 17 在等待
car 16 在等待
car 18 在等待
绿灯亮
car 16 通过了
car 17 通过了
car 18 通过了
car 19 通过了
'''

相关文章