asyncio en Python: async/await, coroutines y el event loop explicados

El código síncrono ejecuta una tarea cada vez: mientras espera que el servidor de base de datos responda, el programa se queda parado. asyncio resuelve esto con un modelo de concurrencia basado en un bucle de eventos: una sola hebra puede gestionar cientos de operaciones I/O sin bloquearse, cediéndolas cuando están esperando y retomándolas cuando tienen datos.

Coroutines: la unidad básica de asyncio

Una coroutine es una función definida con async def. Llamarla no la ejecuta: devuelve un objeto coroutine que debe ser awaited. La palabra clave await pausa la coroutine actual y devuelve el control al bucle de eventos hasta que la operación termine.

import asyncio

async def saludar(nombre: str, delay: float) -> str:
    await asyncio.sleep(delay)  # pausa sin bloquear el hilo
    return f"Hola, {nombre}"

async def main():
    resultado = await saludar("mundo", 1.0)
    print(resultado)  # Hola, mundo

asyncio.run(main())  # punto de entrada: crea y cierra el event loop

El event loop: cómo funciona por dentro

El bucle de eventos es el motor de asyncio. Mantiene una cola de coroutines listas para ejecutarse. Cuando una coroutine hace await sobre una operación I/O, el bucle la suspende y ejecuta la siguiente tarea pendiente. Cuando la I/O termina, la coroutine vuelve a la cola.

import asyncio
import time

async def tarea(nombre: str, segundos: float):
    print(f"[{nombre}] empieza")
    await asyncio.sleep(segundos)
    print(f"[{nombre}] termina")

async def main():
    inicio = time.perf_counter()
    # Ejecutar secuencialmente (await uno tras otro)
    await tarea("A", 1.0)
    await tarea("B", 1.0)
    print(f"Secuencial: {time.perf_counter() - inicio:.1f}s")  # ~2.0s

asyncio.run(main())

Si las ejecutas en secuencia tardas 2 segundos. Lanzándolas en paralelo con gather() tardas 1.

asyncio.gather() para ejecución concurrente

import asyncio
import time

async def descargar(url: str, delay: float) -> str:
    await asyncio.sleep(delay)  # simula latencia de red
    return f"contenido de {url}"

async def main():
    inicio = time.perf_counter()
    resultados = await asyncio.gather(
        descargar("https://api.ejemplo.com/usuarios", 1.0),
        descargar("https://api.ejemplo.com/productos", 1.5),
        descargar("https://api.ejemplo.com/pedidos", 0.8),
    )
    print(f"Tiempo: {time.perf_counter() - inicio:.1f}s")  # ~1.5s
    for r in resultados:
        print(r)

asyncio.run(main())

create_task(): lanzar tareas sin esperar

asyncio.create_task() programa una coroutine para ejecutarse en el background sin necesidad de hacer await inmediatamente. Devuelve un objeto Task que puedes esperar más adelante.

import asyncio

async def proceso_lento(nombre: str) -> str:
    await asyncio.sleep(2)
    return f"resultado de {nombre}"

async def main():
    # Lanza la tarea sin esperarla
    tarea = asyncio.create_task(proceso_lento("job-1"))

    # Hace otras cosas mientras tanto
    print("Haciendo trabajo ligero...")
    await asyncio.sleep(0.5)
    print("Más trabajo ligero...")

    # Ahora sí espera el resultado
    resultado = await tarea
    print(resultado)

asyncio.run(main())

async/await con I/O real: aiohttp

# pip install aiohttp
import asyncio
import aiohttp

async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as respuesta:
        return await respuesta.json()

async def main():
    urls = [
        "https://jsonplaceholder.typicode.com/todos/1",
        "https://jsonplaceholder.typicode.com/todos/2",
        "https://jsonplaceholder.typicode.com/todos/3",
    ]
    async with aiohttp.ClientSession() as session:
        tareas = [fetch(session, url) for url in urls]
        resultados = await asyncio.gather(*tareas)
    for r in resultados:
        print(r["title"])

asyncio.run(main())

asyncio vs threads vs multiprocessing

La elección depende del tipo de carga:

  • asyncio: ideal para I/O-bound con muchas conexiones simultáneas (APIs, bases de datos, scraping). Un solo hilo, sin overhead de creación de hebras ni GIL.
  • threading: útil cuando usas librerías síncronas que no tienen versión async. El GIL limita el paralelismo real en CPU-bound.
  • multiprocessing: para CPU-bound real (cálculo intensivo, procesamiento de imágenes). Evita el GIL usando procesos separados con memoria independiente.
import asyncio
from concurrent.futures import ProcessPoolExecutor

def calculo_pesado(n: int) -> int:
    """CPU-bound: no puede usar asyncio directamente."""
    return sum(i * i for i in range(n))

async def main():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        # Delega el trabajo CPU-bound a un proceso separado
        resultado = await loop.run_in_executor(pool, calculo_pesado, 10_000_000)
    print(resultado)

asyncio.run(main())

Errores comunes con asyncio

# MAL: olvidar await — la coroutine no se ejecuta
async def main():
    saludar("mundo", 1.0)  # RuntimeWarning: coroutine never awaited

# MAL: llamar a asyncio.run() dentro de una coroutine
async def main():
    asyncio.run(otra_coroutine())  # RuntimeError

# BIEN: usar await dentro de coroutines
async def main():
    await otra_coroutine()

# MAL: usar time.sleep() en vez de asyncio.sleep() — bloquea el event loop
async def tarea():
    import time
    time.sleep(1)  # bloquea todo el programa

# BIEN
async def tarea():
    await asyncio.sleep(1)

La regla de oro de asyncio: todo lo que bloquee el hilo (red, disco, espera) debe ser awaitable. Cualquier función síncrona de larga duración congela el bucle de eventos para todas las tareas concurrentes.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP