concurrent.futures es la forma de alto nivel que ofrece la biblioteca estándar de Python para ejecutar tareas en paralelo. A diferencia de threading y multiprocessing directamente, ofrece una API uniforme basada en futuros: envías trabajo, obtienes un objeto Future que representa el resultado pendiente, y decides cuándo recogerlo. La elección entre ThreadPoolExecutor y ProcessPoolExecutor determina si la concurrencia es real (procesos) o cooperativa (hilos).
ThreadPoolExecutor: paralelismo para I/O-bound
import concurrent.futures
import urllib.request
import time
URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
def descargar(url: str) -> int:
with urllib.request.urlopen(url, timeout=10) as resp:
return len(resp.read())
inicio = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
resultados = list(executor.map(descargar, URLS))
print(f"Tiempo: {time.perf_counter() - inicio:.1f}s") # ~1.1s en vez de ~4s
print(resultados)
ProcessPoolExecutor: paralelismo para CPU-bound
import concurrent.futures
import math
import time
def es_primo(n: int) -> bool:
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
return all(n % i != 0 for i in range(3, int(math.isqrt(n)) + 1, 2))
numeros = list(range(100_000, 100_100))
inicio = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor() as executor:
resultados = list(executor.map(es_primo, numeros))
print(f"Tiempo: {time.perf_counter() - inicio:.2f}s")
primos = [n for n, primo in zip(numeros, resultados) if primo]
print(f"Primos encontrados: {primos[:5]}...")
submit(): control individual sobre cada tarea
map() bloquea hasta que todos los resultados estén listos. submit() envía una tarea y devuelve un Future inmediatamente, permitiendo hacer otras cosas mientras tanto:
import concurrent.futures
import time
def tarea_lenta(n: int) -> str:
time.sleep(n)
return f"tarea {n} lista"
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futuros = {
executor.submit(tarea_lenta, n): n
for n in [3, 1, 2]
}
for futuro in futuros:
try:
resultado = futuro.result(timeout=5)
print(resultado)
except concurrent.futures.TimeoutError:
print(f"Tarea {futuros[futuro]} tardó demasiado")
except Exception as e:
print(f"Error en tarea {futuros[futuro]}: {e}")
as_completed(): procesar en orden de llegada
import concurrent.futures
import time
import random
def procesar(item: int) -> dict:
time.sleep(random.uniform(0.1, 1.0))
return {"item": item, "resultado": item ** 2}
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futuros = {executor.submit(procesar, i): i for i in range(10)}
for futuro in concurrent.futures.as_completed(futuros):
original = futuros[futuro]
try:
dato = futuro.result()
print(f" item={dato['item']} ? {dato['resultado']}")
except Exception as exc:
print(f" item={original} falló: {exc}")
as_completed() devuelve un iterador que produce futuros en el orden en que terminan, no en el orden en que se enviaron. Perfecto cuando quieres procesar resultados parciales sin esperar a los más lentos.
wait(): esperar subconjuntos de futuros
import concurrent.futures
import time
def tarea(n: int) -> int:
time.sleep(n * 0.5)
return n * 10
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futuros = [executor.submit(tarea, i) for i in range(1, 6)]
# Espera hasta que el primero termine
listo, pendiente = concurrent.futures.wait(
futuros,
return_when=concurrent.futures.FIRST_COMPLETED
)
print(f"Primer resultado: {next(iter(listo)).result()}")
print(f"Pendientes: {len(pendiente)}")
# Espera al resto
concurrent.futures.wait(pendiente)
resultados = [f.result() for f in futuros]
print(f"Todos: {resultados}")
Cancelar tareas y manejar excepciones
import concurrent.futures
def dividir(a: float, b: float) -> float:
if b == 0:
raise ZeroDivisionError("División por cero")
return a / b
pares = [(10, 2), (5, 0), (8, 4), (1, 0), (9, 3)]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futuros = [executor.submit(dividir, a, b) for a, b in pares]
for i, futuro in enumerate(futuros):
try:
print(f" {pares[i]} ? {futuro.result()}")
except ZeroDivisionError as e:
print(f" {pares[i]} ? Error: {e}")
¿ThreadPoolExecutor, ProcessPoolExecutor o asyncio?
La elección depende del tipo de trabajo:
- I/O-bound con llamadas de bloqueo (requests, psycopg2, sqlite3):
ThreadPoolExecutor. Los hilos se bloquean en I/O pero liberan el GIL, y el coste de crear procesos no se justifica. - CPU-bound (criptografía, compresión, NumPy en bucles Python):
ProcessPoolExecutor. Rompe el GIL al usar procesos separados. - I/O-bound con muchas conexiones simultáneas:
asynciocon bibliotecas async-native (httpx,asyncpg). Más eficiente en memoria que cientos de hilos.
concurrent.futures también se integra con asyncio mediante loop.run_in_executor(), permitiendo ejecutar código de bloqueo en un pool de hilos o procesos desde una coroutine.
