Python - 协程

简介

  • 协程:微线程,底层是通过是通过生成器(generator)完成
  • 使用场景:耗时操作,如:网络请求、网络下载(爬虫)、IO(文件读写)、阻塞
  • 目的:高效利用 CPU
  • 特点:与线程相比,协程的执行效率极高,因为只有一个线程,也不存在同时写变量的冲突,在协程中共享资源不加锁,只需要判断状态

原生实现(generator)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time


def task1():
for i in range(3):
print(f'A + {i}')
yield
time.sleep(1)


def task2():
for i in range(3):
print(f'B + {i}')
yield
time.sleep(2)


if __name__ == '__main__':
g1 = task1()
g2 = task2()

while True:
try:
next(g1)
next(g2)
except:
break

简介

  • greenlets 是用于进程内顺序并发编程的轻量级协程。
  • greenlet 可以单独使用,但它们经常与 gevent 等框架一起使用,以提供更高级别的抽象和异步 I/O。

安装

1
pip install greenlet

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 使用 greenlet 完成协程任务
# https://pypi.org/project/greenlet/
# https://greenlet.readthedocs.io/en/latest/
import time

from greenlet import greenlet


def A(): # 任务A
for i in range(5):
print(f'A + {i}')
# 遇到阻塞,切换为 任务B
gb.switch()
time.sleep(0.5)


def B(): # 任务B
for i in range(5):
print(f'B + {i}')
# 遇到阻塞,切换为 任务C
gc.switch()
time.sleep(0.5)


def C(): # 任务C
for i in range(5):
print(f'C + {i}')
# 遇到阻塞,切换为 任务A
ga.switch()
time.sleep(0.5)


if __name__ == '__main__':
# 创建 greenlet 对象
ga = greenlet(A)
gb = greenlet(B)
gc = greenlet(C)
# 调用任务A
ga.switch()

简介

安装

1
pip install gevent

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""
greenlet 已经实现了协程,但是这个人工切换,是不是觉得太麻烦了,不要着急。
Python 还有一个比 greenlet 更强大的并且能够自动切换任务的模块“gevent”。
其原理是当一个 greenlet 遇到 IO(指的是 input output 输入输出,比如:网络、文件操作等)操作时,比如访问网络,就自动切换到其他 greenlet,等到IO完成,在适当的时候切换回来继续执行。

由于 IO 操作非常耗时,经常使程序处于等待状态,有了 gevent 我们自动切换协程,也就保证总有 greenlet 在运行,而不是等待 IO。

https://pypi.org/project/gevent/
http://www.gevent.org/
"""
import time

import gevent
from gevent import monkey

# 猴子补丁,自动将 time.sleep(0.5) 切换为 gevent.sleep(0.5)
monkey.patch_all()


def A(): # 任务A
for i in range(5):
print(f'A + {i}')
time.sleep(0.5)


def B(): # 任务B
for i in range(5):
print(f'B + {i}')
time.sleep(0.5)


def C(): # 任务C
for i in range(5):
print(f'C + {i}')
time.sleep(0.5)


if __name__ == '__main__':
# 创建 gevent 对象
ga = gevent.spawn(A)
gb = gevent.spawn(B)
gc = gevent.spawn(C)

ga.join()
gb.join()
gc.join()

案列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 案例
import gevent
from gevent import monkey
import urllib.request

monkey.patch_all()


def download(url):
response = urllib.request.urlopen(url)
content = response.read()
print(f'下载了 {url} 的数据,长度:{len(content)}')


if __name__ == '__main__':
urls = [
'https://www.163.com/',
'https://im.qq.com/index',
'https://www.baidu.com/'
]
g1 = gevent.spawn(download, urls[0])
g2 = gevent.spawn(download, urls[1])
g3 = gevent.spawn(download, urls[2])

# 类似 g1.join()
gevent.joinall([g1, g2, g3])

1
2
3
下载了 https://www.163.com/ 的数据,长度:611738
下载了 https://www.baidu.com/ 的数据,长度:227
下载了 https://im.qq.com/index 的数据,长度:15990

运行结果

简介

  • asyncio 是用来编写 并发 代码的库,使用 async/await 语法。
  • asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
  • asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。
  • asyncio 是 Python 3.4 版本引入的标准库,直接内置了对异步 IO 的支持。
  • 为了简化并更好地标识异步 IO,从 Python 3.5 开始引入了新的语法 asyncawait ,可以让 coroutine 的代码更简洁易读。

概念

  • 事件循环:可以理解为一个死循环,去检测并执行某些代码。
  • 协程函数:定义形式为 async def 的函数。
  • 协程对象:调用 协程函数 所返回的对象。

注意

  • 使用协程函数创建协程对象时,协程函数内部的代码不会执行,如果想要运行协程内部的的代码,必须要将协程对象交给 事件循环 来处理。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio


# 创建“协程函数”
async def func():
print('协程函数')


# 创建“协程对象”
result = func()

# 创建“事件循环”对象
# loop = asyncio.get_event_loop()
# 执行“事件循环”对象
# loop.run_until_complete(result)

# Python 3.7版本中引入的语法
asyncio.run(result)

async/await

  • 简介

    • 协程 通过 async/await 语法进行声明,是编写 asyncio 应用的推荐方式。
    • 如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。
    • await + 可等待对象(协程对象、Future、Task 对象 -> IO 等待)
  • 示例 1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    import asyncio


    async def func():
    print('hello')
    await asyncio.sleep(2)
    print('world')


    asyncio.run(func())

  • 示例 2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    import asyncio


    async def others():
    print('start')
    await asyncio.sleep(2)
    print('end')
    return '返回值'


    async def func():
    print('执行协程函数内部代码')
    # 遇到 IO 操作挂起当前协程(任务),等 IO 操作完成之后再继续往下执行。
    # 当前协程挂起时,事件循环可以去执行其他协程(任务)。
    response = await others()
    print(f'IO请求结束,结果为:{response}')


    asyncio.run(func())

  • 示例 3

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import asyncio


    async def others():
    print('start')
    await asyncio.sleep(2)
    print('end')
    return '返回值'


    async def func():
    print('执行协程函数内部代码')
    # 遇到 IO 操作挂起当前协程(任务),等 IO 操作完成之后再继续往下执行。
    # 当前协程挂起时,事件循环可以去执行其他协程(任务)。
    response1 = await others()
    print(f'response1请求结束,结果为:{response1}')

    response2 = await others()
    print(f'response2 请求结束,结果为:{response2}')


    asyncio.run(func())

    await 就是等待对象的值得到结果之后再继续向下执行。

Task 对象

  • 简介

    • Tasks 被用来 “并行的” 调度协程。
    • 当一个协程通过 asyncio.create_task() 等函数被封装为一个 Task,该协程会被自动调度执行,也可用低层级的 loop.create_task()ensure_future() 函数。不建议手动实例化 Task 对象。
    • 注意:此函数 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。
  • 示例 1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import asyncio


    async def func():
    print(1)
    await asyncio.sleep(1)
    print(2)
    return '返回值'


    async def main():
    print('main函数开始执行')
    # 创建Task对象,将当前执行的func函数添加到事件循环
    task1 = asyncio.create_task(func())
    task2 = asyncio.create_task(func())
    print('main函数执行结束')

    # 当执行某些协程遇到IO操作时,会自动切换执行其他任务
    # 此处的await是等待相对应的协程全部执行完毕并获取结果
    ret1 = await task1
    ret2 = await task2
    print(ret1, ret2)


    asyncio.run(main())

  • 示例 2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    import asyncio


    async def func():
    print(1)
    await asyncio.sleep(1)
    print(2)
    return '返回值'


    async def main():
    print('main函数开始执行')
    # 创建Task对象,将当前执行的func函数添加到事件循环
    task_list = [
    asyncio.create_task(func(), name='n1'),
    asyncio.create_task(func(), name='n2')
    ]
    print('main函数执行结束')

    # 简单等待
    # https://docs.python.org/zh-cn/3/library/asyncio-task.html#waiting-primitives
    done, pending = await asyncio.wait(task_list, timeout=None)
    print(done)
    print(pending)


    asyncio.run(main())

  • 示例 3

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import asyncio


    async def func():
    print(1)
    await asyncio.sleep(1)
    print(2)
    return '返回值'


    task_list = [
    func(),
    func()
    ]

    done, pending = asyncio.run(asyncio.wait(task_list))
    print(done)
    print(pending)

asyncio.Future 对象

  • 简介

    • Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果
    • Task 继承自 Future,Task 对象内部 await 结果的处理基于 Future 对象。
    • 通常情况下 没有必要 在应用层级的代码中创建 Future 对象。
  • 示例 1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    import asyncio


    async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    # 创建一个任务(Future对象),这个任务什么都不干
    fut = loop.create_future()
    # 等待任务最终结果(future对象),没有结果则会一直等下去
    await fut


    asyncio.run(main())

  • 示例 2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import asyncio


    async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result('666')


    async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    # 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束
    fut = loop.create_future()
    # 创建一个任务(Task对象),绑定set_after函数,函数内部在2s之后,会给fut赋值
    # 即手动设置future任务的最终结果,那么fut就可以结束了
    await loop.create_task(set_after(fut))
    # 等待future对象获取最终结果,否则一直等下去
    data = await fut
    print(data)


    asyncio.run(main())

concurrent.futures 对象

  • 简介

  • 示例 1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import time
    from concurrent.futures import Future
    from concurrent.futures.thread import ThreadPoolExecutor
    from concurrent.futures.process import ProcessPoolExecutor


    def func(value):
    time.sleep(1)
    print(value)
    return value


    # 创建线程池
    pool = ThreadPoolExecutor(max_workers=5)
    # 创建进程池
    # pool = ProcessPoolExecutor(max_workers=5)

    for i in range(10):
    fut = pool.submit(func, i)
    print(fut)

    以后写代码可能会存在交叉使用,例如:crm 项目 80% 都是基于协程异步编程 + MySQL(不支持)【线程、进程做异步编程】

  • 示例 2:交叉使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    import time
    import asyncio
    import concurrent.futures


    def func1():
    # 某个耗时操作
    time.sleep(2)
    return '天秀'


    async def main():
    loop = asyncio.get_running_loop()

    # 1. Run in the default loop's executor (默认ThreadPoolExecutor )
    # 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行func1函数,并返回一个 concurrent.futures.Future 对象
    # 第二步:调用 asyncio.wrap_future 将 concurrent.futures.Future 对象包装为 asyncio.Future 对象。
    # 因为 concurrent.futures.Future 对象不支持 await 语法,所以需要包装为 asyncio.Future 对象才能使用。
    fut = loop.run_in_executor(None, func1)
    result = await fut
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    # with concurrent.futures.ThreadPoolExecutor() as pool:
    # result = await loop.run_in_executor(pool, func1)
    # print('custom thread pool', result)

    # 3. Run in a custom process poo1:
    # with concurrent.futures.ProcessPoolExecutor() as pool:
    # result = await loop.run_in_executor(pool, func1)
    # print('custom process poo1', result)

  • 案列:asyncio + 不支持异步的模块

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    import asyncio
    import requests


    async def download_image(url):
    # 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动切换到其它任务)
    print(f'开始下载:{url}')

    # 创建“事件循环”对象
    loop = asyncio.get_event_loop()
    # requests 模块不支持异步操作,所以就使用线程池来配合实现了
    future = loop.run_in_executor(None, requests.get, url)

    response = await future
    print('下载完成')
    # 图片保存到本地
    file_name = url.rsplit('/')[-1]
    with open(file_name, 'wb') as file_object:
    file_object.write(response.content)


    if __name__ == '__main__':
    url_list = [
    'https://img.lianzhixiu.com/uploads/allimg/202007/9999/rn27394522a3.jpg',
    'https://img.lianzhixiu.com/uploads/allimg/202102/9999/rnd6d6f2b5b7.jpg',
    'https://img.lianzhixiu.com/uploads/allimg/202106/9999/rn034f9e359f.jpg'
    ]

    tasks = [download_image(url) for url in url_list]
    # 创建“事件循环”对象
    loop = asyncio.get_event_loop()
    # 执行“事件循环”对象
    loop.run_until_complete(asyncio.wait(tasks))

异步迭代器

  • 简介

  • 示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    import asyncio


    class Reader(object):
    """自定义异步迭代器(同时也是异步可迭代对象)"""

    def __init__(self):
    self.count = 0

    async def readline(self):
    # await asyncio.sleep(1)
    self.count += 1
    if self.count == 100:
    return None
    return self.count

    def __aiter__(self):
    return self

    async def __anext__(self):
    val = await self.readline()
    if val == None:
    raise StopAsyncIteration
    return val


    async def func():
    obj = Reader()
    async for item in obj:
    print(item)


    asyncio.run(func())

异步上下文管理器

  • 简介

    • 此种对象通过定义 __aenter__()__aexit__() 方法来对 async with 语句中的环境进行控制。
    • 异步上下文管理器上下文管理器 的一种,它能够在其 __aenter____aexit__ 方法中暂停执行。
    • 异步上下文管理器可在 async with 语句中使用。
  • 示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    import asyncio


    class AsyncContextManager:
    def __init__(self):
    self.conn = conn

    async def do_something(self):
    # 异步操作数据库
    return 666

    async def __aenter__(self):
    # 异步链接数据库
    self.conn = await asyncio.sleep(1)
    return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
    # 异步关闭数据库
    await asyncio.sleep(1)


    async def func():
    async with AsyncContextManager() as f:
    result = f.do_something()
    print(result)


    asyncio.run(func())

简介

  • uvloop 是内置 asyncio 事件循环的快速、直接的替代品。uvloop 在 Cython 中实现,并在后台使用 libuv。
  • uvloop 和 asyncio 与 Python 3.5 中的 async/await 的强大功能相结合,使得在 Python 中编写高性能网络代码比以往任何时候都更加容易。
  • uvloop 使异步速度更快。事实上,它至少比 nodejs、gevent 以及任何其他 Python 异步框架快 2 倍。基于 uvloop 的 asyncio 的性能接近 Go 程序。
  • 注意:uvloop 目前不支持 Windows。
  • uvicorn 的内部使用的就是 uvloop。

安装

1
pip install uvloop

使用

  • 要让 asyncio 使用 uvloop 提供的事件循环,需要安装 uvloop 事件循环策略

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import asyncio
    import uvloop

    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

    # 编写asyncio的代码,于之前写的代码一样
    # 内部的事件循环会自动变成uvloop

    asyncio.run(func())
  • 或者,你也可以手动创建一个循环实例,使用:

    1
    2
    3
    4
    5
    import asyncio
    import uvloop

    loop = uvloop.new_event_loop()
    asyncio.set_event_loop(loop)

实战案例

异步 Redis

  • 简介

    在使用 Python 代码操作 Redis 时,连接、操作、断开都是网络 IO。

  • 安装

    1
    pip install aioredis
  • 示例 1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import asyncio

    import aioredis


    async def execute(uri, password, db):
    print(f'开始执行:{uri}')
    # 网络IO操作:创建Redis链接
    redis = await aioredis.from_url(uri, password=password, db=db, decode_responses=True)

    # 网络IO操作:
    # 在Redis中设置哈希值car,内部再设置三个键值对,即:
    # redis = { car: { key1: 1, key2: 2, key3: 3 } }
    await redis.hset('car', mapping={'key1': 1, 'key2': 2, 'key3': 3})

    # 网络IO操作:去Redis中获取值
    result = await redis.hgetall('car')
    print(result)

    # 网络IO操作:关闭Redis连接
    await redis.close()
    print(f'执行结束:{uri}')


    asyncio.run(execute('redis://host:6379', 'password', 1))

  • 示例 2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    import asyncio

    import aioredis


    async def execute(uri, password, db):
    print(f'开始执行:{uri}')

    # 网络IO操作:先去连接redis://host1:6379,遇到IO则自动切换任务,去连接redis://host2:6379
    redis = await aioredis.from_url(uri, password=password, db=db, decode_responses=True)

    # 网络IO操作:遇到IO自动切换任务
    await redis.hset('car', mapping={'key1': 1, 'key2': 2, 'key3': 3})

    # 网络IO操作:遇到IO自动切换任务
    result = await redis.hgetall('car')
    print(result)

    # 网络IO操作:遇到IO自动切换任务
    await redis.close()
    print(f'执行结束:{uri}')


    task_list = [
    execute('redis://host1:6379', 'password1', 1),
    execute('redis://host2:6379', 'password2', 2)
    ]

    asyncio.run(asyncio.wait(task_list))

  • 注意

    默认情况下, aioredis 的大多数命令将返回 bytes (字节)。使用 decode_responses=True 命令可以自动将返回的结果转为字符串

异步 MySQL

  • 简介

    • aiomysql 是用于从 asyncio (PEP-3156/tulip) 框架访问 MySQL 数据库的 “驱动程序” 。它依赖并重用 PyMySQL 的大部分部分。aiomysql 试图成为很棒的 aiopg 库并保留相同的 api、外观和感觉。
    • aiomysql 内部是 PyMySQL 的拷贝,底层 io 调用切换到 async,基本上是 yield fromasyncio.coroutine 添加在适当的位置))。从 aiopg 移植的 Sqlalchemy 支持。
  • 安装

    1
    pip install aiomysql
  • 示例 1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    import asyncio

    import aiomysql


    async def execute():
    # 网络IO操作:连接MySQL
    conn = await aiomysql.connect(
    host='host',
    port=3306,
    user='user',
    password='password',
    db='mysql'
    )

    # 网络IO操作:创建cursor
    cur = await conn.cursor()

    # 网络IO操作:执行SQL
    await cur.execute('select Host, User from user')

    # 网络IO操作:获取SQL结果
    result = await cur.fetchall()
    print(result)

    # 网络IO操作:关闭连接
    await cur.close()
    conn.close()


    asyncio.run(execute())

  • 示例 2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    import asyncio

    import aiomysql


    async def execute(host, password):
    print(f'开始执行:{host}')
    # 网络IO操作:先去连接host1,遇到IO则自动切换任务,去连接host2
    conn = await aiomysql.connect(
    host=host,
    port=3306,
    user='root',
    password=password,
    db='mysql'
    )

    # 网络IO操作:遇到IO会自动切换任务
    cur = await conn.cursor()

    # 网络IO操作:遇到IO会自动切换任务
    await cur.execute('select Host, User from user')

    # 网络IO操作:遇到IO会自动切换任务
    result = await cur.fetchall()
    print(result)

    # 网络IO操作:遇到IO会自动切换任务
    await cur.close()
    conn.close()
    print(f'执行结束:{host}')


    task_list = [
    execute('host1', 'password'),
    execute('host2', 'password')
    ]

    asyncio.run(asyncio.wait(task_list))

异步爬虫

  • 简介

  • 安装

    1
    pip install aiohttp
  • 示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    import asyncio

    import aiohttp


    async def fetch(session, url):
    print(f'发送请求:{url}')
    async with session.get(url, verify_ssl=False) as response:
    text = await response.text()
    print(f'得到结果:{url},长度:{len(text)}')
    return text


    async def main():
    async with aiohttp.ClientSession() as session:
    url_list = [
    'https://www.python.org/',
    'https://www.baidu.com/',
    'https://gitee.com/'
    ]
    tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
    done, pending = await asyncio.wait(tasks)
    print(done)
    print(pending)


    if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())