w3resource

Setting timeouts for asynchronous operations in Python using asyncio.wait_for()


7. Async Timeout with wait_for

Write a Python program that implements a timeout for an asynchronous operation using asyncio.wait_for().

Sample Solution:

Code:

import asyncio
import time

async def time_consuming_task(duration):
    print(f'Starting long operation for {duration} seconds...')
    await asyncio.sleep(duration)
    return f'Long operation completed in {duration} seconds'
async def main():
    timeout =3
    try:
        result = await asyncio.wait_for(time_consuming_task(8), timeout)
        print(result)
    except asyncio.TimeoutError:
        print(f'Timeout occurred after waiting for {timeout} seconds')
asyncio.run(main())

Output:

Starting long operation for 8 seconds...
Timeout occurred after waiting for 3 seconds

Explanation:

In the above exercise -

  • asyncio.wait_for() takes an awaitable (coroutine) and a timeout in seconds.
  • It waits for the coroutine to complete or raises asyncio.TimeoutError if timeout is exceeded.
  • The program await the wait_for() call to perform the wait.
  • The try/except block catches the potential TimeoutError.
  • If timeout occurs, we handle it accordingly in the except block.
  • If coroutine completes within timeout, result is returned as normal.

Flowchart:

Flowchart: Setting timeouts for asynchronous operations in Python using asyncio.wait_for().

For more Practice: Solve these Related Problems:

  • Write a Python program to implement an asynchronous function that simulates a long-running task and enforces a timeout using asyncio.wait_for.
  • Write a Python script to use asyncio.wait_for to set a timeout on a coroutine that fetches data, printing an error message if the operation times out.
  • Write a Python function that wraps a coroutine in asyncio.wait_for with a specified timeout and handles asyncio.TimeoutError by returning a default value.
  • Write a Python program to test an async operation with a timeout using asyncio.wait_for, then print whether the operation completed or timed out.

Python Code Editor :

Previous: Handling task cancellation with asyncio.CancelledError in Python.
Next: Simulating a producer-consumer scenario with asyncio queues in Python.

What is the difficulty level of this exercise?

Test your Programming skills with w3resource's quiz.



Follow us on Facebook and Twitter for latest update.