multiprocessing en Python: Process, Pool, Queue, Pipe y shared memory para paralelismo real

El GIL (Global Interpreter Lock) de CPython impide que dos hilos ejecuten bytecode Python al mismo tiempo, lo que hace que threading sea ineficaz para tareas que consumen CPU. El módulo multiprocessing resuelve esto lanzando procesos del sistema operativo independientes, cada uno con su propio intérprete y su propio GIL, lo que permite aprovechar todos los núcleos disponibles en la máquina.

Process: lanzar procesos individuales

import multiprocessing as mp
import os

def tarea(nombre: str):
    print(f"[{nombre}] PID={os.getpid()}")
    resultado = sum(i * i for i in range(1_000_000))
    print(f"[{nombre}] resultado={resultado}")


if __name__ == '__main__':
    procesos = [mp.Process(target=tarea, args=(f"P{i}",)) for i in range(4)]
    for p in procesos:
        p.start()
    for p in procesos:
        p.join()
    print("Todos los procesos terminaron")

El bloque if __name__ == '__main__' es imprescindible en Windows y macOS (modo spawn), donde el módulo principal se reimporta en cada proceso hijo. En Linux (modo fork) es buena práctica igualmente.

Queue: comunicación entre procesos

import multiprocessing as mp
import time

def productor(cola: mp.Queue, n: int):
    for i in range(n):
        cola.put(i * i)
        time.sleep(0.01)
    cola.put(None)   # señal de fin


def consumidor(cola: mp.Queue):
    while True:
        item = cola.get()
        if item is None:
            break
        print(f"Consumidor recibió: {item}")


if __name__ == '__main__':
    cola = mp.Queue()
    p_prod = mp.Process(target=productor, args=(cola, 5))
    p_cons = mp.Process(target=consumidor, args=(cola,))
    p_prod.start()
    p_cons.start()
    p_prod.join()
    p_cons.join()

Pipe: canal bidireccional

import multiprocessing as mp

def trabajador(conn):
    peticion = conn.recv()
    conn.send(peticion.upper())
    conn.close()


if __name__ == '__main__':
    padre, hijo = mp.Pipe()
    p = mp.Process(target=trabajador, args=(hijo,))
    p.start()
    padre.send("hola mundo")
    respuesta = padre.recv()
    p.join()
    print(respuesta)   # HOLA MUNDO

Pool: paralelismo con map y starmap

Pool gestiona un conjunto de procesos trabajadores y distribuye el trabajo automáticamente:

import multiprocessing as mp
import time

def factorizar(n: int) -> list[int]:
    """Factorización trivial para demostrar CPU-bound."""
    factores = []
    d = 2
    while d * d <= n:
        while n % d == 0:
            factores.append(d)
            n //= d
        d += 1
    if n > 1:
        factores.append(n)
    return factores


if __name__ == '__main__':
    numeros = [2**31 - 1, 2**29 - 1, 999_999_937, 1_000_000_007, 982_451_653]

    inicio = time.perf_counter()
    with mp.Pool(processes=mp.cpu_count()) as pool:
        resultados = pool.map(factorizar, numeros)
    print(f"Pool.map: {time.perf_counter() - inicio:.2f}s")

    for n, f in zip(numeros, resultados):
        print(f"  {n} ? {f}")
# starmap: cuando cada elemento es una tupla de argumentos
def potencia(base: int, exp: int) -> int:
    return base ** exp


if __name__ == '__main__':
    pares = [(2, 10), (3, 7), (5, 5), (7, 4)]
    with mp.Pool() as pool:
        resultados = pool.starmap(potencia, pares)
    print(resultados)  # [1024, 2187, 3125, 2401]

Pool.imap_unordered: resultados en cuanto llegan

import multiprocessing as mp
import time, random

def procesar(item: int) -> int:
    time.sleep(random.uniform(0.1, 0.5))
    return item * 2


if __name__ == '__main__':
    with mp.Pool(4) as pool:
        for resultado in pool.imap_unordered(procesar, range(10)):
            print(f"Resultado listo: {resultado}")

Value y Array: memoria compartida tipada

import multiprocessing as mp

def incrementar(contador, lock, n: int):
    for _ in range(n):
        with lock:
            contador.value += 1


if __name__ == '__main__':
    contador = mp.Value('i', 0)   # 'i' = int con signo
    lock = mp.Lock()

    procesos = [
        mp.Process(target=incrementar, args=(contador, lock, 100_000))
        for _ in range(4)
    ]
    for p in procesos: p.start()
    for p in procesos: p.join()
    print(f"Contador final: {contador.value}")   # 400000

shared_memory: bloques de memoria compartida

from multiprocessing import shared_memory, Process
import numpy as np

def multiplicar_por_dos(nombre_shm: str, forma: tuple, dtype):
    shm = shared_memory.SharedMemory(name=nombre_shm)
    arr = np.ndarray(forma, dtype=dtype, buffer=shm.buf)
    arr *= 2
    shm.close()


if __name__ == '__main__':
    datos = np.array([1, 2, 3, 4, 5], dtype=np.int64)
    shm = shared_memory.SharedMemory(create=True, size=datos.nbytes)
    arr_compartido = np.ndarray(datos.shape, dtype=datos.dtype, buffer=shm.buf)
    arr_compartido[:] = datos[:]

    p = Process(target=multiplicar_por_dos, args=(shm.name, datos.shape, datos.dtype))
    p.start()
    p.join()

    print(arr_compartido)   # [2 4 6 8 10]
    shm.close()
    shm.unlink()

Cuándo usar multiprocessing vs threading

Usa multiprocessing para tareas CPU-bound: compresión, criptografía, procesamiento de imágenes, cálculo numérico. Usa threading o asyncio para tareas I/O-bound: llamadas HTTP, consultas a bases de datos, lectura de ficheros. El coste de crear y comunicar procesos es mayor que el de los hilos, así que solo merece la pena cuando el trabajo por proceso es suficientemente grande como para amortizar ese coste.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP