Celery en Python: tareas asíncronas, Redis como broker, Beat scheduler y monitorización

Celery es la biblioteca de colas de tareas más popular del ecosistema Python. Permite ejecutar trabajo de forma asíncrona (enviar emails, generar PDFs, procesar imágenes) sin bloquear la respuesta HTTP, y también programar tareas periódicas. Este tutorial cubre el flujo completo con Redis como broker: instalación, definición de tareas, envío, reintentos, flujos de trabajo y monitorización.

Instalación

# pip install celery[redis]
# pip install redis
# También necesitas Redis corriendo: docker run -d -p 6379:6379 redis:7

Configuración básica

# tareas.py
from celery import Celery

app = Celery(
    'miapp',
    broker='redis://localhost:6379/0',    # donde se almacenan las tareas
    backend='redis://localhost:6379/1',   # donde se guardan los resultados
)

app.conf.update(
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    timezone='Europe/Madrid',
    enable_utc=True,
    task_track_started=True,
    result_expires=3600,   # los resultados se borran tras 1 hora
)

Definir tareas con @app.task

# tareas.py (continuación)
import time
import logging

logger = logging.getLogger(__name__)


@app.task
def sumar(a: int, b: int) -> int:
    return a + b


@app.task
def procesar_imagen(ruta: str, ancho: int, alto: int) -> str:
    """Redimensiona una imagen (simulado)."""
    logger.info(f"Procesando {ruta} ? {ancho}x{alto}")
    time.sleep(2)   # simula trabajo real
    return f"{ruta}_thumb_{ancho}x{alto}.jpg"


@app.task(name='tareas.enviar_email')
def enviar_email(destinatario: str, asunto: str, cuerpo: str) -> bool:
    """Envía un email (simulado)."""
    logger.info(f"Enviando email a {destinatario}: {asunto}")
    time.sleep(0.5)
    return True

Lanzar el worker

# En la terminal:
# celery -A tareas worker --loglevel=info --concurrency=4

delay() y apply_async(): encolar tareas

from tareas import sumar, procesar_imagen, enviar_email

# delay() es azúcar sobre apply_async()
resultado = sumar.delay(3, 4)
print(resultado.id)       # UUID de la tarea
print(resultado.state)    # PENDING, STARTED, SUCCESS, FAILURE

# Esperar el resultado (bloqueante)
valor = resultado.get(timeout=10)
print(valor)   # 7

# apply_async() permite opciones adicionales
procesar_imagen.apply_async(
    args=['/uploads/foto.jpg', 800, 600],
    countdown=30,        # ejecutar en 30 segundos
    expires=3600         # caducar si no se ejecuta en 1 hora
)

# Encolar en una cola específica
enviar_email.apply_async(
    args=['[email protected]', 'Bienvenido', 'Hola...'],
    queue='emails'
)

Reintentos automáticos

import httpx
from tareas import app

@app.task(
    bind=True,
    max_retries=5,
    default_retry_delay=60,     # segundos entre intentos
    autoretry_for=(httpx.RequestError,),
    retry_backoff=True,         # espera exponencial
    retry_backoff_max=600       # máximo 10 minutos
)
def llamar_api_externa(self, url: str) -> dict:
    try:
        with httpx.Client(timeout=10) as cliente:
            resp = cliente.get(url)
            resp.raise_for_status()
            return resp.json()
    except httpx.HTTPStatusError as exc:
        if exc.response.status_code == 429:   # rate limit
            raise self.retry(countdown=120, exc=exc)
        raise

Chains y groups: flujos de trabajo

from celery import chain, group, chord
from tareas import procesar_imagen, enviar_email, sumar

# Chain: ejecutar en secuencia (la salida de una es la entrada de la siguiente)
flujo = chain(
    procesar_imagen.s('/uploads/foto.jpg', 800, 600),
    enviar_email.s('[email protected]', 'Imagen procesada')
)
resultado = flujo.delay()

# Group: ejecutar en paralelo
grupo = group(
    sumar.s(1, 2),
    sumar.s(3, 4),
    sumar.s(5, 6)
)
resultado = grupo.delay()
print(resultado.get())   # [3, 7, 11]

# Chord: grupo en paralelo + callback cuando todos terminan
from celery import chord

def suma_total(resultados: list) -> int:
    return sum(resultados)

acorde = chord(
    group(sumar.s(i, i) for i in range(5)),
    sumar.s(0)   # suma todos los resultados con 0 adicional
)
# resultado final cuando todos los subtareas terminen

Celery Beat: tareas periódicas

# celeryconfig.py o dentro de app.conf
from celery.schedules import crontab

app.conf.beat_schedule = {
    'limpiar-sesiones-caducadas': {
        'task': 'tareas.limpiar_sesiones',
        'schedule': crontab(hour=3, minute=0),   # a las 3:00 cada día
    },
    'informe-ventas-semanal': {
        'task': 'tareas.generar_informe_ventas',
        'schedule': crontab(hour=8, minute=0, day_of_week='monday'),
    },
    'ping-salud-cada-minuto': {
        'task': 'tareas.ping_salud',
        'schedule': 60.0,   # cada 60 segundos
        'args': ('production',),
    },
}

# Lanzar el scheduler Beat:
# celery -A tareas beat --loglevel=info

Monitorizar con Flower

# pip install flower
# celery -A tareas flower --port=5555
# Abre http://localhost:5555 en el navegador:
# - Dashboard de workers activos y su estado
# - Cola de tareas pendientes, en ejecución y completadas
# - Detalles de cada tarea: argumentos, resultado, tiempo
# - API REST para consultar el estado programáticamente

Buenas prácticas

  • Las tareas deben ser idempotentes: ejecutarlas dos veces no debe causar daño, porque Celery puede reencolarlas en caso de fallo del worker.
  • Pasa IDs, no objetos: en lugar de enviar_email.delay(objeto_usuario), pasa enviar_email.delay(usuario_id) y carga el objeto dentro de la tarea.
  • Usa colas separadas para tareas de distinta prioridad o tipo (emails, imagenes, informes).
  • Configura task_acks_late=True y worker_prefetch_multiplier=1 para tareas largas para evitar que un worker muerto pierda tareas.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP