Colas internas en Quartup con los Patrones Outbox + Polling

Diseño propuesto por Deepseek 6/8/2025:


Arquitectura Final: Outbox + Polling + Exponential Backoff

1. Estructura de la Base de Datos

CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    message_type VARCHAR(50) NOT NULL,  -- Ej: "enviar_email", "procesar_pedido"
    payload JSONB NOT NULL,             -- Datos del mensaje (ej: {"to": "user@example.com"})
    intentos INT DEFAULT 0,
    estado VARCHAR(20) DEFAULT 'pending', -- 'pending', 'processed', 'failed'
    created_at TIMESTAMPTZ DEFAULT NOW(),
    next_retry_at TIMESTAMPTZ,          -- Para reintentos espaciados
    respuesta JSONB NULL                -- Opcional: guardar respuesta del consumidor
);

-- Índices para mejorar el polling:
CREATE INDEX idx_outbox_pending ON outbox (estado, next_retry_at) WHERE estado = 'pending';

2. Flujo de la Aplicación (Publicación Síncrona)

function publicarMensajeConOutbox(PDO $db, string $tipo, array $payload, int $timeout = 10): array {
    $db->beginTransaction();

    try {
        // 1. Guardar el mensaje en la outbox (dentro de la transacción principal)
        $outboxId = uniqid();
        $stmt = $db->prepare("
            INSERT INTO outbox (id, message_type, payload, next_retry_at)
            VALUES (?, ?, ?, NOW())
        ");
        $stmt->execute([$outboxId, $tipo, json_encode($payload)]);

        // 2. Commit (el mensaje ya es consistente en la DB)
        $db->commit();

        // 3. Opcional: Esperar respuesta (polling)
        if ($timeout > 0) {
            return esperarRespuesta($db, $outboxId, $timeout);
        }

        return ['status' => 'enqueued', 'outbox_id' => $outboxId];
    } catch (Exception $e) {
        $db->rollBack();
        throw new RuntimeException("Error al publicar mensaje: " . $e->getMessage());
    }
}

function esperarRespuesta(PDO $db, string $outboxId, int $timeout): array {
    $startTime = time();
    $pollInterval = 0.5; // 500ms entre checks

    while (time() - $startTime < $timeout) {
        $stmt = $db->prepare("SELECT estado, respuesta FROM outbox WHERE id = ?");
        $stmt->execute([$outboxId]);
        $row = $stmt->fetch(PDO::FETCH_ASSOC);

        if ($row['estado'] === 'processed') {
            return json_decode($row['respuesta'], true);
        } elseif ($row['estado'] === 'failed') {
            throw new RuntimeException("El mensaje falló al procesarse");
        }

        usleep($pollInterval * 1_000_000); // Esperar antes de reintentar
    }

    throw new RuntimeException("Timeout esperando respuesta");
}

3. Worker de Procesamiento (Mejorado)

function workerOutbox(PDO $db, QueueExterno $queue) {
    while (true) {
        // 1. Obtener mensajes pendientes (con SKIP LOCKED para multi-worker)
        $stmt = $db->query("
            SELECT * FROM outbox
            WHERE estado = 'pending' AND next_retry_at <= NOW()
            ORDER BY created_at
            LIMIT 10
            FOR UPDATE SKIP LOCKED
        ");
        $mensajes = $stmt->fetchAll(PDO::FETCH_ASSOC);

        if (empty($mensajes)) {
            sleep(1); // Evitar CPU al 100%
            continue;
        }

        // 2. Procesar cada mensaje
        foreach ($mensajes as $msg) {
            try {
                // Enviar al gestor de colas externo (ej: RabbitMQ)
                $respuesta = $queue->publish($msg['message_type'], $msg['payload']);

                // Actualizar outbox (éxito)
                $db->prepare("
                    UPDATE outbox
                    SET estado = 'processed',
                        respuesta = ?,
                        intentos = intentos + 1
                    WHERE id = ?
                ")->execute([json_encode($respuesta), $msg['id']]);

            } catch (Exception $e) {
                // Calcular próximo reintento (exponential backoff)
                $intentos = $msg['intentos'] + 1;
                $nextRetry = date('Y-m-d H:i:s', time() + min(5 * pow(2, $intentos), 300)); // Máx 5 min

                $db->prepare("
                    UPDATE outbox
                    SET intentos = ?,
                        next_retry_at = ?,
                        estado = ?
                    WHERE id = ?
                ")->execute([
                    $intentos,
                    $nextRetry,
                    ($intentos >= 3) ? 'failed' : 'pending',
                    $msg['id']
                ]);
            }
        }
    }
}

Mejoras Clave Implementadas

  1. UUIDs únicos para evitar colisiones.
  2. Índices optimizados para queries de polling.
  3. SKIP LOCKED para soportar múltiples workers.
  4. Exponential Backoff con límite máximo (ej: 5 min).
  5. Respuestas almacenadas en la outbox (para síncrono).
  6. Timeouts configurables en operaciones síncronas.

¿Cómo Ejecutarlo?

  1. Publicador (API/CLI):

    $db = new PDO('pgsql:host=localhost;dbname=tu_db', 'user', 'pass');
    $response = publicarMensajeConOutbox(
        $db,
        'enviar_email',
        ['to' => 'user@example.com'],
        10 // Timeout para esperar respuesta
    );
  2. Worker (Supervisor/Systemd):

    # Ejecutar como demonio (con reinicio automático)
    supervisorctl start outbox-worker

    Configuración de Supervisor:

    [program:outbox-worker]
    command=php /ruta/al/worker.php
    autorestart=true

Escalabilidad Avanzada


Conclusión

Este diseño te da:

Consistencia transaccional (Outbox).

Tolerancia a fallos (reintentos inteligentes).

Soporte síncrono (cuando necesario).

Escalabilidad (multi-worker).