Python中syncio和aiohttp

时间:2023-03-09 16:50:50
Python中syncio和aiohttp

CPython 解释器本身就不是线程安全的,因此有全局解释器锁(GIL),一次只允许使用一个线程执行 Python 字节码。因此,一个 Python 进程通常不能同时使用多个 CPU 核心。然而,标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放GIL。这意味着在 Python 语言这个层次上可以使用多线程,而 I/O 密集型 Python 程序能从中受益:一个 Python 线程等待网络响应时,阻塞型 I/O 函数会释放 GIL,再运行一个线程。asyncio这个包使用事件循环驱动的协程实现并发。 asyncio 大量使用 yield from 表达式,因此与Python 旧版不兼容。
asyncio 包使用的“协程”是较严格的定义。适合asyncio API 的协程在定义体中必须使用 yield from,而不能使用 yield。此外,适合 asyncio 的协程要由调用方驱动,并由调用方通过 yield from 调用;

先看2个例子:

import threading
import asyncio @asyncio.coroutine
def hello():
print('Start Hello', threading.currentThread())
yield from asyncio.sleep()
print('End Hello', threading.currentThread()) @asyncio.coroutine
def world():
print('Start World', threading.currentThread())
yield from asyncio.sleep()
print('End World', threading.currentThread()) # 获取EventLoop:
loop = asyncio.get_event_loop()
tasks = [hello(), world()]
# 执行coroutine
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

@asyncio.coroutine把生成器函数标记为协程类型。
asyncio.sleep(3) 创建一个3秒后完成的协程。
loop.run_until_complete(future),运行直到future完成;如果参数是 coroutine object,则需要使用 ensure_future()函数包装。
loop.close() 关闭事件循环

import asyncio

@asyncio.coroutine
def worker(text):
""" 协程运行的函数 :param text::return: """
i =
while True:
print(text, i)
try:
yield from asyncio.sleep(.)
except asyncio.CancelledError:
break
i += @asyncio.coroutine
def client(text, io_used):
work_fu = asyncio.ensure_future(worker(text))
# 假装等待I/O一段时间
yield from asyncio.sleep(io_used)
# 结束运行协程
work_fu.cancel()
return 'done' loop = asyncio.get_event_loop()
tasks = [client('xiaozhe', ), client('zzz', )]
result = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('Answer:', result)

asyncio.ensure_future(coro_or_future, *, loop=None):计划安排一个 coroutine object的执行,返回一个 asyncio.Task object。
worker_fu.cancel(): 取消一个协程的执行,抛出CancelledError异常。
asyncio.wait():协程的参数是一个由期物或协程构成的可迭代对象; wait 会分别把各个协程包装进一个 Task 对象。

async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换。
1. 把@asyncio.coroutine替换为async
2. 把yield from替换为await

@asyncio.coroutine
def hello():
print("Hello world!")
r = yield from asyncio.sleep()
print("Hello again!")

等价于

async def hello():
print("Hello world!")
r = await asyncio.sleep()
print("Hello again!")

asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。

asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架

客户端:

import aiohttp
import asyncio
import async_timeout async def fetch(session, url):
async with async_timeout.timeout():
async with session.get(url) as response:
return await response.text() async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'http://python.org')
print(html) loop = asyncio.get_event_loop()
loop.run_until_complete(main())

服务端:

from aiohttp import web

async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text) app = web.Application()
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle) web.run_app(app)

运行结果:

Python中syncio和aiohttp

爬取当当畅销书的图书信息的代码如下:

'''异步方式爬取当当畅销书的图书信息'''
import os
import time
import aiohttp
import asyncio
import pandas as pd
from bs4 import BeautifulSoup # table表格用于储存书本信息
table = [] # 获取网页(文本信息)
async def fetch(session, url):
async with session.get(url) as response:
return await response.text(encoding='gb18030') # 解析网页
async def parser(html):
# 利用BeautifulSoup将获取到的文本解析成HTML
soup = BeautifulSoup(html, 'lxml')
# 获取网页中的畅销书信息
book_list = soup.find('ul', class_='bang_list clearfix bang_list_mode')('li')
for book in book_list:
info = book.find_all('div')
# 获取每本畅销书的排名,名称,评论数,作者,出版社
rank = info[].text[:-]
name = info[].text
comments = info[].text.split('条')[]
author = info[].text
date_and_publisher= info[].text.split()
publisher = date_and_publisher[] if len(date_and_publisher)>= else ''
# 将每本畅销书的上述信息加入到table中
table.append([rank, name, comments, author, publisher]) # 处理网页
async def download(url):
async with aiohttp.ClientSession() as session:
html = await fetch(session, url)
await parser(html) # 全部网页
urls = ['http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-recent7-0-0-1-%d'%i for i in range(,)]
# 统计该爬虫的消耗时间
print('#' * )
t1 = time.time() # 开始时间 # 利用asyncio模块进行异步IO处理
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(download(url)) for url in urls]
tasks = asyncio.gather(*tasks)
loop.run_until_complete(tasks) # 将table转化为pandas中的DataFrame并保存为CSV格式的文件
df = pd.DataFrame(table, columns=['rank', 'name', 'comments', 'author', 'publisher'])
df.to_csv('dangdang.csv', index=False) t2 = time.time() # 结束时间
print('使用aiohttp,总共耗时:%s' % (t2 - t1))
print('#' * )