python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

时间:2022-02-02 21:52:49

昨日内容回顾

线程

什么是线程?

线程是cpu调度的最小单位

进程是资源分配的最小单位

进程和线程是什么关系?

  线程是在进程中的一个执行单位

  多进程 本质上开启的这个进程里就有一个线程

  多线程 单纯的在当前进程中开启了多个线程

线程和进程的区别:

  线程的开启 销毁 任务切换的时间开销小

  在同一个进程中数据共享

  能实现并发,但不能脱离进程

  进程负责管理分配资源 线程负责执行代码

GIL锁 --  全局解释器锁

同一时刻只能有一个线程访问CPU -- 线程锁

Cpython会受到GIL影响

而pypy和jpython不会受到GIL影响

python程序效率下降的问题

高计算型 -- 多线程会导致程序的效率下降

高IO型的 -- 可以使用多线程,不会受到影响

多进程

分布式计算 -- celery(Python开发的分布式任务调度模块)

启动简单线程

from threading import Thread

def func():
print(123) Thread(target=func).start()  

执行输出:123

守护线程和守护进程的区别?

守护线程是等待主进程代码结束之后就结束

守护线程是等待主线程都结束之后才结束

主线程等待其他线程结束,才结束。

开启线程的第二种方法:使用类继承

from threading import Thread
import time class Sayhi(Thread):
def __init__(self, name):
super().__init__()
self.name = name def run(self):
time.sleep(2)
print('%s say hello' % self.name) if __name__ == '__main__':
t = Sayhi('egon')
t.start()
print('主线程')  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

一、Thread类的其他方法

Thread实例对象的方法
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。 threading模块提供的一些方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

举例:

from threading import Thread

def func():
print(123) t = Thread(target=func)
t.start()
print(t.is_alive()) # 返回线程是否是活动的  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

睡0.1秒

import time
from threading import Thread def func():
time.sleep(0.1)
print(123) t = Thread(target=func)
t.start()
print(t.is_alive())  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

import time
from threading import Thread def func():
time.sleep(0.1)
print(123) t = Thread(target=func)
t.start()
print(t.is_alive()) # 返回线程是否活动的
print(t.getName()) # 返回线程名
t.setName('t1') # s设置线程名
print(t.getName())

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

import time
from threading import Thread, currentThread, enumerate, activeCount def func():
time.sleep(0.1)
# print(123) t = Thread(target=func)
t.start()
print(currentThread) # 返回当前的线程变量
print(enumerate()) # 返回一个包含长在运行的线程的list
print(activeCount()) # 返回正在运行的线程数量  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

MainThread, started 9792表示主线程,Thread-1, started 8420表示子线程。它会打印出2个。所以activeCount的结果为2

二、同步锁

当多线程争夺锁时,允许第一个获得锁的线程进入临界区,并执行代码。所有之后到达的线程将被阻塞,直到第一个线程执行结束,退出临界区,并释放锁。  

多个线程抢占资源的情况:

import time
from threading import Thread def func():
global n
temp = n
time.sleep(1)
n = temp - 1 n = 100
t_lst = []
for i in range(100):
t = Thread(target=func)
t.start()
t_lst.append(t)
for t in t_lst: t.join()
print(n)  

执行输出:99

为啥呢?

明明减了100次,结果应该是0的。

为啥是99呢?难道是GIL的问题?但GIL是计算CPU哪一刻的锁

下面开始具体分析:

第一步,每个线程执行global n:temp = n此时,temp等于100

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

第二步,当线程涉及到CPU计算时,向CPU发送请求。但是受到GIL的限制

同一时刻,只能有一个线程计算。

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

CPU计算结果后,返回给线程。线程赋值,并修改全局变量n,此时n=99,线程结束

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

那么其他线程,也是做同样的操作

每个线程赋值n等于99,不管它已经是99了。

上面的现象,出现了数据不安全的情况

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

最后赋值了100次,都是n=99.所以最终结果是99

怎么解决呢?加锁

from threading import Thread, Lock

def func(lock):
global n
lock.acquire() # 加锁
temp = n
n = temp - 1
lock.release() # 解锁 n = 100
t_lst = []
lock = Lock() # 创建锁
for i in range(100):
t = Thread(target=func, args=(lock,))
t.start()
t_lst.append(t)
for t in t_lst: t.join() # 等待所有子线程结束
print(n)  

执行输出:0

如果把计算和赋值拆开,就会出现数据不安全的情况 

下面的写法,不用加锁,也可以得到0

from threading import Thread

def func():
global n
n -= 1 n = 100
for i in range(100):
t = Thread(target=func)
t.start()
print(n)  

执行输出:0

因为默认有一个GIL锁,所以每个线程都减等1,所以最终结果为0

三、死锁与递归锁

进程也有死锁与递归锁,在进程那里忘记说了,放到这里一切说了额

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

from threading import Lock
lock = Lock() # 在同一个线程中,能够被一个锁的多个acquire阻住,这种锁就叫互斥锁
lock.acquire()
lock.acquire()
lock.acquire()  

死锁,也叫互斥锁

科学家吃面的问题

要完成一件事情,需要两个必要因素

要想吃到面,需要:叉子,面

资源的互相抢占的问题 ---- 死锁

四个人围着一张桌子,桌子上放着一碗面,碗里有一个叉子

必须拿到叉子,才能吃面。

每个人每次只能吃一口面,吃完就放回去,让下一个人吃。

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

import time
from threading import Thread,Lock
def eat1(noodle_lock,fork_lock,name): # 吃面
noodle_lock.acquire() # 面条加锁
print('%s 抢到了面条' % name)
fork_lock.acquire() # 叉子加锁
print('%s 抢到了叉子' % name)
print('%s 正在吃面' % name)
fork_lock.release() # 叉子解锁
print('%s 归还了叉子' % name)
noodle_lock.release() # 面条解锁
print('%s 归还了面条' % name)
def eat2(noodle_lock,fork_lock,name): # 也是吃面,不同的是:先抢叉子,再抢面
fork_lock.acquire()
print('%s 抢到了叉子' % name)
time.sleep(0.5)
noodle_lock.acquire()
print('%s 抢到了面' % name)
print('%s 正在吃面' % name)
noodle_lock.release()
print('%s 归还了面' % name)
fork_lock.release()
print('%s归还了叉子' % name)
noodle_lock = Lock() # 面条锁
fork_lock = Lock() # 叉子锁
t1 = Thread(target=eat1,args=(noodle_lock,fork_lock,'nazha')).start()
t2 = Thread(target=eat2,args=(noodle_lock,fork_lock,'egon')).start()
t3 = Thread(target=eat1,args=(noodle_lock,fork_lock,'yuan')).start()
t4 = Thread(target=eat2,args=(noodle_lock,fork_lock,'alex')).start()  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

发现程序卡住了,为啥呢?

egon和yuan分别抢到了叉子和面条,但是谁也不愿意给对方。互掐在...

那么就出现了死锁现象

只有一个锁,不会出现死锁。在多个锁的情况下,就会出现死锁。

如何解决这个问题呢?使用递归锁

递归锁

在python中为诶了支持在同一线程中多次请求同一资源,python提供了可重入锁Lock。

这个RLock内部维护者一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源

简单例子:

from threading import Thread, RLock

rlock = RLock()  # 创建递归锁
rlock.acquire() # 加第一个锁
print('***')
rlock.acquire() # 加第二个锁
print('***')  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

发现瞬间就输出了,程序没有卡顿,666

结论:

递归锁在同一个线程中对同一个锁多次acquire不会产生阻塞

看下面的例子:

from threading import Thread, RLock

def func(rlock, flag):
rlock.acquire() # 第一道锁
print(flag * 10)
rlock.acquire() # 第二道锁
print(flag * 10)
rlock.acquire() # 第三道锁
print(flag * 10)
rlock.acquire() # 第四道锁
print(flag * 10) rlock = RLock() # 创建递归锁
Thread(target=func, args=(rlock, '*')).start() # 传入递归锁和*
Thread(target=func, args=(rlock, '-')).start()  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

发现程序开著了,---------没有输出

为什么?递归锁搞不定?

看下图

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

第一个线程过来,拿走了一串钥匙

每次acquire,就进入一个房间

最后在房间的最里面,没出来。

第二个线程,怎么进入房间呢?

需要第一个线程执行release操作,从嘴里米娜的房间周出来。

它每走出一个放进啊,需要release一次,将钥匙放到钥匙串上面,归还到门口。

由第二个线程拿走钥匙串才行。

它跟递归函数类似

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

怎么解决上面卡主的问题呢?

必须是4次release

from threading import Thread, RLock

def func(rlock, flag):
rlock.acquire() # 第一道锁
print(flag * 10)
rlock.acquire() # 第二道锁
print(flag * 10)
rlock.acquire() # 第三道锁
print(flag * 10)
rlock.acquire() # 第四道锁
print(flag * 10)
rlock.release() # 解锁
rlock.release()
rlock.release()
rlock.release() rlock = RLock() # 创建递归锁
Thread(target=func, args=(rlock, '*')).start()
Thread(target=func, args=(rlock, '-')).start()  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

使用递归锁解决科学家吃面的问题

import time
from threading import Thread, RLock def eat1(noodle_lock, fork_lock, name): # 吃面
noodle_lock.acquire() # 拿到面条的整串钥匙
print('%s 抢到了面条' % name)
fork_lock.acquire() # 拿到叉子的整串钥匙
print('%s 抢到了叉子' % name)
print('%s 正在吃面' % name)
fork_lock.release() # 叉子解锁
print('%s 归还了叉子' % name)
noodle_lock.release() # 面条解锁
print('%s 归还了面条' % name) def eat2(noodle_lock, fork_lock, name): # 也是吃面,不同的是:先抢叉子,再抢面
fork_lock.acquire()
print('%s 抢到了叉子' % name)
time.sleep(0.5)
noodle_lock.acquire()
print('%s 抢到了面' % name)
print('%s 正在吃面' % name)
noodle_lock.release()
print('%s 归还了面' % name)
fork_lock.release()
print('%s 归还了叉子' % name) noodle_lock = fork_lock = RLock() # 面条锁和叉子锁,表示一串钥匙
t1 = Thread(target=eat1, args=(noodle_lock, fork_lock, 'nazha')).start()
t2 = Thread(target=eat2, args=(noodle_lock, fork_lock, 'egon')).start()
t3 = Thread(target=eat1, args=(noodle_lock, fork_lock, 'yuan')).start()
t4 = Thread(target=eat2, args=(noodle_lock, fork_lock, 'alex')).start()  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

什么情况下,需要用到递归锁呢?

有超过一个资源需要锁的时候,使用递归锁

有2个屌丝,一个拿着外层钥匙,一个拿着里层钥匙,谁也不想给对方,就出现了死锁

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

四、信号量

同进程的一样

Semaphore管理一个内置的计数器,

每当调用acquire()时内置计数器-1;

调用release()时内置计数器+1;

计数器不能小于0;当计数器为0时,acquire将阻塞线程直到其它线程调用release()。

实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

import time
from threading import Thread, Semaphore def func(sem, i):
sem.acquire()
print(i)
time.sleep(1)
sem.release() sem = Semaphore(5)
for i in range(6):
Thread(target=func, args=(sem, i)).start()  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

池与信号量

五、事件

同进程的一样

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

连接数据库

import time
import random
from threading import Event, Thread # 连接数据库
def connect_db(e):
count = 1 # 初始计数器
while count < 4:
print('尝试第%s次检测连接' % count)
e.wait(0.5) # 等待0.5,再去执行下面的代码
# 如果不传参数会一直等到事件为True为止
# 如果传参数,传一个事件参数,到时间后,
count += 1
if e.is_set(): # 判断状态是否为True
print('连接成功')
break
else:
print('连接失败') def check_conn(e):
"""检测数据库是否可以连接"""
time.sleep(random.randint(1, 2)) # 等待1-2秒
e.set() # 设置状态为True e = Event()
Thread(target=check_conn, args=(e,)).start()
Thread(target=connect_db, args=(e,)).start()  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

结果是不固定的

什么时候,用到事件?

你要做一件事情 是有前提的
你就先去处理前提的问题 —— 前提处理好了 把状态设置成True
来控制即将要做的事情可以开始

条件

使得线程等待,只有满足某条件时,才释放n个线程

Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

起10个线程

import time
from threading import Condition, Thread def func(i, con):
con.acquire() # 进入锁定池
con.wait() # 等待通知,前后必须要有acquire和release
print(i * '*')
con.release() # 解锁 con = Condition() # 条件变量
for i in range(10):
Thread(target=func, args=(i, con)).start()
while True:
n = int(input('>>>'))
con.acquire() # 加锁
con.notify(n) # 通知其它线程,其它处于wait状态的线程接到通知会重新判断条件,解除wait状态。前后必须要有acquire和release
con.release() # 解锁  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

首先输入2,显示1个*和0个星号,也就是空。

再输入5,分别显示2~6个*号

为什么输入5的时候,出现6个*号?

因为线程池有10个,不等长度的*号

输入2,表示解除2个线程的wait状态,分别表示0,1。

那么5,表示5个。分别表示2,3,4,5,6

从结果上来看,发生数据混乱

 con.notify(n)表示按照要求开放线程

con.notify_all()表示一次性开放线程

总结:

semaphore允许同一时刻n个线程执行这段代码

event有一个内部的事件来控制wait的行为,控制的是所有的线程

condition有一个内部的条件来控制wait的行为,它可以逐个或者分批次的控制线程的走向

六、定时器

定时器,指定n秒后执行某个操作

import time
from threading import Timer def func():
print('*' * 10)
print('子线程', time.strftime("%Y-%m-%d %H:%M:%S")) t = Timer(5, func) # 要开始一个线程,等到5秒之后才开启并执行
t.start()
print('主进程', time.strftime("%Y-%m-%d %H:%M:%S"))  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

虽然可以time.sleep(5)完成5秒的过程,但是它会阻塞主线程

如果用timer就不会,它是异步的。

七、线程队列

queue队列:使用import queue,用法与进程Queue一样

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先进先出
import queue

q = queue.Queue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
"""
结果(先进先出):
first
second
third
"""

先进先出

class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue

q = queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
"""
结果(后进先出):
third
second
first
"""

后进先出

class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

import queue

q = queue.PriorityQueue()
# put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20, 'a'))
q.put((10, 'b'))
q.put((30, 'c'))
print(q.get())
print(q.get())
print(q.get())
"""
结果(数字越小优先级越高,优先级高的优先出队):
(10,'b')
(20,'a')
(30,'c')
"""

优先级队列

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty
Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty. exception queue.Full
Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full. Queue.qsize()
Queue.empty() #return True if empty
Queue.full() # return True if full
Queue.put(item, block=True, timeout=None)
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case). Queue.put_nowait(item)
Equivalent to put(item, False). Queue.get(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case). Queue.get_nowait()
Equivalent to get(False). Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads. Queue.task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue). Raises a ValueError if called more times than there were items placed in the queue. Queue.join() block直到queue被消费完毕

更多方法说明

后进先出

import queue

q = queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())  

执行输出:third

先进先出

import queue

q = queue.Queue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())  

执行输出:first

优先级队列

import queue

q = queue.PriorityQueue()
q.put((1, 'a'))
q.put((2, 'z'))
q.put((3, 'b'))
print(q.get())  

执行输出:(1,'a')

数字越小,优先级越高

有一个场景,你冲了VIP,你就先买到票
如果有2个人,优先级一样呢?

import queue

q = queue.PriorityQueue()
q.put((2, 'z'))
q.put((2, 'a'))
print(q.get())  

执行输出:(2, 'a')

结果是根据ascii码的顺序来排序的

八、Python标准模块--concurrent.futures

https://docs.python.org/dev/library/concurrent.futures.html

#1 介绍
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法
#submit(fn, *args, **kwargs)
异步提交任务 #map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作 #shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前 #result(timeout=None)
取得结果 #add_done_callback(fn)
回调函数
#介绍
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned. class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised. #用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[]
for i in range(11):
future=executor.submit(task,i)
futures.append(future)
executor.shutdown(True)
print('+++>')
for future in futures:
print(future.result())

ProcessPoolExecutor

#介绍
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously. Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor. New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging. #用法
与ProcessPoolExecutor相同

ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11):
# future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit

map的用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os def get_page(url):
print('<进程%s> get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text} def parse_page(res):
res=res.result()
print('<进程%s> parse %s' %(os.getpid(),res['url']))
parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res) if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
] # p=Pool(3)
# for url in urls:
# p.apply_async(get_page,args=(url,),callback=pasrse_page)
# p.close()
# p.join() p=ProcessPoolExecutor(3)
for url in urls:
p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果

回调函数

import time
from concurrent.futures import ThreadPoolExecutor def func(i):
print(i * '*')
time.sleep(1) thread_pool = ThreadPoolExecutor(5) # 创建一个最大可容纳5个任务的线程池
thread_pool.submit(func, 1) # 异步提交任务,往线程池里面加入一个任务  

执行输出:

*

添加10个任务

import time
from concurrent.futures import ThreadPoolExecutor def func(i):
print(i * '*')
time.sleep(1) thread_pool = ThreadPoolExecutor(5) # 创建一个最大可容纳5个任务的线程池
for i in range(6):
thread_pool.submit(func, i) # 异步提交任务 ,往线程池里面 加入一个任务
print('wahaha')

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

thread_pool.submit相当于之前线程的apply_async,表示异步

由于等待了1秒,所以有限输出wahah

如果想最后打印wahaha呢?

import time
from concurrent.futures import ThreadPoolExecutor def func(i):
print(i * '*')
time.sleep(1) thread_pool = ThreadPoolExecutor(5) # 创建一个最大可容纳5个任务的线程池
for i in range(6):
thread_pool.submit(func, i) # 异步提交任务,往线程池里面加入一个任务
thread_pool.shutdown() # 相当于进程池的pool.close()+pool.join()操作
print('wahah')

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

import time
from concurrent.futures import ThreadPoolExecutor def func(i):
print(i * '*')
time.sleep(1)
return i ** 2 thread_pool = ThreadPoolExecutor(5) # 创建一个最大可溶纳2个任务的线程池
ret_lst = []
for i in range(6):
ret = thread_pool.submit(func, i) # 异步提交任务,往线程池里面加入一个任务
ret_lst.append(ret)
thread_pool.shutdown() # 相当于进程池的pool.close()+pool.join()操作
for ret in ret_lst:
print(ret.result()) # 取得结果
print('wahaha')  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

到目前为止和ThreadPool的区别

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

回调函数  

import time
from concurrent.futures import ThreadPoolExecutor def func(i):
print(i * '*')
time.sleep(1)
return i ** 2 def callback(arg): # 回调函数
print(arg.result() * '-') # 取得结果,并乘以- thread_pool = ThreadPoolExecutor(5) # 创建一个最大可容纳5个任务的线程池
for i in range(6):
ret = thread_pool.submit(func, i).add_done_callback(callback) # 异步提交任务,执行回调函数
thread_pool.shutdown() # 相当于进程池的pool.close()+pool.join()操作
print('wahaha')  

执行输出:

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)

什么情况下,使用进程池/线程池?

当内存不需要共享,且高计算的时候用进程

当内存需要共享,且高IO的时候,用线程

当并发很大的时候

多进程:多个任务——进程池:cpu个数、cpu个数+1

多线程:多个任务——线程池:cpu个数*5

4核CPU,可以开4~5个进程,开20条线程/进程。最终可以开80~100个任务

最美的程序,不是单一使用。而是复合使用

比如开多个进程,每个进程,开多个线程。

当后面学到协程,并发(qps)能达到5万

锁的概念,面试,会经常问道,还有队列

真正工作过程中,锁会用到,其它的,很少用。

还有进程池/线程池也会用到

明日默写:

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor def func(i):
print(i * '*')
time.sleep(1)
return i ** 2 def callb(arg):
print(arg.result() * '-') if __name__ == '__main__':
thread_pool = ThreadPoolExecutor(5)
for i in range(10):
thread_pool.submit(func, i).add_done_callback(callb)
thread_pool.shutdown()
print('wahaha')

  

python全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)的更多相关文章

  1. python 全栈开发,Day42&lpar;Thread类的其他方法&comma;同步锁&comma;死锁与递归锁&comma;信号量&comma;事件&comma;条件&comma;定时器&comma;队列&comma;Python标准模块--concurrent&period;futures&rpar;

    昨日内容回顾 线程什么是线程?线程是cpu调度的最小单位进程是资源分配的最小单位 进程和线程是什么关系? 线程是在进程中的 一个执行单位 多进程 本质上开启的这个进程里就有一个线程 多线程 单纯的在当 ...

  2. Thread类的其他方法&comma;同步锁&comma;死锁与递归锁&comma;信号量&comma;事件&comma;条件&comma;定时器&comma;队列&comma;Python标准模块--concurrent&period;futures

    参考博客: https://www.cnblogs.com/xiao987334176/p/9046028.html 线程简述 什么是线程?线程是cpu调度的最小单位进程是资源分配的最小单位 进程和线 ...

  3. python全栈开发,Day41&lpar;线程概念,线程的特点,进程和线程的关系,线程和python理论知识,线程的创建&rpar;

    昨日内容回顾 队列 队列:先进先出.数据进程安全 队列实现方式:管道+锁 生产者消费者模型:解决数据供需不平衡 管道 双向通信,数据进程不安全 EOFError: 管道是由操作系统进行引用计数的 必须 ...

  4. python全栈开发day31-操作系统介绍,异步、同步、阻塞、非阻塞,进程

    一.网络编程内容回顾 1.arp协议 #交换机 #广播.单播 2.ip协议 3.tcp和udp协议 tcp:可靠的,面向连接的,字节流传输,长连接 三次握手:一方发送请求,另一方确认请求同时发送请求, ...

  5. python 全栈开发,Day44&lpar;IO模型介绍&comma;阻塞IO&comma;非阻塞IO&comma;多路复用IO&comma;异步IO&comma;IO模型比较分析&comma;selectors模块&comma;垃圾回收机制&rpar;

    昨日内容回顾 协程实际上是一个线程,执行了多个任务,遇到IO就切换 切换,可以使用yield,greenlet 遇到IO gevent: 检测到IO,能够使用greenlet实现自动切换,规避了IO阻 ...

  6. python 全栈开发,Day51&lpar;常用内置对象&comma;函数&comma;伪数组 arguments&comma;关于DOM的事件操作&comma;DOM介绍&rpar;

    昨日内容回顾 1.三种引入方式 1.行内js <div onclick = 'add(3,4)'></div> //声明一个函数 function add(a,b){ } 2. ...

  7. Python全栈开发之路 【第十七篇】:jQuery的位置属性、事件及案例

    位置属性 <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <titl ...

  8. python全栈开发中级班全程笔记(第二模块、第四章)(常用模块导入)

    python全栈开发笔记第二模块 第四章 :常用模块(第二部分)     一.os 模块的 详解 1.os.getcwd()    :得到当前工作目录,即当前python解释器所在目录路径 impor ...

  9. Python全栈开发【面向对象进阶】

    Python全栈开发[面向对象进阶] 本节内容: isinstance(obj,cls)和issubclass(sub,super) 反射 __setattr__,__delattr__,__geta ...

随机推荐

  1. sql查询指定表外键约束

    //////////////////查询指定表外键约束select a.name as 约束名, object_name(b.parent_object_id) as 外键表, d.name as 外 ...

  2. SqlServer 错误1053:服务并未及时响应启动或控制请求

    sqlserver 的登录用户修改成域账户后,启动不了 解决方法: 计算器管理选择管理员组 将域账户加入到管理员组即可

  3. RabbitMQ 发布&sol;订阅

    我们会做一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式). 为了验证这种模式,我们准备构建一个简单的日志系统.这个系统包含两类程序,一类程序发动日志,另一类程序接收和 ...

  4. 由node-webkit想到

    本人做为.NET的死忠也有些许年头.微软这几年被谷歌苹果之流打的有点招架不住..NET的前景也难免堪忧.虽然我认为就强类型语言方面,C#绝对是最强者.但是新技术的发展确实是可怕的,看看苹果几年就把no ...

  5. Adding the Test API in The ASP&period;NET Web API Help Page

    1.通过NuGet引用Web API Test Client 引用玩该DLL会生成如下文件: 这里面就是我们的帮助文档界面 2.在项目属性中进行如下设置,勾选XMl文档文件,并设置路径 3.在项目的A ...

  6. 【CC2530入门教程-04】CC2530的定时&sol;计数器原理与应用

    第4课  CC2530的定时/计数器原理与应用 广东职业技术学院  欧浩源 一.定时/技术器的基本原理 定时/计数器,是一种能够对内部时钟信号或外部输入信号进行计数,当计数值达到设定要求时,向CPU提 ...

  7. Oracle 12&period;2报错ORA-15032、ORA-15410或ORA-15411解决

    现象:在Oracle 12.2.0.1 RAC环境,在其ASM实例中,如果添加不同大小或者不同数量的LUN到failgroup中,会报错: ORA-15032: not all alterations ...

  8. POJ - 1185 敌兵炮阵

    POJ - 3254 中文题.. 思路:这题可把我恶心坏了,我刚开始的思路其实是正确的... 首先我想开个dp[i][s1][s2]保存到 i行 为止当前行状态为s1,上一行状态为s2 的最大个数,然 ...

  9. 事务不起作用 Closing non transactional SqlSession

    In proxy mode (which is the default), only external method calls coming in through the proxy are int ...

  10. Verdi文档路径

    1.echo $VERDI_HOME 2.cd $VERDI_HOME/doc 3.okular VerdiTut.pdf& Verdi主要在以下方面使用 Verdi使用情形:        ...