Colas internas en Quartup con los Patrones Outbox + Polling
Diseño propuesto por Deepseek 6/8/2025:
Arquitectura Final: Outbox + Polling + Exponential Backoff
1. Estructura de la Base de Datos
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
message_type VARCHAR(50) NOT NULL, -- Ej: "enviar_email", "procesar_pedido"
payload JSONB NOT NULL, -- Datos del mensaje (ej: {"to": "user@example.com"})
intentos INT DEFAULT 0,
estado VARCHAR(20) DEFAULT 'pending', -- 'pending', 'processed', 'failed'
created_at TIMESTAMPTZ DEFAULT NOW(),
next_retry_at TIMESTAMPTZ, -- Para reintentos espaciados
respuesta JSONB NULL -- Opcional: guardar respuesta del consumidor
);
-- Índices para mejorar el polling:
CREATE INDEX idx_outbox_pending ON outbox (estado, next_retry_at) WHERE estado = 'pending';
2. Flujo de la Aplicación (Publicación Síncrona)
function publicarMensajeConOutbox(PDO $db, string $tipo, array $payload, int $timeout = 10): array {
$db->beginTransaction();
try {
// 1. Guardar el mensaje en la outbox (dentro de la transacción principal)
$outboxId = uniqid();
$stmt = $db->prepare("
INSERT INTO outbox (id, message_type, payload, next_retry_at)
VALUES (?, ?, ?, NOW())
");
$stmt->execute([$outboxId, $tipo, json_encode($payload)]);
// 2. Commit (el mensaje ya es consistente en la DB)
$db->commit();
// 3. Opcional: Esperar respuesta (polling)
if ($timeout > 0) {
return esperarRespuesta($db, $outboxId, $timeout);
}
return ['status' => 'enqueued', 'outbox_id' => $outboxId];
} catch (Exception $e) {
$db->rollBack();
throw new RuntimeException("Error al publicar mensaje: " . $e->getMessage());
}
}
function esperarRespuesta(PDO $db, string $outboxId, int $timeout): array {
$startTime = time();
$pollInterval = 0.5; // 500ms entre checks
while (time() - $startTime < $timeout) {
$stmt = $db->prepare("SELECT estado, respuesta FROM outbox WHERE id = ?");
$stmt->execute([$outboxId]);
$row = $stmt->fetch(PDO::FETCH_ASSOC);
if ($row['estado'] === 'processed') {
return json_decode($row['respuesta'], true);
} elseif ($row['estado'] === 'failed') {
throw new RuntimeException("El mensaje falló al procesarse");
}
usleep($pollInterval * 1_000_000); // Esperar antes de reintentar
}
throw new RuntimeException("Timeout esperando respuesta");
}
3. Worker de Procesamiento (Mejorado)
function workerOutbox(PDO $db, QueueExterno $queue) {
while (true) {
// 1. Obtener mensajes pendientes (con SKIP LOCKED para multi-worker)
$stmt = $db->query("
SELECT * FROM outbox
WHERE estado = 'pending' AND next_retry_at <= NOW()
ORDER BY created_at
LIMIT 10
FOR UPDATE SKIP LOCKED
");
$mensajes = $stmt->fetchAll(PDO::FETCH_ASSOC);
if (empty($mensajes)) {
sleep(1); // Evitar CPU al 100%
continue;
}
// 2. Procesar cada mensaje
foreach ($mensajes as $msg) {
try {
// Enviar al gestor de colas externo (ej: RabbitMQ)
$respuesta = $queue->publish($msg['message_type'], $msg['payload']);
// Actualizar outbox (éxito)
$db->prepare("
UPDATE outbox
SET estado = 'processed',
respuesta = ?,
intentos = intentos + 1
WHERE id = ?
")->execute([json_encode($respuesta), $msg['id']]);
} catch (Exception $e) {
// Calcular próximo reintento (exponential backoff)
$intentos = $msg['intentos'] + 1;
$nextRetry = date('Y-m-d H:i:s', time() + min(5 * pow(2, $intentos), 300)); // Máx 5 min
$db->prepare("
UPDATE outbox
SET intentos = ?,
next_retry_at = ?,
estado = ?
WHERE id = ?
")->execute([
$intentos,
$nextRetry,
($intentos >= 3) ? 'failed' : 'pending',
$msg['id']
]);
}
}
}
}
Mejoras Clave Implementadas
- UUIDs únicos para evitar colisiones.
- Índices optimizados para queries de polling.
- SKIP LOCKED para soportar múltiples workers.
- Exponential Backoff con límite máximo (ej: 5 min).
- Respuestas almacenadas en la outbox (para síncrono).
- Timeouts configurables en operaciones síncronas.
¿Cómo Ejecutarlo?
-
Publicador (API/CLI):
$db = new PDO('pgsql:host=localhost;dbname=tu_db', 'user', 'pass'); $response = publicarMensajeConOutbox( $db, 'enviar_email', ['to' => 'user@example.com'], 10 // Timeout para esperar respuesta ); -
Worker (Supervisor/Systemd):
# Ejecutar como demonio (con reinicio automático) supervisorctl start outbox-workerConfiguración de Supervisor:
[program:outbox-worker] command=php /ruta/al/worker.php autorestart=true
Escalabilidad Avanzada
- Multi-Worker: Lanza varias instancias del worker (cada una procesará mensajes distintos gracias a
SKIP LOCKED). - Priorización: Usa
message_typepara priorizar ciertos mensajes. - Dead Letter Queue: Mueve mensajes fallidos a otra tabla para análisis.
Conclusión
Este diseño te da:
✅ Consistencia transaccional (Outbox).
✅ Tolerancia a fallos (reintentos inteligentes).
✅ Soporte síncrono (cuando necesario).
✅ Escalabilidad (multi-worker).