Timer Recovery Postgres
Camunda recupera timers desde RocksDB snapshots. El MVP no tiene snapshots — usa Postgres como source of truth: tabla
timerscon índice ondue_date, scan al startup para load timers pendientes en memoria, scheduler check cada 100ms. Failures recovery: timer table es authoritative, in-memory queue es derivada. Trade-off vs Camunda: latency similar (~100ms), recovery más simple (no snapshot transfer), peor scale theoretical pero suficiente para MVP.
El problema¶
Timers son fundamental BPMN feature: - Boundary timer event: "if user task no completed en 2 days, escalate" - Intermediate timer event: "wait 1 hour then continue" - Timer start event: "start process every Monday at 8am"
Engine debe: 1. Registrar timer cuando se activa (with due_date) 2. Fire timer cuando due_date passes 3. Sobrevivir restart (timers no se pierden)
Camunda usa RocksDB + snapshots: - Timers en RocksDB column family - Snapshot incluye state de RocksDB - Recovery: load snapshot + replay log → state reconstructed
MVP no usa snapshots — ¿cómo recovery?
Solución: tabla timers como source of truth¶
Schema¶
CREATE TABLE timers (
timer_key BIGSERIAL PRIMARY KEY,
process_instance_key BIGINT NOT NULL,
element_instance_key BIGINT NOT NULL,
tenant_id TEXT NOT NULL,
-- Timer config
timer_type TEXT NOT NULL, -- BOUNDARY | INTERMEDIATE | START_EVENT
target_element_id TEXT NOT NULL,
due_date TIMESTAMPTZ NOT NULL,
-- Repeating timer (cycle)
repetitions INT, -- NULL = one-shot, N = repeat N times, -1 = infinite
interval_ms BIGINT, -- For cycle timers
-- State
state TEXT NOT NULL DEFAULT 'PENDING', -- PENDING | TRIGGERED | CANCELED
created_at TIMESTAMPTZ DEFAULT NOW(),
triggered_at TIMESTAMPTZ,
UNIQUE (process_instance_key, element_instance_key)
);
-- CRITICAL INDEX: scan pendientes ordered by due_date
CREATE INDEX idx_timers_pending_due
ON timers (due_date)
WHERE state = 'PENDING';
-- Cleanup helper
CREATE INDEX idx_timers_triggered_old
ON timers (triggered_at)
WHERE state = 'TRIGGERED';
Decisión clave: tabla es source of truth. In-memory schedule es derivada.
Lifecycle de un timer¶
1. Activación (engine processing)¶
Cuando element con timer se activa:
async def on_timer_activated(element_instance, timer_def):
# 1. Calculate due_date
if timer_def.duration:
due_date = now + parse_duration(timer_def.duration) # ISO 8601
elif timer_def.date:
due_date = parse_date(timer_def.date)
elif timer_def.cycle:
due_date = next_occurrence(timer_def.cycle, now)
# 2. Persist atomic con state mutation
await db.execute("""
INSERT INTO timers (
process_instance_key, element_instance_key,
tenant_id, timer_type, target_element_id,
due_date, repetitions, interval_ms
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
""", ...)
# 3. Notify scheduler in-memory (optimization)
timer_scheduler.add_pending(timer_key, due_date)
Atomic via Postgres transaction.
2. Scheduling in-memory (optimization)¶
Para no scan la DB cada 100ms, mantain in-memory schedule:
class TimerScheduler:
def __init__(self):
# Min-heap ordered by due_date
self.pending = [] # [(due_date, timer_key), ...]
self.lock = asyncio.Lock()
self.next_check_time = None
async def add_pending(self, timer_key, due_date):
async with self.lock:
heapq.heappush(self.pending, (due_date, timer_key))
self._reschedule_check()
async def check_loop(self):
while running:
now = datetime.utcnow()
async with self.lock:
# Pop all timers with due_date <= now
to_fire = []
while self.pending and self.pending[0][0] <= now:
_, timer_key = heapq.heappop(self.pending)
to_fire.append(timer_key)
next_due = self.pending[0][0] if self.pending else None
# Fire outside lock
for timer_key in to_fire:
await self._fire_timer(timer_key)
# Wait until next timer or 100ms
if next_due:
wait = max(0.1, (next_due - now).total_seconds())
wait = min(wait, 60) # Cap at 60s (refresh from DB)
else:
wait = 60
await asyncio.sleep(wait)
async def _fire_timer(self, timer_key):
# Emit TIMER:TRIGGER command
await engine.emit_command(
intent='TIMER:TRIGGER',
payload={'timer_key': timer_key}
)
Optimization de Camunda's DueDateTimerCheckScheduler: demand-driven (check next due, not poll). Same pattern.
3. Fire del timer¶
class TimerTriggerProcessor:
async def process(self, command):
timer_key = command.payload['timer_key']
# Atomic: load timer, check still pending, mark triggered
async with db.transaction():
timer = await db.fetch_one(
"SELECT * FROM timers WHERE timer_key = $1 FOR UPDATE",
timer_key
)
if not timer or timer['state'] != 'PENDING':
return # Already triggered or canceled (idempotent)
# Mark triggered
await db.execute(
"UPDATE timers SET state = 'TRIGGERED', triggered_at = NOW() WHERE timer_key = $1",
timer_key
)
# Emit event
await emit_event('TIMER:TRIGGERED', timer)
# Activate target element (cause downstream effects)
await activate_target_element(timer['process_instance_key'], timer['target_element_id'])
# If repeating, schedule next occurrence
if timer['repetitions'] is None or timer['repetitions'] > 1:
await self._schedule_next_occurrence(timer)
4. Repeating timers¶
Recurring timers (e.g., R/PT1H = every hour):
async def _schedule_next_occurrence(self, timer):
next_due = timer['due_date'] + timedelta(milliseconds=timer['interval_ms'])
new_repetitions = timer['repetitions'] - 1 if timer['repetitions'] > 0 else -1
await db.execute("""
INSERT INTO timers (
process_instance_key, element_instance_key,
tenant_id, timer_type, target_element_id,
due_date, repetitions, interval_ms
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
""", ..., next_due, new_repetitions, timer['interval_ms'])
# Add to in-memory schedule
timer_scheduler.add_pending(new_timer_key, next_due)
Each occurrence is new row — historical record naturally preserved.
Recovery al startup¶
Engine restart → in-memory schedule está vacía. Recovery from Postgres:
class EngineStartup:
async def initialize_timers(self):
"""Load all pending timers from DB into in-memory scheduler."""
now = datetime.utcnow()
# Load all pending timers (chunks if many)
async for row in db.fetch_chunks("""
SELECT timer_key, due_date
FROM timers
WHERE state = 'PENDING'
ORDER BY due_date ASC
""", chunk_size=1000):
timer_scheduler.add_pending(row['timer_key'], row['due_date'])
# Fire any overdue immediately
await timer_scheduler.process_overdue()
Tiempo de startup: 1M pending timers × ~1μs each = ~1 second. Acceptable.
Si > 10M timers, considerar lazy load (only nearest 1000 at startup, scan rest as time progresses).
"Overdue" handling¶
Si engine fue down por hours/days, hay timers cuya due_date ya pasó:
async def process_overdue(self):
"""Fire all timers whose due_date has passed."""
overdue = await db.fetch_all("""
SELECT timer_key FROM timers
WHERE state = 'PENDING' AND due_date <= NOW()
ORDER BY due_date ASC
""")
for row in overdue:
# Fire as command (goes through normal processing)
await engine.emit_command(
intent='TIMER:TRIGGER',
payload={'timer_key': row['timer_key']}
)
Backfill happens "as fast as engine can process". Could overwhelm engine if many overdue → rate limit:
# Process in batches con yield
for batch in chunks(overdue, 100):
for row in batch:
await emit_command(...)
await asyncio.sleep(0.1) # yield 100ms between batches
Cancelation de timers¶
User cancela process instance → cancel timers:
async def cancel_timers_for_instance(process_instance_key):
# Mark canceled in DB
await db.execute("""
UPDATE timers
SET state = 'CANCELED'
WHERE process_instance_key = $1
AND state = 'PENDING'
""", process_instance_key)
# Remove from in-memory schedule
timer_scheduler.remove_for_instance(process_instance_key)
In-memory remove es O(N) si usas heap, pero N usually small. Alternative: lazy check on fire (skip if state != PENDING).
Cleanup de timers viejos¶
Triggered/canceled timers acumulan. Retention policy:
-- Cleanup job (cron)
DELETE FROM timers
WHERE state IN ('TRIGGERED', 'CANCELED')
AND triggered_at < NOW() - INTERVAL '30 days';
30 días retention para forensics. Adjust per compliance needs.
Failure modes¶
| Failure | Recovery |
|---|---|
| Engine restart | Reload from DB at startup |
| DB connection lost | Retry with backoff; in-memory continues firing |
| In-memory schedule corrupted | Next periodic check rebuilds from DB |
| Clock skew | DB time is authoritative (use NOW() not local clock) |
| Timer with past due_date | Process as overdue, fire immediately |
| Duplicate fire (race) | UPDATE with state check is atomic |
Performance considerations¶
Read load¶
In-memory schedule reduces DB reads to: - Startup: 1 full scan - Periodic refresh: 1 query every minute (verify no missed timers) - Per fire: 1 SELECT FOR UPDATE + 1 UPDATE
For 1000 timers/sec firing: 2000 DB ops/sec. Postgres handles trivially.
Write load¶
Every activation = 1 INSERT. Every fire = 1 UPDATE. Plus recurring timers = 1 INSERT per occurrence.
Bottleneck unlikely.
Memory¶
In-memory heap: ~32 bytes per entry. 1M timers = ~32 MB. Acceptable.
Comparison vs Camunda¶
| Aspect | Camunda RocksDB + snapshots | MVP Postgres |
|---|---|---|
| Recovery time (restart) | ~seconds (snapshot load) | ~1s per 1M timers |
| Latency (fire) | ~10-50ms | ~50-100ms (DB roundtrip) |
| Storage overhead | Compact (RocksDB) | Larger (Postgres rows) |
| Operational complexity | Higher (snapshot tooling) | Lower (just SQL) |
| Cross-engine coordination | Via Raft | Via Postgres advisory lock |
Trade-off: marginal latency hit, dramatically simpler ops. Right for MVP.
Phase 2+: Multi-engine timer ownership¶
When multiple engine instances (Phase 2), only ONE should fire each timer:
Option A: Leader-only timer processing¶
Leader (per ADR-020) procesa timers, followers idle for timers. Simple.
Option B: Sharded timer ownership¶
Each engine instance owns subset of timers:
-- Each engine claims chunk
UPDATE timers
SET claimed_by = $instance_id, claimed_until = NOW() + INTERVAL '5 minutes'
WHERE state = 'PENDING'
AND due_date <= NOW() + INTERVAL '1 minute'
AND (claimed_by IS NULL OR claimed_until < NOW())
AND timer_key % $num_engines = $shard_index
LIMIT 100
RETURNING *;
Engine processes its claimed timers, renews claim periodically. If engine dies, claims expire → another picks up.
Recomendación MVP: Option A (leader-only). Suficiente para Phase 1-2.
Links¶
- concepts/timer-system — Camunda's approach
- adrs/adr-019-replay-determinism-invariant — Timer fires must be replayable
- concepts/api-engine-serialization — How timer commands enter the processing queue
- ISO 8601 duration