w3resource

Asyncio Concurrent Task Scheduler Implementation in Python

Write a Python program that builds a concurrent task scheduler using asyncio.

The problem involves building a concurrent task scheduler using Python's "asyncio" library, which facilitates the execution of multiple tasks asynchronously. This scheduler should efficiently manage and execute tasks without blocking the main thread, allowing for improved performance and responsiveness. The solution should include task creation, scheduling, and execution, leveraging "asyncio's" event loop and coroutine functionalities to handle tasks concurrently. This approach is particularly useful for I/O-bound operations where tasks spend a significant amount of time waiting for external resources

Sample Solution:

Python Code :

# Import the asyncio library for asynchronous programming
import asyncio

# Import nest_asyncio to handle nested event loops
import nest_asyncio

# Apply nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Define the TaskScheduler class
class TaskScheduler:
    def __init__(self):
        # Initialize an empty list to store tasks
        self.tasks = []

    def schedule(self, coro, *args):
        """Schedule a coroutine with given arguments."""
        # Append the coroutine with its arguments to the tasks list
        self.tasks.append(coro(*args))

    async def run(self):
        """Run all scheduled tasks concurrently."""
        # Use asyncio.gather to run all tasks concurrently
        await asyncio.gather(*self.tasks)

# Define an example coroutine that simulates a task
async def example_task(name, duration):
    """An example coroutine that simulates a task."""
    # Print a message when the task starts
    print(f"Task {name} started, will take {duration} seconds.")
    # Simulate a delay using asyncio.sleep
    await asyncio.sleep(duration)
    # Print a message when the task finishes
    print(f"Task {name} finished.")

# Define the main coroutine for example usage
async def main():
    # Create an instance of TaskScheduler
    scheduler = TaskScheduler()

    # Schedule example tasks with different durations
    scheduler.schedule(example_task, "A", 2)
    scheduler.schedule(example_task, "B", 3)
    scheduler.schedule(example_task, "C", 1)

    # Run all scheduled tasks concurrently
    await scheduler.run()

# Run the main function using asyncio's event loop
asyncio.run(main())

Output:

Task A started, will take 2 seconds.
Task B started, will take 3 seconds.
Task C started, will take 1 seconds.
Task C finished.
Task A finished.
Task B finished.

Explanation:

  • Imports:
  • asyncio: A library for writing concurrent code using the async/await syntax.
  • nest_asyncio: A library that allows nested use of asyncio event loops, which is useful in interactive environments like Jupyter Notebooks or certain IDEs.
  • Applying nest_asyncio:
  • nest_asyncio.apply(): This function call modifies the event loop policy to allow nested event loops, preventing a "RuntimeError" when calling asyncio.run() in environments that already have a running event loop.
  • TaskScheduler Class:
  • Initialization (__init__): Creates an empty list "self.tasks" to store scheduled tasks.
  • Scheduling Tasks (schedule): Adds a coroutine (async function) and its arguments to the "self.tasks" list.
  • Running Tasks (run): Uses "asyncio.gather" to run all tasks stored in "self.tasks" concurrently.
  • Example Task Coroutine (example_task):
  • A sample async function that simulates a task by printing a start message, sleeping for a specified duration using "asyncio.sleep", and then printing a finish message.
  • Main Coroutine (main):
  • Creates an instance of 'TaskScheduler'.
  • Schedules three tasks (example_task) with different durations.
  • Runs all scheduled tasks concurrently by calling scheduler.run().
  • Running the main function:
  • asyncio.run(main()): Starts the asyncio event loop and runs the 'main' coroutine.

Python Code Editor :

Have another way to solve this solution? Contribute your code (and comments) through Disqus.

Previous: Python Bloom Filter implementation.
Next: Python LRU Cache Implementation

What is the difficulty level of this exercise?

Test your Programming skills with w3resource's quiz.



Follow us on Facebook and Twitter for latest update.