Python从使用线程到使用async/await的深入讲解

时间:2022-09-10 10:55:00

前言

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法asyncawait,可以让coroutine的代码更简洁易读。

请注意,async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:

  • 把@asyncio.rotoutine替换为async;
  • 把yield from替换为await。

async/await 是一种异步变成方法,还有两种你可能听过,

     1. 回调

     2. Promise

(写过 JavaScript 的肯定很熟悉了)

异步意味着任务不会阻塞,比如,如果我要下载一个比较忙的网络资源,我的程序不需要一直等待下载完成,它可以在等待下载时继续做其他事情。这与并行执行多个操作不同。以下伪代码比较容易理解:

?
1
2
3
4
5
# 慢方法
page = get_page_sync('some_page')
 
# 会阻塞整个程序的运行
print(page)

有两种方法可以改善上述的情况

(一)首先,让我们试试使用线程。通过使用线程,我们可以将 get_page_sync 调用放到单独的线程去执行,这样主线程 就可以继续执行其他操作。

?
1
2
3
4
5
6
7
8
9
10
11
# 将慢方法放到单独的线程执行
t = threading.thread(
 target = get_page_sync('some_page',args=('some_page',))
)
t.run()
 
# 在线程运行时执行其他操作
do_something_else()
 
# 等待线程完执行成
t.join()

线程有几个优缺点,主要的缺点是:

     1. 必须在改变共享数据前锁定共享数据

     2. 只能通过传递给主线程消息来处理线程内的异常

(二)现在我们试试第二种中的 async/await,Python3.5 开始支持的 async/await 方式,与第一种(线程)之间的主要区别在于,后者是操作系统内核执行上下文切换,而前者中我们自己控制。(上下文切换即,当多个线程正在运行时,内核可能停止当前进程,使其进入休眠状态,并选择不同的线程继续执行。这被称作抢占式多任务处理【Preemption】)

当我们自己控制时,它被称作非抢占式或合作型多任务式,因为是我们自己处理上下文切换,所以我们需要一个调度程序,也叫做『事件循环』。此事件循环只循环遍历等待中的调度,并运行它的所有事件。每当我们产生操作时,当前任务会被添加到队列中,且第一个任务(优先级而非顺序)从队列中弹出并开始执行。例如,可以通过以下方式更改上述伪代码:

?
1
2
3
async def print_page():
 page = await get_page_sync('some_page')
 print(page)

当我们触发上面的语句时,get_page_async 方法将非阻塞的获取 some_page 还有 yield 句柄,这意味着我们的 print_page 函数将控制时间循环 ,并且时间循环可以继续执行其他曹组,知道我们得到返回的响应。

我们先将我们的线程代码改造成这种语法。我们将使用 asyncio(Python 自带的时间循环库),并使用 aiohttp 包来执行异步 http 请求。

我们将会创建一个名为 main 函数,它将成为我们异步代码的入口。然后我们创建一个时间循环和一个「未来对象」。这个未来对象是对异步函数的抽象,它存储了一些基本的属性,比如它当前的状态(就像 Promise 一样) 。然后我们将告诉我们的时间循环继续运行,知道这个「未来」完成。

?
1
2
3
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(main())
loop.run_until_complete(future)

在我们的 main 方法中,我们将创建另一个未来任务列表,每个任务负责从某网站下载不同的桐乡。我们这样做是因为每次下载都会发起网络请求,在网络请求时,我们可以运行另一端代码。创建任务列表后,我们可以通过调用等待整个列表执行完成 asyncio.gather ,这就是它的实现:

?
1
2
3
4
5
6
7
async def main():
 tasks = []
 async with aiohttp.ClientSession() as session:
  for img in img_list:
   task = asyncio.ensure_future(download_img(img, session))
   task.append(task)
 await asyncio.gather(*tasks)

(这段代码来的有点猛了)

最后一个我们要改的方法就是 download_img 了,我们仅仅需要替换 requests.get 调用为异步:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
i = 1
async def download_img(img, session):
 global i, bar
 
 # 获取文件后缀
 file_ext = get_extention(img.link)
 # 拼接文件名
 file_name = img.id + file_ext
 
 resp = await session.get(img.link)
 with open(file_name, 'wb') as f:
  async for chunk in resp.content.iter_chunked(1024):
   f.write(chunk)
 
 bar.update(i)
 i += 1

要注意的一点是在更新 i 的时候不需要先锁住它,这是因为我们前面说过,没有代码是同时执行的,所以永远不可能出现竞态条件。

因为没有锁或者线程的开销,异步版本可能还会比多线程版本快一些。

这是完整代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#! /usr/bin/env python
 
import os
import re
import sys
 
import aiohttp
import asyncio
import async_timeout
 
import progressbar
 
from imgurpython import ImgurClient
 
regex = re.compile(r'\.(\w+)$')
def get_extension(link):
 ext = regex.search(link).group()
 
 return ext
 
i = 1
async def download_img(img, session):
 global i, bar
 
 # get the file extension
 file_ext = get_extension(img.link)
 # create unique name by combining file id with its extension
 file_name = img.id + file_ext
 
 resp = await session.get(img.link)
 with open(file_name, 'wb') as f:
  async for chunk in resp.content.iter_chunked(1024):
   f.write(chunk)
 
 bar.update(i)
 i += 1
 
try:
 album_id = sys.argv[1]
except IndexError:
 raise Exception('Please specify an album id')
 
client_id = os.getenv('IMGUR_CLIENT_ID')
client_secret = os.getenv('IMGUR_CLIENT_SECRET')
client = ImgurClient(client_id, client_secret)
 
img_lst = client.get_album_images(album_id)
bar = progressbar.ProgressBar(max_value=len(img_lst))
 
async def main():
 tasks = []
 async with aiohttp.ClientSession() as session:
  for img in img_lst:
   task = asyncio.ensure_future(download_img(img, session))
   tasks.append(task)
 
  await asyncio.gather(*tasks)
 
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(main())
loop.run_until_complete(future)

原文:https://medium.com/@exqu17/python-bits-moving-from-threads-to-async-await-741ec5124cdc

作者:https://medium.com/@exqu17?source=post_header_lockup

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对服务器之家的支持。

原文链接:https://zhuanlan.zhihu.com/p/44661314