Celery es la biblioteca de colas de tareas más popular del ecosistema Python. Permite ejecutar trabajo de forma asíncrona (enviar emails, generar PDFs, procesar imágenes) sin bloquear la respuesta HTTP, y también programar tareas periódicas. Este tutorial cubre el flujo completo con Redis como broker: instalación, definición de tareas, envío, reintentos, flujos de trabajo y monitorización.
Instalación
# pip install celery[redis] # pip install redis # También necesitas Redis corriendo: docker run -d -p 6379:6379 redis:7
Configuración básica
# tareas.py
from celery import Celery
app = Celery(
'miapp',
broker='redis://localhost:6379/0', # donde se almacenan las tareas
backend='redis://localhost:6379/1', # donde se guardan los resultados
)
app.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='Europe/Madrid',
enable_utc=True,
task_track_started=True,
result_expires=3600, # los resultados se borran tras 1 hora
)
Definir tareas con @app.task
# tareas.py (continuación)
import time
import logging
logger = logging.getLogger(__name__)
@app.task
def sumar(a: int, b: int) -> int:
return a + b
@app.task
def procesar_imagen(ruta: str, ancho: int, alto: int) -> str:
"""Redimensiona una imagen (simulado)."""
logger.info(f"Procesando {ruta} ? {ancho}x{alto}")
time.sleep(2) # simula trabajo real
return f"{ruta}_thumb_{ancho}x{alto}.jpg"
@app.task(name='tareas.enviar_email')
def enviar_email(destinatario: str, asunto: str, cuerpo: str) -> bool:
"""Envía un email (simulado)."""
logger.info(f"Enviando email a {destinatario}: {asunto}")
time.sleep(0.5)
return True
Lanzar el worker
# En la terminal: # celery -A tareas worker --loglevel=info --concurrency=4
delay() y apply_async(): encolar tareas
from tareas import sumar, procesar_imagen, enviar_email
# delay() es azúcar sobre apply_async()
resultado = sumar.delay(3, 4)
print(resultado.id) # UUID de la tarea
print(resultado.state) # PENDING, STARTED, SUCCESS, FAILURE
# Esperar el resultado (bloqueante)
valor = resultado.get(timeout=10)
print(valor) # 7
# apply_async() permite opciones adicionales
procesar_imagen.apply_async(
args=['/uploads/foto.jpg', 800, 600],
countdown=30, # ejecutar en 30 segundos
expires=3600 # caducar si no se ejecuta en 1 hora
)
# Encolar en una cola específica
enviar_email.apply_async(
args=['[email protected]', 'Bienvenido', 'Hola...'],
queue='emails'
)
Reintentos automáticos
import httpx
from tareas import app
@app.task(
bind=True,
max_retries=5,
default_retry_delay=60, # segundos entre intentos
autoretry_for=(httpx.RequestError,),
retry_backoff=True, # espera exponencial
retry_backoff_max=600 # máximo 10 minutos
)
def llamar_api_externa(self, url: str) -> dict:
try:
with httpx.Client(timeout=10) as cliente:
resp = cliente.get(url)
resp.raise_for_status()
return resp.json()
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 429: # rate limit
raise self.retry(countdown=120, exc=exc)
raise
Chains y groups: flujos de trabajo
from celery import chain, group, chord
from tareas import procesar_imagen, enviar_email, sumar
# Chain: ejecutar en secuencia (la salida de una es la entrada de la siguiente)
flujo = chain(
procesar_imagen.s('/uploads/foto.jpg', 800, 600),
enviar_email.s('[email protected]', 'Imagen procesada')
)
resultado = flujo.delay()
# Group: ejecutar en paralelo
grupo = group(
sumar.s(1, 2),
sumar.s(3, 4),
sumar.s(5, 6)
)
resultado = grupo.delay()
print(resultado.get()) # [3, 7, 11]
# Chord: grupo en paralelo + callback cuando todos terminan
from celery import chord
def suma_total(resultados: list) -> int:
return sum(resultados)
acorde = chord(
group(sumar.s(i, i) for i in range(5)),
sumar.s(0) # suma todos los resultados con 0 adicional
)
# resultado final cuando todos los subtareas terminen
Celery Beat: tareas periódicas
# celeryconfig.py o dentro de app.conf
from celery.schedules import crontab
app.conf.beat_schedule = {
'limpiar-sesiones-caducadas': {
'task': 'tareas.limpiar_sesiones',
'schedule': crontab(hour=3, minute=0), # a las 3:00 cada día
},
'informe-ventas-semanal': {
'task': 'tareas.generar_informe_ventas',
'schedule': crontab(hour=8, minute=0, day_of_week='monday'),
},
'ping-salud-cada-minuto': {
'task': 'tareas.ping_salud',
'schedule': 60.0, # cada 60 segundos
'args': ('production',),
},
}
# Lanzar el scheduler Beat:
# celery -A tareas beat --loglevel=info
Monitorizar con Flower
# pip install flower # celery -A tareas flower --port=5555 # Abre http://localhost:5555 en el navegador: # - Dashboard de workers activos y su estado # - Cola de tareas pendientes, en ejecución y completadas # - Detalles de cada tarea: argumentos, resultado, tiempo # - API REST para consultar el estado programáticamente
Buenas prácticas
- Las tareas deben ser idempotentes: ejecutarlas dos veces no debe causar daño, porque Celery puede reencolarlas en caso de fallo del worker.
- Pasa IDs, no objetos: en lugar de
enviar_email.delay(objeto_usuario), pasaenviar_email.delay(usuario_id)y carga el objeto dentro de la tarea. - Usa colas separadas para tareas de distinta prioridad o tipo (
emails,imagenes,informes). - Configura
task_acks_late=Trueyworker_prefetch_multiplier=1para tareas largas para evitar que un worker muerto pierda tareas.
