Worker reliability patterns¶
Patterns operacionales para que un worker en producción no caiga ni cause incidents en cascada. Cubre retry / backoff / circuit breaker / DLQ / poison messages / graceful degradation.
El problema: workers como single point of failure¶
Un job worker mal escrito puede: - Retry-bomb un downstream que ya está caído. - Consumir todos los jobs de la cola y luego morir. - Encolar instancias en estado inválido si crashea mid-completion. - Loop infinito sobre un poison message (variable malformada). - DDoS al engine pidiendo jobs sin esperar.
Cada uno tiene un pattern conocido. El SDK provee primitivas; el worker autor las combina.
Pattern 1: Retry con exponential backoff + jitter¶
Decisión clave: ¿retry en el worker o en el engine?
| Tipo de falla | Retry en |
|---|---|
| Network blip al backend externo (HTTP 503) | Worker (rápido, 100-500ms) |
| Backend down después de N retries | Engine (job fail, retry policy del BPMN) |
| Lógica de negocio falla (BPMN error) | Ninguno: boundary error event |
| DB del backend con deadlock | Worker (rápido, sin consumir retry del job) |
Implementación¶
func chargePayment(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
var result *stripe.Charge
err := retry.Do(
func() error {
var err error
result, err = stripe.Charge(ctx, ...)
return err
},
retry.Attempts(3),
retry.Delay(100*time.Millisecond),
retry.MaxDelay(2*time.Second),
retry.DelayType(retry.BackOffDelay), // exponential
retry.RetryIf(func(err error) bool {
return isTransient(err) // 5xx, network, timeout
}),
retry.Context(ctx),
)
if err != nil {
if isBPMNError(err) {
return nil, wfclient.BPMNError(bpmnCode(err), err.Error())
}
// Transitorio agotado → engine decide qué hacer (retry policy)
return nil, fmt.Errorf("charge failed after 3 worker retries: %w", err)
}
return map[string]any{"chargeId": result.ID}, nil
}
Jitter¶
Sin jitter: si 1000 workers fallan al mismo segundo, todos reintenten al segundo siguiente = thundering herd.
Implementación standard: delay * (1 + random(-jitter_factor, +jitter_factor)).
Reglas¶
- Max 3-5 retries worker-side. Más es problema del engine (BPMN retry policy).
- Max delay 5-10s worker-side. Más bloquea el job slot.
- Idempotency key obligatoria: el job se reintenta a nivel engine también; sin idempotency, doble charge.
- Respect context cancellation: si el contexto se cancela (timeout / shutdown), salir del retry loop.
Pattern 2: Circuit breaker¶
Si el backend externo está completamente down, NO sigas pidiéndole. Falla rápido.
Estados¶
stateDiagram-v2
[*] --> CLOSED
CLOSED: CLOSED (normal)
OPEN: OPEN (rechaza todo)
HALF_OPEN: HALF-OPEN (deja pasar 1 request)
CLOSED --> OPEN: N failures
OPEN --> HALF_OPEN: después de timeout
HALF_OPEN --> CLOSED: success
HALF_OPEN --> OPEN: failure
Implementación (sony/gobreaker)¶
import "github.com/sony/gobreaker"
var stripeBreaker = gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "stripe-api",
MaxRequests: 1, // half-open
Interval: 60 * time.Second,
Timeout: 30 * time.Second,
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.ConsecutiveFailures > 5
},
OnStateChange: func(name string, from, to gobreaker.State) {
slog.Warn("circuit breaker state change",
"breaker", name, "from", from, "to", to)
// Métrica: circuit_breaker_state{name="stripe-api"}
},
})
func chargePayment(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
result, err := stripeBreaker.Execute(func() (interface{}, error) {
return stripe.Charge(ctx, ...)
})
if err == gobreaker.ErrOpenState {
// Circuit breaker abierto → fail rápido al engine sin consumir retry del job
return nil, wfclient.FailWithRetry("circuit open, will retry", time.Now().Add(30*time.Second))
}
if err != nil {
return nil, err
}
return map[string]any{"chargeId": result.(*stripe.Charge).ID}, nil
}
Trade-off: FailWithRetry vs fail normal¶
fail normal: consume retry del job. Después de N retries → incident.FailWithRetry(retryAt=...): NO consume retry. Engine re-encola el job en el tiempo dado.
FailWithRetry es ideal cuando el problema no es del proceso sino del entorno (downstream down). El job no es "malo"; sólo necesita esperar.
Pattern 3: Bulkhead / Resource isolation¶
Problema: un worker handler que llama a Stripe Y a la DB. Si Stripe está lento, las goroutines del worker quedan ocupadas. La DB pasa a estar bien pero el worker no puede tomar jobs nuevos.
Solución: limitar concurrency por dependencia.
var stripeLimiter = semaphore.NewWeighted(10) // max 10 in-flight a Stripe
func chargePayment(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
if err := stripeLimiter.Acquire(ctx, 1); err != nil {
return nil, wfclient.FailWithRetry("stripe-limiter full", time.Now().Add(5*time.Second))
}
defer stripeLimiter.Release(1)
return stripe.Charge(ctx, ...)
}
Bulkheads más sofisticados: thread pool aislado por downstream (más relevante en Java).
Pattern 4: Timeouts en cascada¶
Cada call externo necesita timeout. Si no hay → goroutine leak, eventual OOM.
func chargePayment(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
// Worker timeout total: 30s (configurado en WorkerOptions)
// Reservar 5s buffer para complete del engine
ctxWithTimeout, cancel := context.WithTimeout(ctx, 25*time.Second)
defer cancel()
// Cada dependencia con su sub-timeout
stripeCtx, _ := context.WithTimeout(ctxWithTimeout, 10*time.Second)
result, err := stripe.Charge(stripeCtx, ...)
if err != nil {
return nil, err
}
dbCtx, _ := context.WithTimeout(ctxWithTimeout, 5*time.Second)
if err := db.SaveTransaction(dbCtx, result); err != nil {
// Stripe ya cobró → compensation o reconciliation job
return nil, fmt.Errorf("db save failed after stripe charge: %w", err)
}
return ..., nil
}
Regla: cada call externo debe tener timeout. Sin timeout es bug.
Pattern 5: Idempotency¶
Crítico: el engine reintenta jobs (al-least-once). Si tu handler no es idempotent, vas a tener doble effect.
Estrategias por tipo de dependencia¶
| Backend | Idempotency strategy |
|---|---|
| Stripe / Payment provider | Idempotency-Key header (provider-side) |
| Postgres (insert) | ON CONFLICT DO NOTHING con unique key |
| Email (SMTP) | Message-ID único con tracking en DB |
| Webhook (POST) | El receiver debe ser idempotent (you can't control) |
| External API custom | UUID en request body, deduplicate server-side |
Idempotency key estable¶
El job.Key es ideal: único por job, persiste a través de retries.
idempotencyKey := fmt.Sprintf("wf-job-%d", job.Key)
stripe.Charge(ctx, customer, amount, stripe.WithIdempotencyKey(idempotencyKey))
Cuidado: si el job se reintenta después de un partial success (stripe cobró, DB no se grabó), el segundo intento NO cobra de nuevo (mismo idempotency key) pero SÍ graba en DB.
Patrón "log first, act later"¶
CREATE TABLE worker_idempotency (
job_key BIGINT PRIMARY KEY,
completed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
result JSONB
);
func handler(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
// Check cache
var result map[string]any
err := db.QueryRow(ctx,
"SELECT result FROM worker_idempotency WHERE job_key=$1",
job.Key).Scan(&result)
if err == nil {
return result, nil // ya ejecutado
}
if !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
// Execute
result, err = doActualWork(ctx, job)
if err != nil {
return nil, err
}
// Cache result
_, err = db.Exec(ctx,
"INSERT INTO worker_idempotency (job_key, result) VALUES ($1, $2)",
job.Key, result)
if err != nil {
slog.Warn("idempotency cache insert failed", "err", err)
// Continúa: result válido aunque cache falle
}
return result, nil
}
TTL via partition pruning (los job_keys vienen ordenados por tiempo).
Pattern 6: Poison message handling¶
Poison message: un job que SIEMPRE falla, no por bug del backend sino por data malformada (variable corrupta, JSON inválido, etc.).
Detección¶
func handler(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
amount, ok := job.Variables["amount"].(float64)
if !ok {
// Datos malformados → NO retry; incident inmediato
return nil, wfclient.IncidentError(fmt.Sprintf("amount variable invalid: %v", job.Variables["amount"]))
}
if amount < 0 {
return nil, wfclient.IncidentError("amount negative; data corruption")
}
// ... resto del handler
}
IncidentError NO consume retry. Crea incident inmediato para review humana.
DLQ (Dead Letter Queue) opcional¶
Si querés un sink centralizado de poison messages para análisis batch:
CREATE TABLE poison_jobs (
job_key BIGINT PRIMARY KEY,
job_type TEXT NOT NULL,
process_instance_key BIGINT,
error_message TEXT,
variables JSONB,
received_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
if errors.Is(err, ErrPoisonMessage) {
db.Exec(ctx,
"INSERT INTO poison_jobs (...) VALUES (...)",
job.Key, job.Type, ...)
return nil, wfclient.IncidentError("poison message; logged for analysis")
}
Workers especializados pueden hacer mining sobre poison_jobs (e.g., generar issues en GitHub).
Pattern 7: Graceful degradation¶
Si una funcionalidad opcional falla, no fallar la transacción principal.
func handler(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
// Crítico
chargeResult, err := stripe.Charge(ctx, ...)
if err != nil {
return nil, err
}
// No crítico: registrar en analytics
if err := analytics.Track(ctx, chargeResult.ID); err != nil {
slog.Warn("analytics tracking failed", "err", err)
// No fail; continúa
}
// No crítico: enviar notificación
if err := notifications.Push(ctx, ...); err != nil {
slog.Warn("notification failed", "err", err)
}
return map[string]any{"chargeId": chargeResult.ID}, nil
}
Riesgo: pérdida silenciosa de funcionalidad. Mitigación: métrica + alerta sobre fallas de funcionalidad "opcional".
Pattern 8: Job streaming + back-pressure¶
El SDK pide jobs en bulk (default 32). Si el handler es lento, encola jobs locales que pueden expirar.
opts := wfclient.WorkerOptions{
Type: "slow-handler",
Concurrency: 4,
MaxJobsActivated: 4, // = concurrency, no encolar más
Timeout: 60 * time.Second,
}
Regla: MaxJobsActivated <= Concurrency * 2 para no acumular jobs en RAM que el engine cree estar en progreso.
Anti-patterns comunes¶
❌ Retry sin idempotency¶
❌ Retry sobre todo error¶
// MAL
for i := 0; i < 100; i++ {
if err := doWork(); err == nil { return nil }
time.Sleep(1 * time.Second)
}
// 100 reintentos sobre un bug del programa = pérdida de tiempo + alertas
❌ Ignorar context cancellation¶
// MAL
func handler(ctx context.Context, job *Job) (map[string]any, error) {
time.Sleep(60 * time.Second) // NO respeta ctx
return ..., nil
}
❌ Worker que no completa o falla — job timeout¶
Si un worker activa un job y nunca completa/falla (crash, network), el engine lo libera al deadline (definido por timeout del worker). Otro worker lo tomará.
Anti-pattern: configurar timeout muy alto (1 hora). Si el worker muere, el job queda atascado 1 hora.
Mejor: timeout proporcional al tiempo realista (p99 del handler × 2).
❌ Activar variables no necesarias¶
// MAL: trae 50KB de variables aunque el handler solo use 2 campos
WorkerOptions{Type: "...", FetchVariables: nil}
// BIEN
WorkerOptions{Type: "...", FetchVariables: []string{"orderId", "amount"}}
Métricas operacionales clave¶
wf_worker_jobs_handled_total{type, outcome="success|fail|bpmn_error|incident|poison"}
wf_worker_job_duration_seconds{type} # histogram
wf_worker_retries_total{type, dependency}
wf_worker_circuit_breaker_state{breaker} # 0=closed, 1=half-open, 2=open
wf_worker_backpressure_events_total{type, reason}
wf_worker_panics_total{type}
wf_worker_idempotency_cache_hits_total{type}
wf_worker_degraded_total{worker, feature}
Alertas recomendadas:
rate(wf_worker_panics_total[5m]) > 0→ bug, page on-call.rate(wf_worker_jobs_handled_total{outcome="poison"}[1h]) > 10→ data quality issue.wf_worker_circuit_breaker_state == 2 for 5m→ downstream prolongado.- p95
wf_worker_job_duration_seconds> target → degradación.
Checklist pre-producción para un worker¶
- Idempotent (idempotency key strategy clara).
- Context cancellation respeta en todos los callables.
- Timeout configurado por dependencia + total.
- Retry policy con jitter, max attempts ≤ 5, max delay ≤ 10s.
- Circuit breaker para dependencies externas críticas.
- BPMN errors mapeados explícitamente.
- Métricas + structured logs + traces (otel).
- Healthcheck endpoint (
/healthz,/readyz). - Graceful shutdown probado (SIGTERM → drain).
- Variables fetched limitadas a las necesarias.
- Concurrency dimensionada vs downstream capacity.
- Test de carga con throughput proyectado.
- Runbook documentado (qué hacer si circuit breaker abierto).
Referencias¶
- analysis/worker-sdk-go-design — API del SDK
- adrs/adr-007-at-least-once-idempotent-workers — contrato base
- concepts/retry-backoff — engine-side retry
- concepts/incident-management — incidents flow
- analysis/error-handling-patterns — patterns end-to-end
- Hystrix patterns (Netflix) — circuit breaker, bulkhead, timeout
- Release It! (Michael Nygard) — stability patterns