Api Engine Serialization
REST API recibe requests concurrentes; engine procesa single-threaded por ADR-006. ¿Cómo bridge? Solución: persist command al log primero (atomic), engine procesa de log secuencialmente. La API NO espera processing — responde con
commandPositiony cliente lo usa para tracking. Para sync API (createWithAwaitingResult), waiters registrados en in-memory map, notified al complete. Garantiza correctness sin locks complejos. Reusa pattern de Camunda command sourcing pero adaptado a Postgres.
El problema¶
Per ADR-006, engine processing es single-threaded dentro de una partition. Sin concurrency = sin race conditions.
Pero REST API es inherentemente multi-threaded: - HTTP servers (Express, FastAPI, Gin) handle requests concurrently - 100 clientes pueden enviar requests simultáneos - Multiple HTTP workers en mismo process
Pregunta: ¿cómo se serializa de API multi-threaded a engine single-threaded sin lose performance ni correctness?
La key insight: command log es el sync point¶
flowchart TD
subgraph MT[Multi-threaded]
R1[HTTP req 1]
R2[HTTP req 2]
R3[HTTP req 3]
end
DB[("Postgres command_log<br/>(BIGSERIAL position)<br/>Atomic INSERTs<br/>serializan naturalmente")]
subgraph ST[Single-threaded]
E["Engine loop<br/>(single thread)<br/>Reads positions sequentially"]
end
R1 --> DB
R2 --> DB
R3 --> DB
DB --> E
Postgres BIGSERIAL provides natural ordering. Cualquier number de concurrent INSERTs assigned monotonic positions.
Flow detallado¶
Step 1: API handler inserts command¶
async def create_process_instance(request):
"""Multi-threaded HTTP handler."""
# Validate request (multi-threaded OK)
tenant_id = extract_tenant(request)
body = await request.json()
# Rate limit (multi-threaded OK)
if not await rate_limiter.check(tenant_id):
return Response(429)
# Authorization (multi-threaded OK)
await authorize(request.user, "create_process_instance", tenant_id)
# Insert command (multi-threaded but atomic per row)
result = await db.fetch_one("""
INSERT INTO command_log (timestamp, tenant_id, intent, payload)
VALUES (NOW(), $1, 'PROCESS_INSTANCE_CREATION:CREATE', $2)
RETURNING position, timestamp
""", tenant_id, body)
return {
"commandPosition": result['position'],
"timestamp": result['timestamp']
}
API returns INMEDIATAMENTE con position. No waiting for processing. Throughput de API = throughput de Postgres INSERT (>10K/sec easy).
Step 2: Engine consumes log sequentially¶
class EngineLoop:
def __init__(self):
self.last_processed_position = 0
self.running = False
async def start(self):
"""Single-threaded processing loop."""
self.running = True
# Recover last position from DB
self.last_processed_position = await self._recover_position()
while self.running:
await self._process_next_batch()
async def _process_next_batch(self):
"""Get next batch of commands, process serially."""
commands = await db.fetch_all("""
SELECT position, intent, payload, tenant_id
FROM command_log
WHERE position > $1
ORDER BY position ASC
LIMIT 100
""", self.last_processed_position)
if not commands:
# No new commands, wait briefly
await self._wait_for_signal()
return
for cmd in commands:
await self._process_one_command(cmd)
self.last_processed_position = cmd['position']
await self._update_position_marker()
Step 3: Engine signal cuando hay new commands¶
Polling cada Xms is wasteful. Use Postgres LISTEN/NOTIFY:
# API inserts command + notifies
async def insert_command(...):
async with db.transaction():
await db.execute("INSERT INTO command_log ...")
await db.execute(f"NOTIFY new_command, '{tenant_id}'")
# Engine listens
class EngineLoop:
async def _wait_for_signal(self):
"""Wait for new command notification or timeout."""
try:
async with db.acquire() as conn:
await conn.add_listener('new_command', self._on_new_command)
await self.notification_event.wait()
self.notification_event.clear()
except asyncio.TimeoutError:
pass # periodic poll fallback
async def _on_new_command(self, conn, pid, channel, payload):
self.notification_event.set()
Result: engine procesa commands con latency ~1-10ms desde INSERT. No busy polling.
Sync API: createWithAwaitingResult¶
Algunos use cases want sync response (cliente espera completion):
Implementation: in-memory waiter registry¶
class CompletionWaiterRegistry:
"""Map of processInstanceKey → asyncio.Future."""
def __init__(self):
self.waiters = {} # pi_key → Future
async def wait_for_completion(self, pi_key, timeout=30):
future = asyncio.get_event_loop().create_future()
self.waiters[pi_key] = future
try:
result = await asyncio.wait_for(future, timeout=timeout)
return result
finally:
self.waiters.pop(pi_key, None)
def notify_completed(self, pi_key, result):
if pi_key in self.waiters:
future = self.waiters[pi_key]
if not future.done():
future.set_result(result)
# In API handler
async def create_with_awaiting_result(request):
# Insert command (as before)
result = await db.fetch_one("INSERT INTO command_log ... RETURNING position")
# Engine will assign pi_key when processing
# Wait for it via separate notification
pi_key = await wait_for_pi_creation(result['position']) # listens for event
# Wait for instance completion
final_state = await waiter_registry.wait_for_completion(pi_key, timeout=30)
return final_state
# Engine emits notification when instance completes
async def on_process_completed(pi_key, final_state):
waiter_registry.notify_completed(pi_key, final_state)
Cross-engine waiters (Phase 2)¶
Si multiple engine instances: waiter podría estar en engine A, completion ocurre en engine B (leader).
Solution: route sync requests al leader.
@app.post('/v2/process-instances')
async def create_instance(request, wait=False):
if wait and not engine.is_leader:
# Redirect to leader
leader_url = engine.discover_leader_url()
return RedirectResponse(f"{leader_url}{request.path}", status_code=307)
# Normal handling
Or: use Postgres NOTIFY cross-engine:
# When instance completes (any engine)
await db.execute(f"NOTIFY pi_completed_{pi_key}, '{json.dumps(final_state)}'")
# Engine A listens
async with db.acquire() as conn:
await conn.add_listener(f'pi_completed_{pi_key}', on_pi_completed)
Multi-engine waiters work without leader routing.
Idempotency: client retries¶
Cliente puede retry request si timeout. Sin idempotency, duplicate process instances created.
Solution: idempotency keys opcional:
CREATE TABLE idempotency_keys (
key TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL,
command_position BIGINT NOT NULL,
response_body JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
expires_at TIMESTAMPTZ DEFAULT NOW() + INTERVAL '24 hours'
);
CREATE INDEX idx_idempotency_expires ON idempotency_keys(expires_at);
API handler:
async def create_instance(request, idempotency_key=None):
if idempotency_key:
existing = await db.fetch_one("""
SELECT response_body FROM idempotency_keys
WHERE key = $1 AND tenant_id = $2
""", idempotency_key, tenant_id)
if existing:
return existing['response_body']
# Normal processing
result = await insert_command(...)
response = {...}
if idempotency_key:
await db.execute("""
INSERT INTO idempotency_keys (key, tenant_id, command_position, response_body)
VALUES ($1, $2, $3, $4)
""", idempotency_key, tenant_id, result['position'], response)
return response
Cleanup via cron:
Reading state: NO sync point needed¶
Reads (GET endpoints) NO van por command_log. Lean directamente de state tables:
async def get_process_instance(pi_key):
# Direct read - no queue
instance = await db.fetch_one("""
SELECT * FROM process_instances WHERE process_instance_key = $1
""", pi_key)
return instance
Reads son trivially parallel — Postgres MVCC handles concurrent reads.
Performance characteristics¶
| Operation | Path | Latency |
|---|---|---|
| Create instance | INSERT command_log | ~5-15ms |
| Create with await | INSERT + wait for completion | ~50ms-30s |
| Get instance | SELECT state tables | ~1-5ms |
| Complete job | INSERT command + RETURN immediate | ~5-15ms |
| Engine processing | LISTEN + SELECT + UPDATE | ~10-50ms per command |
API throughput limited by: - Postgres INSERT rate (~10K-50K/sec) - Rate limiter per-tenant - Connection pool size
Engine throughput limited by: - Single thread per partition (CPU-bound at ~1000-5000 cmd/sec) - Postgres write rate (state mutations)
Failure modes¶
API down, engine up¶
- INSERTs fail → cliente gets 503
- Engine continues processing queued commands
- When API recovers, normal operations
Engine down, API up¶
- INSERTs continue (API doesn't know engine is down)
- Queue grows in command_log
- When engine recovers, processes backlog (potentially fast catchup)
- Sync API endpoints timeout (no completion happening)
Network partition between API and engine¶
- Both think other is down
- API keeps accepting (correct: commands queue safely)
- Engine resumes processing when network returns
Postgres down¶
- Both API and engine block
- No commands accepted, no commands processed
- When Postgres recovers, everything resumes
- Patroni failover (~30s, ADR-020)
Comparing to Camunda¶
Camunda's gateway has bidirectional gRPC streaming: - Client streams jobs request, server streams jobs back - Backpressure via HTTP/2 flow control - StabilizingAIMD adjusts rate
MVP via REST + Postgres: - One-shot requests - Backpressure via rate limit + 429/503 (ver concepts/backpressure-rest-strategy) - Simple but less elegant
Trade-off: MVP loses ~5-10% throughput, gains massive simplicity. Postgres-based sync is easier to reason about y debug.
Phase 2+: Multiple engine instances¶
When multiple engines (active-active per ADR-020):
Approach A: leader processes commands¶
- Only leader runs processing loop
- Followers serve reads + insert commands
- Single command stream → single processor
Approach B: partitioned processing¶
- Engines own subsets of tenants
- Each engine processes its tenants' commands
- Smart router routes new commands to correct engine
Recomendación: Approach A para Phase 2 (simpler). Approach B for Phase 3+ when sharding.
Schema optimizations¶
-- Engine reads commands ordered by position
CREATE INDEX idx_command_position_unprocessed
ON command_log (position)
WHERE processed_at IS NULL;
-- After processing, mark
UPDATE command_log SET processed_at = NOW() WHERE position = $1;
-- Eventually cleanup processed commands per retention policy
Or use cursor-based reading:
-- Engine tracks last position separately (not in command_log)
CREATE TABLE engine_state (
instance_id TEXT PRIMARY KEY,
last_processed_position BIGINT NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Engine pseudo-code:
UPDATE engine_state SET last_processed_position = $new_pos WHERE instance_id = $self_id;
Less write amplification (no UPDATE per command).
Links¶
- adrs/adr-006-single-threaded-per-partition — Foundation
- concepts/journal-and-stream-platform — Stream processing pattern
- concepts/backpressure-rest-strategy — Complementary
- adrs/adr-005-stream-processing-command-sourcing — Command sourcing
- adrs/adr-020-patroni-postgres-ha — HA implications