concurrent.futures en Python: ThreadPoolExecutor, ProcessPoolExecutor y as_completed

concurrent.futures es la forma de alto nivel que ofrece la biblioteca estándar de Python para ejecutar tareas en paralelo. A diferencia de threading y multiprocessing directamente, ofrece una API uniforme basada en futuros: envías trabajo, obtienes un objeto Future que representa el resultado pendiente, y decides cuándo recogerlo. La elección entre ThreadPoolExecutor y ProcessPoolExecutor determina si la concurrencia es real (procesos) o cooperativa (hilos).

ThreadPoolExecutor: paralelismo para I/O-bound

import concurrent.futures
import urllib.request
import time

URLS = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
]

def descargar(url: str) -> int:
    with urllib.request.urlopen(url, timeout=10) as resp:
        return len(resp.read())


inicio = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    resultados = list(executor.map(descargar, URLS))
print(f"Tiempo: {time.perf_counter() - inicio:.1f}s")   # ~1.1s en vez de ~4s
print(resultados)

ProcessPoolExecutor: paralelismo para CPU-bound

import concurrent.futures
import math
import time

def es_primo(n: int) -> bool:
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    return all(n % i != 0 for i in range(3, int(math.isqrt(n)) + 1, 2))


numeros = list(range(100_000, 100_100))

inicio = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor() as executor:
    resultados = list(executor.map(es_primo, numeros))
print(f"Tiempo: {time.perf_counter() - inicio:.2f}s")

primos = [n for n, primo in zip(numeros, resultados) if primo]
print(f"Primos encontrados: {primos[:5]}...")

submit(): control individual sobre cada tarea

map() bloquea hasta que todos los resultados estén listos. submit() envía una tarea y devuelve un Future inmediatamente, permitiendo hacer otras cosas mientras tanto:

import concurrent.futures
import time

def tarea_lenta(n: int) -> str:
    time.sleep(n)
    return f"tarea {n} lista"


with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futuros = {
        executor.submit(tarea_lenta, n): n
        for n in [3, 1, 2]
    }

    for futuro in futuros:
        try:
            resultado = futuro.result(timeout=5)
            print(resultado)
        except concurrent.futures.TimeoutError:
            print(f"Tarea {futuros[futuro]} tardó demasiado")
        except Exception as e:
            print(f"Error en tarea {futuros[futuro]}: {e}")

as_completed(): procesar en orden de llegada

import concurrent.futures
import time
import random

def procesar(item: int) -> dict:
    time.sleep(random.uniform(0.1, 1.0))
    return {"item": item, "resultado": item ** 2}


with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futuros = {executor.submit(procesar, i): i for i in range(10)}

    for futuro in concurrent.futures.as_completed(futuros):
        original = futuros[futuro]
        try:
            dato = futuro.result()
            print(f"  item={dato['item']} ? {dato['resultado']}")
        except Exception as exc:
            print(f"  item={original} falló: {exc}")

as_completed() devuelve un iterador que produce futuros en el orden en que terminan, no en el orden en que se enviaron. Perfecto cuando quieres procesar resultados parciales sin esperar a los más lentos.

wait(): esperar subconjuntos de futuros

import concurrent.futures
import time

def tarea(n: int) -> int:
    time.sleep(n * 0.5)
    return n * 10


with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    futuros = [executor.submit(tarea, i) for i in range(1, 6)]

    # Espera hasta que el primero termine
    listo, pendiente = concurrent.futures.wait(
        futuros,
        return_when=concurrent.futures.FIRST_COMPLETED
    )
    print(f"Primer resultado: {next(iter(listo)).result()}")
    print(f"Pendientes: {len(pendiente)}")

    # Espera al resto
    concurrent.futures.wait(pendiente)
    resultados = [f.result() for f in futuros]
    print(f"Todos: {resultados}")

Cancelar tareas y manejar excepciones

import concurrent.futures

def dividir(a: float, b: float) -> float:
    if b == 0:
        raise ZeroDivisionError("División por cero")
    return a / b


pares = [(10, 2), (5, 0), (8, 4), (1, 0), (9, 3)]

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futuros = [executor.submit(dividir, a, b) for a, b in pares]

for i, futuro in enumerate(futuros):
    try:
        print(f"  {pares[i]} ? {futuro.result()}")
    except ZeroDivisionError as e:
        print(f"  {pares[i]} ? Error: {e}")

¿ThreadPoolExecutor, ProcessPoolExecutor o asyncio?

La elección depende del tipo de trabajo:

  • I/O-bound con llamadas de bloqueo (requests, psycopg2, sqlite3): ThreadPoolExecutor. Los hilos se bloquean en I/O pero liberan el GIL, y el coste de crear procesos no se justifica.
  • CPU-bound (criptografía, compresión, NumPy en bucles Python): ProcessPoolExecutor. Rompe el GIL al usar procesos separados.
  • I/O-bound con muchas conexiones simultáneas: asyncio con bibliotecas async-native (httpx, asyncpg). Más eficiente en memoria que cientos de hilos.

concurrent.futures también se integra con asyncio mediante loop.run_in_executor(), permitiendo ejecutar código de bloqueo en un pool de hilos o procesos desde una coroutine.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP