Python multiprocessing Module

The multiprocessing module in Python allows you to create processes, which are independent units of execution that can run in parallel on multiple cores. This module is particularly useful for CPU-bound tasks, as it can bypass the Global Interpreter Lock (GIL) that limits multi-threading performance in Python.

Table of Contents

  1. Introduction
  2. Key Classes and Functions
    • Process
    • Pool
    • Queue
    • Pipe
    • Lock
    • Event
    • Value
    • Array
    • Manager
  3. Examples
    • Creating and Starting Processes
    • Using Pools
    • Interprocess Communication with Queues
    • Interprocess Communication with Pipes
    • Synchronizing Processes with Locks
    • Sharing State Between Processes
  4. Real-World Use Case
  5. Conclusion
  6. References

Introduction

The multiprocessing module in Python provides a way to create and manage separate processes, enabling concurrent execution and taking advantage of multiple CPU cores. It supports process-based parallelism, interprocess communication, and synchronization primitives.

Key Classes and Functions

Process

Represents an individual process.

import multiprocessing

def worker():
    print("Worker process")

if __name__ == '__main__':
    process = multiprocessing.Process(target=worker)
    process.start()
    process.join()

Output:

Worker process

Pool

Manages a pool of worker processes to which jobs can be submitted.

import multiprocessing

def worker(x):
    return x * x

if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(worker, [1, 2, 3, 4])
        print(results)

Output:

[1, 4, 9, 16]

Queue

A FIFO queue for interprocess communication.

import multiprocessing

def worker(queue):
    queue.put('Hello from worker')

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    process = multiprocessing.Process(target=worker, args=(queue,))
    process.start()
    print(queue.get())
    process.join()

Output:

Hello from worker

Pipe

A two-way communication channel between two processes.

import multiprocessing

def worker(conn):
    conn.send('Hello from worker')
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    process = multiprocessing.Process(target=worker, args=(child_conn,))
    process.start()
    print(parent_conn.recv())
    process.join()

Output:

Hello from worker

Lock

A synchronization primitive that can be used to control access to a shared resource.

import multiprocessing

def worker(lock, counter):
    with lock:
        counter.value += 1

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    counter = multiprocessing.Value('i', 0)
    processes = [multiprocessing.Process(target=worker, args=(lock, counter)) for _ in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(f'Counter: {counter.value}')

Output:

Counter: 5

Event

A synchronization primitive that can be used to signal events between processes.

import multiprocessing

def worker(event):
    print("Worker waiting for event")
    event.wait()
    print("Worker received event")

if __name__ == '__main__':
    event = multiprocessing.Event()
    process = multiprocessing.Process(target=worker, args=(event,))
    process.start()
    print("Main process setting event")
    event.set()
    process.join()

Output:

Worker waiting for event
Worker received event
Main process setting event

Value

A shared object to store a single value.

import multiprocessing

def worker(counter):
    with counter.get_lock():
        counter.value += 1

if __name__ == '__main__':
    counter = multiprocessing.Value('i', 0)
    processes = [multiprocessing.Process(target=worker, args=(counter,)) for _ in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(f'Counter: {counter.value}')

Output:

Counter: 5

Array

A shared array that can be accessed by multiple processes.

import multiprocessing

def worker(arr):
    for i in range(len(arr)):
        arr[i] += 1

if __name__ == '__main__':
    arr = multiprocessing.Array('i', range(5))
    processes = [multiprocessing.Process(target=worker, args=(arr,)) for _ in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(f'Array: {arr[:]}')

Output:

Array: [5, 6, 7, 8, 9]

Manager

A manager object that controls a server process, which holds Python objects and allows other processes to manipulate them using proxies.

import multiprocessing

def worker(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = multiprocessing.Process(target=worker, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

Output:

{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Examples

Creating and Starting Processes

import multiprocessing

def worker(num):
    print(f'Worker: {num}')

if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

Output:

Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4

Using Pools

import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(square, [1, 2, 3, 4, 5])
        print(results)

Output:

[1, 4, 9, 16, 25]

Interprocess Communication with Queues

import multiprocessing

def worker(queue):
    queue.put('Hello from worker')

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    process = multiprocessing.Process(target=worker, args=(queue,))
    process.start()
    print(queue.get())
    process.join()

Output:

Hello from worker

Interprocess Communication with Pipes

import multiprocessing

def worker(conn):
    conn.send('Hello from worker')
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    process = multiprocessing.Process(target=worker, args=(child_conn,))
    process.start()
    print(parent_conn.recv())
    process.join()

Output:

Hello from worker

Synchronizing Processes with Locks

import multiprocessing

def worker(lock, counter):
    with lock:
        counter.value += 1

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    counter = multiprocessing.Value('i', 0)
    processes = [multiprocessing.Process(target=worker, args=(lock, counter)) for _ in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(f'Counter: {counter.value}')

Output:

Counter: 5

Sharing State Between Processes

import multiprocessing

def worker(counter, array):
    with counter.get_lock():
        counter.value += 1
    for i in range(len(array)):
        array[i] += 1

if __name__ == '__main__':
    counter = multiprocessing.Value('i', 0)
    array = multiprocessing.Array('i', range(5))
    processes = [multiprocessing.Process(target=worker, args=(counter, array)) for _ in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(f'Counter: {counter.value}')
    print(f'Array: {array[:]}')

Output:

Counter: 5
Array: [5, 6, 7, 8, 9]

Real-World Use Case

Parallel Web Scraping

import multiprocessing
import requests

urls = [
    'http://example.com',
    'http://example.org',
    'http://example.net',
]

def fetch(url):
    response = requests.get(url)
    return url, response.status_code

if __name__ == '__main__':
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.map(fetch, urls)
        for url, status in results:
            print(f'Fetched {url} with status {status}')

Output:

Fetched http://example.com with status 200
Fetched http://example.org with status 200
Fetched http://example.net with status 200

Conclusion

The multiprocessing module in Python provides a powerful way to create and manage processes, enabling parallel execution of code. It includes various classes and functions for process-based parallelism, interprocess communication, and synchronization, making it suitable for CPU-bound tasks and taking advantage of multiple CPU cores.

References

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top