Si alguna vez has escrito un script que hace cien peticiones HTTP y tarda un minuto porque las lanza una detrás de otra, asyncio es lo que necesitas. La librería permite ejecutar muchas operaciones de I/O a la vez dentro de un único thread, sin el coste de lanzar cientos de threads del sistema operativo.
El truco está en el event loop: mientras una coroutine espera la respuesta de un servidor, el loop pasa a ejecutar otra. No hay paralelismo real de CPU, pero cuando el tiempo lo pasa esperando (HTTP, base de datos, lectura de disco), el resultado práctico es el mismo.
Una aclaración que vale la pena hacer: asyncio no resuelve problemas de CPU. Si tienes un cálculo pesado, un modelo de ML o procesamiento de imágenes, necesitas multiprocessing o el modo free-threaded de Python 3.13. asyncio es para I/O-bound: peticiones de red, consultas a BBDD, lectura de ficheros lentos.
Los dos conceptos que lo sostienen todo:
async def: define una coroutine, es decir, una función que puede suspenderse en medio de su ejecución.await: suspende la coroutine actual y devuelve el control al event loop hasta que la operación termina.
async def obtener_datos(url):
async with httpx.AsyncClient() as client:
respuesta = await client.get(url)
return respuesta.json()
asyncio.run() y el event loop
La forma correcta de arrancar una aplicación async en 2026 es asyncio.run(). Inicia el event loop, ejecuta la coroutine que le pasas y cierra el loop al terminar.
import asyncio
async def main():
print("Hola desde asyncio")
asyncio.run(main())
Lo que no deberías hacer es el patrón antiguo:
# Evitar en código nuevo
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Este patrón está deprecado en algunos contextos desde Python 3.10. Si ves código así en un proyecto heredado, es señal de que fue escrito antes de Python 3.7. Sustitúyelo por asyncio.run() y listo.
TaskGroup: structured concurrency desde Python 3.11
asyncio.TaskGroup es la forma moderna de lanzar varias tareas en paralelo y esperar a que todas terminen. Apareció en Python 3.11 y resuelve un problema real: con gather() era fácil perder excepciones o dejar tareas huérfanas si algo fallaba.
import asyncio
async def tarea(nombre, segundos):
await asyncio.sleep(segundos)
print(f"{nombre} terminada")
return nombre
async def main():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(tarea("A", 1))
t2 = tg.create_task(tarea("B", 2))
t3 = tg.create_task(tarea("C", 0.5))
# Aquí ya han terminado las tres
print(t1.result(), t2.result(), t3.result())
asyncio.run(main())
El bloque async with no termina hasta que todas las tareas dentro han completado. Si una lanza una excepción, TaskGroup cancela las demás y vuelve a lanzar la excepción. No se pierde nada en silencio, que es exactamente lo que pasaba antes si no eras cuidadoso con gather().
Para proyectos que todavía usan Python 3.10 o anterior, asyncio.gather() sigue siendo la alternativa. Pero si controlas la versión de Python, TaskGroup es más claro y más seguro.
asyncio.timeout(): cancelar operaciones lentas
Otro añadido de Python 3.11. Antes, para limitar el tiempo de una operación tenías que usar asyncio.wait_for(coro, timeout=5.0), que funciona bien pero mezcla la lógica de la operación con la del timeout. El nuevo context manager es más limpio:
import asyncio
async def operacion_lenta():
await asyncio.sleep(10)
return "resultado"
async def main():
try:
async with asyncio.timeout(3.0):
resultado = await operacion_lenta()
except TimeoutError:
print("La operación tardó demasiado")
asyncio.run(main())
Si la operación no termina en 3 segundos, se cancela y se lanza TimeoutError. Puedes capturarlo como ves arriba o dejarlo subir para que lo gestione un nivel superior.
Se puede combinar con TaskGroup sin problema:
async with asyncio.timeout(10.0):
async with asyncio.TaskGroup() as tg:
tg.create_task(descarga_a())
tg.create_task(descarga_b())
Si el grupo completo tarda más de 10 segundos, se cancela todo.
asyncio.to_thread(): integrar código bloqueante
El punto débil de asyncio es que cualquier llamada síncrona que tarde tiempo bloquea el event loop entero. Si llamas a requests.get(url) directamente dentro de una coroutine, todas las demás tareas se quedan paradas mientras espera la respuesta.
asyncio.to_thread() resuelve esto mandando la función bloqueante a un thread pool, sin bloquear el event loop:
import asyncio
import requests
async def descargar(url):
# requests es síncrono, pero to_thread evita que bloquee el loop
respuesta = await asyncio.to_thread(requests.get, url)
return respuesta.json()
async def main():
urls = [
"https://api.ejemplo.com/datos/1",
"https://api.ejemplo.com/datos/2",
"https://api.ejemplo.com/datos/3",
]
async with asyncio.TaskGroup() as tg:
tareas = [tg.create_task(descargar(url)) for url in urls]
asyncio.run(main())
Para ficheros, lo equivalente es la librería aiofiles, que tiene una API async nativa y es más eficiente que to_thread con archivos grandes:
import aiofiles
async def leer_fichero(ruta):
async with aiofiles.open(ruta, 'r') as f:
contenido = await f.read()
return contenido
asyncio.gather(): varias coroutines a la vez
gather() es la forma clásica de ejecutar varias coroutines concurrentemente y recoger sus resultados:
import asyncio
async def fetch(n):
await asyncio.sleep(n)
return n * 2
async def main():
resultados = await asyncio.gather(
fetch(1),
fetch(2),
fetch(3),
)
print(resultados) # [2, 4, 6]
asyncio.run(main())
Por defecto, si una coroutine lanza una excepción, gather() la propaga y cancela las demás. Si prefieres que las tareas independientes sigan aunque alguna falle, usa return_exceptions=True:
resultados = await asyncio.gather(
fetch(1),
fetch_que_puede_fallar(),
fetch(3),
return_exceptions=True,
)
# resultados[1] será una instancia de Exception, no se propaga
for r in resultados:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(f"OK: {r}")
Para Python 3.11+, TaskGroup suele ser preferible porque el manejo de excepciones es más predecible. Pero gather() sigue siendo útil cuando necesitas return_exceptions=True o trabajas con versiones antiguas.
asyncio.Queue y el patrón productor-consumidor
Cuando tienes un productor (que genera trabajo) y varios consumidores (que lo procesan), asyncio.Queue es la herramienta adecuada. Controla el flujo sin locks ni condiciones de carrera.
import asyncio
async def productor(cola, urls):
for url in urls:
await cola.put(url)
# Señal de que no hay más trabajo
await cola.put(None)
async def consumidor(id, cola):
while True:
url = await cola.get()
if url is None:
# Devolver la señal para los demás consumidores
await cola.put(None)
break
print(f"Consumidor {id} procesando {url}")
await asyncio.sleep(0.5) # Simula trabajo
async def main():
cola = asyncio.Queue(maxsize=10)
urls = [f"https://api.ejemplo.com/{i}" for i in range(20)]
async with asyncio.TaskGroup() as tg:
tg.create_task(productor(cola, urls))
for i in range(3): # 3 consumidores en paralelo
tg.create_task(consumidor(i, cola))
asyncio.run(main())
await cola.put(item) espera si la cola está llena. await cola.get() espera si está vacía. El parámetro maxsize controla la presión entre productor y consumidores: si el productor va muy por delante, se frena solo.
Limitar concurrencia con asyncio.Semaphore
Cuando no necesitas una cola pero sí quieres limitar cuántas operaciones se ejecutan a la vez, asyncio.Semaphore es más sencillo:
import asyncio
import httpx
async def descargar_con_limite(client, url, sem):
async with sem:
respuesta = await client.get(url)
return respuesta.text
async def main():
urls = [f"https://api.ejemplo.com/{i}" for i in range(100)]
sem = asyncio.Semaphore(10) # Máximo 10 peticiones simultáneas
async with httpx.AsyncClient() as client:
tareas = [descargar_con_limite(client, url, sem) for url in urls]
resultados = await asyncio.gather(*tareas)
asyncio.run(main())
Sin el semáforo, las 100 peticiones se lanzarían al mismo tiempo. Con él, nunca hay más de 10 activas. Útil para no saturar una API externa o la propia red.
HTTP async: aiohttp y httpx
Para hacer peticiones HTTP asíncronas tienes dos opciones principales.
aiohttp
El cliente más maduro del ecosistema async de Python. API algo verbosa pero muy completa:
import asyncio
import aiohttp
async def obtener(session, url):
async with session.get(url) as respuesta:
return await respuesta.json()
async def main():
async with aiohttp.ClientSession() as session:
datos = await asyncio.gather(
obtener(session, "https://api.ejemplo.com/a"),
obtener(session, "https://api.ejemplo.com/b"),
)
print(datos)
asyncio.run(main())
httpx
Más moderno, con API similar a requests pero con soporte async. Si ya conoces requests, httpx tiene una curva de aprendizaje casi plana:
import asyncio
import httpx
async def main():
async with httpx.AsyncClient() as client:
r1, r2 = await asyncio.gather(
client.get("https://api.ejemplo.com/a"),
client.get("https://api.ejemplo.com/b"),
)
print(r1.json(), r2.json())
asyncio.run(main())
La ventaja de httpx es que puedes usar el mismo cliente en modo síncrono (httpx.Client) y async (httpx.AsyncClient), lo que facilita la migración gradual de código existente. Para scraping o llamadas masivas a APIs, con asyncio + httpx puedes gestionar 100 peticiones en el tiempo que una sola tardaría con requests en modo síncrono.
Un par de cosas que conviene saber antes de lanzarte
Primero: no todo tiene que ser async. Si tu script hace tres peticiones y ya está, la complejidad añadida de asyncio no vale la pena. La concurrencia async tiene sentido cuando tienes docenas o cientos de operaciones de I/O concurrentes.
Segundo: depurar código async es más complicado que código síncrono. Los stack traces pueden ser confusos y los errores de cancelación no siempre son obvios. Activa el modo debug de asyncio durante el desarrollo:
asyncio.run(main(), debug=True)
Esto activa advertencias para coroutines no esperadas, tasks lentas y otros problemas comunes que en modo normal pasan en silencio.
Si quieres ver asyncio aplicado a algo más concreto, mira cómo se usa en asyncio en práctica: agentes ReAct multi-modelo con Python o cómo afecta al análisis de detectar memory leaks en código Python async.
Imagen: Pexels / Tiger Lily
