Una vez que dominas async/await básico, el siguiente paso es coordinar múltiples coroutines entre sí: lanzarlas en grupo, limitar cuántas corren a la vez, comunicarlas mediante colas y proteger recursos compartidos. Aquí entran gather(), TaskGroup, Queue y Semaphore.
asyncio.gather() el caballo de batalla
gather() lanza varias coroutines en paralelo y espera a que todas terminen. El orden de los resultados coincide con el orden de las coroutines pasadas, independientemente de cuál acabe antes.
import asyncio
async def obtener_precio(producto: str) -> float:
await asyncio.sleep(0.5) # simula llamada a API
precios = {"manzana": 1.20, "pera": 0.90, "uva": 2.50}
return precios.get(producto, 0.0)
async def main():
productos = ["manzana", "pera", "uva"]
precios = await asyncio.gather(
*[obtener_precio(p) for p in productos]
)
for producto, precio in zip(productos, precios):
print(f"{producto}: {precio:.2f} ")
asyncio.run(main())
# manzana: 1.20 pera: 0.90 uva: 2.50 todo en ~0.5s
Por defecto, si una coroutine lanza una excepción, gather() propaga el primer error y cancela las demás. Con return_exceptions=True las excepciones se devuelven como valores normales en la lista de resultados.
resultados = await asyncio.gather(
tarea_buena(),
tarea_que_falla(),
return_exceptions=True
)
for r in resultados:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(f"OK: {r}")
TaskGroup (Python 3.11+) gestión estructurada
asyncio.TaskGroup es la alternativa moderna a gather(). Si cualquier tarea falla, cancela el resto del grupo automáticamente y propaga el error de forma limpia.
import asyncio
async def scrape(url: str) -> str:
await asyncio.sleep(0.3)
if "error" in url:
raise ValueError(f"No se puede acceder a {url}")
return f"datos de {url}"
async def main():
resultados = []
async with asyncio.TaskGroup() as tg:
tareas = [
tg.create_task(scrape("https://api.com/a")),
tg.create_task(scrape("https://api.com/b")),
tg.create_task(scrape("https://api.com/c")),
]
# Aquí todas las tareas han terminado
for t in tareas:
resultados.append(t.result())
print(resultados)
asyncio.run(main())
asyncio.Queue comunicación entre productores y consumidores
asyncio.Queue es una cola FIFO segura para coroutines. El patrón clásico es tener uno o más productores que añaden trabajo y varios consumidores que lo procesan en paralelo.
import asyncio
async def productor(cola: asyncio.Queue, items: list):
for item in items:
await cola.put(item)
print(f"Producido: {item}")
await asyncio.sleep(0.1)
# Señal de fin para cada consumidor
await cola.put(None)
await cola.put(None)
async def consumidor(nombre: str, cola: asyncio.Queue):
while True:
item = await cola.get()
if item is None:
break
await asyncio.sleep(0.2) # simula procesamiento
print(f"[{nombre}] procesado: {item}")
cola.task_done()
async def main():
cola: asyncio.Queue = asyncio.Queue(maxsize=5)
urls = [f"url_{i}" for i in range(6)]
await asyncio.gather(
productor(cola, urls),
consumidor("worker-1", cola),
consumidor("worker-2", cola),
)
asyncio.run(main())
asyncio.Semaphore limitar la concurrencia
Cuando haces scraping o llamas a una API externa, lanzar 1000 requests simultáneos puede agotar conexiones o violar rate limits. Semaphore limita el número de operaciones concurrentes.
import asyncio
import aiohttp
MAX_CONCURRENTES = 5
async def fetch(session: aiohttp.ClientSession, sem: asyncio.Semaphore, url: str) -> str:
async with sem: # solo MAX_CONCURRENTES a la vez
async with session.get(url) as resp:
return await resp.text()
async def main():
urls = [f"https://jsonplaceholder.typicode.com/todos/{i}" for i in range(1, 21)]
sem = asyncio.Semaphore(MAX_CONCURRENTES)
async with aiohttp.ClientSession() as session:
tareas = [fetch(session, sem, url) for url in urls]
resultados = await asyncio.gather(*tareas)
print(f"Descargados: {len(resultados)} recursos")
asyncio.run(main())
asyncio.Lock y asyncio.Event
import asyncio
# Lock: protege un recurso compartido
async def actualizar_contador(lock: asyncio.Lock, contador: list):
async with lock:
valor = contador[0]
await asyncio.sleep(0) # cede el control
contador[0] = valor + 1
# Event: señaliza entre coroutines
async def esperar_inicio(evento: asyncio.Event, nombre: str):
await evento.wait()
print(f"{nombre} arranca")
async def main():
# Lock
lock = asyncio.Lock()
contador = [0]
await asyncio.gather(*[actualizar_contador(lock, contador) for _ in range(100)])
print(f"Contador: {contador[0]}") # 100, sin race condition
# Event
evento = asyncio.Event()
tareas = [asyncio.create_task(esperar_inicio(evento, f"tarea-{i}")) for i in range(3)]
await asyncio.sleep(0.5)
evento.set() # desbloquea todas las tareas a la vez
await asyncio.gather(*tareas)
asyncio.run(main())
El antipatrón más común: bloquear el event loop
import asyncio
import time
import httpx # librería síncrona
# MAL: llama a una función síncrona bloqueante dentro de una coroutine
async def descargar_mal(url: str) -> str:
respuesta = httpx.get(url) # bloquea el event loop entero
return respuesta.text
# BIEN: usa run_in_executor para delegar código síncrono a un thread pool
async def descargar_bien(url: str) -> str:
loop = asyncio.get_running_loop()
respuesta = await loop.run_in_executor(None, httpx.get, url)
return respuesta.text
# MEJOR: usa una librería async nativa (aiohttp, httpx con async)
import httpx as httpx_async
async def descargar_mejor(url: str) -> str:
async with httpx_async.AsyncClient() as client:
respuesta = await client.get(url)
return respuesta.text
La regla práctica para elegir la herramienta adecuada: usa gather() cuando tienes un número fijo de coroutines y necesitas todos los resultados; TaskGroup cuando quieres cancelación automática ante el primer fallo; Queue para el patrón productor-consumidor con trabajo dinámico; y Semaphore siempre que llames a recursos externos con límites de tasa.
