Simulating a producer-consumer scenario with asyncio queues in Python
8. Async Producer-Consumer with Queue
Write a Python program that uses asyncio queues to simulate a producer-consumer scenario with multiple producers and a single consumer.
Sample Solution:
Code:
import asyncio
import random
async def producer(queue, id):
for i in range(3):
item = f"Item: {id}-{i}"
await queue.put(item)
print(f"Producer {id} produced-> {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumer consumed {item}")
queue.task_done()
async def main():
queue = asyncio.Queue()
producers = [asyncio.create_task(producer(queue, i)) for i in range(3)]
consumer_task = asyncio.create_task(consumer(queue))
await asyncio.gather(*producers)
await queue.join()
await queue.put(None) # Signal the consumer to stop
await consumer_task
# Run the event loop
asyncio.run(main())
Output:
Producer 0 produced-> Item: 0-0 Producer 1 produced-> Item: 1-0 Producer 2 produced-> Item: 2-0 Consumer consumed Item: 0-0 Consumer consumed Item: 1-0 Consumer consumed Item: 2-0 Producer 1 produced-> Item: 1-1 Consumer consumed Item: 1-1 Producer 0 produced-> Item: 0-1 Consumer consumed Item: 0-1 Producer 2 produced-> Item: 2-1 Consumer consumed Item: 2-1 Producer 1 produced-> Item: 1-2 Consumer consumed Item: 1-2 Producer 0 produced-> Item: 0-2 Consumer consumed Item: 0-2 Producer 2 produced-> Item: 2-2 Consumer consumed Item: 2-2
Explanation:
In the above exercise -
- The "producer()" coroutine simulates a producer that adds items to the queue. Each producer produces 3 items with an associated ID.
- The "consumer()" coroutine simulates a single consumer that consumes items from the queue. It runs in an infinite loop, consuming items until a None value is encountered, indicating it should stop.
- The "main()" coroutine sets up the event loop and the queue. It creates three producer tasks and one consumer task using asyncio.create_task().
- The 'producers' produce items and add them to the queue asynchronously. The "consumer()" consumes items from the queue as they become available.
- After all 'producers' have finished producing, the program uses await queue.join() to wait until all items in the queue are processed.
- To signal the consumer to stop, the program sends 'None' to the queue, and then waits for the consumer to complete its task.
- The event loop is started using asyncio.run(main()).
When we run this program, the producers adding items to the queue and the 'consumer' consuming them.
Flowchart:

For more Practice: Solve these Related Problems:
- Write a Python program to simulate a producer-consumer scenario using asyncio.Queue, where multiple producers add items and one consumer processes them.
- Write a Python script to create a producer coroutine that puts random numbers into an asyncio.Queue and a consumer coroutine that retrieves and prints these numbers.
- Write a Python function to implement an async producer-consumer model using multiple producers and a single consumer, then display the order in which items are processed.
- Write a Python program to use asyncio queues to manage a producer-consumer system, where producers add items at random intervals and the consumer prints each item along with a timestamp.
Python Code Editor :
Previous: Handling task cancellation with asyncio.CancelledError in Python.
What is the difficulty level of this exercise?
Test your Programming skills with w3resource's quiz.