【Python】【异步IO】

时间:2020-12-23 05:11:30
# 【【异步IO】】

# 【协程】

'''
协程,又称微线程,纤程。英文名Coroutine。 协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。 子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。 所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。 子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。 协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。 注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B: def A():
print('1')
print('2')
print('3') def B():
print('x')
print('y')
print('z')
假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是: 1
2
x
y
3
z
但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。 看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有何优势? 最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。 第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。 因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。 Python对协程的支持是通过generator实现的。 在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值。 但是Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。 '''
# 例子
'''
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:
'''
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print ('[Consumer] consuming %s...' % n)
r = '200 ok'
def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print ('[Producer] Producing %s...' % n)
r = c.send(n)
print ('[Producer] Consumer return %s...' % r)
c.close()
c = consumer()
produce(c)
'''
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
''' '''
注意到consumer函数是一个generator,把一个consumer传入produce后: 首先调用c.send(None)启动生成器; 然后,一旦生产了东西,通过c.send(n)切换到consumer执行; consumer通过yield拿到消息,处理,又通过yield把结果传回; produce拿到consumer处理的结果,继续生产下一条消息; produce决定不生产了,通过c.close()关闭consumer,整个过程结束。 整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。 最后套用Donald Knuth的一句话总结协程的特点: “子程序就是协程的一种特例。”
''' #【asyncio】

#  例子

import asyncio

@asyncio.coroutine

def wget(host):

print ('wget %s....' % host)

connect = asyncio.open_connection(host,80)

reader,writer = yield from connect

# header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host

header = 'GET / HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n' % host

writer.write(header.encode('utf-8'))

yield from writer.drain()

while True:

line = yield from reader.readline()

if line == b'\r\n':

break

print ('%s header > %s ' % (host,line.decode('utf-8').rstrip()))

writer.close()

loop = asyncio.get_event_loop()

tasks = [wget(host) for host in ['www.sina.com.cn','www.sohu.com','www.163.com']]

loop.run_until_complete(asyncio.wait(tasks))

loop.close()

''' wget www.163.com...

wget www.sohu.com...

wget www.sina.com.cn...

www.sohu.com header > HTTP/1.1 200 OK

www.sohu.com header > Content-Type: text/html;charset=UTF-8

www.sohu.com header > Connection: close

www.sohu.com header > Server: nginx

www.sohu.com header > Date: Tue, 31 Jul 2018 07:44:25 GMT www.sohu.com header > Cache-Control: max-age=60 www.sohu.com header > X-From-Sohu: X-SRC-Cached www.sohu.com header > Content-Encoding: gzip www.sohu.com header > FSS-Cache: HIT from 10983758.13343064.18921842 www.sohu.com header > FSS-Proxy: Powered by 9541944.10459458.17480006 www.sina.com.cn header > HTTP/1.1 302 Moved Temporarily www.sina.com.cn header > Server: nginx www.sina.com.cn header > Date: Tue, 31 Jul 2018 07:45:18 GMT www.sina.com.cn header > Content-Type: text/html www.sina.com.cn header > Content-Length: 154 www.sina.com.cn header > Connection: close www.sina.com.cn header > Location: https://www.sina.com.cn/ www.sina.com.cn header > X-Via-CDN: f=edge,s=ctc.nanjing.ha2ts4.118.nb.sinaedge.com,c=180.168.212.46; www.sina.com.cn header > X-Via-Edge: 15330231186532ed4a8b47c5e66ca7c92596e www.163.com header > HTTP/1.1 200 OK www.163.com header > Expires: Tue, 31 Jul 2018 07:46:38 GMT www.163.com header > Date: Tue, 31 Jul 2018 07:45:18 GMT www.163.com header > Server: nginx www.163.com header > Content-Type: text/html; charset=GBK www.163.com header > Vary: Accept-Encoding,User-Agent,Accept www.163.com header > Cache-Control: max-age=80 www.163.com header > X-Via: 1.1 PSzjwzdx11jb78:2 (Cdn Cache Server V2.0), 1.1 zhoudianxin177:0 (Cdn Cache Server V2.0) www.163.com header > Connection: close

'''

'''

【解析】

asyncio.open_connection接受host参数和port参数以及一些可选的关键字参数.返回一个reader和一个writer,redaer is a StreamReader instance; the writer is a StreamWriter instance. writer.write就和socket.send差不多… drain的官方解释: drain() gives the opportunity for the loop to schedule the write operation and flush the buffer. It should especially be used when a possibly large amount of data is written to the transport, and the coroutine does not yield-from between calls to write(). 在事件循环中刷新缓冲区,特别是在数据量很大的情况下,保证数据完整性

'''

# 【aiohttp】
'''
asyncio可以实现单线程并发IO操作。如果仅在客户端,发挥的威力不大。如果用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程
+coroutine实现多用户的高并发支持。
asyncio实现了TCP、UDP、SSL等协议,aiohttp是基于asyncio实现的HTTP框架。
'''
import  asyncio
from aiohttp import web async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index</h1>',content_type='text/html') async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello,%s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'),content_type='text/html') async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET','/',index)
app.router.add_route('GET','/hello/{name}',hello)
srv = await loop.create_server(app.make_handler(),'127.0.0.1',8002)
print ('Server started at http://127.0.0.1:8002.....')
return srv loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()