Saltar a contenido

Timer Recovery Postgres

Camunda recupera timers desde RocksDB snapshots. El MVP no tiene snapshots — usa Postgres como source of truth: tabla timers con índice on due_date, scan al startup para load timers pendientes en memoria, scheduler check cada 100ms. Failures recovery: timer table es authoritative, in-memory queue es derivada. Trade-off vs Camunda: latency similar (~100ms), recovery más simple (no snapshot transfer), peor scale theoretical pero suficiente para MVP.

El problema

Timers son fundamental BPMN feature: - Boundary timer event: "if user task no completed en 2 days, escalate" - Intermediate timer event: "wait 1 hour then continue" - Timer start event: "start process every Monday at 8am"

Engine debe: 1. Registrar timer cuando se activa (with due_date) 2. Fire timer cuando due_date passes 3. Sobrevivir restart (timers no se pierden)

Camunda usa RocksDB + snapshots: - Timers en RocksDB column family - Snapshot incluye state de RocksDB - Recovery: load snapshot + replay log → state reconstructed

MVP no usa snapshots — ¿cómo recovery?

Solución: tabla timers como source of truth

Schema

CREATE TABLE timers (
    timer_key BIGSERIAL PRIMARY KEY,
    process_instance_key BIGINT NOT NULL,
    element_instance_key BIGINT NOT NULL,
    tenant_id TEXT NOT NULL,

    -- Timer config
    timer_type TEXT NOT NULL,            -- BOUNDARY | INTERMEDIATE | START_EVENT
    target_element_id TEXT NOT NULL,
    due_date TIMESTAMPTZ NOT NULL,

    -- Repeating timer (cycle)
    repetitions INT,                     -- NULL = one-shot, N = repeat N times, -1 = infinite
    interval_ms BIGINT,                  -- For cycle timers

    -- State
    state TEXT NOT NULL DEFAULT 'PENDING',  -- PENDING | TRIGGERED | CANCELED
    created_at TIMESTAMPTZ DEFAULT NOW(),
    triggered_at TIMESTAMPTZ,

    UNIQUE (process_instance_key, element_instance_key)
);

-- CRITICAL INDEX: scan pendientes ordered by due_date
CREATE INDEX idx_timers_pending_due 
ON timers (due_date) 
WHERE state = 'PENDING';

-- Cleanup helper
CREATE INDEX idx_timers_triggered_old 
ON timers (triggered_at) 
WHERE state = 'TRIGGERED';

Decisión clave: tabla es source of truth. In-memory schedule es derivada.

Lifecycle de un timer

1. Activación (engine processing)

Cuando element con timer se activa:

async def on_timer_activated(element_instance, timer_def):
    # 1. Calculate due_date
    if timer_def.duration:
        due_date = now + parse_duration(timer_def.duration)  # ISO 8601
    elif timer_def.date:
        due_date = parse_date(timer_def.date)
    elif timer_def.cycle:
        due_date = next_occurrence(timer_def.cycle, now)

    # 2. Persist atomic con state mutation
    await db.execute("""
        INSERT INTO timers (
            process_instance_key, element_instance_key, 
            tenant_id, timer_type, target_element_id, 
            due_date, repetitions, interval_ms
        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
    """, ...)

    # 3. Notify scheduler in-memory (optimization)
    timer_scheduler.add_pending(timer_key, due_date)

Atomic via Postgres transaction.

2. Scheduling in-memory (optimization)

Para no scan la DB cada 100ms, mantain in-memory schedule:

class TimerScheduler:
    def __init__(self):
        # Min-heap ordered by due_date
        self.pending = []  # [(due_date, timer_key), ...]
        self.lock = asyncio.Lock()
        self.next_check_time = None

    async def add_pending(self, timer_key, due_date):
        async with self.lock:
            heapq.heappush(self.pending, (due_date, timer_key))
            self._reschedule_check()

    async def check_loop(self):
        while running:
            now = datetime.utcnow()

            async with self.lock:
                # Pop all timers with due_date <= now
                to_fire = []
                while self.pending and self.pending[0][0] <= now:
                    _, timer_key = heapq.heappop(self.pending)
                    to_fire.append(timer_key)

                next_due = self.pending[0][0] if self.pending else None

            # Fire outside lock
            for timer_key in to_fire:
                await self._fire_timer(timer_key)

            # Wait until next timer or 100ms
            if next_due:
                wait = max(0.1, (next_due - now).total_seconds())
                wait = min(wait, 60)  # Cap at 60s (refresh from DB)
            else:
                wait = 60

            await asyncio.sleep(wait)

    async def _fire_timer(self, timer_key):
        # Emit TIMER:TRIGGER command
        await engine.emit_command(
            intent='TIMER:TRIGGER',
            payload={'timer_key': timer_key}
        )

Optimization de Camunda's DueDateTimerCheckScheduler: demand-driven (check next due, not poll). Same pattern.

3. Fire del timer

class TimerTriggerProcessor:
    async def process(self, command):
        timer_key = command.payload['timer_key']

        # Atomic: load timer, check still pending, mark triggered
        async with db.transaction():
            timer = await db.fetch_one(
                "SELECT * FROM timers WHERE timer_key = $1 FOR UPDATE",
                timer_key
            )

            if not timer or timer['state'] != 'PENDING':
                return  # Already triggered or canceled (idempotent)

            # Mark triggered
            await db.execute(
                "UPDATE timers SET state = 'TRIGGERED', triggered_at = NOW() WHERE timer_key = $1",
                timer_key
            )

            # Emit event
            await emit_event('TIMER:TRIGGERED', timer)

        # Activate target element (cause downstream effects)
        await activate_target_element(timer['process_instance_key'], timer['target_element_id'])

        # If repeating, schedule next occurrence
        if timer['repetitions'] is None or timer['repetitions'] > 1:
            await self._schedule_next_occurrence(timer)

4. Repeating timers

Recurring timers (e.g., R/PT1H = every hour):

async def _schedule_next_occurrence(self, timer):
    next_due = timer['due_date'] + timedelta(milliseconds=timer['interval_ms'])
    new_repetitions = timer['repetitions'] - 1 if timer['repetitions'] > 0 else -1

    await db.execute("""
        INSERT INTO timers (
            process_instance_key, element_instance_key,
            tenant_id, timer_type, target_element_id,
            due_date, repetitions, interval_ms
        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
    """, ..., next_due, new_repetitions, timer['interval_ms'])

    # Add to in-memory schedule
    timer_scheduler.add_pending(new_timer_key, next_due)

Each occurrence is new row — historical record naturally preserved.

Recovery al startup

Engine restart → in-memory schedule está vacía. Recovery from Postgres:

class EngineStartup:
    async def initialize_timers(self):
        """Load all pending timers from DB into in-memory scheduler."""
        now = datetime.utcnow()

        # Load all pending timers (chunks if many)
        async for row in db.fetch_chunks("""
            SELECT timer_key, due_date 
            FROM timers 
            WHERE state = 'PENDING'
            ORDER BY due_date ASC
        """, chunk_size=1000):
            timer_scheduler.add_pending(row['timer_key'], row['due_date'])

        # Fire any overdue immediately
        await timer_scheduler.process_overdue()

Tiempo de startup: 1M pending timers × ~1μs each = ~1 second. Acceptable.

Si > 10M timers, considerar lazy load (only nearest 1000 at startup, scan rest as time progresses).

"Overdue" handling

Si engine fue down por hours/days, hay timers cuya due_date ya pasó:

async def process_overdue(self):
    """Fire all timers whose due_date has passed."""
    overdue = await db.fetch_all("""
        SELECT timer_key FROM timers 
        WHERE state = 'PENDING' AND due_date <= NOW()
        ORDER BY due_date ASC
    """)

    for row in overdue:
        # Fire as command (goes through normal processing)
        await engine.emit_command(
            intent='TIMER:TRIGGER',
            payload={'timer_key': row['timer_key']}
        )

Backfill happens "as fast as engine can process". Could overwhelm engine if many overdue → rate limit:

# Process in batches con yield
for batch in chunks(overdue, 100):
    for row in batch:
        await emit_command(...)
    await asyncio.sleep(0.1)  # yield 100ms between batches

Cancelation de timers

User cancela process instance → cancel timers:

async def cancel_timers_for_instance(process_instance_key):
    # Mark canceled in DB
    await db.execute("""
        UPDATE timers 
        SET state = 'CANCELED'
        WHERE process_instance_key = $1 
          AND state = 'PENDING'
    """, process_instance_key)

    # Remove from in-memory schedule
    timer_scheduler.remove_for_instance(process_instance_key)

In-memory remove es O(N) si usas heap, pero N usually small. Alternative: lazy check on fire (skip if state != PENDING).

Cleanup de timers viejos

Triggered/canceled timers acumulan. Retention policy:

-- Cleanup job (cron)
DELETE FROM timers 
WHERE state IN ('TRIGGERED', 'CANCELED') 
  AND triggered_at < NOW() - INTERVAL '30 days';

30 días retention para forensics. Adjust per compliance needs.

Failure modes

Failure Recovery
Engine restart Reload from DB at startup
DB connection lost Retry with backoff; in-memory continues firing
In-memory schedule corrupted Next periodic check rebuilds from DB
Clock skew DB time is authoritative (use NOW() not local clock)
Timer with past due_date Process as overdue, fire immediately
Duplicate fire (race) UPDATE with state check is atomic

Performance considerations

Read load

In-memory schedule reduces DB reads to: - Startup: 1 full scan - Periodic refresh: 1 query every minute (verify no missed timers) - Per fire: 1 SELECT FOR UPDATE + 1 UPDATE

For 1000 timers/sec firing: 2000 DB ops/sec. Postgres handles trivially.

Write load

Every activation = 1 INSERT. Every fire = 1 UPDATE. Plus recurring timers = 1 INSERT per occurrence.

Bottleneck unlikely.

Memory

In-memory heap: ~32 bytes per entry. 1M timers = ~32 MB. Acceptable.

Comparison vs Camunda

Aspect Camunda RocksDB + snapshots MVP Postgres
Recovery time (restart) ~seconds (snapshot load) ~1s per 1M timers
Latency (fire) ~10-50ms ~50-100ms (DB roundtrip)
Storage overhead Compact (RocksDB) Larger (Postgres rows)
Operational complexity Higher (snapshot tooling) Lower (just SQL)
Cross-engine coordination Via Raft Via Postgres advisory lock

Trade-off: marginal latency hit, dramatically simpler ops. Right for MVP.

Phase 2+: Multi-engine timer ownership

When multiple engine instances (Phase 2), only ONE should fire each timer:

Option A: Leader-only timer processing

Leader (per ADR-020) procesa timers, followers idle for timers. Simple.

Option B: Sharded timer ownership

Each engine instance owns subset of timers:

-- Each engine claims chunk
UPDATE timers 
SET claimed_by = $instance_id, claimed_until = NOW() + INTERVAL '5 minutes'
WHERE state = 'PENDING' 
  AND due_date <= NOW() + INTERVAL '1 minute'
  AND (claimed_by IS NULL OR claimed_until < NOW())
  AND timer_key % $num_engines = $shard_index
LIMIT 100
RETURNING *;

Engine processes its claimed timers, renews claim periodically. If engine dies, claims expire → another picks up.

Recomendación MVP: Option A (leader-only). Suficiente para Phase 1-2.