w3resource

Simulating a producer-consumer scenario with asyncio queues in Python


8. Async Producer-Consumer with Queue

Write a Python program that uses asyncio queues to simulate a producer-consumer scenario with multiple producers and a single consumer.

Sample Solution:

Code:

import asyncio
import random
async def producer(queue, id):
    for i in range(3):
        item = f"Item: {id}-{i}"
        await queue.put(item)
        print(f"Producer {id} produced-> {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Consumer consumed {item}")
        queue.task_done()
async def main():
    queue = asyncio.Queue()
    producers = [asyncio.create_task(producer(queue, i)) for i in range(3)]
    consumer_task = asyncio.create_task(consumer(queue))
    await asyncio.gather(*producers)
    await queue.join()
    await queue.put(None)  # Signal the consumer to stop
    await consumer_task
# Run the event loop
asyncio.run(main())

Output:

Producer 0 produced-> Item: 0-0
Producer 1 produced-> Item: 1-0
Producer 2 produced-> Item: 2-0
Consumer consumed Item: 0-0
Consumer consumed Item: 1-0
Consumer consumed Item: 2-0
Producer 1 produced-> Item: 1-1
Consumer consumed Item: 1-1
Producer 0 produced-> Item: 0-1
Consumer consumed Item: 0-1
Producer 2 produced-> Item: 2-1
Consumer consumed Item: 2-1
Producer 1 produced-> Item: 1-2
Consumer consumed Item: 1-2
Producer 0 produced-> Item: 0-2
Consumer consumed Item: 0-2
Producer 2 produced-> Item: 2-2
Consumer consumed Item: 2-2

Explanation:

In the above exercise -

  • The "producer()" coroutine simulates a producer that adds items to the queue. Each producer produces 3 items with an associated ID.
  • The "consumer()" coroutine simulates a single consumer that consumes items from the queue. It runs in an infinite loop, consuming items until a None value is encountered, indicating it should stop.
  • The "main()" coroutine sets up the event loop and the queue. It creates three producer tasks and one consumer task using asyncio.create_task().
  • The 'producers' produce items and add them to the queue asynchronously. The "consumer()" consumes items from the queue as they become available.
  • After all 'producers' have finished producing, the program uses await queue.join() to wait until all items in the queue are processed.
  • To signal the consumer to stop, the program sends 'None' to the queue, and then waits for the consumer to complete its task.
  • The event loop is started using asyncio.run(main()).

When we run this program, the producers adding items to the queue and the 'consumer' consuming them.

Flowchart:

Flowchart: Simulating a producer-consumer scenario with asyncio queues in Python.

For more Practice: Solve these Related Problems:

  • Write a Python program to simulate a producer-consumer scenario using asyncio.Queue, where multiple producers add items and one consumer processes them.
  • Write a Python script to create a producer coroutine that puts random numbers into an asyncio.Queue and a consumer coroutine that retrieves and prints these numbers.
  • Write a Python function to implement an async producer-consumer model using multiple producers and a single consumer, then display the order in which items are processed.
  • Write a Python program to use asyncio queues to manage a producer-consumer system, where producers add items at random intervals and the consumer prints each item along with a timestamp.

Python Code Editor :

Previous: Handling task cancellation with asyncio.CancelledError in Python.

What is the difficulty level of this exercise?

Test your Programming skills with w3resource's quiz.



Follow us on Facebook and Twitter for latest update.