asyncio avanzado: gather, TaskGroup, Queue y sincronización en Python async

Una vez que dominas async/await básico, el siguiente paso es coordinar múltiples coroutines entre sí: lanzarlas en grupo, limitar cuántas corren a la vez, comunicarlas mediante colas y proteger recursos compartidos. Aquí entran gather(), TaskGroup, Queue y Semaphore.

asyncio.gather() — el caballo de batalla

gather() lanza varias coroutines en paralelo y espera a que todas terminen. El orden de los resultados coincide con el orden de las coroutines pasadas, independientemente de cuál acabe antes.

import asyncio

async def obtener_precio(producto: str) -> float:
    await asyncio.sleep(0.5)  # simula llamada a API
    precios = {"manzana": 1.20, "pera": 0.90, "uva": 2.50}
    return precios.get(producto, 0.0)

async def main():
    productos = ["manzana", "pera", "uva"]
    precios = await asyncio.gather(
        *[obtener_precio(p) for p in productos]
    )
    for producto, precio in zip(productos, precios):
        print(f"{producto}: {precio:.2f} €")

asyncio.run(main())
# manzana: 1.20 €  pera: 0.90 €  uva: 2.50 €  — todo en ~0.5s

Por defecto, si una coroutine lanza una excepción, gather() propaga el primer error y cancela las demás. Con return_exceptions=True las excepciones se devuelven como valores normales en la lista de resultados.

resultados = await asyncio.gather(
    tarea_buena(),
    tarea_que_falla(),
    return_exceptions=True
)
for r in resultados:
    if isinstance(r, Exception):
        print(f"Error: {r}")
    else:
        print(f"OK: {r}")

TaskGroup (Python 3.11+) — gestión estructurada

asyncio.TaskGroup es la alternativa moderna a gather(). Si cualquier tarea falla, cancela el resto del grupo automáticamente y propaga el error de forma limpia.

import asyncio

async def scrape(url: str) -> str:
    await asyncio.sleep(0.3)
    if "error" in url:
        raise ValueError(f"No se puede acceder a {url}")
    return f"datos de {url}"

async def main():
    resultados = []
    async with asyncio.TaskGroup() as tg:
        tareas = [
            tg.create_task(scrape("https://api.com/a")),
            tg.create_task(scrape("https://api.com/b")),
            tg.create_task(scrape("https://api.com/c")),
        ]
    # Aquí todas las tareas han terminado
    for t in tareas:
        resultados.append(t.result())
    print(resultados)

asyncio.run(main())

asyncio.Queue — comunicación entre productores y consumidores

asyncio.Queue es una cola FIFO segura para coroutines. El patrón clásico es tener uno o más productores que añaden trabajo y varios consumidores que lo procesan en paralelo.

import asyncio

async def productor(cola: asyncio.Queue, items: list):
    for item in items:
        await cola.put(item)
        print(f"Producido: {item}")
        await asyncio.sleep(0.1)
    # Señal de fin para cada consumidor
    await cola.put(None)
    await cola.put(None)

async def consumidor(nombre: str, cola: asyncio.Queue):
    while True:
        item = await cola.get()
        if item is None:
            break
        await asyncio.sleep(0.2)  # simula procesamiento
        print(f"[{nombre}] procesado: {item}")
        cola.task_done()

async def main():
    cola: asyncio.Queue = asyncio.Queue(maxsize=5)
    urls = [f"url_{i}" for i in range(6)]
    await asyncio.gather(
        productor(cola, urls),
        consumidor("worker-1", cola),
        consumidor("worker-2", cola),
    )

asyncio.run(main())

asyncio.Semaphore — limitar la concurrencia

Cuando haces scraping o llamas a una API externa, lanzar 1000 requests simultáneos puede agotar conexiones o violar rate limits. Semaphore limita el número de operaciones concurrentes.

import asyncio
import aiohttp

MAX_CONCURRENTES = 5

async def fetch(session: aiohttp.ClientSession, sem: asyncio.Semaphore, url: str) -> str:
    async with sem:  # solo MAX_CONCURRENTES a la vez
        async with session.get(url) as resp:
            return await resp.text()

async def main():
    urls = [f"https://jsonplaceholder.typicode.com/todos/{i}" for i in range(1, 21)]
    sem = asyncio.Semaphore(MAX_CONCURRENTES)
    async with aiohttp.ClientSession() as session:
        tareas = [fetch(session, sem, url) for url in urls]
        resultados = await asyncio.gather(*tareas)
    print(f"Descargados: {len(resultados)} recursos")

asyncio.run(main())

asyncio.Lock y asyncio.Event

import asyncio

# Lock: protege un recurso compartido
async def actualizar_contador(lock: asyncio.Lock, contador: list):
    async with lock:
        valor = contador[0]
        await asyncio.sleep(0)  # cede el control
        contador[0] = valor + 1

# Event: señaliza entre coroutines
async def esperar_inicio(evento: asyncio.Event, nombre: str):
    await evento.wait()
    print(f"{nombre} arranca")

async def main():
    # Lock
    lock = asyncio.Lock()
    contador = [0]
    await asyncio.gather(*[actualizar_contador(lock, contador) for _ in range(100)])
    print(f"Contador: {contador[0]}")  # 100, sin race condition

    # Event
    evento = asyncio.Event()
    tareas = [asyncio.create_task(esperar_inicio(evento, f"tarea-{i}")) for i in range(3)]
    await asyncio.sleep(0.5)
    evento.set()  # desbloquea todas las tareas a la vez
    await asyncio.gather(*tareas)

asyncio.run(main())

El antipatrón más común: bloquear el event loop

import asyncio
import time
import httpx  # librería síncrona

# MAL: llama a una función síncrona bloqueante dentro de una coroutine
async def descargar_mal(url: str) -> str:
    respuesta = httpx.get(url)  # bloquea el event loop entero
    return respuesta.text

# BIEN: usa run_in_executor para delegar código síncrono a un thread pool
async def descargar_bien(url: str) -> str:
    loop = asyncio.get_running_loop()
    respuesta = await loop.run_in_executor(None, httpx.get, url)
    return respuesta.text

# MEJOR: usa una librería async nativa (aiohttp, httpx con async)
import httpx as httpx_async
async def descargar_mejor(url: str) -> str:
    async with httpx_async.AsyncClient() as client:
        respuesta = await client.get(url)
        return respuesta.text

La regla práctica para elegir la herramienta adecuada: usa gather() cuando tienes un número fijo de coroutines y necesitas todos los resultados; TaskGroup cuando quieres cancelación automática ante el primer fallo; Queue para el patrón productor-consumidor con trabajo dinámico; y Semaphore siempre que llames a recursos externos con límites de tasa.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP