w3resource

Python Thread-Safe Priority Queue implementation

Write a Python program that implements a thread-safe priority queue.

The task is to create a Python program that implements a thread-safe priority queue. This queue will allow multiple threads to safely add and remove items based on their priority, ensuring that higher-priority items are processed before lower-priority ones. By using synchronization mechanisms such as locks, the program will ensure that concurrent access to the queue does not lead to data corruption or race conditions, making it suitable for multi-threaded applications that require prioritized task management.

Sample Solution:

Python Code :

# Import the queue module to use PriorityQueue
import queue
# Import the threading module to use threads and locks
import threading

# Define a class for a thread-safe priority queue
class ThreadSafePriorityQueue:
    def __init__(self):
        # Initialize a PriorityQueue object to hold the items
        self._queue = queue.PriorityQueue()
        # Initialize a lock to ensure thread safety
        self._lock = threading.Lock()

    # Method to put an item into the priority queue
    def put(self, item, priority):
        # Acquire the lock to ensure thread safety
        with self._lock:
            # Put the item into the queue with its priority
            self._queue.put((priority, item))

    # Method to get an item from the priority queue
    def get(self):
        # Acquire the lock to ensure thread safety
        with self._lock:
            # Check if the queue is not empty
            if not self._queue.empty():
                # Get the item with the highest priority (lowest priority number)
                priority, item = self._queue.get()
                # Return the item
                return item
            else:
                # Return None if the queue is empty
                return None

# Example usage to demonstrate the thread-safe priority queue
if __name__ == "__main__":
    # Define a producer function to add items to the queue
    def producer(q):
        # Add 5 items to the queue with their priority as their value
        for i in range(5):
            q.put(i, i)

    # Define a consumer function to get items from the queue
    def consumer(q):
        # Continuously get items from the queue
        while True:
            # Get an item from the queue
            item = q.get()
            # If the item is None, break the loop (end of processing)
            if item is None:
                break
            # Print the consumed item
            print("Consumed:", item)

    # Create an instance of the thread-safe priority queue
    q = ThreadSafePriorityQueue()
    # Create a producer thread to add items to the queue
    producer_thread = threading.Thread(target=producer, args=(q,))
    # Create a consumer thread to get items from the queue
    consumer_thread = threading.Thread(target=consumer, args=(q,))

    # Start the producer thread
    producer_thread.start()
    # Start the consumer thread
    consumer_thread.start()

    # Wait for the producer thread to finish
    producer_thread.join()
    # Add a sentinel value to the queue to signal the consumer to stop
    q.put(None, None)
    # Wait for the consumer thread to finish
    consumer_thread.join()

Output:

Consumed: 0
Consumed: 1
Consumed: 2
Consumed: 3
Consumed: 4

Explanation:

  • Imports:
    queue: Provides the "PriorityQueue" class used to create a priority queue.
  • threading: Provides threading capabilities, including locks for synchronization.
  • ThreadSafePriorityQueue Class:
  • Initialization (__init__ method):
    Creates an instance of 'PriorityQueue' to hold the items.
  • Initializes a lock (_lock) to ensure thread-safe operations on the queue.
  • put Method:
    Takes an 'item' and a 'priority'.
  • Uses the lock to ensure exclusive access to the queue.
  • Puts the item into the queue along with its priority.
  • get Method:
  • Uses the lock to ensure exclusive access to the queue.
  • Checks if the queue is not empty.
  • Retrieves and removes the item with the highest priority (lowest priority number) from the queue.
  • Returns the item if the queue is not empty; otherwise, returns None.
  • Example usage:
  • Producer Function (producer):
    Adds 5 items to the queue, each with a priority equal to its value.
  • Consumer Function (consumer):
    Continuously retrieves items from the queue.
  • Stops when it retrieves a 'None' value, which signals the end of the queue.
  • Prints each consumed item.
  • Main execution:
    Creates an instance of 'ThreadSafePriorityQueue'.
  • Creates and starts a producer thread that adds items to the queue.
  • Creates and starts a consumer thread that consumes items from the queue.
  • Waits for the producer thread to finish.
  • Adds a sentinel value ('None') to the queue to signal the consumer thread to stop.
  • Waits for the consumer thread to finish.

Python Code Editor :

Have another way to solve this solution? Contribute your code (and comments) through Disqus.

Previous: Matrix Multiplication in Python Using list comprehensions.
Next: Python Data Validation Library with Dataclasses and type hints.

What is the difficulty level of this exercise?

Test your Programming skills with w3resource's quiz.



Follow us on Facebook and Twitter for latest update.