Python中asyncio库的基本使用

鳄鱼君

发表文章数:642

Vieu四代商业主题

高扩展、安全、稳定、响应式布局多功能模板。

¥69 现在购买
首页 » Python » Python中asyncio库的基本使用

asyncio是一个使用async / await语法编写并发代码的库。asyncio用作多个Python异步框架的基础,这些框架提供高性能的网络和Web服务器,数据库连接库,分布式任务队列等。asyncio通常非常适合IO绑定和高级 结构化网络代码。

asyncio提供了一组高级 API:

同时运行Python协同程序并完全控制它们的执行;
执行网络IO和IPC ;
控制子过程 ;
通过队列分配任务;
同步并发代码;

此外,还有一些用于库和框架开发人员的低级 API :

创建和管理事件循环,提供异步API networking,运行subprocesses,处理等;OS signals
使用传输实现有效的协议 ;
使用async / await语法桥接基于回调的库和代码。

Conroutines

使用async / await语法声明的协同程序是编写asyncio应用程序的首选方法。例如,以下代码片段(需要Python 3.7+)打印“hello”,等待1秒,然后打印“world”:

import asyncio
 
async def main():
     print('hello')
     await asyncio.sleep(1)
     print('world')
 
asyncio.run(main())
 

上面代码等同于下面(不推荐使用基于生成器的协同程序的支持,并计划在Python 3.10中删除。)

import asyncio
 
@asyncio.coroutine
def main():
     print('hello')
     yield  from asyncio.sleep(1)
     print('world')
 
asyncio.run(main())
 

asyncio实际等同于下面的工作(参数为An asyncio.Future, a coroutine or an awaitable is required)

import asyncio
 
@asyncio.coroutine
def main():
     print('hello')
     yield  from asyncio.sleep(1)
     print('world')
 
# asyncio.run(main())
loop = asyncio.events.new_event_loop()
asyncio.events.set_event_loop(loop)
loop.run_until_complete(main())
 

asyncio.run功能介绍

实际运行协程asyncio提供了三种主要机制:

1、The asyncio.run()函数来运行顶层入口点“main()”函数(见上面的例子)

2、Awaiting on a coroutine 以下代码片段将在等待1秒后打印“hello”,然后在等待另外 2秒后打印“world” :

import asyncio
import time
 
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
 
async def main():
    print(f"started at {time.strftime('%X')}")
 
    await say_after(1, 'hello')
    await say_after(2, 'world')
 
    print(f"finished at {time.strftime('%X')}")
 
asyncio.run(main())
# started at 11:54:48
# hello
# world
# finished at 11:54:51
 

3、asyncio.create_task()与asyncio同时运行协同程序的功能Tasks;让我们修改上面的例子并同时运行两个say_after协同程序 :

import asyncio
import time
 
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")
 
async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")
    # Wait until both tasks are completed (should take
    # around 2 seconds.)
 
    await task1
    await task2
    print(f"finished at {time.strftime('%X')}")
 
asyncio.run(main())
 
# started at 14:27:22
# hello at 14:27:23
# world at 14:27:24
# finished at 14:27:24
 

稍微改变一下形式,可以理解的更多

import asyncio
import time
 
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")
 
async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")
    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await asyncio.sleep(3)
 
    # await task1
    # await task2
    print(f"finished at {time.strftime('%X')}")
 
asyncio.run(main())
 
# started at 14:29:41
# hello at 14:29:42
# world at 14:29:43
# finished at 14:29:44
 

  
Awaitables

我们说如果一个对象可以在表达式中使用,那么它就是一个等待对象await。许多asyncio API旨在接受等待。

有三种主要类型的等待对象:coroutines, Tasks, and Futures.

Coroutines

Python coroutines are awaitables and therefore can be awaited from other coroutines:

import asyncio
 
async def nested():
    return 42
 
async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()
 
    # Let's do it differently now and await it:
    print(await nested())  # will print "42".
 
asyncio.run(main())
 
# 42
 

重要

在本文档中,术语“coroutine”可用于两个密切相关的概念:

一个协程功能:一个功能;async def
一个协程对象:通过调用协同程序函数返回的对象 。

Tasks

任务用于调度协同程序并发。

当一个协程被包装到一个Task中时,会像asyncio.create_task()一样 conroutine自动安排很快运行:

import asyncio
 
async def nested():
    return 42
 
async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())
 
    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task
 
asyncio.run(main())
 

Futures

A Future是一个特殊的低级别等待对象,它表示异步操作的最终结果。

当等待 Future对象时,它意味着协程将等到Future在其他地方解析。

需要asyncio中的未来对象以允许基于回调的代码与async / await一起使用。

通常,不需要在应用程序级代码中创建Future对象。

可以等待有时通过库和一些asyncio API公开的未来对象:

async def main():
    await function_that_returns_a_future_object()
 
    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )
 

返回Future对象的低级函数的一个很好的例子是loop.run_in_executor()。

Asyncio方法

1、运行asyncio程序

asyncio.run(coro,*,debug = False )

  此函数运行传递的协同程序,负责管理asyncio事件循环并最终确定异步生成器。

  当另一个asyncio事件循环在同一个线程中运行时,无法调用此函数。

  如果是debugTrue,则事件循环将以调试模式运行。

  此函数始终创建一个新的事件循环并在结束时将其关闭。它应该用作asyncio程序的主要入口点,理想情况下应该只调用一次。

  版本3.7中的新功能:重要:此功能已临时添加到Python 3.7中的asyncio中。

2、创建任务

asyncio.create_task(coro)

  将coro coroutine包装成a Task 并安排执行。返回Task对象。

  任务在返回的循环中执行,如果当前线程中没有运行循环get_running_loop(), RuntimeError则引发该任务。

  Python 3.7中添加了此功能。在Python 3.7之前,asyncio.ensure_future()可以使用低级函数:

async def coro():
    ...
 
# In Python 3.7+
task = asyncio.create_task(coro())
...
 
# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())

 
 

3、sleeping

coroutine asyncio.sleep(delay, result=None, *, loop=None)

  阻止 delay seconds.。

  如果提供了result ,则在协程完成时将其返回给调用者。

  leep() 始终挂起当前任务,允许其他任务运行。

  该loop 参数已被弃用,并定于去除在Python 3.10。

  协程示例每秒显示当前日期5秒:

import asyncio
import datetime
 
async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)
 
asyncio.run(display_date())

 

  

4、同时运行任务

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

  同时在aws 序列中运行 awaitable objects

  如果在aws中任何awaitable 是协程,它将自动安排为任务

  如果所有等待成功完成,则结果是返回值的汇总列表。结果值的顺序对应于aws中的等待顺序

  如果return_exceptions是False(默认),则第一个引发的异常会立即传播到等待的任务gather()。

  如果return_exceptions是True,异常的处理方式一样成功的结果,并在结果列表汇总。

  如果gather()被取消,所有提交的awaitables(尚未完成)也被取消。

  如果aws序列中的Task or Future被取消,则将其视为已引发CancelledError- 在这种情况下不会取消gather() 呼叫。这是为了防止取消一个提交的Tasks/Futures 以导致其他任务/期货被取消。

import asyncio
 
async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
 
async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
 
asyncio.run(main())
 
# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24
 

获取返回结果,异常情况

import asyncio
 
async def factorial(name, number):
 
    print(name)
    if name == 'A':
        return name
    elif name == 'B':
        raise SyntaxError(name)
    await asyncio.sleep(number)
 
 
async def main():
    # Schedule three calls *concurrently*:
    result = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
        return_exceptions=True
    )
    print(result)
 
asyncio.run(main())
 
# A
# B
# C
# ['A', SyntaxError('B'), None]
 

版本3.7中已更改:如果取消了聚集本身,则无论return_exceptions如何,都会传播取消。

5、Shielding From Cancellation

awaitable asyncio.shield(aw, *, loop=None)

  Protect an awaitable object from being cancelled.

  If aw is a coroutine it is automatically scheduled as a Task.

  The statement:

res = await shield(something())
 

  等同于

res = await something()
 

  除非取消包含它的协程,否则something()不会取消运行的任务。从观点来看something(),取消没有发生。虽然它的来电者仍然被取消,所以“等待”表达仍然提出了一个CancelledError。

  如果something()通过其他方式取消(即从内部取消)也会取消shield()。

  如果希望完全忽略取消(不推荐),则该shield()函数应与try / except子句结合使用,如下所示:

try:
    res = await shield(something())
except CancelledError:
    res = None
 

6、超时

coroutine asyncio.wait_for(aw, timeout, *, loop=None)

  Wait for the aw awaitable to complete with a timeout.

  If aw is a coroutine it is automatically scheduled as a Task.

  timeout可以是None或等待的float或int秒数。如果超时是None,将等到完成

  如果发生超时,它将取消任务并加注 asyncio.TimeoutError。

  要避免该任务cancellation,请将其包装shield()。

  该函数将一直等到将来实际取消,因此总等待时间可能会超过超时。

  如果等待被取消,则未来的aw也会被取消。

  该循环参数已被弃用,并定于去除在Python 3.10。

async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print(‘yay!’)

async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print(‘timeout!’)

asyncio.run(main())

# Expected output:
#
# timeout!

改变在3.7版本:当AW被取消,由于超时,wait_for等待AW被取消。以前,它asyncio.TimeoutError立即提出 。

7、超时原语

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

  同时运行aws中的等待对象 并阻塞,直到return_when指定的条件为止。

  如果在aws中任何等待的是协程,它将自动安排为任务。wait()直接传递协同程序对象 已被弃用,因为它会导致 混乱的行为。

  返回两组任务/期货:。(done, pending)

  用法:
1

done, pending = await asyncio.wait(aws)

  该循环参数已被弃用,并定于去除在Python 3.10。

  timeout(浮点数或整数),如果指定,可用于控制返回前等待的最大秒数。

  请注意,此功能不会引发asyncio.TimeoutError。超时发生时未完成的期货或任务仅在第二组中返回。

  return_when表示此函数何时返回。它必须是以下常量之一:
不变 描述
FIRST_COMPLETED 当任何未来完成或取消时,该函数将返回。
FIRST_EXCEPTION 当任何未来通过引发异常完成时,函数将返回。如果没有未来引发异常则等同于 ALL_COMPLETED。
ALL_COMPLETED 所有期货结束或取消时,该功能将返回。

  不像wait_for(),wait()当发生超时不会取消期货。   

  注意 wait()将协同程序自动调度为任务,然后在 集合中返回隐式创建的任务对象。因此,以下代码将无法按预期方式工作:(done, pending)

async def foo():
    return 42
 
coro = foo()
done, pending = await asyncio.wait({coro})
 
if coro in done:
    # This branch will never be run!

  以下是如何修复上述代码段:

async def foo():
return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
# Everything will work as expected now.

  wait()不推荐直接传递协程对象。

8、 Scheduling From Other Threads

asyncio.run_coroutine_threadsafe(coro, loop)

  将协程提交给给定的事件循环。线程安全的。

  返回a concurrent.futures.Future以等待另一个OS线程的结果。

  此函数旨在从与运行事件循环的OS线程不同的OS线程调用。例:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

  如果在协程中引发异常,则会通知返回的Future。它还可以用于取消事件循环中的任务:

try:
result = future.result(timeout)
except asyncio.TimeoutError:
print(‘The coroutine took too long, cancelling the task…’)
future.cancel()
except Exception as exc:
print(f’The coroutine raised an exception: {exc!r}’)
else:
print(f’The coroutine returned: {result!r}’)

  请参阅 文档的并发和多线程部分。

  与其他asyncio函数不同,此函数需要 显式传递循环参数。

  3.5.1版中的新功能。

9、自省

asyncio.current_task(loop = None )

  返回当前正在运行的Task实例,或者None没有正在运行的任务。

  如果loop是None get_running_loop()用来获取loop。

  版本3.7中的新功能。

asyncio.all_tasks(loop = None )

  返回Task循环运行的一组尚未完成的对象。

  如果loop是None,get_running_loop()则用于获取当前循环。

  版本3.7中的新功能。

任务对象

class asyncio.Task(coro,*,loop = None )

A Future-like object that runs a Python coroutine. Not thread-safe.

任务用于在事件循环中运行协同程序。如果一个协程在Future上等待,则Task暂停执行协程并等待Future的完成。当Future 完成后,包装协程的执行将恢复。

事件循环使用协作调度:事件循环一次运行一个任务。当一个Task等待完成Future时,事件循环运行其他任务,回调或执行IO操作。

使用高级asyncio.create_task()功能创建任务,或低级别loop.create_task()或 ensure_future()功能。不鼓励手动实例化任务。

要取消正在运行的任务,请使用该cancel()方法。调用它将导致Task将CancelledError异常抛出到包装的协同程序中。如果在取消期间协程正在等待Future对象,则Future对象将被取消。

cancelled()可用于检查任务是否被取消。True如果包装的协程没有抑制CancelledError异常并且实际上被取消,则该方法返回。

asyncio.Task继承自Future其所有API,除了Future.set_result()和 Future.set_exception()。

任务支持该contextvars模块。创建任务时,它会复制当前上下文,然后在复制的上下文中运行其协程。

版本3.7中已更改:添加了对contextvars模块的支持。

cancel()

  请求取消任务。

  这会安排CancelledError在事件循环的下一个循环中将异常抛入包装的协程。

  协程则有机会通过抑制异常与清理,甚至拒绝请求try… … … … … … 块。因此,不同于,不保证任务将被取消,尽管完全抑制取消并不常见,并且积极地不鼓励。exceptCancelledErrorfinallyFuture.cancel()Task.cancel()

  以下示例说明了协同程序如何拦截取消请求:

async def cancel_me():
print(‘cancel_me(): before sleep’)

try:
# Wait for 1 hour
await asyncio.sleep(3600)
except asyncio.CancelledError:
print(‘cancel_me(): cancel sleep’)
raise
finally:
print(‘cancel_me(): after sleep’)

async def main():
# Create a “cancel_me” Task
task = asyncio.create_task(cancel_me())

# Wait for 1 second
await asyncio.sleep(1)

task.cancel()
try:
await task
except asyncio.CancelledError:
print(“main(): cancel_me is cancelled now”)

asyncio.run(main())

# Expected output:
#
# cancel_me(): before sleep
# cancel_me(): cancel sleep
# cancel_me(): after sleep
# main(): cancel_me is cancelled now

cancelled()
  True如果任务被取消,则返回。
  请求取消时取消任务, cancel()并且包装的协同程序将CancelledError异常传播 到其中。

done()
  True如果任务完成则返回。
  一个任务完成时,包裹协程要么返回的值,引发异常,或者任务被取消。

result()
  返回任务的结果。
  如果任务完成,则返回包装协程的结果(或者如果协程引发异常,则重新引发该异常。)
  如果已取消任务,则此方法会引发CancelledError异常。
  如果Task的结果尚不可用,则此方法会引发InvalidStateError异常。

exception()
  返回Task的例外。
  如果包装的协同程序引发异常,则返回异常。如果包装的协程正常返回,则此方法返回None。
  如果已取消任务,则此方法会引发 CancelledError异常。
  如果尚未完成任务,则此方法会引发 InvalidStateError异常。

add_done_callback(回调,*,上下文=无)
  添加要在任务完成时运行的回调。
  此方法仅应在基于低级回调的代码中使用。 
  有关Future.add_done_callback() 详细信息,请参阅文档。

remove_done_callback(回调)
  从回调列表中删除回调。
  此方法仅应在基于低级回调的代码中使用。
  有关Future.remove_done_callback() 详细信息,请参阅文档。

get_stack(*,limit = None )
  返回此任务的堆栈帧列表。
  如果未完成包装的协同程序,则会返回挂起它的堆栈。如果协程已成功完成或被取消,则返回一个空列表。如果协程被异常终止,则返回回溯帧列表。
  帧始终从最旧到最新排序。
  对于挂起的协程,只返回一个堆栈帧。
  可选的limit参数设置要返回的最大帧数; 默认情况下,返回所有可用的帧。返回列表的排序取决于是返回堆栈还是返回:返回堆栈的最新帧,但返回最旧的回溯帧。(这与回溯模块的行为相匹配。)

print_stack(*,limit = None,file = None )
  打印此任务的堆栈或回溯。
  这会为检索到的帧生成类似于回溯模块的输出get_stack()。
  该极限参数传递给get_stack()直接。
  的文件参数是其中输出被写入的I / O流; 默认输出写入sys.stderr。

classmethod all_tasks(loop = None )
  返回一组事件循环的所有任务。
  默认情况下,返回当前事件循环的所有任务。如果是loopNone,则该get_event_loop()函数用于获取当前循环。
  不推荐使用此方法,将在Python 3.9中删除。请改用此asyncio.all_tasks()功能。

classmethod current_task(loop = None )
  返回当前正在运行的任务或None。
  如果是loopNone,则该get_event_loop()函数用于获取当前循环。
  不推荐使用此方法,将在Python 3.9中删除。请改用此asyncio.current_task()功能。

其他

1、async for 运用

import asyncio

class AsyncIter:
def __init__(self, items):
self.items = items

async def __aiter__(self):
for item in self.items:
await asyncio.sleep(1)
yield item

async def print_iter(things):
async for item in things:
print(item)

if __name__ == ‘__main__’:
loop = asyncio.get_event_loop()
things = AsyncIter([1, 2, 3])
loop.run_until_complete(print_iter(things))
loop.close()

未经允许不得转载:作者:鳄鱼君, 转载或复制请以 超链接形式 并注明出处 鳄鱼君
原文地址:《Python中asyncio库的基本使用》 发布于2020-03-05

分享到:
赞(0) 赏杯咖啡

评论 抢沙发

8 + 8 =


文章对你有帮助可赏作者一杯咖啡

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.6主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
切换注册

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录
切换登录

注册