ADR-019: Replay determinism como invariante testeable¶
- Status: Accepted
- Date: 2026-05-14
- Tags: core, testing, correctness
Context and Problem Statement¶
Si el engine usa command sourcing (ADR-005) y single-threaded processing (ADR-006), el state debería ser reconstructible desde el log. ¿Hacemos esto un test invariant explícito, o asumimos que la implementation correcta lo logre implicitly?
Decision Drivers¶
- Camunda tiene
ContinuouslyReplayTestque valida esto continuously - Bugs de no-determinism son catastróficos (failover causes state divergence)
- Failure modes silenciosos sin test explícito
- Property-based testing es la herramienta correcta
Considered Options¶
- Hacer invariante explícita + test continuo (como Camunda)
- Asumir invariante implicit (no test)
- Test manual ocasional
- Solo test en CI nightly
Decision Outcome¶
Chosen option: Invariante explícita + test continuo + property-based porque: - Single most important correctness property del engine - Sin test = bug catastrófico silent - Property-based test detecta edge cases - Inversión one-time, beneficio permanente
Positive Consequences¶
- Catastrophic bugs caught en CI
- Refactoring safe (any change que break determinism falla test)
- Onboarding seguro (new devs ven invariante claramente)
- Foundation para failover correcto
Negative Consequences¶
- Test infrastructure investment (~2 semanas)
- Test puede ser slow (replay overhead)
- Random failures requires good reproducibility
Implementación¶
Property: state reconstructible from log¶
def test_replay_determinism():
"""
For ANY valid sequence of commands C1...Cn:
state_after_processing(C1...Cn) == state_after_replay(log_of(processing(C1...Cn)))
"""
log = SharedLog()
# Two engines sharing the log
processing_engine = Engine(log, mode='PROCESSING')
replay_engine = Engine(log, mode='REPLAY')
# Generate random commands
commands = generate_random_commands(count=100)
for cmd in commands:
processing_engine.process(cmd)
# Wait for replay to catch up
wait_until(lambda: replay_engine.position == log.last_position)
# Compare states
p_state = processing_engine.collect_state()
r_state = replay_engine.collect_state()
assert p_state == r_state, "Replay state diverged from processing state"
Excepciones documentadas¶
Algunas tables son intencionalmente non-replay-deterministic:
EXCLUDED_FROM_REPLAY_CHECK = {
'transient_cache', # in-memory caches
'migrations_state', # populated at startup, no events
}
def compare_states(p_state, r_state):
for table in p_state:
if table in EXCLUDED_FROM_REPLAY_CHECK:
continue
assert p_state[table] == r_state[table], f"Table {table} diverged"
Camunda excluye DEFAULT y MIGRATIONS_STATE. MVP debe documentar similar list.
Property-based testing¶
Beyond simple test, use property-based:
@given(
bpmn_model=valid_bpmn_models(max_elements=10, max_depth=4),
execution_path=valid_execution_paths()
)
def test_replay_invariant(bpmn_model, execution_path):
log = SharedLog()
p_engine = Engine(log, mode='PROCESSING')
r_engine = Engine(log, mode='REPLAY')
p_engine.deploy(bpmn_model)
pid = p_engine.create_instance(bpmn_model.process_id)
for step in execution_path.steps:
execute_step(p_engine, step, pid)
wait_until(lambda: r_engine.position == log.last_position)
assert p_engine.state == r_engine.state
Para 100 procesos × 30 paths = 3000 test cases. Catches edge cases que test-by-example perdería.
Reglas de implementación para preservar determinism¶
Source of non-determinism debe externalizarse¶
# BAD: no determinístico
def process_create_instance(cmd):
pi_key = uuid.uuid4() # ← random
now = datetime.utcnow() # ← time
instance = ProcessInstance(pi_key, now)
# GOOD: inyectable
def process_create_instance(cmd, key_generator, clock):
pi_key = key_generator.next_key() # monotonic
now = clock.now() # inyectable
instance = ProcessInstance(pi_key, now)
KeyGenerator monotonic¶
-- Postgres BIGSERIAL is monotonic
CREATE TABLE process_instances (
process_instance_key BIGSERIAL PRIMARY KEY,
...
);
Clock inyectable¶
class Clock:
def now(self) -> datetime: ...
class SystemClock(Clock):
def now(self) -> datetime:
return datetime.utcnow()
class TestClock(Clock):
def __init__(self, start):
self.current = start
def now(self):
return self.current
def advance(self, delta):
self.current += delta
Tests usan TestClock — controlled time = controllable replay.
Random NUNCA acceptable¶
Engine NUNCA llama random() directamente. Si necesita randomness:
1. Externaliza (seed inyectable)
2. Persistir el random value en el event (replay reads from event)
Failure modes que esto previene¶
Sin replay determinism test:
-
Failover corruption: new leader rebuilds state from log, ends up with different state than old leader. Followers verán state divergente.
-
Snapshot inconsistency: snapshot taken at position X, restored elsewhere → state divergent from what was at position X.
-
Time-based bugs: code que llama
datetime.now()directamente da resultados diferentes en replay. -
Race conditions revealed: si algún día algo concurrent slips in, replay test catches it.
Costo¶
- Test infrastructure: ~2 semanas inicial setup
- CI time per run: +5-10 min para replay tests
- Maintenance: minimal (test rarely changes)
ROI: catches catastrophic bugs antes de producción. Cheap insurance.
Links¶
- concepts/replay-determinism — Detalle técnico
- concepts/property-based-testing — Pattern
- concepts/test-infrastructure — Test framework
- adrs/adr-005-stream-processing-command-sourcing — Foundation
- adrs/adr-006-single-threaded-per-partition — Habilitador