El código síncrono ejecuta una tarea cada vez: mientras espera que el servidor de base de datos responda, el programa se queda parado. asyncio resuelve esto con un modelo de concurrencia basado en un bucle de eventos: una sola hebra puede gestionar cientos de operaciones I/O sin bloquearse, cediéndolas cuando están esperando y retomándolas cuando tienen datos.
Coroutines: la unidad básica de asyncio
Una coroutine es una función definida con async def. Llamarla no la ejecuta: devuelve un objeto coroutine que debe ser awaited. La palabra clave await pausa la coroutine actual y devuelve el control al bucle de eventos hasta que la operación termine.
import asyncio
async def saludar(nombre: str, delay: float) -> str:
await asyncio.sleep(delay) # pausa sin bloquear el hilo
return f"Hola, {nombre}"
async def main():
resultado = await saludar("mundo", 1.0)
print(resultado) # Hola, mundo
asyncio.run(main()) # punto de entrada: crea y cierra el event loop
El event loop: cómo funciona por dentro
El bucle de eventos es el motor de asyncio. Mantiene una cola de coroutines listas para ejecutarse. Cuando una coroutine hace await sobre una operación I/O, el bucle la suspende y ejecuta la siguiente tarea pendiente. Cuando la I/O termina, la coroutine vuelve a la cola.
import asyncio
import time
async def tarea(nombre: str, segundos: float):
print(f"[{nombre}] empieza")
await asyncio.sleep(segundos)
print(f"[{nombre}] termina")
async def main():
inicio = time.perf_counter()
# Ejecutar secuencialmente (await uno tras otro)
await tarea("A", 1.0)
await tarea("B", 1.0)
print(f"Secuencial: {time.perf_counter() - inicio:.1f}s") # ~2.0s
asyncio.run(main())
Si las ejecutas en secuencia tardas 2 segundos. Lanzándolas en paralelo con gather() tardas 1.
asyncio.gather() para ejecución concurrente
import asyncio
import time
async def descargar(url: str, delay: float) -> str:
await asyncio.sleep(delay) # simula latencia de red
return f"contenido de {url}"
async def main():
inicio = time.perf_counter()
resultados = await asyncio.gather(
descargar("https://api.ejemplo.com/usuarios", 1.0),
descargar("https://api.ejemplo.com/productos", 1.5),
descargar("https://api.ejemplo.com/pedidos", 0.8),
)
print(f"Tiempo: {time.perf_counter() - inicio:.1f}s") # ~1.5s
for r in resultados:
print(r)
asyncio.run(main())
create_task(): lanzar tareas sin esperar
asyncio.create_task() programa una coroutine para ejecutarse en el background sin necesidad de hacer await inmediatamente. Devuelve un objeto Task que puedes esperar más adelante.
import asyncio
async def proceso_lento(nombre: str) -> str:
await asyncio.sleep(2)
return f"resultado de {nombre}"
async def main():
# Lanza la tarea sin esperarla
tarea = asyncio.create_task(proceso_lento("job-1"))
# Hace otras cosas mientras tanto
print("Haciendo trabajo ligero...")
await asyncio.sleep(0.5)
print("Más trabajo ligero...")
# Ahora sí espera el resultado
resultado = await tarea
print(resultado)
asyncio.run(main())
async/await con I/O real: aiohttp
# pip install aiohttp
import asyncio
import aiohttp
async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as respuesta:
return await respuesta.json()
async def main():
urls = [
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://jsonplaceholder.typicode.com/todos/3",
]
async with aiohttp.ClientSession() as session:
tareas = [fetch(session, url) for url in urls]
resultados = await asyncio.gather(*tareas)
for r in resultados:
print(r["title"])
asyncio.run(main())
asyncio vs threads vs multiprocessing
La elección depende del tipo de carga:
- asyncio: ideal para I/O-bound con muchas conexiones simultáneas (APIs, bases de datos, scraping). Un solo hilo, sin overhead de creación de hebras ni GIL.
- threading: útil cuando usas librerías síncronas que no tienen versión async. El GIL limita el paralelismo real en CPU-bound.
- multiprocessing: para CPU-bound real (cálculo intensivo, procesamiento de imágenes). Evita el GIL usando procesos separados con memoria independiente.
import asyncio
from concurrent.futures import ProcessPoolExecutor
def calculo_pesado(n: int) -> int:
"""CPU-bound: no puede usar asyncio directamente."""
return sum(i * i for i in range(n))
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
# Delega el trabajo CPU-bound a un proceso separado
resultado = await loop.run_in_executor(pool, calculo_pesado, 10_000_000)
print(resultado)
asyncio.run(main())
Errores comunes con asyncio
# MAL: olvidar await la coroutine no se ejecuta
async def main():
saludar("mundo", 1.0) # RuntimeWarning: coroutine never awaited
# MAL: llamar a asyncio.run() dentro de una coroutine
async def main():
asyncio.run(otra_coroutine()) # RuntimeError
# BIEN: usar await dentro de coroutines
async def main():
await otra_coroutine()
# MAL: usar time.sleep() en vez de asyncio.sleep() bloquea el event loop
async def tarea():
import time
time.sleep(1) # bloquea todo el programa
# BIEN
async def tarea():
await asyncio.sleep(1)
La regla de oro de asyncio: todo lo que bloquee el hilo (red, disco, espera) debe ser awaitable. Cualquier función síncrona de larga duración congela el bucle de eventos para todas las tareas concurrentes.
