Saltar a contenido

Process Definition Cache

Re-parsear BPMN XML cada request es prohibitivo (~10ms). Camunda cachea ExecutableProcess en su process_cache column family. El MVP necesita equivalent: in-memory LRU cache per engine instance + invalidation on deploy via Postgres LISTEN/NOTIFY. Trade-offs: memoria vs latencia. Recomendación: LRU con 1000 entries max, ~100MB footprint, hit rate >99% en steady state. Para múltiples engine instances (Phase 2+), cada uno mantiene su cache + invalidation messaging.

El problema

Cada vez que el engine procesa un command BPMN, necesita: 1. Lookup del process definition (BPMN XML) 2. Parse XML → ExecutableProcess (in-memory object tree) 3. Find element by ID dentro del tree 4. Execute la lógica

Pasos 1-3 son repetidos en cada command. Sin cache:

flowchart TD
    A["Command arrives<br/>(e.g., COMPLETE_ELEMENT)"] --> B["SELECT bpmn_xml FROM process_definitions<br/>WHERE key = ?<br/>~1ms DB roundtrip"]
    B --> C["Parse XML → BpmnModelInstance<br/>~5-15ms (XML parsing is expensive)"]
    C --> D["Transform → ExecutableProcess<br/>(5-step pipeline per ADR-005)<br/>~2-5ms"]
    D --> E["Find element by ID in tree<br/>~0.1ms"]
    E --> F["Execute logic<br/>~0.1-5ms"]
    F --> G["Total: ~8-25ms per command<br/>of which 7-20ms is parsing overhead"]

Para 100 TPS, esto agrega 700-2000ms de CPU/sec gastado en re-parsing — 40-80% del budget. Inaceptable.

Solución: LRU cache in-memory

Estructura del cache

from cachetools import LRUCache
from typing import Optional

class ProcessDefinitionCache:
    def __init__(self, max_entries=1000):
        # Cache key: (tenant_id, process_definition_key)
        self.cache: LRUCache = LRUCache(maxsize=max_entries)
        self.lock = asyncio.Lock()

        # Metrics
        self.hits = 0
        self.misses = 0

    async def get(self, tenant_id: str, key: int) -> Optional[ExecutableProcess]:
        cache_key = (tenant_id, key)

        if cache_key in self.cache:
            self.hits += 1
            return self.cache[cache_key]

        self.misses += 1

        async with self.lock:
            # Double-check after acquiring lock
            if cache_key in self.cache:
                return self.cache[cache_key]

            # Load from DB and parse
            executable = await self._load_and_parse(tenant_id, key)
            if executable:
                self.cache[cache_key] = executable

            return executable

    async def _load_and_parse(self, tenant_id: str, key: int):
        row = await db.fetch_one(
            "SELECT bpmn_xml FROM process_definitions WHERE tenant_id = $1 AND key = $2",
            tenant_id, key
        )
        if not row:
            return None

        return parse_bpmn(row['bpmn_xml'])

    def invalidate(self, tenant_id: str, key: int):
        cache_key = (tenant_id, key)
        self.cache.pop(cache_key, None)

    def invalidate_tenant(self, tenant_id: str):
        # Used when tenant deleted
        keys_to_remove = [k for k in self.cache if k[0] == tenant_id]
        for k in keys_to_remove:
            del self.cache[k]

Cache sizing

Param Value Reasoning
max_entries 1000 1000 distinct processes × tenants is generous
Avg entry size ~100 KB Parsed BPMN tree con elements + behaviors
Total memory ~100 MB Acceptable for engine process
LRU policy Standard Evict least recently used

Si workload tiene > 1000 distinct (tenant, process) combinaciones simultáneas, increase. Si más small, decrease para save memory.

Hit rate analysis

Typical workload: - Most users tienen pocos process definitions activas (~5-20) - Most traffic concentra on those processes - Old/deprecated processes rare lookups

Expected hit rate: >99% en steady state.

Invalidation: cuándo evictar

3 triggers para evict del cache:

1. New deployment del mismo bpmnProcessId

Nueva versión deployed → old version sigue en cache para process instances que la usaron, pero new instances deben usar new version.

async def on_deployment_event(event):
    bpmn_process_id = event['bpmnProcessId']

    # NO invalidate old version key
    # (instances en ejecución la siguen necesitando)

    # Just refresh "latest version" mapping
    latest_version_cache.invalidate(event['tenantId'], bpmn_process_id)

Clave: process_definition_key es immutable (cada deployment = new key). Old key keeps its cache entry alive while instances ejecutan.

2. Process definition deleted

Admin elimina un proceso:

async def on_process_definition_deleted(event):
    cache.invalidate(event['tenantId'], event['processDefinitionKey'])

3. Cross-engine invalidation (Phase 2+)

Cuando hay multiple engine instances, cada uno mantiene cache propio. Invalidation cross-engine via Postgres LISTEN/NOTIFY:

-- Engine que emite deployment notifica:
NOTIFY process_definition_changed, '{"tenant": "...", "key": 12345}';
# Cada engine instance escucha:
async def listen_for_invalidations():
    async with db.acquire() as conn:
        await conn.add_listener('process_definition_changed', on_invalidation)

async def on_invalidation(conn, pid, channel, payload):
    data = json.loads(payload)
    cache.invalidate(data['tenant'], data['key'])

Cero infraestructura adicional (no Redis pubsub, no Kafka).

Latest version resolution cache

Separate cache para "give me latest version of bpmn_process_id":

class LatestVersionCache:
    def __init__(self, ttl=60):
        self.cache = TTLCache(maxsize=10000, ttl=ttl)

    async def get_latest(self, tenant_id, bpmn_process_id):
        cache_key = (tenant_id, bpmn_process_id)
        if cache_key in self.cache:
            return self.cache[cache_key]

        row = await db.fetch_one("""
            SELECT process_definition_key
            FROM process_definitions
            WHERE tenant_id = $1 AND bpmn_process_id = $2
            ORDER BY version DESC
            LIMIT 1
        """, tenant_id, bpmn_process_id)

        if row:
            self.cache[cache_key] = row['process_definition_key']
            return row['process_definition_key']
        return None

TTL corto (60s) acepta lag pequeño en favor de simplicity. Invalidation immediate via LISTEN/NOTIFY.

Element lookup dentro de process

Después de tener ExecutableProcess en cache, lookup de elemento por ID debe ser O(1):

class ExecutableProcess:
    def __init__(self, bpmn_process_id, version, elements):
        self.bpmn_process_id = bpmn_process_id
        self.version = version
        self.elements_by_id = {e.id: e for e in elements}  # dict O(1)
        self.start_events = [e for e in elements if e.element_type == 'START_EVENT']

    def get_element(self, element_id: str) -> ExecutableElement:
        return self.elements_by_id.get(element_id)

Pre-build el dict at parse time → cero overhead por lookup.

Memory considerations

Per ExecutableProcess in cache:

ExecutableProcess:
    bpmn_process_id: str (~32 bytes)
    version: int (8 bytes)
    elements_by_id: dict
        N elements × (
            ExecutableElement:
                id: str (~32 bytes)
                element_type: str (~32 bytes)
                ... behavior config (~1KB avg)
        )

Para típico proceso con 20 elements: ~25KB
Para complex proceso con 100 elements: ~125KB
Average: ~100KB

Cache de 1000 entries = ~100MB. Manageable.

Si proceso has multiple versions cacheado (porque hay instances activas de cada una), each version = separate entry. 5 versions × 100KB = 500KB para ese bpmn_process_id.

Profiling: validate hit rate

Métricas a exportar (OTel):

cache_hit_counter = meter.create_counter("process_cache.hits")
cache_miss_counter = meter.create_counter("process_cache.misses")
cache_size_gauge = meter.create_gauge("process_cache.size")

# Update
cache_hit_counter.add(1, {"tenant": tenant_id})
cache_size_gauge.set(len(cache.cache))

Grafana dashboard mostrar hit rate %. Si < 90% en steady state, revisar: - Cache size suficiente? - Eviction pattern problemático? - New deployments demasiado frecuentes?

Cold start

Cuando engine starts, cache está vacío. Primeros requests son slower (parse).

Two strategies:

Lazy (default)

Cache fills naturally as requests arrive. After ~1 min, hit rate alta.

Pre-warm (opcional)

Para latency-sensitive deployments:

async def warm_cache():
    """Pre-load active process definitions."""
    active_pds = await db.fetch_all("""
        SELECT DISTINCT process_definition_key, tenant_id
        FROM process_instances
        WHERE state IN ('ACTIVE', 'INCIDENT')
        ORDER BY started_at DESC
        LIMIT 1000
    """)

    for row in active_pds:
        await cache.get(row['tenant_id'], row['process_definition_key'])

# At startup
await warm_cache()

Alternative: shared cache (Redis)

Para múltiples engines, opción es shared cache:

flowchart LR
    E1[Engine 1] --> R["Redis<br/>(shared cache)"]
    E2[Engine 2] --> R
    E3[Engine 3] --> R
    R --> P[("Postgres<br/>(definitions)")]

Trade-offs: - Pro: single cache instance, lower aggregate memory - Pro: invalidation atomic via Redis DELETE - Con: network roundtrip per cache hit (~1ms) - Con: serialization overhead (ExecutableProcess → bytes) - Con: dependency adicional

Recomendación MVP: stick with per-engine cache. Local memory is faster que Redis network roundtrip. Coordination via LISTEN/NOTIFY suficiente.

Schema queries optimizadas

-- Process definition table
CREATE TABLE process_definitions (
    process_definition_key BIGSERIAL PRIMARY KEY,
    bpmn_process_id TEXT NOT NULL,
    version INT NOT NULL,
    tenant_id TEXT NOT NULL,
    bpmn_xml TEXT NOT NULL,            -- raw XML
    parsed_metadata JSONB,             -- pre-extracted metadata
    deployment_key BIGINT NOT NULL,
    deployed_at TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE (bpmn_process_id, version, tenant_id)
);

-- Fast lookup by key
-- (covered by PRIMARY KEY index)

-- Latest version lookup
CREATE INDEX idx_pd_latest_version 
ON process_definitions (tenant_id, bpmn_process_id, version DESC);

-- Used in: 
-- SELECT ... ORDER BY version DESC LIMIT 1

Storage del BPMN XML

¿TEXT o BYTEA o externalize?

Option Pros Cons
TEXT column Simple, queryable Storage overhead per row
BYTEA (compressed) ~70% smaller Decompression overhead
External (S3) Cero DB size impact Network roundtrip, complexity

Average BPMN XML: ~10-50KB. Para 10K process definitions: ~100MB-500MB. Manageable in Postgres.

Recomendación: TEXT column. Postgres TOAST compress automaticamente if > 2KB.