【PY模块】协程 asyncio 实现异步操作

什么是协程

协程不是计算机提供的,是人为创造的(多线程、多进程就是计算机提供)

协程(Coroutine),可以认为是微线程,是一种用户态内的上下文 切换 技术,简单理解就是,遇到 IO 耗时操作时,切换其他代码块继续执行的技术。

Py 协程实现的方式

  • greenlet,早期的第三方模块
  • yield 关键字
  • asyncio装饰器(@asyncio.coroutine)【py3.4】
  • async、await 关键字【py3.5】【推荐】

异步编程

事件循环

可以理解成一个死循环,去检查并执行某些代码。

# 伪代码
任务列表=[ 任务1,任务2,任务3 ]

while True:

    # 将 '可执行' 和 '已完成' 的任务返回
    可执行的任务列表,已完成的任务列表=去任务列表中检查所有的任务 

    for 就绪任务 in 可执行的任务列表:

        '执行'已就绪的任务

    for 已完成的任务 in 已完成的任务列表:

        在任务列表中'移除' 已完成的任务

    如果 任务列表 中的任务'都已完成',则'终止'循环
import asyncio

# 生成或获取一个事件循环
loop = asyncio.get_event_loop()
# 把 任务 放入事件循环
loop.run_until_complete(task_list)

快速上手

协程函数:定义函数时async def 函数名(不是普通的函数了)

import asyncio

async def fun():
    pass

result = fun()

# 事件对象 (Py 3.7以前)【两种写法都有应用场景】
# loop = asyncio.get_event_loop()
# loop.run_until_complete(result)

# 事件对象
asyncio.run(result) # Py 3.7 以后支持

await 关键字

await 后面只可以跟(协程对象Task对象Future对象

也可以理解为 等待 耗时操作,在这个等待的 时间可以去 执行 其他任务

import asyncio
from loguru import logger

async def fun():
    logger.info(f'开始')
    await asyncio.sleep(2)
    logger.info(f'结束')

# 两个协程对象任务
tasks = [
    fun(),
    fun(),
]
asyncio.run(asyncio.wait(tasks))

常规执行两次 fun() 需要四秒,使用协程只需要 两秒

mark

Task 对象

理解:可以向事件循环添加任务的对象。

Task 用于并发调度协程,使用asyncio.create_task(协程对象,...)的方式创建 Task 对象,这样就可以加入事件循环 等待调度

还能使用 更低一级的 loop.create_task() 或者 ensure_future() 创建,不建议手动实例化 Task 函数。

import asyncio
from loguru import logger

async def fun():
    logger.info(f'开始')
    await asyncio.sleep(2)
    logger.info(f'结束')

# 执行两次 `fun()` 用时 两秒
async def main():
    task1 = asyncio.create_task(fun())
    task2 = asyncio.create_task(fun())
    await task1
    await task2

asyncio.run(main())

mark

Task 的返回值name

import asyncio
from loguru import logger

async def io():
    logger.debug('io 执行中...')
    await asyncio.sleep(2)
    logger.debug('io 操作完成...')

async def fun():
    await io()
    return True

async def main():
    tasks = [
        asyncio.create_task(fun(), name='0'),
        asyncio.create_task(fun(), name='1'),
    ]
    done, padding = await asyncio.wait(tasks, timeout=None)
    logger.info(f'done = {done}')
    logger.info(f'padding = {padding}')

if __name__ == '__main__':
    asyncio.run(main())

mark

Future 可等待对象

  • Task 继承 Future ,Task对象内部 await 结果的处理基于 Future 对象而来。

  • 使用 loop.create_future() 来创建 Future 对象。

  • Future 的特性:await future 等待 future 结果,future 没有结果则一直等待

import asyncio
from loguru import logger

async def fun(fut):
    # 设置 fut 值
    fut.set_result('xxx')

async def main():
    # 获取当前时间循环 下面的 run
    loop = asyncio.get_event_loop()
    # 创建 future 对象
    fut = loop.create_future()
    # 创建 Task 对象,通过 'fun()' 给 fut 赋值
    await asyncio.create_task(fun(fut))  # 注释掉 fut 一直等待

    # 等待 fut 结果,fut 没有结果则一直等待
    data = await fut
    logger.info(data)

asyncio.run(main())

异步迭代器

异步迭代器:实现了__aiter__()__anext__()方法的对象,必须返回一个awaitable对象。async for支持处理异步迭代器的__anext__()方法返回可等待对象,直到引发一个stopAsyncIteration异常

异步可迭代对象:可在async for语句中被使用的对象,必须通过它的__aiter__()方法返回一个asynchronous_iterator(异步迭代器)

import asyncio

class Reader(object):
    def __init__(self):
        self.count = 0

    # 返回自己
    def __aiter__(self):
        return self

    # 迭代
    async def __anext__(self):
        self.count += 1
        if self.count == 5:
            raise StopAsyncIteration  # 迭代完成
        return self.count

async def main():
    reader = Reader()
    async for item in reader:
        print(item)

if __name__ == '__main__':
    asyncio.run(main())

异步上下文管理器

上下文管理器:with open 操作 ,实现了 \_\_enter__()\_\_exit__()

异步上下文管理器:通过定义__aenter__()__aexit__()方法来对async with语句中的环境进行控制的对象。

import asyncio
from loguru import logger

class AsyncContextManager(object):

    async def do(self):
        logger.debug(f'操作数据库')

    async def __aenter__(self):
        logger.debug(f'连接数据库')
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        logger.debug(f'关闭数据库')

async def main():
    async with AsyncContextManager() as acm:
        await acm.do()

if __name__ == '__main__':
    asyncio.run(main())
发表评论 / Comment

用心评论~