Python并发编程06 /阻塞、异步调用/同步调用、异步回调函数、线程queue、事件event、协程

时间:2023-01-12 08:02:26

Python并发编程06 /阻塞、异步调用/同步调用、异步回调函数、线程queue、事件event、协程

1. 阻塞

  • 进程运行的三个状态:运行,就绪,阻塞

  • 阻塞非阻塞是从执行任务的角度来看的:

    阻塞:程序运行时,遇到了IO,程序挂起,CPU被切走

    非阻塞:程序没有遇到IO,程序遇到IO但是通过某种手段,让CPU尽量的运行我的程序

2. 异步调用、同步调用

1. 概念

  • 提交任务的角度:

    异步:一次提交多个任务,然后就执行下一行代码

    同步:提交一个任务,自任务开始运行直到此任务结束(可能有IO),返回一个返回值之后,再提交下一个任务

2. 异步调用

  • 代码示例:

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import time
    import random
    import os def task(i):
    print(f'{os.getpid()}开始任务')
    time.sleep(random.randint(1,3))
    print(f'{os.getpid()}任务结束')
    return i if __name__ == '__main__':
    # 异步调用
    pool = ProcessPoolExecutor()
    for i in range(10):
    pool.submit(task,i)
    pool.shutdown(wait=True)
    print('===主') # shutdown: 让我的主进程等待进程池中所有的子进程都结束任务之后再执行. 有点类似于join.
    # shutdown: 在上一个进程池没有完成所有的任务之前,不允许添加新的任务.
    # 一个任务是通过一个函数实现的,任务完成了它的返回值就是函数的返回值.

3. 同步调用

  • 代码示例:

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import time
    import random
    import os def task(i):
    print(f'{os.getpid()}开始任务')
    time.sleep(random.randint(1,3))
    print(f'{os.getpid()}任务结束')
    return i
    if __name__ == '__main__': # 同步调用
    pool = ProcessPoolExecutor()
    for i in range(10):
    obj = pool.submit(task,i)
    print(f'任务结果:{obj.result()}')
    pool.shutdown(wait=True)
    print('===主') # obj是一个动态对象,返回的当前的对象的状态,有可能运行中,可能(就绪阻塞),还可能是结束了.
    # obj.result() 必须等到这个任务完成后,返回了结果之后,在执行下一个任务.

3. 异步调用+回调函数

  • 版本一:

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import requests def task(url):
    '''模拟的就是爬取多个源代码 一定有IO操作'''
    ret = requests.get(url)
    if ret.status_code == 200:
    return ret.text def parse(content):
    '''模拟对数据进行分析 一般没有IO'''
    return len(content) if __name__ == '__main__':
    # 开启线程池,并发并行的执行
    url_list = [
    'http://www.baidu.com',
    'http://www.taobao.com',
    'https://www.sina.com.cn',
    ]
    pool = ThreadPoolExecutor(4)
    obj_list = []
    for url in url_list:
    obj = pool.submit(task,url)
    obj_list.append(obj)
    pool.shutdown(wait=True)
    for res in obj_list:
    print(parse(res.result()))
    print('===主') # 1. 异步发出10个任务,并发的执行,但是统一的接收所有的任务的返回值.(效率低,不能实时的获取结果)
    # 2. 分析结果流程是串行,影响效率.
  • 版本二:针对版本一的缺点2,改进,让串行编程并发或者并行.

    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    import requests def task(url):
    '''模拟的就是爬取多个源代码 一定有IO操作'''
    ret = requests.get(url)
    if ret.status_code == 200:
    return parse(ret.text) def parse(content):
    '''模拟对数据进行分析 一般没有IO'''
    return len(content) if __name__ == '__main__':
    # 开启线程池,并发并行的执行
    url_list = [
    'http://www.baidu.com',
    'http://www.taobao.com',
    'https://www.sina.com.cn',
    ]
    pool = ThreadPoolExecutor(4)
    obj_list = []
    for url in url_list:
    obj = pool.submit(task, url)
    obj_list.append(obj)
    pool.shutdown(wait=True)
    for res in obj_list: # [obj1, obj2,obj3....obj10]
    print(res.result()) # 线程池设置4个线程, 异步发起10个任务,每个任务是通过网页获取源码+数据分析, 并发执行,
    # 耦合性增强了,并发执行任务,此任务最好是IO阻塞,才能发挥最大的效果,而parse操作一般是没有IO阻塞的
  • 版本三:异步调用 + 回调函数

    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    import requests def task(url):
    '''模拟的就是爬取多个源代码 一定有IO操作'''
    ret = requests.get(url)
    if ret.status_code == 200:
    return ret.text
    def parse(obj):
    '''模拟对数据进行分析 一般没有IO'''
    print(len(obj.result())) if __name__ == '__main__':
    # 开启线程池,并发并行的执行
    url_list = [
    'http://www.baidu.com',
    'http://www.taobao.com',
    'https://www.sina.com.cn',
    ]
    pool = ThreadPoolExecutor(4)
    for url in url_list:
    obj = pool.submit(task, url)
    obj.add_done_callback(parse) # 基于异步调用回收所有任务的结果要做到实时回收结果,并发执行任务每个任务只是处理IO阻塞的,不能增加新的功能.
    # 1.线程池设置4个线程, 异步发起10个任务,每个任务是通过网页获取源码, 并发执行
    # 2.当一个任务完成之后,将parse这个分析代码的任务交由剩余的空闲的线程去执行,这个线程继续去处理其他任务.
    # 3.如果进程池+回调: 回调函数由主进程去执行.
    # 如果线程池+回调: 回调函数由空闲的线程去执行.
  • 异步回调是一回事儿?

    1.异步站在发布任务的角度
    2.站在接收结果的角度: 回调函数 按顺序接收每个任务的结果,进行下一步处理 异步 + 回调:异步处理的IO类型;回调处理非IO

4. 线程queue

  • 应用:多线程抢占资源,只能让其串行,选择互斥锁或者队列

  • 队列:先进先出

    import queue
    q = queue.Queue(3)
    q.put(1)
    q.put(2)
    q.put(3) print(q.get())
    print(q.get())
    print(q.get()) # 输出结果:
    1
    2
    3 # print(q.get(block=False)) # 只要有阻塞就报错
    # print(q.get(timeout=2)) # 阻塞2s 还没有值直接报错
  • 堆栈:后进先出

    q = queue.LifoQueue(4)
    q.put(1)
    q.put(2)
    q.put(3) print(q.get())
    print(q.get())
    print(q.get()) # 输出结果:
    3
    2
    1 # 栈的应用场景:drf源码中节流的实现,用到了列表实现一个栈
  • 优先级队列

    q = queue.PriorityQueue(4)
    q.put((5, '王五'))
    q.put((-2,'张三'))
    q.put((0, '李四'))
    print(q.get())
    print(q.get())
    print(q.get()) # 输出结果:
    (-2, '张三')
    (0, '李四')
    (5, '王五') # 数值越小优先级越高,就越先输出

5. 事件event

  • 定义:开启两个线程,一个线程运行到中间的某个阶段,触发另一个线程执行,两个线程增加了耦合性

  • 代码示例:

    示例一:使用全局变量,实现事件event的效果

    from threading import Thread
    from threading import current_thread
    import time flag = False
    def check():
    print(f'{current_thread().name} 监测服务器是否开启...')
    time.sleep(3)
    global flag
    flag = True
    print('服务器已经开启...') def connect():
    while 1:
    print(f'{current_thread().name} 等待连接...')
    time.sleep(0.5)
    if flag:
    print(f'{current_thread().name} 连接成功...')
    break t1 = Thread(target=check,)
    t2 = Thread(target=connect,)
    t1.start()
    t2.start() # 如果程序中的其它线程需要通过判断某个线程的状态来确定自己的下一步操作

    示例二:使用事件event

    from threading import Thread
    from threading import current_thread
    from threading import Event
    import time event = Event()
    def check():
    print(f'{current_thread().name} 监测服务器是否开启...')
    time.sleep(3)
    print(event.is_set()) # 判断是否执行event.set()方法
    event.set() # 设置event事件
    print(event.is_set())
    print('服务器已经开启...') def connect(): print(f'{current_thread().name} 等待连接...')
    # event.wait() # 阻塞 直到 event.set() 方法之后
    event.wait(1) # 只阻塞1秒,1秒之后如果还没有进行set 直接进行下一步操作.
    print(f'{current_thread().name} 连接成功...') t1 = Thread(target=check,)
    t2 = Thread(target=connect,)
    t1.start()
    t2.start()

    示例三:一个线程监测服务器是否开始,另个一线程判断如果开始了,则显示连接成功,此线程只尝试连接3次,1s 一次,如果超过3次,还没有连接成功,则显示连接失败.

    from threading import Thread
    from threading import current_thread
    from threading import Event
    import time
    event = Event()
    def check():
    print(f'{current_thread().name}检测服务器是否开启')
    time.sleep(3)
    event.set()
    if t2.is_alive():
    print('服务器已经开启')
    def connect():
    count = 3
    while count:
    count -= 1
    print(f'{current_thread().name} 等待连接')
    event.wait(1)
    if event.is_set():
    print(f'{current_thread().name} 连接成功')
    break
    else:
    print(f'{current_thread().name} 连接失败')

6. 协程

  • 相关概念:

    协程:一个线程并发的处理任务
    串行:一个线程执行一个任务,执行完毕之后,执行下一个任务
    并行:多个CPU执行多个任务,4个CPU执行4个任务
    并发:一个CPU执行多个任务,看起来像是同时执行
    并发真正的核心/本质:切换并且保持状态
    多线程的并发:3个线程处理10个任务,如果线程1处理的这个任务,遇到阻塞,cpu被操作系统切换到另一个线程,
  • 应用示例:单个CPU:10个任务,让你给我并发的执行这10个任务

    方式一:开启多进程并发执行,操作系统+保持状态
    方式二:开启多线程并发执行,操作系统+保持状态
    方式三:开启协程并发的执行,自己的程序把控着CPU在多个任务之间来回的切换+保持状态
    方式三的详细解释:协程切换的速度非常快,蒙蔽操作系统的眼睛,让操作系统认为CPU一直在运行这个线程(协程)
  • 使用协程的原因:

    1. 开销小
    2. 运行速度快
    3. 协程会长期霸占CPU只执行我程序里边的所有任务
  • 协程的特点:

    1. 必须在只有一个单线程里实现并发
    2. 修改共享数据不需加锁
    3. 用户程序里自己保存多个控制流的上下文栈(保持状态)
    4. 附加:一个协程遇到IO操作自动切换到其它协程
  • 工作中应用协程:

    一般在工作中我们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果,如果是4核的cpu,一般起5个进程,每个进程中20个线程(5倍cpu数量),每个线程可以起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,我们就可以用协程去实现并发。 并发数量 = 5 * 20 * 500 = 50000个并发,这是一般一个4cpu的机器最大的并发数。nginx在负载均衡的时候最大承载量就是5w个
  • 非协程示例代码:

    示例一:只有切换+不能保持状态,遇到IO不会主动切换

    def func1():
    print('in func1') def func2():
    print('in func2')
    func1()
    print('end') func2()

    示例二:yield 切换+保持状态,遇到IO不会主动切换

    def gen():
    while 1:
    yield 1 def func():
    obj = gen()
    for i in range(10):
    next(obj)
    func()

    示例三:greenlet 切换 +保持状态,遇到IO不会主动切换

    from greenlet import greenlet
    import time
    def eat(name):
    print('%s eat 1' %name) # 第2步
    g2.switch('李四') # 第3步
    time.sleep(3)
    print('%s eat 2' %name) # 第6步
    g2.switch() # 第7步 def play(name):
    print('%s play 1' %name) # 第4步
    g1.switch() # 第5步
    print('%s play 2' %name) # 第8步 g1=greenlet(eat)
    g2=greenlet(play) g1.switch('张三') # 第1步 切换到eat任务 # 输出结果:
    张三 eat 1
    李四 play 1
    张三 eat 2
    李四 play 2
  • 协程示例代码:

    示例一:模拟的阻塞,不是真正的阻塞(遇到time.sleep()还是会阻塞)

    import gevent
    from threading import current_thread
    def eat(name):
    print('%s eat 1' %name)
    print(current_thread().name)
    gevent.sleep(2)
    print('%s eat 2' %name)
    def play(name):
    print('%s play 1' %name)
    print(current_thread().name)
    gevent.sleep(1)
    print('%s play 2' %name) g1 = gevent.spawn(eat,'egon')
    g2 = gevent.spawn(play,name='egon')
    print(f'主{current_thread().name}')
    g1.join()
    g2.join()

    示例二:真正的协程 / 最终版

    import gevent
    from gevent import monkey
    monkey.patch_all() # 打补丁: 将下面的所有的任务的阻塞都打上标记
    def eat(name):
    print('%s eat 1' %name)
    time.sleep(2)
    print('%s eat 2' %name)
    def play(name):
    print('%s play 1' %name)
    time.sleep(1)
    print('%s play 2' %name)
    g1 = gevent.spawn(eat,'张三')
    g2 = gevent.spawn(play,name='李四')
    # g1.join()
    # g2.join()
    gevent.joinall([g1,g2])

Python并发编程06 /阻塞、异步调用/同步调用、异步回调函数、线程queue、事件event、协程的更多相关文章

  1. python 并发编程 非阻塞IO模型

    非阻塞IO(non-blocking IO) Linux下,可以通过设置socket使其变为non-blocking.当对一个non-blocking socket执行读操作时,流程是这个样子: 从图 ...

  2. Python异步非阻塞IO多路复用Select/Poll/Epoll使用,线程,进程,协程

    1.使用select模拟socketserver伪并发处理客户端请求,代码如下: import socket import select sk = socket.socket() sk.bind((' ...

  3. 并发编程~~~多线程~~~线程queue, 事件event,

    一 线程queue 多线程抢占资源,只能让其串行. 互斥锁 队列 import queue q = queue.Queue() # 先进先出 q = queue.LifoQueue() # 先进后出 ...

  4. JUC 并发编程--06, 阻塞队列(7种), 阻塞等待 api的 代码验证

    这些队列的 api ,就是添加队列,出队列,检测对首元素, 由于 add()--remove(), offer()--poll(),太简单这里不做验证, 只验证后二组api: 阻塞等待( put()- ...

  5. python并发编程知识点总结

    1.到底什么是线程?什么是进程? Python自己没有这玩意,Python中调用的操作系统的线程和进程. 2.Python多线程情况下: 计算密集型操作:效率低,Python内置的一个全局解释器锁,锁 ...

  6. python 并发编程 io模型 目录

    python 并发编程 IO模型介绍 python 并发编程 socket 服务端 客户端 阻塞io行为 python 并发编程 阻塞IO模型 python 并发编程 非阻塞IO模型 python 并 ...

  7. Python并发编程系列之常用概念剖析:并行 串行 并发 同步 异步 阻塞 非阻塞 进程 线程 协程

    1 引言 并发.并行.串行.同步.异步.阻塞.非阻塞.进程.线程.协程是并发编程中的常见概念,相似却也有却不尽相同,令人头痛,这一篇博文中我们来区分一下这些概念. 2 并发与并行 在解释并发与并行之前 ...

  8. python 并发编程 同步调用和异步调用 回调函数

    提交任务的两张方式: 1.同步调用 2.异步调用 同步调用:提交完任务后,就在原地等待任务执行完后,拿到结果,再执行下一行代码 同步调用,导致程序串行执行 from concurrent.future ...

  9. Python并发编程-线程同步(线程安全)

    Python并发编程-线程同步(线程安全) 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 线程同步,线程间协调,通过某种技术,让一个线程访问某些数据时,其它线程不能访问这些数据,直 ...

随机推荐

  1. 暴力枚举 + 24点 --- hnu : Cracking the Safe

    Cracking the Safe Time Limit: 1000ms, Special Time Limit:2500ms, Memory Limit:65536KB Total submit u ...

  2. iOS - 如何自动播放H5中的音频

    场景:iOS端设备,App页面跳转到H5产品介绍,背景音乐无法播放.(为什么不能自动播放,因该是iPhone人性化设定吧~) 加载H5用UIWebView空间: 代码: CGRect rect = s ...

  3. Hbase集群搭建及所有配置调优参数整理及API代码运行

    最近为了方便开发,在自己的虚拟机上搭建了三节点的Hadoop集群与Hbase集群,hadoop集群的搭建与zookeeper集群这里就不再详细说明,原来的笔记中记录过.这里将hbase配置参数进行相应 ...

  4. 转:LoadRunner负载测试之Windows常见性能计数器,分析服务器性能瓶颈

    发布于2012-10-8,来源:博客园 监测对象 System(系统) l %Total Processor Time 系统中所有处理器都处于繁忙状态的时间百分比,对于多处理器系统来说,该值可以反映所 ...

  5. initComponents()方法

    initComponents()是在使用GUI工具设计GUI界面时,NetBeans系统自动生成的方法. 其功能是在界面添加各个组件,并为它们注册监听器. 把initComponents()放在构造方 ...

  6. juce中的Singleton

    说明上其实很明白,支持多线程,防止重复创建,同时支持如果删除以后就不在创建,利用局部静态变量进行标记.挺通用,看来下次写个c11版本的 //============================== ...

  7. Postman教程——创建第一个集合

    系列文章首发平台为果冻想个人博客.果冻想,是一个原创技术文章分享网站.在这里果冻会分享他的技术心得,技术得失,技术人生.我在果冻想等待你,也希望你能和我分享你的技术得与失,期待. 什么是集合 集合是P ...

  8. P3200 [HNOI2009]有趣的数列--洛谷luogu

    ---恢复内容开始--- 题目描述 我们称一个长度为2n的数列是有趣的,当且仅当该数列满足以下三个条件: (1)它是从1到2n共2n个整数的一个排列{ai}: (2)所有的奇数项满足a1<a3& ...

  9. windows 内存管理的几种方式及其优缺点

    windows 内存管理方式主要分为:页式管理,段式管理,段页式管理. 页式管理的基本原理是将各进程的虚拟空间划分为若干个长度相等的页:页式管理把内存空间按照页的大小划分成片或者页面,然后把页式虚拟地 ...

  10. PHP Extension

    新手搞PHP ,之前用过 PERL, BASH: 所以开始用PHP 写程序上手比较快, 几天之后对PHP 的内部实现机制产生了兴趣,所以自己尝试着写写简单的PHP 扩展,以增加对PHP 的理解.   ...