Colas de trabajos en PHP: background jobs con Redis, tabla de BD o Beanstalkd

Algunas tareas son demasiado lentas para ejecutarlas durante una petición HTTP: enviar emails, procesar imágenes, generar PDFs, llamar a APIs externas lentas, sincronizar datos… Si las ejecutas en el mismo ciclo request-response, el usuario espera. La solución es desacoplarlas: la petición encola el trabajo y un worker en background lo procesa de forma asíncrona.

Opción 1: cola con tabla de base de datos

El enfoque más portable: una tabla jobs actúa como cola; un worker PHP hace polling y procesa los trabajos uno a uno.

<?php
// Estructura de la tabla
// CREATE TABLE jobs (
//   id         INT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
//   tipo       VARCHAR(100) NOT NULL,
//   payload    JSON NOT NULL,
//   intentos   TINYINT DEFAULT 0,
//   siguiente  DATETIME DEFAULT CURRENT_TIMESTAMP,
//   completado TINYINT DEFAULT 0,
//   error      TEXT NULL
// );

// Encolar un trabajo (desde el controlador web)
function encolar(PDO $pdo, string $tipo, array $payload): void {
    $stmt = $pdo->prepare(
        "INSERT INTO jobs (tipo, payload) VALUES (:tipo, :payload)"
    );
    $stmt->execute([
        ':tipo'    => $tipo,
        ':payload' => json_encode($payload),
    ]);
}

// Uso
encolar($pdo, 'enviar_email', [
    'destinatario' => '[email protected]',
    'asunto'       => 'Bienvenido',
    'plantilla'    => 'bienvenida',
]);

Worker básico para la cola BD

<?php
// worker.php — ejecutar con: php worker.php
require __DIR__ . '/vendor/autoload.php';

$pdo = new PDO(getenv('DB_DSN'), getenv('DB_USER'), getenv('DB_PASS'));
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

echo "Worker iniciado. Procesando jobs...n";

while (true) {
    // Obtener y bloquear un job (SELECT ... FOR UPDATE SKIP LOCKED en MySQL 8)
    $pdo->beginTransaction();
    $stmt = $pdo->query(
        "SELECT * FROM jobs
         WHERE completado = 0
           AND siguiente <= NOW()
           AND intentos < 3
         ORDER BY siguiente
         LIMIT 1
         FOR UPDATE SKIP LOCKED"
    );
    $job = $stmt->fetch(PDO::FETCH_ASSOC);

    if (!$job) {
        $pdo->rollBack();
        sleep(2); // No hay jobs; esperar antes de volver a comprobar
        continue;
    }

    // Marcar como en proceso
    $pdo->prepare("UPDATE jobs SET intentos = intentos + 1 WHERE id = ?")->execute([$job['id']]);
    $pdo->commit();

    try {
        procesarJob($job['tipo'], json_decode($job['payload'], true));

        $pdo->prepare("UPDATE jobs SET completado = 1 WHERE id = ?")->execute([$job['id']]);
        echo "Job {$job['id']} ({$job['tipo']}) completadon";
    } catch (Exception $e) {
        // Programar reintento con backoff exponencial
        $siguiente = date('Y-m-d H:i:s', time() + (2 ** $job['intentos']) * 60);
        $pdo->prepare("UPDATE jobs SET siguiente = ?, error = ? WHERE id = ?")->execute([
            $siguiente, $e->getMessage(), $job['id']
        ]);
        echo "Job {$job['id']} fallido: {$e->getMessage()}n";
    }
}

function procesarJob(string $tipo, array $payload): void {
    match($tipo) {
        'enviar_email' => enviarEmail($payload),
        'procesar_imagen' => procesarImagen($payload),
        default => throw new RuntimeException("Tipo de job desconocido: $tipo"),
    };
}

Opción 2: cola con Redis (LPUSH/BRPOP)

Redis es más rápido y escalable que una tabla SQL para colas de alta frecuencia. BRPOP bloquea hasta que llega un elemento, sin polling:

<?php
// Encolar (desde el controlador web)
function encolarRedis(Redis $redis, string $cola, array $job): void {
    $redis->lPush($cola, json_encode($job));
}

encolarRedis($redis, 'jobs:email', [
    'tipo'         => 'enviar_email',
    'destinatario' => '[email protected]',
]);

// Worker Redis
$redis = new Redis();
$redis->connect(getenv('REDIS_HOST', '127.0.0.1'));

echo "Worker Redis escuchando...n";
while (true) {
    // BRPOP bloquea máx 5 segundos; devuelve [nombreCola, valor] o null
    $item = $redis->brPop(['jobs:email', 'jobs:imagenes'], 5);
    if ($item === null) continue;

    [, $payload] = $item;
    $job = json_decode($payload, true);

    try {
        procesarJob($job['tipo'], $job);
        echo "Job procesado: {$job['tipo']}n";
    } catch (Exception $e) {
        // Reencolar con contador de intentos o mover a dead-letter queue
        $job['intentos'] = ($job['intentos'] ?? 0) + 1;
        if ($job['intentos'] < 3) {
            $redis->lPush('jobs:email', json_encode($job));
        } else {
            $redis->lPush('jobs:dead', json_encode($job));
        }
    }
}

Backoff exponencial en reintentos

<?php
function calcularSiguienteIntento(int $intentos): DateTimeImmutable {
    // 1 min, 2 min, 4 min, 8 min, 16 min, máx 60 min
    $segundos = min(60 * 60, (2 ** $intentos) * 60);
    return new DateTimeImmutable("+{$segundos} seconds");
}

echo calcularSiguienteIntento(0)->format('H:i:s'); // +1 min
echo calcularSiguienteIntento(1)->format('H:i:s'); // +2 min
echo calcularSiguienteIntento(5)->format('H:i:s'); // +32 min

Varios workers en paralelo

Para procesar varios jobs simultáneamente lanza varios procesos del mismo worker. Con supervisord (o systemd) es sencillo:

# /etc/supervisor/conf.d/worker.conf
[program:php-worker]
command         = php /var/www/mi-app/worker.php
numprocs        = 4          ; 4 workers en paralelo
process_name    = %(program_name)s_%(process_num)02d
autostart       = true
autorestart     = true
redirect_stderr = true
stdout_logfile  = /var/log/worker.log

Con Redis y BRPOP, varios workers escuchan la misma cola y cada item es entregado a un solo worker (competing consumers). Con la tabla BD y FOR UPDATE SKIP LOCKED funciona igual: cada worker toma un job distinto sin colisiones.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP