Saltar a contenido

Webhook delivery — reliability, retries, security

Cómo entregar eventos del engine a sistemas externos vía webhook de forma confiable: outbox pattern, retry policy, signature, IP allowlist, replay protection. Cubre M2-M3.

Premisa

Workflow engine emite eventos importantes: - process.instance.completed - incident.created - job.failed - user.task.assigned

Customers quieren reaccionar a estos via HTTP callbacks. Webhook delivery debe ser: - At-least-once: no perder eventos. - Ordered per resource: events for instance X in correct order. - Secure: HMAC signature, replay protection. - Resilient: customer endpoints fallan, no DoS-eamos.

Architecture

flowchart TD
    ES[Engine event sources]
    OB[(Outbox<br/>Postgres table)]
    WD[Webhook dispatcher<br/>Go worker]
    RP[Retry policy]
    Sig[Signature]
    RL[Per-endpoint rate limit]
    DLQ[(DLQ for failed deliveries)]
    CE[Customer endpoint]
    VS[Verify signature]
    IP[Idempotent processing]
    Ret[Return 2xx within 30s]

    ES --> OB --> WD
    WD --> RP
    WD --> Sig
    WD --> RL
    WD --> DLQ
    WD --> CE
    CE --> VS
    CE --> IP
    CE --> Ret

Outbox pattern

Eventos van a outbox table en misma transaction que el business write.

CREATE TABLE webhook_outbox (
    id BIGSERIAL PRIMARY KEY,
    tenant_id TEXT NOT NULL,
    event_type TEXT NOT NULL,
    resource_type TEXT NOT NULL,
    resource_id TEXT NOT NULL,
    payload JSONB NOT NULL,
    event_timestamp TIMESTAMPTZ NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    -- Delivery tracking
    delivered_at TIMESTAMPTZ,
    delivery_attempts INT NOT NULL DEFAULT 0,
    next_retry_at TIMESTAMPTZ,
    last_error TEXT,
    state TEXT NOT NULL DEFAULT 'pending'  -- pending, delivered, failed, dlq
) PARTITION BY RANGE (created_at);

CREATE INDEX ON webhook_outbox (state, next_retry_at)
    WHERE state IN ('pending', 'retrying');
CREATE INDEX ON webhook_outbox (tenant_id, resource_id, event_timestamp);

Transactional write:

BEGIN;
-- Business write
INSERT INTO process_instances (...) VALUES (...);

-- Outbox write
INSERT INTO webhook_outbox (
    tenant_id, event_type, resource_type, resource_id,
    payload, event_timestamp
) VALUES (
    'acme', 'process.instance.completed', 'instance', '22518...',
    '{"key": "22518...", "completedAt": "..."}',
    now()
);
COMMIT;

Outbox dispatcher (Go worker) reads + delivers + marks state.

Subscription model

Customer registra subscription:

POST /api/v1/webhooks
{
  "url": "https://customer.example.com/wf-events",
  "events": ["process.instance.completed", "incident.created"],
  "filters": {
    "processDefinitionId": "order-flow"
  },
  "secret": "auto-generated",
  "active": true
}

HTTP 201
{
  "id": "wh_abc123",
  "url": "...",
  "secret": "whsec_xxxxx",  -- mostrar SOLO una vez
  "createdAt": "..."
}

Subscription schema:

CREATE TABLE webhook_subscriptions (
    id TEXT PRIMARY KEY,
    tenant_id TEXT NOT NULL,
    url TEXT NOT NULL,
    events TEXT[] NOT NULL,  -- ["process.instance.completed", ...]
    filters JSONB,
    secret_hash TEXT NOT NULL,  -- HMAC-SHA256 of secret
    active BOOLEAN NOT NULL DEFAULT true,
    consecutive_failures INT NOT NULL DEFAULT 0,
    disabled_at TIMESTAMPTZ,
    disabled_reason TEXT,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX ON webhook_subscriptions (tenant_id) WHERE active = true;

Outbox dispatcher fan-out: 1 event → N subscriptions matched.

Retry policy

Exponential backoff con jitter, max retries.

Attempt 1: immediate
Attempt 2: 30 seconds
Attempt 3: 2 minutes
Attempt 4: 10 minutes
Attempt 5: 1 hour
Attempt 6: 6 hours
Attempt 7: 24 hours
(max 7 attempts, ~31 horas total)

→ Si todas fallan: move to DLQ

Implementación:

func nextRetryAt(attempts int) time.Time {
    delays := []time.Duration{
        0,
        30 * time.Second,
        2 * time.Minute,
        10 * time.Minute,
        1 * time.Hour,
        6 * time.Hour,
        24 * time.Hour,
    }
    if attempts >= len(delays) {
        return time.Time{}  // DLQ
    }
    delay := delays[attempts]
    jitter := time.Duration(rand.Int63n(int64(delay / 5)))
    return time.Now().Add(delay + jitter - delay/10)
}

Dispatcher worker

func (d *Dispatcher) Run(ctx context.Context) {
    for {
        select {
        case <-ctx.Done(): return
        default:
        }

        events, err := d.fetchPending(ctx, 100)
        if err != nil || len(events) == 0 {
            time.Sleep(1 * time.Second)
            continue
        }

        for _, ev := range events {
            d.workerPool.Submit(func() {
                d.deliver(ctx, ev)
            })
        }
    }
}

func (d *Dispatcher) fetchPending(ctx context.Context, limit int) ([]Event, error) {
    rows, err := db.QueryContext(ctx, `
        UPDATE webhook_outbox
        SET state = 'in_flight', delivery_attempts = delivery_attempts + 1
        WHERE id IN (
            SELECT id FROM webhook_outbox
            WHERE state IN ('pending', 'retrying')
                AND (next_retry_at IS NULL OR next_retry_at <= now())
            ORDER BY id
            LIMIT $1
            FOR UPDATE SKIP LOCKED
        )
        RETURNING *
    `, limit)
    // ...
}

func (d *Dispatcher) deliver(ctx context.Context, ev Event) {
    subs := d.subscriptionsFor(ev)
    for _, sub := range subs {
        result := d.sendOne(ctx, ev, sub)
        d.recordResult(ev, sub, result)
    }
}

FOR UPDATE SKIP LOCKED permite múltiples dispatcher instances en paralelo.

Per-endpoint rate limit

No DDoS al customer:

// Token bucket per subscription
limiter := rate.NewLimiter(rate.Limit(10), 20)  // 10 RPS, burst 20

if err := limiter.Wait(ctx); err != nil {
    return
}

Si customer's endpoint procesa solo 1 req/sec, accelerar mata su sistema.

Default: 10 RPS per subscription. Configurable.

Per-resource ordering (opcional)

Default: events pueden llegar out of order (concurrent dispatchers). Para ordering, requirir flag:

ALTER TABLE webhook_subscriptions ADD COLUMN ordered_per_resource BOOLEAN DEFAULT false;

Si ordered_per_resource = true: - Events for same (subscription_id, resource_id) se procesan sequentially. - Implementation: lock per resource o queue per resource.

Trade-off: ordering reduces throughput. Default off.

Security

HMAC signature

Cada request firmado con HMAC-SHA256:

POST /wf-events HTTP/1.1
Host: customer.example.com
Content-Type: application/json
Wf-Signature: t=1716748800,v1=abc123...
Wf-Event-Id: evt_xyz789
Wf-Event-Type: process.instance.completed
Wf-Webhook-Id: wh_abc123

{
  "id": "evt_xyz789",
  "type": "process.instance.completed",
  "timestamp": "2026-05-14T10:00:00Z",
  "data": { ... }
}

Signature generation:

timestamp := time.Now().Unix()
signed := fmt.Sprintf("%d.%s", timestamp, payload)
mac := hmac.New(sha256.New, []byte(subscriptionSecret))
mac.Write([]byte(signed))
signature := hex.EncodeToString(mac.Sum(nil))
header := fmt.Sprintf("t=%d,v1=%s", timestamp, signature)

Verification (customer-side):

// Parse header: t=..., v1=...
parts := parseSignatureHeader(header)
timestamp := parts["t"]
expected := parts["v1"]

// Reconstruct
signed := fmt.Sprintf("%s.%s", timestamp, body)
mac := hmac.New(sha256.New, []byte(secret))
mac.Write([]byte(signed))
actual := hex.EncodeToString(mac.Sum(nil))

if !hmac.Equal([]byte(expected), []byte(actual)) {
    return errors.New("invalid signature")
}

// Replay protection
if time.Since(time.Unix(timestamp, 0)) > 5*time.Minute {
    return errors.New("signature expired")
}

IP allowlist

Customer can configure: "only accept webhooks from these IPs".

Our published egress IP ranges:

trust.example.com/security/webhook-source-ips
{
  "ranges": ["52.10.0.0/24", "52.20.0.0/24"],
  "lastUpdated": "..."
}

Customer firewall whitelist. Reduce attack surface.

TLS only

Customer endpoint debe ser HTTPS. Reject HTTP at subscription creation.

if !strings.HasPrefix(url, "https://") {
    return errors.New("only HTTPS endpoints supported")
}

Localhost / private IP

Reject:

host := parsedURL.Host
ip := net.ParseIP(host)
if ip != nil && (ip.IsPrivate() || ip.IsLoopback()) {
    return errors.New("private/loopback addresses not allowed")
}
// DNS resolution check too
ips, _ := net.LookupIP(host)
for _, addr := range ips {
    if addr.IsPrivate() {
        return errors.New("resolved to private IP")
    }
}

Evita SSRF attacks (customer creates webhook pointing al internal infra).

Customer error handling

Response codes

Status Acción
2xx (200, 201, 204) Success; mark delivered
410 Gone Subscription dead; auto-disable
429 Too Many Requests Retry con Retry-After header
4xx (other) Retry hasta max attempts
5xx Retry con exponential backoff
Timeout (30s+) Treated as 5xx
TLS error Retry; if persistent, disable

Auto-disable on persistent failures

if subscription.consecutive_failures > 100 {
    db.Exec(`UPDATE webhook_subscriptions SET active = false, disabled_at = now(), disabled_reason = 'too_many_failures'`)
    sendEmailToOwner(...)
}

Customer recibe email; debe re-enable manualmente.

DLQ — Dead Letter Queue

Events que excedieron retry attempts:

-- DLQ table (same schema as outbox)
CREATE TABLE webhook_dlq (
    id BIGINT,
    ...
);

-- On final failure
INSERT INTO webhook_dlq SELECT * FROM webhook_outbox WHERE id = $1;
UPDATE webhook_outbox SET state = 'dlq' WHERE id = $1;

UI / CLI para replay desde DLQ:

wf webhook dlq list
wf webhook dlq replay <event-id>
wf webhook dlq replay-all --since 24h --subscription wh_abc

Customer-side idempotency

POST /wf-events HTTP/1.1
Wf-Event-Id: evt_xyz789

{...}

Wf-Event-Id es único per event. Customer guarda IDs procesados; idempotent.

Atomic processing recommendado:

// Customer side
func handleWebhook(req *http.Request) {
    eventID := req.Header.Get("Wf-Event-Id")

    // Try insert (deduplicates)
    _, err := db.Exec(`INSERT INTO processed_events (id) VALUES ($1) ON CONFLICT DO NOTHING`, eventID)
    if err != nil {
        return 500
    }

    // Process
    ...

    return 200
}

Event delivery semantics

At-least-once

Engine guarantees: si event in outbox, será delivered (eventually).

Customer guarantees: si recibe event, debe processar idempotent.

Ordering

Default: no orden across resources.

Per-resource ordering opcional (slower).

Latency

Target: - P50 < 5s (event created → customer receives). - P99 < 30s. - Max: 24h (last retry).

Note: rate-limited subscriptions o slow customers inflan p99.

Observability

Métricas (engine-side)

wf_webhook_delivery_attempts_total{subscription_id, outcome="success|retry|fail|dlq"}
wf_webhook_delivery_duration_seconds{subscription_id}
wf_webhook_outbox_lag_seconds  # current age of oldest pending
wf_webhook_outbox_size  # pending events
wf_webhook_subscription_disabled_total{reason}
wf_webhook_dlq_size{subscription_id}

Alertas: - wf_webhook_outbox_lag_seconds > 300 → dispatcher lagging. - wf_webhook_dlq_size > 100 → muchos events undeliverable.

Customer-facing dashboard

Customer ve en UI: - Recent deliveries (success/fail). - Latency per endpoint. - DLQ count. - Subscription state.

Útil para diagnostic: "¿por qué mi webhook no llegó?"

Customer testing

Customer puede testear delivery sin proceso real:

wf webhook test <subscription-id> --event-type process.instance.completed

Engine envía un event mock al endpoint. Útil for verification setup.

Idempotency en customer side (verification)

Encourage customer to:

// Test idempotency
event := mockEvent()
process(event)  // first time
process(event)  // second time -- should be no-op

Comparativa con alternativas

Approach Pros Cons
Webhooks Universal, simple Customer pushes effort to themselves
SSE (Server-Sent Events) Built-in retry, simple Browser-oriented, less robust for backend
Message bus (Kafka) Robust, scalable, dedup Customer needs Kafka infra
Polling No customer endpoint needed High overhead, latency

Webhooks es defacto para "engine → external system".

Para customers con scale: opcional Kafka topic en lugar de webhooks.

Implementation phases

  • M2: Outbox table, basic dispatcher, HMAC sig, retry policy.
  • M3: Per-endpoint rate limit, DLQ, customer testing API.
  • M4: Per-resource ordering opcional, Kafka alternative.

Anti-patterns

❌ Síncrono webhook delivery

Engine espera response del customer antes de continuar proceso → customer slow stops engine.

// MAL
if err := customerWebhook(event); err != nil {
    // Process blocks here
}
// BIEN
outbox.Insert(event)  // async, returns immediately
// Dispatcher worker delivers offline

❌ Sin signature

Customer no puede verify authenticity → spoofing attacks.

❌ No retry policy

1 5xx del customer → event perdido forever.

❌ Webhook URL en variable de proceso

<bpmn:serviceTask>
  <zeebe:taskHeaders>
    <zeebe:header key="webhookUrl" value="https://anybody.com/callback"/>
  </zeebe:taskHeaders>
</bpmn:serviceTask>

Anyone can configure URL → SSRF risk. Subscriptions deben ser API-managed con validation.

❌ Sin rate limit

Customer's endpoint dies, engine retries forever, hammers customer's recovery.

Referencias