终结python协程----从yield到actor模型的实现

时间:2023-11-24 15:45:20

把应用程序的代码分为多个代码块,正常情况代码自上而下顺序执行。如果代码块A运行过程中,能够切换执行代码块B,又能够从代码块B再切换回去继续执行代码块A,这就实现了协程

我们知道线程的调度(线程上下文切换)是由操作系统决定的,当一个线程启动后,什么时候占用CPU、什么时候让出CPU,程序员都无法干涉。假设现在启动4个线程,CPU线程时间片为 5 毫秒,也就是说,每个线程每隔5ms就让出CPU,让其他线程抢占CPU。可想而知,等4个线程运行结束,要进行多少次切换?

如果我们能够自行调度自己写的程序,让一些代码块遇到IO操作时,切换去执行另外一些需要CPU操作的代码块,是不是节约了很多无畏的上下文切换呢?是的,协程就是针对这一情况而生的。我们把写好的一个应用程序分为很多个代码块,如下图所示:

终结python协程----从yield到actor模型的实现

把应用程序的代码分为多个代码块,正常情况代码自上而下顺序执行。如果代码块A运行过程中,能够切换执行代码块B,又能够从代码块B再切换回去继续执行代码块A,这就实现了协程(通常是遇到IO操作时切换才有意义)。示意图如下:

终结python协程----从yield到actor模型的实现

所以,关于协程可以总结以下两点:

(1)线程的调度是由操作系统负责,协程调度是程序自行负责。

(2)与线程相比,协程减少了无畏的操作系统切换。

实际上当遇到IO操作时做切换才更有意义,(因为IO操作不用占用CPU),如果没遇到IO操作,按照时间片切换,无意义。

python中的yield 关键字用来实现生成器,但是生成器在一定的程度上与协程其实也是差不多。我们来看个例子:

def sayHello(n):
while n > 0:
print("hello~", n)
yield n
n -= 1
print('say hello') if __name__ == "__main__":
sayHello(5) # 测试1
# next(sayHello(5)) # 测试2 # 测试3
# for i in sayHello(5):
# pass

挨个测试,你会发现第一个测试是不能通过的,什么都不会输出,这就是我们的生成器特性了,一旦函数内部有yield关键字,此函数就是生成器,只有调用next 或是 for之类的能够迭代的才能够使得生成器执行。那么这与我们的协程有什么关系呢?请看代码:

from collections import deque

def sayHello(n):
while n > 0:
print("hello~", n)
yield n
n -= 1
print('say hello') def sayHi(n):
x = 0
while x < n:
print('hi~', x)
yield
x += 1
print("say hi") # 使用yield语句,实现简单任务调度器
class TaskScheduler(object):
def __init__(self):
self._task_queue = deque() def new_task(self, task):
'''
向调度队列添加新的任务
'''
self._task_queue.append(task) def run(self):
'''
不断运行,直到队列中没有任务
'''
while self._task_queue:
task = self._task_queue.popleft()
try:
next(task)
self._task_queue.append(task)
except StopIteration:
# 生成器结束
pass if __name__ == "__main__":
sched = TaskScheduler()
sched.new_task(sayHello(10))
sched.new_task(sayHi(15))
sched.run()

代码运行下,你就发现了,这就是我们对协程的定义了。接下来我们说下actor模型。actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。下面我们通过yield来实现:

from collections import deque

class ActorScheduler:
def __init__(self):
self._actors = {}
self._msg_queue = deque() def new_actor(self, name, actor):
self._msg_queue.append((actor, None))
self._actors[name] = actor def send(self, name, msg):
actor = self._actors.get(name)
if actor:
self._msg_queue.append((actor, msg)) def run(self):
while self._msg_queue:
# print("队列:", self._msg_queue)
actor, msg = self._msg_queue.popleft()
# print("actor", actor)
# print("msg", msg)
try:
actor.send(msg)
except StopIteration:
pass if __name__ == '__main__':
def say_hello():
while True:
msg = yield
print("say hello", msg) def say_hi():
while True:
msg = yield
print("say hi", msg) def counter(sched):
while True:
n = yield
print("counter:", n)
if n == 0:
break
sched.send('say_hello', n)
sched.send('say_hi', n)
sched.send('counter', n-1) sched = ActorScheduler()
# 创建初始化 actors
sched.new_actor('say_hello', say_hello())
sched.new_actor('say_hi', say_hi())
sched.new_actor('counter', counter(sched)) sched.send('counter', 10)
sched.run()

(1) ActorScheduler 负责事件循环
(2) counter() 负责控制终止
(3) say_hello() / say_hi() 相当于切换的协程,当程序运行到这些函数内部的yield处,就开始切换。

所以,当执行时,我们能够看到say_hello() / say_hi()不断交替切换执行,直到counter满足终止条件之后,协程终止。看懂上例可能需要花费一些时间。实际上我们已经实现了一个“操作系统”的最小核心部分。 生成器函数(含有yield的函数)就是认为,而yield语句是任务挂起的信号。 调度器循环检查任务列表直到没有任务要执行为止。

而这就是廖雪峰的python官网教程里面的协程代码的最好解释,这也是之前一直在思考的问题,请看代码:

def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK' def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close() c = consumer()
produce(c)

我之前一直纳闷send()函数是如何激活生成器的,原来是实现了actor模型的协程!

相关链接:再议Python协程——从yield到asyncio