Bozo Kopic - Articles

2023-05-23 16:45:00 UTC
2023-06-01 20:20:00 UTC

Beware of asyncio.wait_for

Python asyncio library provides event loop implementation with coroutine based interface. Usage of this library greatly improves development process involved in structuring applications with concurrent task executions. Nevertheless, this kind of problems require deep understanding of underling concepts, even if they are wrapped in user-friendly interface. Lack in understanding of implementation and interface can lead to hard-to-detect bugs.

This article observes behavior of asyncio.wait_for implementation and identifies some of unexpected edge cases. Understanding of basic asyncio concepts, such as coroutines, tasks and futures, is assumed (see Coroutines and Tasks).

Introduction

wait_for is one of basic asyncio utility functions which enables cancellation of task/future based on elapsed time. It accepts single awaitable object and timeout. If provided awaitable object is coroutine, new task is created and coroutine execution is scheduled.

By accepting any kind of coroutine, wait_for can be used as generic timeout utility. Individual coroutine implementations do not have to provide timeout arguments and implement additional timeout logic. Responsibility of timeout functionality is delegated to code calling coroutine which should be canceled based on timeout. Because of this inversion of responsibility, execution timeout can be applied even to those coroutines which are not initially written with timeout operation in mind.

asyncio provides mechanism for task cancellation based on exception propagation. This generic mechanism enables cancellation of any kind of tasks as long as all executing coroutines propagate asyncio.CancelledError. If any coroutine fails to propagate this exception, task cancellation will fail and often result in unwanted behavior.

Because wait_for is basic function widely used by other coroutines, it is reasonable to expect that it will always successfully propagate CancelledError and therefore support correct cancellation. Nevertheless, this is not always the case. Following examples explore conditions when wait_for stops cancellation propagation.

Simple producer/consumer

To help us in identifying this edge-cases, we will use simple producer/consumer model where synchronization between producer and consumer is based on asyncio.Queue.

Producer is modeled with coroutine which adds new entries to queue at regular intervals:

async def produce(queue: asyncio.Queue):
    try:
        for i in itertools.count(1):
            queue.put_nowait(i)
            await asyncio.sleep(1)

    finally:
        print('closing produce')

Consumer is modeled with coroutine which waits for new entries. Once entry is available in queue, consumer will print entry to standard output and continue waiting for new entries indefinitely:

async def consume(queue: asyncio.Queue):
    try:
        while True:
            result = await queue.get()
            print(result)

    finally:
        print('closing consume')

Additional "work" is represented with coroutine which sleeps based on provided delay:

async def other_work(delay: float):
    await asyncio.sleep(delay)

Producer and consumer are run as new tasks which are cancelled after additional work is done:

queue = asyncio.Queue()

producer = asyncio.create_task(produce(queue))
consumer = asyncio.create_task(consume(queue))

await other_work(2.5)

producer.cancel()
consumer.cancel()

with contextlib.suppress(asyncio.CancelledError):
    await producer

with contextlib.suppress(asyncio.CancelledError):
    await consumer

By running this code, we can expect:

1
2
3
closing produce
closing consume

Example 1 source code

Consumer with wait_for

To introduce wait_for, we can change consume from previous example with:

async def consume(queue: asyncio.Queue):
    try:
        while True:
            try:
                result = await asyncio.wait_for(queue.get(), timeout=0.5)
                print(result)

            except asyncio.TimeoutError:
                print('timeout')

    finally:
        print('closing consume')

New implementation of consume waits for queued entries with provided timeout. If timeout occurs, timeout is printed to standard output and loop starts from beginning.

Running this example will result in:

1
timeout
2
timeout
3
closing produce
closing consume

Example 2 source code

wait_for ignoring cancellation

In previous example, if we change other_work's delay to 0:

await other_work(0)

unexpected result occurs:

closing produce
1
timeout
timeout
timeout
timeout
...

Execution of this example newer finishes because consumer is not successfully canceled. Because wait_for is only coroutine awaited in consume, we can assume that wait_for did not propagate CancelledError.

Example 3 source code

Focusing on consumer

To focus only on consumer, we can skip producer's task creation:

queue = asyncio.Queue()

consumer = asyncio.create_task(consume(queue))

await other_work(0)

consumer.cancel()

with contextlib.suppress(asyncio.CancelledError):
    await consumer

Just by removing producer, consumer task is successfully canceled:

closing consume

Example 4 source code

Identifying edge-case

Because producer and consumer only interact through queue, we can expect that queue state is significant in occurrence of unwanted behavior. To test this hypothesis, instead of empty queue, non empty queue is provided to consume:

queue = asyncio.Queue()
queue.put_nowait(1)

This change is sufficient for introduction of unwanted behavior:

1
timeout
timeout
timeout
timeout
...

This example demonstrates that behavior of wait_for is dependent of provided awaitable's behavior which can even result in stopping CancelledError propagation. To accomplish this, we have used asyncio.sleep(0) as a way to schedule precise task cancellation depending on task creation. Same sequence of create_task and cancel calls can easily occur in real-world scenarios. Because of this, great care must be taken when wait_for is used, taking into account behavior of provided awaitable and possible cancellation timing of task executing wait_for.

Example 5 source code

When to expect unsuccessful cancellation

As previous examples demonstrated, slight modifications in delay/timeout parameters can produce significant functional changes. These parameters are often provided as part of end-user defined configuration which makes their values additionally volatile.

Also, order of actions, which at first sight should not have significant impact, can also cause failure of wait_for cancellation.

Taking into account this causes, it is hard to describe single universal case when to expect unsuccessful cancellation. That said, most significant behavior, which impacts cancellation propagation is:

When task running wait_for is cancelled at the "same time" as awaitable's result is made available, wait_for can return awaitable's result instead of rising CancelledError.

In this case, concept of "same time" is kind of vague because of sequential execution of concurrent tasks. Actions, such as task cancellation or task finishing with result, are usually not instantaneous (executed at the moment cancel is called). This actions can result in appending items to event loop and delegating action execution to future loop iterations.

To demonstrate stated edge-case, following example is provided:

async def do_work(future: asyncio.Future):
    await asyncio.sleep(1)
    return 42

loop = asyncio.get_running_loop()
future = loop.create_future()

work_task = asyncio.create_task(do_work(future))
wait_task = asyncio.create_task(asyncio.wait_for(work_task, timeout=2))

await asyncio.sleep(1)

print('work task done', work_task.done())
print('wait task done', wait_task.done())
wait_task.cancel()

try:
    result = await wait_task
    print(result)

except asyncio.CancelledError:
    print('cancelled')

Running this example results in:

work task done False
wait task done False
42

Here we can see that both do_work and main task are sleeping for 1 second, after which wait_task is cancelled. Although, cancel method is called, awaiting wait_task results in do_work's result instead of raising CancelledError.

What is also important to notice is that both work_task and wait_task are not done at the time of calling cancel method. This tells us that we cannot reason about cancel success based on current state of these two tasks.

Example 6 source code

Alternative implementation

To mitigate problems which can occur when using asyncio.wait_for, hat-aio implements hat.aio.wait_for which can be used as drop-in replacement for asyncio.wait_for. Together with propagation of CancelledError, this implementation provides hat.aio.CancelledWithResultError. CancelledWithResultError extends CancelledError with additional result/exception. This result/exception contains awaitable's result in case when result is available and wait_for is cancelled at the same time. Because this exception is also CancelledError, all existing code catching CancelledError will continue to work. In cases where obtaining result is necessary, even when CancelledError is raised (e.g. result is associated with resource which requires explicit cleanup), CancelledWithResultError can be used.

In the previous example, if we replace asyncio.wait_for with hat.aio.wait_for:

wait_task = asyncio.create_task(hat.aio.wait_for(work_task, timeout=2))

result is:

work task done False
wait task done False
cancelled

If obtaining result is required, CancelledError can be replaced with CancelledWithResultError:

except hat.aio.CancelledWithResultError as e:
    print('cancelled with result', e.result)

which results is:

work task done False
wait task done False
cancelled with result 42

Example 7 source code