Saltar a contenido

Worker reliability patterns

Patterns operacionales para que un worker en producción no caiga ni cause incidents en cascada. Cubre retry / backoff / circuit breaker / DLQ / poison messages / graceful degradation.

El problema: workers como single point of failure

Un job worker mal escrito puede: - Retry-bomb un downstream que ya está caído. - Consumir todos los jobs de la cola y luego morir. - Encolar instancias en estado inválido si crashea mid-completion. - Loop infinito sobre un poison message (variable malformada). - DDoS al engine pidiendo jobs sin esperar.

Cada uno tiene un pattern conocido. El SDK provee primitivas; el worker autor las combina.

Pattern 1: Retry con exponential backoff + jitter

Decisión clave: ¿retry en el worker o en el engine?

Tipo de falla Retry en
Network blip al backend externo (HTTP 503) Worker (rápido, 100-500ms)
Backend down después de N retries Engine (job fail, retry policy del BPMN)
Lógica de negocio falla (BPMN error) Ninguno: boundary error event
DB del backend con deadlock Worker (rápido, sin consumir retry del job)

Implementación

func chargePayment(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
    var result *stripe.Charge
    err := retry.Do(
        func() error {
            var err error
            result, err = stripe.Charge(ctx, ...)
            return err
        },
        retry.Attempts(3),
        retry.Delay(100*time.Millisecond),
        retry.MaxDelay(2*time.Second),
        retry.DelayType(retry.BackOffDelay),  // exponential
        retry.RetryIf(func(err error) bool {
            return isTransient(err)  // 5xx, network, timeout
        }),
        retry.Context(ctx),
    )

    if err != nil {
        if isBPMNError(err) {
            return nil, wfclient.BPMNError(bpmnCode(err), err.Error())
        }
        // Transitorio agotado → engine decide qué hacer (retry policy)
        return nil, fmt.Errorf("charge failed after 3 worker retries: %w", err)
    }
    return map[string]any{"chargeId": result.ID}, nil
}

Jitter

Sin jitter: si 1000 workers fallan al mismo segundo, todos reintenten al segundo siguiente = thundering herd.

retry.Delay(100*time.Millisecond)
retry.MaxJitter(50*time.Millisecond)  // ±50ms aleatorio

Implementación standard: delay * (1 + random(-jitter_factor, +jitter_factor)).

Reglas

  • Max 3-5 retries worker-side. Más es problema del engine (BPMN retry policy).
  • Max delay 5-10s worker-side. Más bloquea el job slot.
  • Idempotency key obligatoria: el job se reintenta a nivel engine también; sin idempotency, doble charge.
  • Respect context cancellation: si el contexto se cancela (timeout / shutdown), salir del retry loop.

Pattern 2: Circuit breaker

Si el backend externo está completamente down, NO sigas pidiéndole. Falla rápido.

Estados

stateDiagram-v2
    [*] --> CLOSED
    CLOSED: CLOSED (normal)
    OPEN: OPEN (rechaza todo)
    HALF_OPEN: HALF-OPEN (deja pasar 1 request)
    CLOSED --> OPEN: N failures
    OPEN --> HALF_OPEN: después de timeout
    HALF_OPEN --> CLOSED: success
    HALF_OPEN --> OPEN: failure

Implementación (sony/gobreaker)

import "github.com/sony/gobreaker"

var stripeBreaker = gobreaker.NewCircuitBreaker(gobreaker.Settings{
    Name:        "stripe-api",
    MaxRequests: 1,            // half-open
    Interval:    60 * time.Second,
    Timeout:     30 * time.Second,
    ReadyToTrip: func(counts gobreaker.Counts) bool {
        return counts.ConsecutiveFailures > 5
    },
    OnStateChange: func(name string, from, to gobreaker.State) {
        slog.Warn("circuit breaker state change",
            "breaker", name, "from", from, "to", to)
        // Métrica: circuit_breaker_state{name="stripe-api"}
    },
})

func chargePayment(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
    result, err := stripeBreaker.Execute(func() (interface{}, error) {
        return stripe.Charge(ctx, ...)
    })

    if err == gobreaker.ErrOpenState {
        // Circuit breaker abierto → fail rápido al engine sin consumir retry del job
        return nil, wfclient.FailWithRetry("circuit open, will retry", time.Now().Add(30*time.Second))
    }
    if err != nil {
        return nil, err
    }
    return map[string]any{"chargeId": result.(*stripe.Charge).ID}, nil
}

Trade-off: FailWithRetry vs fail normal

  • fail normal: consume retry del job. Después de N retries → incident.
  • FailWithRetry(retryAt=...): NO consume retry. Engine re-encola el job en el tiempo dado.

FailWithRetry es ideal cuando el problema no es del proceso sino del entorno (downstream down). El job no es "malo"; sólo necesita esperar.

Pattern 3: Bulkhead / Resource isolation

Problema: un worker handler que llama a Stripe Y a la DB. Si Stripe está lento, las goroutines del worker quedan ocupadas. La DB pasa a estar bien pero el worker no puede tomar jobs nuevos.

Solución: limitar concurrency por dependencia.

var stripeLimiter = semaphore.NewWeighted(10)  // max 10 in-flight a Stripe

func chargePayment(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
    if err := stripeLimiter.Acquire(ctx, 1); err != nil {
        return nil, wfclient.FailWithRetry("stripe-limiter full", time.Now().Add(5*time.Second))
    }
    defer stripeLimiter.Release(1)

    return stripe.Charge(ctx, ...)
}

Bulkheads más sofisticados: thread pool aislado por downstream (más relevante en Java).

Pattern 4: Timeouts en cascada

Cada call externo necesita timeout. Si no hay → goroutine leak, eventual OOM.

func chargePayment(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
    // Worker timeout total: 30s (configurado en WorkerOptions)
    // Reservar 5s buffer para complete del engine
    ctxWithTimeout, cancel := context.WithTimeout(ctx, 25*time.Second)
    defer cancel()

    // Cada dependencia con su sub-timeout
    stripeCtx, _ := context.WithTimeout(ctxWithTimeout, 10*time.Second)
    result, err := stripe.Charge(stripeCtx, ...)
    if err != nil {
        return nil, err
    }

    dbCtx, _ := context.WithTimeout(ctxWithTimeout, 5*time.Second)
    if err := db.SaveTransaction(dbCtx, result); err != nil {
        // Stripe ya cobró → compensation o reconciliation job
        return nil, fmt.Errorf("db save failed after stripe charge: %w", err)
    }
    return ..., nil
}

Regla: cada call externo debe tener timeout. Sin timeout es bug.

Pattern 5: Idempotency

Crítico: el engine reintenta jobs (al-least-once). Si tu handler no es idempotent, vas a tener doble effect.

Estrategias por tipo de dependencia

Backend Idempotency strategy
Stripe / Payment provider Idempotency-Key header (provider-side)
Postgres (insert) ON CONFLICT DO NOTHING con unique key
Email (SMTP) Message-ID único con tracking en DB
Webhook (POST) El receiver debe ser idempotent (you can't control)
External API custom UUID en request body, deduplicate server-side

Idempotency key estable

El job.Key es ideal: único por job, persiste a través de retries.

idempotencyKey := fmt.Sprintf("wf-job-%d", job.Key)
stripe.Charge(ctx, customer, amount, stripe.WithIdempotencyKey(idempotencyKey))

Cuidado: si el job se reintenta después de un partial success (stripe cobró, DB no se grabó), el segundo intento NO cobra de nuevo (mismo idempotency key) pero SÍ graba en DB.

Patrón "log first, act later"

CREATE TABLE worker_idempotency (
    job_key BIGINT PRIMARY KEY,
    completed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    result JSONB
);
func handler(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
    // Check cache
    var result map[string]any
    err := db.QueryRow(ctx,
        "SELECT result FROM worker_idempotency WHERE job_key=$1",
        job.Key).Scan(&result)
    if err == nil {
        return result, nil  // ya ejecutado
    }
    if !errors.Is(err, sql.ErrNoRows) {
        return nil, err
    }

    // Execute
    result, err = doActualWork(ctx, job)
    if err != nil {
        return nil, err
    }

    // Cache result
    _, err = db.Exec(ctx,
        "INSERT INTO worker_idempotency (job_key, result) VALUES ($1, $2)",
        job.Key, result)
    if err != nil {
        slog.Warn("idempotency cache insert failed", "err", err)
        // Continúa: result válido aunque cache falle
    }
    return result, nil
}

TTL via partition pruning (los job_keys vienen ordenados por tiempo).

Pattern 6: Poison message handling

Poison message: un job que SIEMPRE falla, no por bug del backend sino por data malformada (variable corrupta, JSON inválido, etc.).

Detección

func handler(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
    amount, ok := job.Variables["amount"].(float64)
    if !ok {
        // Datos malformados → NO retry; incident inmediato
        return nil, wfclient.IncidentError(fmt.Sprintf("amount variable invalid: %v", job.Variables["amount"]))
    }
    if amount < 0 {
        return nil, wfclient.IncidentError("amount negative; data corruption")
    }
    // ... resto del handler
}

IncidentError NO consume retry. Crea incident inmediato para review humana.

DLQ (Dead Letter Queue) opcional

Si querés un sink centralizado de poison messages para análisis batch:

CREATE TABLE poison_jobs (
    job_key BIGINT PRIMARY KEY,
    job_type TEXT NOT NULL,
    process_instance_key BIGINT,
    error_message TEXT,
    variables JSONB,
    received_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
if errors.Is(err, ErrPoisonMessage) {
    db.Exec(ctx,
        "INSERT INTO poison_jobs (...) VALUES (...)",
        job.Key, job.Type, ...)
    return nil, wfclient.IncidentError("poison message; logged for analysis")
}

Workers especializados pueden hacer mining sobre poison_jobs (e.g., generar issues en GitHub).

Pattern 7: Graceful degradation

Si una funcionalidad opcional falla, no fallar la transacción principal.

func handler(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
    // Crítico
    chargeResult, err := stripe.Charge(ctx, ...)
    if err != nil {
        return nil, err
    }

    // No crítico: registrar en analytics
    if err := analytics.Track(ctx, chargeResult.ID); err != nil {
        slog.Warn("analytics tracking failed", "err", err)
        // No fail; continúa
    }

    // No crítico: enviar notificación
    if err := notifications.Push(ctx, ...); err != nil {
        slog.Warn("notification failed", "err", err)
    }

    return map[string]any{"chargeId": chargeResult.ID}, nil
}

Riesgo: pérdida silenciosa de funcionalidad. Mitigación: métrica + alerta sobre fallas de funcionalidad "opcional".

wf_worker_degraded_total{worker, feature}

Pattern 8: Job streaming + back-pressure

El SDK pide jobs en bulk (default 32). Si el handler es lento, encola jobs locales que pueden expirar.

opts := wfclient.WorkerOptions{
    Type:              "slow-handler",
    Concurrency:       4,
    MaxJobsActivated:  4,  // = concurrency, no encolar más
    Timeout:           60 * time.Second,
}

Regla: MaxJobsActivated <= Concurrency * 2 para no acumular jobs en RAM que el engine cree estar en progreso.

Anti-patterns comunes

❌ Retry sin idempotency

// MAL
stripe.Charge(ctx, ...)  // sin idempotency key, retry causa doble charge

❌ Retry sobre todo error

// MAL
for i := 0; i < 100; i++ {
    if err := doWork(); err == nil { return nil }
    time.Sleep(1 * time.Second)
}
// 100 reintentos sobre un bug del programa = pérdida de tiempo + alertas

❌ Ignorar context cancellation

// MAL
func handler(ctx context.Context, job *Job) (map[string]any, error) {
    time.Sleep(60 * time.Second)  // NO respeta ctx
    return ..., nil
}
// BIEN
select {
case <-time.After(60 * time.Second):
case <-ctx.Done():
    return nil, ctx.Err()
}

❌ Worker que no completa o falla — job timeout

Si un worker activa un job y nunca completa/falla (crash, network), el engine lo libera al deadline (definido por timeout del worker). Otro worker lo tomará.

Anti-pattern: configurar timeout muy alto (1 hora). Si el worker muere, el job queda atascado 1 hora.

Mejor: timeout proporcional al tiempo realista (p99 del handler × 2).

❌ Activar variables no necesarias

// MAL: trae 50KB de variables aunque el handler solo use 2 campos
WorkerOptions{Type: "...", FetchVariables: nil}

// BIEN
WorkerOptions{Type: "...", FetchVariables: []string{"orderId", "amount"}}

Métricas operacionales clave

wf_worker_jobs_handled_total{type, outcome="success|fail|bpmn_error|incident|poison"}
wf_worker_job_duration_seconds{type}  # histogram
wf_worker_retries_total{type, dependency}
wf_worker_circuit_breaker_state{breaker}  # 0=closed, 1=half-open, 2=open
wf_worker_backpressure_events_total{type, reason}
wf_worker_panics_total{type}
wf_worker_idempotency_cache_hits_total{type}
wf_worker_degraded_total{worker, feature}

Alertas recomendadas:

  • rate(wf_worker_panics_total[5m]) > 0 → bug, page on-call.
  • rate(wf_worker_jobs_handled_total{outcome="poison"}[1h]) > 10 → data quality issue.
  • wf_worker_circuit_breaker_state == 2 for 5m → downstream prolongado.
  • p95 wf_worker_job_duration_seconds > target → degradación.

Checklist pre-producción para un worker

  • Idempotent (idempotency key strategy clara).
  • Context cancellation respeta en todos los callables.
  • Timeout configurado por dependencia + total.
  • Retry policy con jitter, max attempts ≤ 5, max delay ≤ 10s.
  • Circuit breaker para dependencies externas críticas.
  • BPMN errors mapeados explícitamente.
  • Métricas + structured logs + traces (otel).
  • Healthcheck endpoint (/healthz, /readyz).
  • Graceful shutdown probado (SIGTERM → drain).
  • Variables fetched limitadas a las necesarias.
  • Concurrency dimensionada vs downstream capacity.
  • Test de carga con throughput proyectado.
  • Runbook documentado (qué hacer si circuit breaker abierto).

Referencias