Handling task cancellation with asyncio.CancelledError in Python
6. Async Task Cancellation
Write a Python program to create a coroutine that simulates a time-consuming task and use asyncio.CancelledError to handle task cancellation.
Sample Solution:
Code:
import asyncio
import random
async def time_consuming_task():
print('Time-consuming task started...')
try:
for i in range(1, 6):
await asyncio.sleep(random.randint(1,5))
print(f'Step {i} completed')
except asyncio.CancelledError:
print('Time consuming task was cancelled')
raise
async def main():
task = asyncio.create_task(time_consuming_task())
await asyncio.sleep(random.randint(1,3))
task.cancel()
try:
await task
except asyncio.CancelledError:
print('Main coroutine caught task cancellation!')
asyncio.run(main())
Output:
Time-consuming task started... Time consuming task was cancelled Main coroutine caught task cancellation!
Explanation:
The above code creates an async coroutine `time_consuming_task` that simulates work by sleeping for random intervals. It uses a try/except block to catch asyncio.CancelledError if the task is cancelled.
The main() coroutine creates the task, waits a random amount of time, cancels it, and uses another try/except to catch the cancellation.
Flowchart:

For more Practice: Solve these Related Problems:
- Write a Python program that creates a coroutine simulating a long-running task, then cancels it after a fixed delay, handling asyncio.CancelledError appropriately.
- Write a Python script to implement a time-consuming async function that can be cancelled by a timer, and print a message when cancellation occurs.
- Write a Python function that runs a coroutine, cancels it midway, and uses exception handling to print a custom cancellation message.
- Write a Python program to start an async task, cancel it using asyncio.sleep and cancellation logic, and then confirm that the task was cancelled.
Python Code Editor :
Previous: Measuring total execution time of concurrent asynchronous tasks in Python.
Next: Setting timeouts for asynchronous operations in Python using asyncio.wait_for().
What is the difficulty level of this exercise?
Test your Programming skills with w3resource's quiz.