Python-协程
简介
- 协程:微线程,底层是通过是通过生成器(generator)完成
- 使用场景:耗时操作,如:网络请求、网络下载(爬虫)、IO(文件读写)、阻塞
- 目的:高效利用CPU
- 特点:与线程相比,协程的执行效率极高,因为只有一个线程,也不存在同时写变量的冲突,在协程中共享资源不加锁,只需要判断状态
原生实现(generator)
1 | import time |
简介
- greenlets 是用于进程内顺序并发编程的轻量级协程。
- greenlet 可以单独使用,但它们经常与gevent等框架一起使用,以提供更高级别的抽象和异步 I/O。
安装
1 | pip install greenlet |
示例
1 | # 使用 greenlet 完成协程任务 |
简介
- gevent是一个基于协程的Python网络库,它使用greenlet在libev或libuv事件循环之上提供一个高级的同步API。
- 主要特点:
- 基于libev或libuv的快速事件循环。
- 基于greenlets的轻量级执行单元。
- API重用 Python 标准库中的概念(例如,事件和 队列)。
- 支持 SSL 的协作套接字
- 通过线程池、dnspython 或 c-ares 执行的协作 DNS 查询。
- 猴子修补实用程序使第三方模块变得合作
- TCP/UDP/HTTP 服务器
- 子进程支持(通过gevent.subprocess)
- 线程池
安装
1 | pip install gevent |
示例
1 | """ |
案列
1 | # 案例 |
1 | 下载了 https://www.163.com/ 的数据,长度:611738 |
运行结果
简介
- asyncio 是用来编写 并发 代码的库,使用 async/await 语法。
- asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
- asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。
asyncio
是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。- 为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法
async
和await
,可以让coroutine的代码更简洁易读。
概念
- 事件循环:可以理解为一个死循环,去检测并执行某些代码。
- 协程函数:定义形式为
async def
的函数。 - 协程对象:调用 协程函数 所返回的对象。
注意
- 使用协程函数创建协程对象时,协程函数内部的代码不会执行,如果想要运行协程内部的的代码,必须要将协程对象交给 事件循环 来处理。
示例
1 | import asyncio |
async/await
-
简介
-
示例1
1
2
3
4
5
6
7
8
9
10
11import asyncio
async def func():
print('hello')
await asyncio.sleep(2)
print('world')
asyncio.run(func()) -
示例2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import asyncio
async def others():
print('start')
await asyncio.sleep(2)
print('end')
return '返回值'
async def func():
print('执行协程函数内部代码')
# 遇到 IO 操作挂起当前协程(任务),等 IO 操作完成之后再继续往下执行。
# 当前协程挂起时,事件循环可以去执行其他协程(任务)。
response = await others()
print(f'IO请求结束,结果为:{response}')
asyncio.run(func()) -
示例3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import asyncio
async def others():
print('start')
await asyncio.sleep(2)
print('end')
return '返回值'
async def func():
print('执行协程函数内部代码')
# 遇到 IO 操作挂起当前协程(任务),等 IO 操作完成之后再继续往下执行。
# 当前协程挂起时,事件循环可以去执行其他协程(任务)。
response1 = await others()
print(f'response1请求结束,结果为:{response1}')
response2 = await others()
print(f'response2 请求结束,结果为:{response2}')
asyncio.run(func())await 就是等待对象的值得到结果之后再继续向下执行。
Task 对象
-
简介
- Tasks被用来“并行的”调度协程。
- 当一个协程通过
asyncio.create_task()
等函数被封装为一个 Task,该协程会被自动调度执行,也可用低层级的loop.create_task()
或ensure_future()
函数。不建议手动实例化 Task 对象。 - 注意:此函数 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的
asyncio.ensure_future()
函数。
-
示例1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import asyncio
async def func():
print(1)
await asyncio.sleep(1)
print(2)
return '返回值'
async def main():
print('main函数开始执行')
# 创建Task对象,将当前执行的func函数添加到事件循环
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(func())
print('main函数执行结束')
# 当执行某些协程遇到IO操作时,会自动切换执行其他任务
# 此处的await是等待相对应的协程全部执行完毕并获取结果
ret1 = await task1
ret2 = await task2
print(ret1, ret2)
asyncio.run(main()) -
示例2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28import asyncio
async def func():
print(1)
await asyncio.sleep(1)
print(2)
return '返回值'
async def main():
print('main函数开始执行')
# 创建Task对象,将当前执行的func函数添加到事件循环
task_list = [
asyncio.create_task(func(), name='n1'),
asyncio.create_task(func(), name='n2')
]
print('main函数执行结束')
# 简单等待
# https://docs.python.org/zh-cn/3/library/asyncio-task.html#waiting-primitives
done, pending = await asyncio.wait(task_list, timeout=None)
print(done)
print(pending)
asyncio.run(main()) -
示例3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19import asyncio
async def func():
print(1)
await asyncio.sleep(1)
print(2)
return '返回值'
task_list = [
func(),
func()
]
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)
print(pending)
asyncio.Future 对象
-
简介
Future
是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。- Task 继承自 Future,Task 对象内部 await 结果的处理基于 Future 对象。
- 通常情况下 没有必要 在应用层级的代码中创建 Future 对象。
-
示例1
1
2
3
4
5
6
7
8
9
10
11
12
13
14import asyncio
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),这个任务什么都不干
fut = loop.create_future()
# 等待任务最终结果(future对象),没有结果则会一直等下去
await fut
asyncio.run(main()) -
示例2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import asyncio
async def set_after(fut):
await asyncio.sleep(2)
fut.set_result('666')
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束
fut = loop.create_future()
# 创建一个任务(Task对象),绑定set_after函数,函数内部在2s之后,会给fut赋值
# 即手动设置future任务的最终结果,那么fut就可以结束了
await loop.create_task(set_after(fut))
# 等待future对象获取最终结果,否则一直等下去
data = await fut
print(data)
asyncio.run(main())
concurrent.futures 对象
-
简介
concurrent.futures
模块提供异步执行可调用对象高层接口。- 异步执行可以由
ThreadPoolExecutor
使用线程或由ProcessPoolExecutor
使用单独的进程来实现。 两者都是实现抽像类Executor
定义的接口。 - 使用线程池、进程池实现异步操作时用到的对象。
-
示例1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(1)
print(value)
return value
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
fut = pool.submit(func, i)
print(fut)以后写代码可能会存在交叉使用,例如:crm项目80%都是基于协程异步编程 + MySQL(不支持)【线程、进程做异步编程】
-
示例2:交叉使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32import time
import asyncio
import concurrent.futures
def func1():
# 某个耗时操作
time.sleep(2)
return '天秀'
async def main():
loop = asyncio.get_running_loop()
# 1. Run in the default loop's executor (默认ThreadPoolExecutor )
# 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行func1函数,并返回一个 concurrent.futures.Future 对象
# 第二步:调用 asyncio.wrap_future 将 concurrent.futures.Future 对象包装为 asyncio.Future 对象。
# 因为 concurrent.futures.Future 对象不支持 await 语法,所以需要包装为 asyncio.Future 对象才能使用。
fut = loop.run_in_executor(None, func1)
result = await fut
print('default thread pool', result)
# 2. Run in a custom thread pool:
# with concurrent.futures.ThreadPoolExecutor() as pool:
# result = await loop.run_in_executor(pool, func1)
# print('custom thread pool', result)
# 3. Run in a custom process poo1:
# with concurrent.futures.ProcessPoolExecutor() as pool:
# result = await loop.run_in_executor(pool, func1)
# print('custom process poo1', result) -
案列:asyncio + 不支持异步的模块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34import asyncio
import requests
async def download_image(url):
# 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动切换到其它任务)
print(f'开始下载:{url}')
# 创建“事件循环”对象
loop = asyncio.get_event_loop()
# requests 模块不支持异步操作,所以就使用线程池来配合实现了
future = loop.run_in_executor(None, requests.get, url)
response = await future
print('下载完成')
# 图片保存到本地
file_name = url.rsplit('/')[-1]
with open(file_name, 'wb') as file_object:
file_object.write(response.content)
if __name__ == '__main__':
url_list = [
'https://img.lianzhixiu.com/uploads/allimg/202007/9999/rn27394522a3.jpg',
'https://img.lianzhixiu.com/uploads/allimg/202102/9999/rnd6d6f2b5b7.jpg',
'https://img.lianzhixiu.com/uploads/allimg/202106/9999/rn034f9e359f.jpg'
]
tasks = [download_image(url) for url in url_list]
# 创建“事件循环”对象
loop = asyncio.get_event_loop()
# 执行“事件循环”对象
loop.run_until_complete(asyncio.wait(tasks))
异步迭代器
-
简介
- 什么是异步迭代器?
实现了__aiter__()
和__anext__()
方法的对象。__anext__
必须返回一个 awaitable 对象。async for
会处理异步迭代器的__anext__()
方法所返回的可等待对象,直到其引发一个StopAsyncIteration
异常。 - 什么是异步可迭代对象?
可在async for
语句中被使用的对象。必须通过它的__aiter__()
方法返回一个 asynchronous iterator。
- 什么是异步迭代器?
-
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34import asyncio
class Reader(object):
"""自定义异步迭代器(同时也是异步可迭代对象)"""
def __init__(self):
self.count = 0
async def readline(self):
# await asyncio.sleep(1)
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val == None:
raise StopAsyncIteration
return val
async def func():
obj = Reader()
async for item in obj:
print(item)
asyncio.run(func())
异步上下文管理器
-
简介
- 此种对象通过定义
__aenter__()
和__aexit__()
方法来对async with
语句中的环境进行控制。 - 异步上下文管理器 是 上下文管理器 的一种,它能够在其
__aenter__
和__aexit__
方法中暂停执行。 - 异步上下文管理器可在
async with
语句中使用。
- 此种对象通过定义
-
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29import asyncio
class AsyncContextManager:
def __init__(self):
self.conn = conn
async def do_something(self):
# 异步操作数据库
return 666
async def __aenter__(self):
# 异步链接数据库
self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 异步关闭数据库
await asyncio.sleep(1)
async def func():
async with AsyncContextManager() as f:
result = f.do_something()
print(result)
asyncio.run(func())
简介
- uvloop 是内置 asyncio 事件循环的快速、直接的替代品。uvloop 在 Cython 中实现,并在后台使用 libuv。
- uvloop 和 asyncio 与 Python 3.5 中的 async/await 的强大功能相结合,使得在 Python 中编写高性能网络代码比以往任何时候都更加容易。
- uvloop 使异步速度更快。事实上,它至少比 nodejs、gevent 以及任何其他 Python 异步框架快 2 倍。基于 uvloop 的 asyncio 的性能接近 Go 程序。
- 注意:uvloop目前不支持Windows。
- uvicorn 的内部使用的就是 uvloop。
安装
1 | pip install uvloop |
使用
-
要让 asyncio 使用 uvloop 提供的事件循环,需要安装 uvloop 事件循环策略
1
2
3
4
5
6
7
8
9import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 编写asyncio的代码,于之前写的代码一样
# 内部的事件循环会自动变成uvloop
asyncio.run(func()) -
或者,你也可以手动创建一个循环实例,使用:
1
2
3
4
5import asyncio
import uvloop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
实战案例
异步Redis
-
简介
在使用 Python 代码操作 Redis 时,连接、操作、断开都是网络IO。
-
安装
1
pip install aioredis
-
示例1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import asyncio
import aioredis
async def execute(uri, password, db):
print(f'开始执行:{uri}')
# 网络IO操作:创建Redis链接
redis = await aioredis.from_url(uri, password=password, db=db, decode_responses=True)
# 网络IO操作:
# 在Redis中设置哈希值car,内部再设置三个键值对,即:
# redis = { car: { key1: 1, key2: 2, key3: 3 } }
await redis.hset('car', mapping={'key1': 1, 'key2': 2, 'key3': 3})
# 网络IO操作:去Redis中获取值
result = await redis.hgetall('car')
print(result)
# 网络IO操作:关闭Redis连接
await redis.close()
print(f'执行结束:{uri}')
asyncio.run(execute('redis://host:6379', 'password', 1)) -
示例2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30import asyncio
import aioredis
async def execute(uri, password, db):
print(f'开始执行:{uri}')
# 网络IO操作:先去连接redis://host1:6379,遇到IO则自动切换任务,去连接redis://host2:6379
redis = await aioredis.from_url(uri, password=password, db=db, decode_responses=True)
# 网络IO操作:遇到IO自动切换任务
await redis.hset('car', mapping={'key1': 1, 'key2': 2, 'key3': 3})
# 网络IO操作:遇到IO自动切换任务
result = await redis.hgetall('car')
print(result)
# 网络IO操作:遇到IO自动切换任务
await redis.close()
print(f'执行结束:{uri}')
task_list = [
execute('redis://host1:6379', 'password1', 1),
execute('redis://host2:6379', 'password2', 2)
]
asyncio.run(asyncio.wait(task_list)) -
注意
默认情况下,
aioredis
的大多数命令将返回bytes
(字节)。使用decode_responses=True
命令可以自动将返回的结果转为字符串。
异步MySQL
-
简介
-
安装
1
pip install aiomysql
-
示例1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32import asyncio
import aiomysql
async def execute():
# 网络IO操作:连接MySQL
conn = await aiomysql.connect(
host='host',
port=3306,
user='user',
password='password',
db='mysql'
)
# 网络IO操作:创建cursor
cur = await conn.cursor()
# 网络IO操作:执行SQL
await cur.execute('select Host, User from user')
# 网络IO操作:获取SQL结果
result = await cur.fetchall()
print(result)
# 网络IO操作:关闭连接
await cur.close()
conn.close()
asyncio.run(execute()) -
示例2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39import asyncio
import aiomysql
async def execute(host, password):
print(f'开始执行:{host}')
# 网络IO操作:先去连接host1,遇到IO则自动切换任务,去连接host2
conn = await aiomysql.connect(
host=host,
port=3306,
user='root',
password=password,
db='mysql'
)
# 网络IO操作:遇到IO会自动切换任务
cur = await conn.cursor()
# 网络IO操作:遇到IO会自动切换任务
await cur.execute('select Host, User from user')
# 网络IO操作:遇到IO会自动切换任务
result = await cur.fetchall()
print(result)
# 网络IO操作:遇到IO会自动切换任务
await cur.close()
conn.close()
print(f'执行结束:{host}')
task_list = [
execute('host1', 'password'),
execute('host2', 'password')
]
asyncio.run(asyncio.wait(task_list))
异步爬虫
-
简介
- 异步HTTP客户端/服务器用于 asyncio 和 Python。
- 支持 Client 和 HTTP Server。
- 支持开箱即用的服务器 WebSocket和 客户端 WebSocket ,无需回调地狱。
- Web-server 有 Middlewares、Signals 和可插入的路由。
-
安装
1
pip install aiohttp
-
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30import asyncio
import aiohttp
async def fetch(session, url):
print(f'发送请求:{url}')
async with session.get(url, verify_ssl=False) as response:
text = await response.text()
print(f'得到结果:{url},长度:{len(text)}')
return text
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www.python.org/',
'https://www.baidu.com/',
'https://gitee.com/'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
done, pending = await asyncio.wait(tasks)
print(done)
print(pending)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())