w3resource

Python Multiprocessing: A Comprehensive Guide to Parallel Execution

Introduction to Python Multiprocessing

Python's 'multiprocessing' module allows you to create processes that run concurrently, enabling true parallel execution. This is especially useful for CPU-bound tasks, as it overcomes the limitations of Python's Global Interpreter Lock (GIL) by using separate memory space for each process.

Multiprocessing involves running multiple processes simultaneously. Unlike threads, processes do not share memory space, making them ideal for CPU-bound tasks that require intensive computation.

Creating and Starting a Process:

The most basic way to use the multiprocessing module is by creating a Process object and starting it.

Example 1: Creating and Starting a Process

This example demonstrates how to create and start a simple process that executes a function in parallel with the main program.

Code:

import multiprocessing
import time

# Define a function that will run in a separate process
def print_numbers():
    for i in range(5):
        print(f"Process printing number: {i}")
        time.sleep(1)  # Simulate a time-consuming task

# Create a process that runs the print_numbers function
process = multiprocessing.Process(target=print_numbers)

# Start the process
process.start()

# Main process continues to run independently
print("Main process is running concurrently.")

# Wait for the process to finish
process.join() 

Output:

Main process is running concurrently.

Explanation:

A Process object is created by passing the target function (print_numbers). Calling start() runs the function in a separate process, allowing it to execute concurrently with the main process. The join() method waits for the process to finish before the main process continues, ensuring proper synchronization.

Passing Arguments to Processes:

We can pass arguments to the target function of a process using the args parameter.

Example 2: Passing Arguments to a Process

This example demonstrates how to pass arguments to a function running in a separate process.

Code:

import multiprocessing

# Define a function that takes an argument
def greet(name):
    print(f"Hello, {name}!")

# Create a process that runs the greet function with an argument
process = multiprocessing.Process(target=greet, args=("Avalon",))

# Start the process
process.start()

# Wait for the process to complete
process.join()

Output:

Hello, Avalon!

Explanation:

The 'args' parameter of the 'Process' class allows passing arguments to the target function. In this example, the string "Avalon" is passed to the 'greet' function, which is executed by the process.

Using a Pool of Workers:

A 'Pool' allows you to manage multiple worker processes and distribute tasks among them, which simplifies running parallel tasks.

Example 3: Using a Pool of Workers

This example shows how to use a pool of worker processes to execute a function in parallel across multiple inputs.

Code:

from multiprocessing import Pool

# Define a function that squares a number
def square(number):
    return number * number

# Create a list of numbers to process
numbers = [1, 2, 3, 4, 5]

# Create a pool of workers
with Pool(processes=3) as pool:
    # Map the square function to the list of numbers
    results = pool.map(square, numbers)

# Print the results
print(f"Squared numbers: {results}")

Output:

Squared numbers: [1, 4, 9, 16, 25]

Explanation:

A 'Pool' object is created with 3 worker processes. The 'map()' method distributes the list of numbers among the worker processes, applying the 'square()' function to each number concurrently. The results are collected and returned in the same order as the input, demonstrating efficient parallel processing with multiple tasks.

Sharing Data between Processes with Queues:

Inter-process communication can be achieved using queues, which allow processes to send and receive data safely.

Example 4: Using Queues for Inter-Process Communication

This example demonstrates how to use a queue for communication between producer and consumer processes.

Code:

import multiprocessing
# Define a function that puts data into a queue
def producer(queue):
    for i in range(5):
        queue.put(i)
        print(f"Produced: {i}")

# Define a function that gets data from a queue
def consumer(queue):
    while not queue.empty():
        item = queue.get()
        print(f"Consumed: {item}")

# Create a queue
queue = multiprocessing.Queue()

# Create producer and consumer processes
producer_process = multiprocessing.Process(target=producer, args=(queue,))
consumer_process = multiprocessing.Process(target=consumer, args=(queue,))

# Start the processes
producer_process.start()
producer_process.join()  # Ensure producer finishes before consumer starts

consumer_process.start()
consumer_process.join()

Output:

Produced: 0
Produced: 1
Produced: 2
Produced: 3
Produced: 4
Consumed: 0
Consumed: 1
Consumed: 2
Consumed: 3
Consumed: 4

Explanation:

A 'Queue' object is created to facilitate communication between the 'producer' and 'consumer' processes. The producer process puts items into the queue, while the consumer process retrieves them. Using queues ensures thread-safe data transfer between processes, making it easy to coordinate work among multiple processes.

Using Locks to Prevent Race Conditions:

Locks can be used to prevent race conditions when multiple processes need to access a shared resource.

Example 5: Using Locks for Process Synchronization

This example shows how to use locks to prevent race conditions when multiple processes modify a shared resource.

Code:

import multiprocessing

# Shared resource
counter = multiprocessing.Value('i', 0)  # Integer shared between processes

# Create a lock
lock = multiprocessing.Lock()

# Define a function that increments the shared counter
def increment_counter(lock, counter):
    for _ in range(1000):
        with lock:
            counter.value += 1  # Modify shared resource with lock

# Create multiple processes that run the increment_counter function
processes = [multiprocessing.Process(target=increment_counter, args=(lock, counter)) for _ in range(5)]

# Start all processes
for process in processes:
    process.start()

# Wait for all processes to complete
for process in processes:
    process.join()

# Print the final value of the counter
print(f"Final counter value: {counter.value}")

Output:

Final counter value: 5000

Explanation:

A 'Value' object is used to share an integer ('counter') between processes. A 'Lock' ensures that only one process can modify the counter at a time. The 'with lock': statement ensures that the lock is acquired before modifying the counter and released afterward, preventing race conditions and ensuring consistent results.

Using Process Pools for Asynchronous Task Execution:

'Pool' can also be used to run tasks asynchronously, which allows you to manage and monitor tasks more effectively.

Example 6: Asynchronous Execution with Process Pools

This example demonstrates how to use 'apply_async()' with a process pool to execute tasks asynchronously.

Code:

from multiprocessing import Pool
import time

# Define a function that simulates a time-consuming task
def slow_task(name):
    print(f"Starting task: {name}")
    time.sleep(2)  # Simulate delay
    return f"Task {name} completed"

# Create a pool of workers
with Pool(processes=3) as pool:
    # Apply tasks asynchronously
    results = [pool.apply_async(slow_task, args=(f"Task {i}",)) for i in range(5)]

    # Retrieve results as they complete
    for result in results:
        print(result.get())  # Wait for each task to complete and print the result

Output:

Starting task: Task 0
Starting task: Task 1
Starting task: Task 2
Starting task: Task 3
Starting task: Task 4
Task Task 0 completed
Task Task 1 completed
Task Task 2 completed
Task Task 3 completed
Task Task 4 completed

Explanation:

The 'apply_async()' method submits tasks to the process pool, which runs them in parallel. Each task is started immediately and executes concurrently with others. Results are retrieved using the 'get()' method, which waits for the task to complete. This approach allows you to run multiple time-consuming tasks without blocking the main program.

Sharing State between Processes with Manager:

'Manager' provides a way to create shared objects that can be used across multiple processes, such as lists and dictionaries.

Example 7: Sharing State with Manager

This example shows how to use 'Manager' to create shared objects, allowing multiple processes to interact with a common state.

Code:

from multiprocessing import Manager, Process

# Define a function that adds items to a shared list
def add_to_list(shared_list):
    for i in range(5):
        shared_list.append(i)
        print(f"Added {i} to list")

# Create a manager object
with Manager() as manager:
    # Create a shared list
    shared_list = manager.list()

    # Create and start multiple processes
    processes = [Process(target=add_to_list, args=(shared_list,)) for _ in range(3)]
    for process in processes:
        process.start()

    # Wait for all processes to complete
    for process in processes:
        process.join()

    # Print the shared list
    print(f"Shared list: {list(shared_list)}")

Output:

Added 0 to list
Added 0 to list
Added 1 to list
Added 2 to list
Added 3 to list
Added 4 to list
Added 1 to list
Added 2 to list
Added 3 to list
Added 4 to list
Added 0 to list
Added 1 to list
Added 2 to list
Added 3 to list
Added 4 to list
Shared list: [0, 0, 1, 2, 3, 4, 1, 2, 3, 4, 0, 1, 2, 3, 4]

Explanation:

'Manager' provides shared objects, such as lists, that can be accessed and modified by multiple processes. In this example, a shared list is modified by multiple processes, each adding items to it. The changes are reflected in the shared list, demonstrating how state can be shared and managed safely across processes



Become a Patron!

Follow us on Facebook and Twitter for latest update.