El GIL (Global Interpreter Lock) de CPython impide que dos hilos ejecuten bytecode Python al mismo tiempo, lo que hace que threading sea ineficaz para tareas que consumen CPU. El módulo multiprocessing resuelve esto lanzando procesos del sistema operativo independientes, cada uno con su propio intérprete y su propio GIL, lo que permite aprovechar todos los núcleos disponibles en la máquina.
Process: lanzar procesos individuales
import multiprocessing as mp
import os
def tarea(nombre: str):
print(f"[{nombre}] PID={os.getpid()}")
resultado = sum(i * i for i in range(1_000_000))
print(f"[{nombre}] resultado={resultado}")
if __name__ == '__main__':
procesos = [mp.Process(target=tarea, args=(f"P{i}",)) for i in range(4)]
for p in procesos:
p.start()
for p in procesos:
p.join()
print("Todos los procesos terminaron")
El bloque if __name__ == '__main__' es imprescindible en Windows y macOS (modo spawn), donde el módulo principal se reimporta en cada proceso hijo. En Linux (modo fork) es buena práctica igualmente.
Queue: comunicación entre procesos
import multiprocessing as mp
import time
def productor(cola: mp.Queue, n: int):
for i in range(n):
cola.put(i * i)
time.sleep(0.01)
cola.put(None) # señal de fin
def consumidor(cola: mp.Queue):
while True:
item = cola.get()
if item is None:
break
print(f"Consumidor recibió: {item}")
if __name__ == '__main__':
cola = mp.Queue()
p_prod = mp.Process(target=productor, args=(cola, 5))
p_cons = mp.Process(target=consumidor, args=(cola,))
p_prod.start()
p_cons.start()
p_prod.join()
p_cons.join()
Pipe: canal bidireccional
import multiprocessing as mp
def trabajador(conn):
peticion = conn.recv()
conn.send(peticion.upper())
conn.close()
if __name__ == '__main__':
padre, hijo = mp.Pipe()
p = mp.Process(target=trabajador, args=(hijo,))
p.start()
padre.send("hola mundo")
respuesta = padre.recv()
p.join()
print(respuesta) # HOLA MUNDO
Pool: paralelismo con map y starmap
Pool gestiona un conjunto de procesos trabajadores y distribuye el trabajo automáticamente:
import multiprocessing as mp
import time
def factorizar(n: int) -> list[int]:
"""Factorización trivial para demostrar CPU-bound."""
factores = []
d = 2
while d * d <= n:
while n % d == 0:
factores.append(d)
n //= d
d += 1
if n > 1:
factores.append(n)
return factores
if __name__ == '__main__':
numeros = [2**31 - 1, 2**29 - 1, 999_999_937, 1_000_000_007, 982_451_653]
inicio = time.perf_counter()
with mp.Pool(processes=mp.cpu_count()) as pool:
resultados = pool.map(factorizar, numeros)
print(f"Pool.map: {time.perf_counter() - inicio:.2f}s")
for n, f in zip(numeros, resultados):
print(f" {n} ? {f}")
# starmap: cuando cada elemento es una tupla de argumentos
def potencia(base: int, exp: int) -> int:
return base ** exp
if __name__ == '__main__':
pares = [(2, 10), (3, 7), (5, 5), (7, 4)]
with mp.Pool() as pool:
resultados = pool.starmap(potencia, pares)
print(resultados) # [1024, 2187, 3125, 2401]
Pool.imap_unordered: resultados en cuanto llegan
import multiprocessing as mp
import time, random
def procesar(item: int) -> int:
time.sleep(random.uniform(0.1, 0.5))
return item * 2
if __name__ == '__main__':
with mp.Pool(4) as pool:
for resultado in pool.imap_unordered(procesar, range(10)):
print(f"Resultado listo: {resultado}")
Value y Array: memoria compartida tipada
import multiprocessing as mp
def incrementar(contador, lock, n: int):
for _ in range(n):
with lock:
contador.value += 1
if __name__ == '__main__':
contador = mp.Value('i', 0) # 'i' = int con signo
lock = mp.Lock()
procesos = [
mp.Process(target=incrementar, args=(contador, lock, 100_000))
for _ in range(4)
]
for p in procesos: p.start()
for p in procesos: p.join()
print(f"Contador final: {contador.value}") # 400000
shared_memory: bloques de memoria compartida
from multiprocessing import shared_memory, Process
import numpy as np
def multiplicar_por_dos(nombre_shm: str, forma: tuple, dtype):
shm = shared_memory.SharedMemory(name=nombre_shm)
arr = np.ndarray(forma, dtype=dtype, buffer=shm.buf)
arr *= 2
shm.close()
if __name__ == '__main__':
datos = np.array([1, 2, 3, 4, 5], dtype=np.int64)
shm = shared_memory.SharedMemory(create=True, size=datos.nbytes)
arr_compartido = np.ndarray(datos.shape, dtype=datos.dtype, buffer=shm.buf)
arr_compartido[:] = datos[:]
p = Process(target=multiplicar_por_dos, args=(shm.name, datos.shape, datos.dtype))
p.start()
p.join()
print(arr_compartido) # [2 4 6 8 10]
shm.close()
shm.unlink()
Cuándo usar multiprocessing vs threading
Usa multiprocessing para tareas CPU-bound: compresión, criptografía, procesamiento de imágenes, cálculo numérico. Usa threading o asyncio para tareas I/O-bound: llamadas HTTP, consultas a bases de datos, lectura de ficheros. El coste de crear y comunicar procesos es mayor que el de los hilos, así que solo merece la pena cuando el trabajo por proceso es suficientemente grande como para amortizar ese coste.
