Saltar a contenido

Api Engine Serialization

REST API recibe requests concurrentes; engine procesa single-threaded por ADR-006. ¿Cómo bridge? Solución: persist command al log primero (atomic), engine procesa de log secuencialmente. La API NO espera processing — responde con commandPosition y cliente lo usa para tracking. Para sync API (createWithAwaitingResult), waiters registrados en in-memory map, notified al complete. Garantiza correctness sin locks complejos. Reusa pattern de Camunda command sourcing pero adaptado a Postgres.

El problema

Per ADR-006, engine processing es single-threaded dentro de una partition. Sin concurrency = sin race conditions.

Pero REST API es inherentemente multi-threaded: - HTTP servers (Express, FastAPI, Gin) handle requests concurrently - 100 clientes pueden enviar requests simultáneos - Multiple HTTP workers en mismo process

Pregunta: ¿cómo se serializa de API multi-threaded a engine single-threaded sin lose performance ni correctness?

La key insight: command log es el sync point

flowchart TD
    subgraph MT[Multi-threaded]
        R1[HTTP req 1]
        R2[HTTP req 2]
        R3[HTTP req 3]
    end
    DB[("Postgres command_log<br/>(BIGSERIAL position)<br/>Atomic INSERTs<br/>serializan naturalmente")]
    subgraph ST[Single-threaded]
        E["Engine loop<br/>(single thread)<br/>Reads positions sequentially"]
    end
    R1 --> DB
    R2 --> DB
    R3 --> DB
    DB --> E

Postgres BIGSERIAL provides natural ordering. Cualquier number de concurrent INSERTs assigned monotonic positions.

Flow detallado

Step 1: API handler inserts command

async def create_process_instance(request):
    """Multi-threaded HTTP handler."""

    # Validate request (multi-threaded OK)
    tenant_id = extract_tenant(request)
    body = await request.json()

    # Rate limit (multi-threaded OK)
    if not await rate_limiter.check(tenant_id):
        return Response(429)

    # Authorization (multi-threaded OK)
    await authorize(request.user, "create_process_instance", tenant_id)

    # Insert command (multi-threaded but atomic per row)
    result = await db.fetch_one("""
        INSERT INTO command_log (timestamp, tenant_id, intent, payload)
        VALUES (NOW(), $1, 'PROCESS_INSTANCE_CREATION:CREATE', $2)
        RETURNING position, timestamp
    """, tenant_id, body)

    return {
        "commandPosition": result['position'],
        "timestamp": result['timestamp']
    }

API returns INMEDIATAMENTE con position. No waiting for processing. Throughput de API = throughput de Postgres INSERT (>10K/sec easy).

Step 2: Engine consumes log sequentially

class EngineLoop:
    def __init__(self):
        self.last_processed_position = 0
        self.running = False

    async def start(self):
        """Single-threaded processing loop."""
        self.running = True
        # Recover last position from DB
        self.last_processed_position = await self._recover_position()

        while self.running:
            await self._process_next_batch()

    async def _process_next_batch(self):
        """Get next batch of commands, process serially."""
        commands = await db.fetch_all("""
            SELECT position, intent, payload, tenant_id
            FROM command_log
            WHERE position > $1
            ORDER BY position ASC
            LIMIT 100
        """, self.last_processed_position)

        if not commands:
            # No new commands, wait briefly
            await self._wait_for_signal()
            return

        for cmd in commands:
            await self._process_one_command(cmd)
            self.last_processed_position = cmd['position']
            await self._update_position_marker()

Step 3: Engine signal cuando hay new commands

Polling cada Xms is wasteful. Use Postgres LISTEN/NOTIFY:

# API inserts command + notifies
async def insert_command(...):
    async with db.transaction():
        await db.execute("INSERT INTO command_log ...")
        await db.execute(f"NOTIFY new_command, '{tenant_id}'")

# Engine listens
class EngineLoop:
    async def _wait_for_signal(self):
        """Wait for new command notification or timeout."""
        try:
            async with db.acquire() as conn:
                await conn.add_listener('new_command', self._on_new_command)
                await self.notification_event.wait()
                self.notification_event.clear()
        except asyncio.TimeoutError:
            pass  # periodic poll fallback

    async def _on_new_command(self, conn, pid, channel, payload):
        self.notification_event.set()

Result: engine procesa commands con latency ~1-10ms desde INSERT. No busy polling.

Sync API: createWithAwaitingResult

Algunos use cases want sync response (cliente espera completion):

POST /v2/process-instances?wait=true&timeout=30s

Implementation: in-memory waiter registry

class CompletionWaiterRegistry:
    """Map of processInstanceKey → asyncio.Future."""
    def __init__(self):
        self.waiters = {}  # pi_key → Future

    async def wait_for_completion(self, pi_key, timeout=30):
        future = asyncio.get_event_loop().create_future()
        self.waiters[pi_key] = future

        try:
            result = await asyncio.wait_for(future, timeout=timeout)
            return result
        finally:
            self.waiters.pop(pi_key, None)

    def notify_completed(self, pi_key, result):
        if pi_key in self.waiters:
            future = self.waiters[pi_key]
            if not future.done():
                future.set_result(result)

# In API handler
async def create_with_awaiting_result(request):
    # Insert command (as before)
    result = await db.fetch_one("INSERT INTO command_log ... RETURNING position")

    # Engine will assign pi_key when processing
    # Wait for it via separate notification

    pi_key = await wait_for_pi_creation(result['position'])  # listens for event

    # Wait for instance completion
    final_state = await waiter_registry.wait_for_completion(pi_key, timeout=30)

    return final_state
# Engine emits notification when instance completes
async def on_process_completed(pi_key, final_state):
    waiter_registry.notify_completed(pi_key, final_state)

Cross-engine waiters (Phase 2)

Si multiple engine instances: waiter podría estar en engine A, completion ocurre en engine B (leader).

Solution: route sync requests al leader.

@app.post('/v2/process-instances')
async def create_instance(request, wait=False):
    if wait and not engine.is_leader:
        # Redirect to leader
        leader_url = engine.discover_leader_url()
        return RedirectResponse(f"{leader_url}{request.path}", status_code=307)

    # Normal handling

Or: use Postgres NOTIFY cross-engine:

# When instance completes (any engine)
await db.execute(f"NOTIFY pi_completed_{pi_key}, '{json.dumps(final_state)}'")

# Engine A listens
async with db.acquire() as conn:
    await conn.add_listener(f'pi_completed_{pi_key}', on_pi_completed)

Multi-engine waiters work without leader routing.

Idempotency: client retries

Cliente puede retry request si timeout. Sin idempotency, duplicate process instances created.

Solution: idempotency keys opcional:

POST /v2/process-instances
Idempotency-Key: abc-123-xyz
CREATE TABLE idempotency_keys (
    key TEXT PRIMARY KEY,
    tenant_id TEXT NOT NULL,
    command_position BIGINT NOT NULL,
    response_body JSONB NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    expires_at TIMESTAMPTZ DEFAULT NOW() + INTERVAL '24 hours'
);

CREATE INDEX idx_idempotency_expires ON idempotency_keys(expires_at);

API handler:

async def create_instance(request, idempotency_key=None):
    if idempotency_key:
        existing = await db.fetch_one("""
            SELECT response_body FROM idempotency_keys 
            WHERE key = $1 AND tenant_id = $2
        """, idempotency_key, tenant_id)

        if existing:
            return existing['response_body']

    # Normal processing
    result = await insert_command(...)
    response = {...}

    if idempotency_key:
        await db.execute("""
            INSERT INTO idempotency_keys (key, tenant_id, command_position, response_body)
            VALUES ($1, $2, $3, $4)
        """, idempotency_key, tenant_id, result['position'], response)

    return response

Cleanup via cron:

DELETE FROM idempotency_keys WHERE expires_at < NOW();

Reading state: NO sync point needed

Reads (GET endpoints) NO van por command_log. Lean directamente de state tables:

async def get_process_instance(pi_key):
    # Direct read - no queue
    instance = await db.fetch_one("""
        SELECT * FROM process_instances WHERE process_instance_key = $1
    """, pi_key)

    return instance

Reads son trivially parallel — Postgres MVCC handles concurrent reads.

Performance characteristics

Operation Path Latency
Create instance INSERT command_log ~5-15ms
Create with await INSERT + wait for completion ~50ms-30s
Get instance SELECT state tables ~1-5ms
Complete job INSERT command + RETURN immediate ~5-15ms
Engine processing LISTEN + SELECT + UPDATE ~10-50ms per command

API throughput limited by: - Postgres INSERT rate (~10K-50K/sec) - Rate limiter per-tenant - Connection pool size

Engine throughput limited by: - Single thread per partition (CPU-bound at ~1000-5000 cmd/sec) - Postgres write rate (state mutations)

Failure modes

API down, engine up

  • INSERTs fail → cliente gets 503
  • Engine continues processing queued commands
  • When API recovers, normal operations

Engine down, API up

  • INSERTs continue (API doesn't know engine is down)
  • Queue grows in command_log
  • When engine recovers, processes backlog (potentially fast catchup)
  • Sync API endpoints timeout (no completion happening)

Network partition between API and engine

  • Both think other is down
  • API keeps accepting (correct: commands queue safely)
  • Engine resumes processing when network returns

Postgres down

  • Both API and engine block
  • No commands accepted, no commands processed
  • When Postgres recovers, everything resumes
  • Patroni failover (~30s, ADR-020)

Comparing to Camunda

Camunda's gateway has bidirectional gRPC streaming: - Client streams jobs request, server streams jobs back - Backpressure via HTTP/2 flow control - StabilizingAIMD adjusts rate

MVP via REST + Postgres: - One-shot requests - Backpressure via rate limit + 429/503 (ver concepts/backpressure-rest-strategy) - Simple but less elegant

Trade-off: MVP loses ~5-10% throughput, gains massive simplicity. Postgres-based sync is easier to reason about y debug.

Phase 2+: Multiple engine instances

When multiple engines (active-active per ADR-020):

Approach A: leader processes commands

  • Only leader runs processing loop
  • Followers serve reads + insert commands
  • Single command stream → single processor

Approach B: partitioned processing

  • Engines own subsets of tenants
  • Each engine processes its tenants' commands
  • Smart router routes new commands to correct engine

Recomendación: Approach A para Phase 2 (simpler). Approach B for Phase 3+ when sharding.

Schema optimizations

-- Engine reads commands ordered by position
CREATE INDEX idx_command_position_unprocessed
ON command_log (position)
WHERE processed_at IS NULL;

-- After processing, mark
UPDATE command_log SET processed_at = NOW() WHERE position = $1;

-- Eventually cleanup processed commands per retention policy

Or use cursor-based reading:

-- Engine tracks last position separately (not in command_log)
CREATE TABLE engine_state (
    instance_id TEXT PRIMARY KEY,
    last_processed_position BIGINT NOT NULL,
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Engine pseudo-code:
UPDATE engine_state SET last_processed_position = $new_pos WHERE instance_id = $self_id;

Less write amplification (no UPDATE per command).