w3resource

Rust Program: Implement Producer-Consumer Pattern with Threads & Channels


Write a Rust program that implements a producer-consumer pattern using threads and channels.

Sample Solution:

Rust Code:

use std::sync::{Arc, Mutex};
use std::thread;

// Define a shared queue structure with thread-safe operations
struct SharedQueue {
    data: Arc>>, // Shared data protected by a mutex
}

impl SharedQueue {
    // Create a new instance of SharedQueue
    fn new() -> Self {
        SharedQueue {
            data: Arc::new(Mutex::new(Vec::new())), // Initialize an empty vector
        }
    }

    // Method to push a value into the shared queue
    fn push(&self, value: i32) {
        // Lock the mutex to access the shared data
        let mut data = self.data.lock().unwrap();
        // Push the value into the vector
        data.push(value);
    }

    // Method to pop a value from the shared queue
    fn pop(&self) -> Option {
        // Lock the mutex to access the shared data
        let mut data = self.data.lock().unwrap();
        // Pop a value from the vector if it's not empty
        data.pop()
    }
}

fn main() {
    // Create a shared queue instance
    let shared_queue = Arc::new(SharedQueue::new());

    // Clone the Arc for the producer thread
    let producer_queue = Arc::clone(&shared_queue);
    // Clone the Arc for the consumer thread
    let consumer_queue = Arc::clone(&shared_queue);

    // Spawn a producer thread
    let producer_handle = thread::spawn(move || {
        // Produce some data and push it into the shared queue
        for i in 0..5 {
            // Call the push method of SharedQueue to push the value
            producer_queue.push(i);
            println!("Producer sent: {}", i);
            // Introduce a short delay for demonstration purposes
            thread::sleep(std::time::Duration::from_millis(100));
        }
    });

    // Spawn a consumer thread
    let consumer_handle = thread::spawn(move || {
        // Consume data from the shared queue
        loop {
            // Call the pop method of SharedQueue to pop a value
            if let Some(value) = consumer_queue.pop() {
                println!("Consumer received: {}", value);
            } else {
                // If the queue is empty, break out of the loop
                break;
            }
            // Introduce a short delay for demonstration purposes
            thread::sleep(std::time::Duration::from_millis(200));
        }
    });

    // Wait for both threads to finish
    producer_handle.join().unwrap();
    consumer_handle.join().unwrap();
} 

Output:

Standard Output
Producer sent: 0
Consumer received: 0
Producer sent: 1
Consumer received: 1
Producer sent: 2
Producer sent: 3
Consumer received: 3
Producer sent: 4
Consumer received: 4
Consumer received: 2

Explanation:

In the exercise above,

  • Import the necessary modules from the standard library: "Arc" and "Mutex" for thread-safe reference counting and mutual exclusion, and "thread" for managing threads.
  • The "SharedQueue" struct is defined to represent a shared queue with thread-safe operations. It contains a single field 'data', which is an 'Arc' wrapped around a 'Mutex' containing a vector of 'i32'.
  • Inside the "impl" block for "SharedQueue", we define methods "new()", "push()", and "pop()".
    • new creates a new instance of "SharedQueue" with an empty vector.
    • push inserts a value into the shared vector after locking the mutex to ensure exclusive access.
    • pop removes and returns a value from the shared vector if it's not empty.
  • In the main function:
    • We create an instance of "SharedQueue" wrapped in an "Arc" to share among threads.
    • Clone the "Arc" for both the producer and consumer threads to share ownership.
    • Spawn two threads using thread::spawn. One is the producer thread, and the other is the consumer thread.
    • The producer thread produces data (values from 0 to 4) and pushes them into the shared queue.
    • The consumer thread continuously consumes data from the shared queue by popping values until the queue is empty.
    • Both threads introduce short delays for demonstration purposes using thread::sleep.
    • Finally, we wait for both threads to finish execution using "join()".

Rust Code Editor:

Previous: Rust Program: Create two Threads & Print "Hello, World!".
Next: Rust Program: Calculate Factorial with Threads.

What is the difficulty level of this exercise?

Test your Programming skills with w3resource's quiz.



Follow us on Facebook and Twitter for latest update.