Saltar a contenido

Idempotency & deduplication — estrategia end-to-end

En un sistema at-least-once, todo se duplica eventualmente. Cómo el engine, los workers, las APIs externas y los message systems se coordinan para que el sistema sea efectively-exactly-once. Patterns + Postgres queries.

Premisa: at-least-once siempre

adrs/adr-007-at-least-once-idempotent-workers hace at-least-once el contrato base. Razones: - Network unreliable: ack pérdida ⇒ retry. - Crashes: worker dies después de complete API call, antes de ack ⇒ retry. - Timer expiration: job no completado in time ⇒ otro worker lo toma.

Cualquier "exactly-once" engine sin idempotent workers es ficción.

Layers donde la dedupe importa

1. API ingress       (REST/gRPC) → cliente reintenta StartInstance
2. Command queue    → engine reintenta apply command
3. Worker fetch     → job activado dos veces
4. External call    → worker llama Stripe dos veces
5. Message bus      → message publicado dos veces
6. Event sink       → audit log con eventos duplicados

Cada layer necesita su estrategia. No hay "una" solución global.

Layer 1: API ingress idempotency

Idempotency-Key header (Stripe pattern)

POST /api/v1/instances
Idempotency-Key: e8a3-4b2f-...

{ "processDefinitionId": "order-flow", "variables": {...} }

Server-side:

CREATE TABLE api_idempotency (
    key TEXT PRIMARY KEY,
    tenant_id TEXT NOT NULL,
    response_status INT,
    response_body JSONB,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now()
) PARTITION BY RANGE (created_at);

-- Retention: 24h
CREATE INDEX ON api_idempotency (tenant_id, created_at);

Lookup flow:

func handleStartInstance(w http.ResponseWriter, r *http.Request) {
    idemKey := r.Header.Get("Idempotency-Key")
    if idemKey == "" {
        // Optional; without it, server creates one or accepts duplicates
        // Best practice: require for non-idempotent POSTs
        idemKey = generateID()
    }

    // Try fetch existing response
    var existing IdempotencyRecord
    err := db.QueryRow(ctx,
        "SELECT response_status, response_body FROM api_idempotency WHERE key=$1 AND tenant_id=$2",
        idemKey, tenant).Scan(&existing.Status, &existing.Body)

    if err == nil {
        // Replay
        w.WriteHeader(existing.Status)
        w.Write(existing.Body)
        return
    }

    // First time: execute, then store
    response := executeStartInstance(...)
    db.Exec(ctx,
        "INSERT INTO api_idempotency (key, tenant_id, response_status, response_body) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING",
        idemKey, tenant, response.Status, response.Body)
    w.Write(response.Body)
}

Reglas: - Key tiene scope (tenant, key) para evitar collisions entre tenants. - TTL 24h: balance entre cobertura realista y storage. - Race condition: dos requests concurrent con misma key → second waits, sees record from first.

Idempotency-Key debe ser único per "logical request": - Cliente decide el key (UUID v4 OK). - Mismo key ⇒ misma operación lógica. - Si payload distinto con mismo key → 409 Conflict.

Conflict detection

if existing != nil {
    // Verify request matches original (hash of body)
    if hash(reqBody) != existing.RequestHash {
        return 409 Conflict
    }
    return cached response
}

Layer 2: Engine command queue dedup

El engine internal queue puede recibir un comando dos veces (e.g., crash mid-write).

Estrategia: cada command tiene deterministic ID basado en source.

Command ID = hash(source, requestID, sequenceNum)
CREATE TABLE command_log (
    id BIGINT PRIMARY KEY,
    deterministic_key TEXT UNIQUE,  -- evita re-inserts
    type TEXT NOT NULL,
    payload JSONB NOT NULL,
    position BIGSERIAL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- Insert con ON CONFLICT DO NOTHING
INSERT INTO command_log (id, deterministic_key, type, payload)
VALUES ($1, $2, $3, $4)
ON CONFLICT (deterministic_key) DO NOTHING
RETURNING id;

Si el INSERT retorna NULL: comando duplicado, no procesar. Si retorna ID: procesar.

Replay determinism

Ver adrs/adr-019-replay-determinism-invariant: replay del command log debe producir mismo state. Esto implica: - Comandos tienen IDs determinísticos. - Procesamiento es función pura del state + command. - No timestamps "now()" dentro del processing — usar el timestamp del command.

Layer 3: Job activation dedup

Worker activa job. Crash. Otro worker activa el mismo job. Risk: doble ejecución.

Estrategia: job activation con lease.

-- Estado del job
CREATE TABLE jobs (
    key BIGINT PRIMARY KEY,
    type TEXT NOT NULL,
    state TEXT NOT NULL,  -- ACTIVATABLE, ACTIVATED, COMPLETED, FAILED
    activated_by TEXT,
    activated_at TIMESTAMPTZ,
    deadline TIMESTAMPTZ,
    retries INT NOT NULL,
    variables JSONB
);

Activate con SKIP LOCKED:

UPDATE jobs
SET state = 'ACTIVATED',
    activated_by = $worker_id,
    activated_at = now(),
    deadline = now() + interval '30 seconds'
WHERE key IN (
    SELECT key FROM jobs
    WHERE state = 'ACTIVATABLE' AND type = $type
    ORDER BY priority DESC, key ASC
    LIMIT $batch_size
    FOR UPDATE SKIP LOCKED
)
RETURNING *;

Worker A activa job 123 con deadline +30s. Worker A crashea. Después de 30s, scheduler detecta deadline expired, vuelve job a ACTIVATABLE. Worker B activa.

Ambos workers no ejecutan simultáneamente — porque el lease es exclusive. Pero pueden ejecutar secuencialmente (A inicia, B inicia después de A's lease expiration).

Implicación: workers DEBEN ser idempotent (un job puede ejecutarse 2+ veces over time).

Layer 4: Worker → external call dedup

Cuando worker llama Stripe / DB / webhook, debe garantizar idempotency.

Provider supports idempotency keys (best)

result, err := stripe.Charge(ctx, &stripe.ChargeParams{
    Customer:       customerID,
    Amount:         amount,
    IdempotencyKey: fmt.Sprintf("wf-job-%d", job.Key),
})

job.Key es estable across retries del mismo job. Stripe deduplica server-side.

Provider sin idempotency: SELECT-then-INSERT con unique key

// Idempotent insert
_, err := db.Exec(ctx,
    `INSERT INTO orders (id, customer, amount, created_at)
     VALUES ($1, $2, $3, now())
     ON CONFLICT (id) DO NOTHING`,
    orderIDFromJob, customer, amount)

orderIDFromJob debe ser determinístico del job, no de uuid.New() (que cambia entre retries).

orderID := deterministicID(processInstanceKey, "order")

Idempotent state machine

Some operations son state transitions: idempotent si "set to state X" es safe a re-aplicar.

-- Idempotent
UPDATE order SET status = 'shipped' WHERE id = $1;

-- NO idempotent
UPDATE order SET shipment_count = shipment_count + 1 WHERE id = $1;

Layer 5: Message bus dedup

Engine publica a Kafka / NATS. Crash mid-publish puede causar duplicates.

Kafka: transactional producer

producer.BeginTransaction()
producer.Send(topic, key, value)
producer.CommitTransaction()  // atomic con el ack del engine

Si rollback: mensaje no visible to consumers. Garantiza no duplicates downstream.

NATS JetStream: msg-id

msg := nats.NewMsg(subject)
msg.Header.Set(nats.MsgIdHdr, fmt.Sprintf("wf-event-%d", eventID))
js.PublishMsg(msg)

JetStream dedupea por msg-id (configurable window, default 2 min).

Engine no publica direct. Inserta en tabla outbox transaccionalmente con su business write:

BEGIN;
INSERT INTO process_instances (...) VALUES (...);
INSERT INTO outbox (event_type, payload, dedup_key) VALUES (...);
COMMIT;

Outbox shipper:

SELECT id, event_type, payload, dedup_key
FROM outbox
WHERE shipped = false
ORDER BY id
LIMIT 100
FOR UPDATE SKIP LOCKED;

Ship to message bus con dedup_key. Marca shipped=true. Si crash: re-shipping mismo dedup_key → broker dedupea.

UPDATE outbox SET shipped = true WHERE id = ANY($1);

Outbox garantiza: - Mensaje publicado iff DB write succeeded. - Mensaje publicado at-least-once (broker dedupea). - No "ghost messages" sin DB write.

Layer 6: Audit log dedup

Audit events tienen requirements distintos: append-only, no missing, no extras.

Estrategia: usar el deterministic event ID generado por el engine.

CREATE TABLE audit_log (
    event_id TEXT PRIMARY KEY,  -- e.g., "ev-{partition}-{position}"
    timestamp TIMESTAMPTZ NOT NULL,
    event_type TEXT NOT NULL,
    actor TEXT,
    resource TEXT,
    payload JSONB,
    -- ...
);

INSERT INTO audit_log (...) ON CONFLICT (event_id) DO NOTHING;

Event ID derivado de la posición en el command log → único pero estable.

Patterns generales

Pattern: SELECT-then-INSERT con unique key

INSERT INTO target (id, ...) VALUES ($determinístic_id, ...)
ON CONFLICT (id) DO NOTHING
RETURNING id;

Si insert retorna NULL: ya existía, salta el resto. Si retorna ID: continúa.

Pattern: Compare-And-Swap (CAS)

Idempotent updates:

UPDATE jobs
SET state = 'COMPLETED', completed_at = now()
WHERE key = $1 AND state = 'ACTIVATED' AND activated_by = $worker_id;

Si returns 0 rows: alguien más lo completó/expiró. NO error, solo skip.

Pattern: dedupe table TTL

CREATE TABLE deduplication (
    key TEXT PRIMARY KEY,
    seen_at TIMESTAMPTZ NOT NULL DEFAULT now()
) PARTITION BY RANGE (seen_at);

-- Partition trimming
DROP TABLE deduplication_2026_05;  -- after retention period

Workers check antes de execute:

INSERT INTO deduplication (key) VALUES ($idempotency_key)
ON CONFLICT (key) DO NOTHING
RETURNING key;

Si retorna NULL: skip work.

Pattern: outbox + idempotent consumer

Producer (engine):
  BEGIN;
  business_write;
  outbox_insert(dedup_key);
  COMMIT;

Shipper:
  publish(dedup_key);

Consumer:
  if already_processed(dedup_key): skip;
  else: process + mark processed;

Funciona si: - Producer: outbox + DB en misma transacción. - Shipper: at-least-once delivery. - Consumer: idempotent + dedup store.

Tooling support

CLI helper

wf debug idempotency-check --tenant acme --hours 24
# Lista keys vistas y count of dup hits

Métricas

wf_api_idempotency_hits_total{path}    # cuántos requests reusaron resultado cacheado
wf_api_idempotency_conflicts_total      # mismo key con payload distinto
wf_worker_idempotency_skipped_total{type}  # work skipped (ya hecho)
wf_outbox_dedup_drops_total{topic}      # broker dropped duplicate
wf_db_command_dedup_total              # command log dedup

Alertas: - rate(wf_api_idempotency_conflicts_total[5m]) > 0.01 → cliente con bug. - rate(wf_worker_idempotency_skipped_total[5m]) > 0.05 → workers reintentando demasiado.

Edge cases tricky

1. Idempotency key reused incorrectly por cliente

Cliente usa mismo key para 2 ops diferentes (e.g., 2 órdenes distintas). Server detecta payload mismatch → 409. Cliente debe regenerar key.

2. Worker completes job after deadline expired

Worker A activa, lento, deadline expira. Worker B activa el mismo job. Worker A finalmente termina y llama complete. Engine debe rechazar complete de Worker A (deadline expired, worker B owns it).

UPDATE jobs
SET state = 'COMPLETED'
WHERE key = $1 AND activated_by = $worker_id AND deadline > now();
-- Si returns 0: too late.

Worker A recibe error "job already completed by another worker" — no retry.

3. Process instance creation idempotency

Cliente reintenta startInstance. Sin Idempotency-Key, se crean 2 instances duplicadas.

Mitigación: si business logic permite, usar correlationKey:

POST /api/v1/instances
{ "processDefinitionId": "order-flow",
  "variables": { "orderId": "O-123" },
  "uniquenessKey": "orderId" }

Server: si existe instance con orderId=O-123 en ACTIVE state, return existing instead of new.

Implementación: index unique sobre (process_def_id, uniqueness_key_value, state=ACTIVE).

4. Message correlation duplicate

Message publish duplicado → match con misma process instance → handler triggers 2 veces.

Mitigación: messages tienen message_id único. Engine dedupea:

CREATE TABLE messages_received (
    message_id TEXT PRIMARY KEY,
    received_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

Antes de correlate, INSERT con conflict skip.

5. Variable update race

Worker A y B activan jobs distintos de la misma instance. Ambos quieren actualizar total: - A: total = 100 + 50 = 150 - B: total = 100 + 30 = 130

Sin coordination: last-writer-wins, perdiste 50.

Mitigación: engine procesa comandos secuencialmente por partition (single-threaded actor). Ambos updates van a la cola → secuenciados. A primero (total=150), B después (total=180).

Pero workers no deberían update mismo field concurrent. Modelo: cada worker actualiza un field propio.

Tabla resumen

Layer Key Storage TTL Failure mode si falla
API ingress Idempotency-Key api_idempotency 24h Operación ejecutada 2×
Engine command log Deterministic ID command_log Permanent Replay non-deterministic
Job activation Lease (deadline) jobs Active job Doble ejecución concurrent
Worker → external Provider key o job.Key Provider-side Provider-defined Doble call effect
Message bus msg-id / dedup_key Broker 2-60min Doble message processing
Audit log Event ID determinístico audit_log Permanent Audit duplicates

Anti-patterns

❌ "Just retry, it'll work"

Sin idempotency, retry causa data corruption.

❌ Idempotency key del lado del server

Server genera key.
Cliente retry sin saber el key (lo recibió en response del primer attempt — que perdió).
Server crea nuevo key.
Doble operación.

Key debe ser client-generated.

❌ Idempotency con timestamp

key := fmt.Sprintf("%s-%d", op, time.Now().Unix())  // MAL

Retry segundo después → diferente key → no dedup.

❌ Worker side effects sin tx

// MAL
db.Insert(...)            // tx1: inserts row
externalAPI.Send(...)     // call externo
db.Update(processed=true) // tx3: marks processed

Si crash entre tx1 y tx3: row inserted, externo called, pero NO marked. Retry → doble external call.

// BIEN: outbox
BEGIN;
  db.Insert(...);
  db.Outbox.Insert(externalCall);
COMMIT;
// Shipper picks up outbox, calls external, marks processed

Roadmap

  • M1: API idempotency-key, deterministic command IDs, lease-based job activation.
  • M2: Outbox pattern para event sink.
  • M3: Tooling CLI para debug, métricas dashboards.
  • M4: Cross-region dedup considerations.

Referencias