Process Definition Cache
Re-parsear BPMN XML cada request es prohibitivo (~10ms). Camunda cachea ExecutableProcess en su
process_cachecolumn family. El MVP necesita equivalent: in-memory LRU cache per engine instance + invalidation on deploy via Postgres LISTEN/NOTIFY. Trade-offs: memoria vs latencia. Recomendación: LRU con 1000 entries max, ~100MB footprint, hit rate >99% en steady state. Para múltiples engine instances (Phase 2+), cada uno mantiene su cache + invalidation messaging.
El problema¶
Cada vez que el engine procesa un command BPMN, necesita: 1. Lookup del process definition (BPMN XML) 2. Parse XML → ExecutableProcess (in-memory object tree) 3. Find element by ID dentro del tree 4. Execute la lógica
Pasos 1-3 son repetidos en cada command. Sin cache:
flowchart TD
A["Command arrives<br/>(e.g., COMPLETE_ELEMENT)"] --> B["SELECT bpmn_xml FROM process_definitions<br/>WHERE key = ?<br/>~1ms DB roundtrip"]
B --> C["Parse XML → BpmnModelInstance<br/>~5-15ms (XML parsing is expensive)"]
C --> D["Transform → ExecutableProcess<br/>(5-step pipeline per ADR-005)<br/>~2-5ms"]
D --> E["Find element by ID in tree<br/>~0.1ms"]
E --> F["Execute logic<br/>~0.1-5ms"]
F --> G["Total: ~8-25ms per command<br/>of which 7-20ms is parsing overhead"]
Para 100 TPS, esto agrega 700-2000ms de CPU/sec gastado en re-parsing — 40-80% del budget. Inaceptable.
Solución: LRU cache in-memory¶
Estructura del cache¶
from cachetools import LRUCache
from typing import Optional
class ProcessDefinitionCache:
def __init__(self, max_entries=1000):
# Cache key: (tenant_id, process_definition_key)
self.cache: LRUCache = LRUCache(maxsize=max_entries)
self.lock = asyncio.Lock()
# Metrics
self.hits = 0
self.misses = 0
async def get(self, tenant_id: str, key: int) -> Optional[ExecutableProcess]:
cache_key = (tenant_id, key)
if cache_key in self.cache:
self.hits += 1
return self.cache[cache_key]
self.misses += 1
async with self.lock:
# Double-check after acquiring lock
if cache_key in self.cache:
return self.cache[cache_key]
# Load from DB and parse
executable = await self._load_and_parse(tenant_id, key)
if executable:
self.cache[cache_key] = executable
return executable
async def _load_and_parse(self, tenant_id: str, key: int):
row = await db.fetch_one(
"SELECT bpmn_xml FROM process_definitions WHERE tenant_id = $1 AND key = $2",
tenant_id, key
)
if not row:
return None
return parse_bpmn(row['bpmn_xml'])
def invalidate(self, tenant_id: str, key: int):
cache_key = (tenant_id, key)
self.cache.pop(cache_key, None)
def invalidate_tenant(self, tenant_id: str):
# Used when tenant deleted
keys_to_remove = [k for k in self.cache if k[0] == tenant_id]
for k in keys_to_remove:
del self.cache[k]
Cache sizing¶
| Param | Value | Reasoning |
|---|---|---|
max_entries |
1000 | 1000 distinct processes × tenants is generous |
| Avg entry size | ~100 KB | Parsed BPMN tree con elements + behaviors |
| Total memory | ~100 MB | Acceptable for engine process |
| LRU policy | Standard | Evict least recently used |
Si workload tiene > 1000 distinct (tenant, process) combinaciones simultáneas, increase. Si más small, decrease para save memory.
Hit rate analysis¶
Typical workload: - Most users tienen pocos process definitions activas (~5-20) - Most traffic concentra on those processes - Old/deprecated processes rare lookups
Expected hit rate: >99% en steady state.
Invalidation: cuándo evictar¶
3 triggers para evict del cache:
1. New deployment del mismo bpmnProcessId¶
Nueva versión deployed → old version sigue en cache para process instances que la usaron, pero new instances deben usar new version.
async def on_deployment_event(event):
bpmn_process_id = event['bpmnProcessId']
# NO invalidate old version key
# (instances en ejecución la siguen necesitando)
# Just refresh "latest version" mapping
latest_version_cache.invalidate(event['tenantId'], bpmn_process_id)
Clave: process_definition_key es immutable (cada deployment = new key). Old key keeps its cache entry alive while instances ejecutan.
2. Process definition deleted¶
Admin elimina un proceso:
async def on_process_definition_deleted(event):
cache.invalidate(event['tenantId'], event['processDefinitionKey'])
3. Cross-engine invalidation (Phase 2+)¶
Cuando hay multiple engine instances, cada uno mantiene cache propio. Invalidation cross-engine via Postgres LISTEN/NOTIFY:
-- Engine que emite deployment notifica:
NOTIFY process_definition_changed, '{"tenant": "...", "key": 12345}';
# Cada engine instance escucha:
async def listen_for_invalidations():
async with db.acquire() as conn:
await conn.add_listener('process_definition_changed', on_invalidation)
async def on_invalidation(conn, pid, channel, payload):
data = json.loads(payload)
cache.invalidate(data['tenant'], data['key'])
Cero infraestructura adicional (no Redis pubsub, no Kafka).
Latest version resolution cache¶
Separate cache para "give me latest version of bpmn_process_id":
class LatestVersionCache:
def __init__(self, ttl=60):
self.cache = TTLCache(maxsize=10000, ttl=ttl)
async def get_latest(self, tenant_id, bpmn_process_id):
cache_key = (tenant_id, bpmn_process_id)
if cache_key in self.cache:
return self.cache[cache_key]
row = await db.fetch_one("""
SELECT process_definition_key
FROM process_definitions
WHERE tenant_id = $1 AND bpmn_process_id = $2
ORDER BY version DESC
LIMIT 1
""", tenant_id, bpmn_process_id)
if row:
self.cache[cache_key] = row['process_definition_key']
return row['process_definition_key']
return None
TTL corto (60s) acepta lag pequeño en favor de simplicity. Invalidation immediate via LISTEN/NOTIFY.
Element lookup dentro de process¶
Después de tener ExecutableProcess en cache, lookup de elemento por ID debe ser O(1):
class ExecutableProcess:
def __init__(self, bpmn_process_id, version, elements):
self.bpmn_process_id = bpmn_process_id
self.version = version
self.elements_by_id = {e.id: e for e in elements} # dict O(1)
self.start_events = [e for e in elements if e.element_type == 'START_EVENT']
def get_element(self, element_id: str) -> ExecutableElement:
return self.elements_by_id.get(element_id)
Pre-build el dict at parse time → cero overhead por lookup.
Memory considerations¶
Per ExecutableProcess in cache:
ExecutableProcess:
bpmn_process_id: str (~32 bytes)
version: int (8 bytes)
elements_by_id: dict
N elements × (
ExecutableElement:
id: str (~32 bytes)
element_type: str (~32 bytes)
... behavior config (~1KB avg)
)
Para típico proceso con 20 elements: ~25KB
Para complex proceso con 100 elements: ~125KB
Average: ~100KB
Cache de 1000 entries = ~100MB. Manageable.
Si proceso has multiple versions cacheado (porque hay instances activas de cada una), each version = separate entry. 5 versions × 100KB = 500KB para ese bpmn_process_id.
Profiling: validate hit rate¶
Métricas a exportar (OTel):
cache_hit_counter = meter.create_counter("process_cache.hits")
cache_miss_counter = meter.create_counter("process_cache.misses")
cache_size_gauge = meter.create_gauge("process_cache.size")
# Update
cache_hit_counter.add(1, {"tenant": tenant_id})
cache_size_gauge.set(len(cache.cache))
Grafana dashboard mostrar hit rate %. Si < 90% en steady state, revisar: - Cache size suficiente? - Eviction pattern problemático? - New deployments demasiado frecuentes?
Cold start¶
Cuando engine starts, cache está vacío. Primeros requests son slower (parse).
Two strategies:
Lazy (default)¶
Cache fills naturally as requests arrive. After ~1 min, hit rate alta.
Pre-warm (opcional)¶
Para latency-sensitive deployments:
async def warm_cache():
"""Pre-load active process definitions."""
active_pds = await db.fetch_all("""
SELECT DISTINCT process_definition_key, tenant_id
FROM process_instances
WHERE state IN ('ACTIVE', 'INCIDENT')
ORDER BY started_at DESC
LIMIT 1000
""")
for row in active_pds:
await cache.get(row['tenant_id'], row['process_definition_key'])
# At startup
await warm_cache()
Alternative: shared cache (Redis)¶
Para múltiples engines, opción es shared cache:
flowchart LR
E1[Engine 1] --> R["Redis<br/>(shared cache)"]
E2[Engine 2] --> R
E3[Engine 3] --> R
R --> P[("Postgres<br/>(definitions)")]
Trade-offs: - Pro: single cache instance, lower aggregate memory - Pro: invalidation atomic via Redis DELETE - Con: network roundtrip per cache hit (~1ms) - Con: serialization overhead (ExecutableProcess → bytes) - Con: dependency adicional
Recomendación MVP: stick with per-engine cache. Local memory is faster que Redis network roundtrip. Coordination via LISTEN/NOTIFY suficiente.
Schema queries optimizadas¶
-- Process definition table
CREATE TABLE process_definitions (
process_definition_key BIGSERIAL PRIMARY KEY,
bpmn_process_id TEXT NOT NULL,
version INT NOT NULL,
tenant_id TEXT NOT NULL,
bpmn_xml TEXT NOT NULL, -- raw XML
parsed_metadata JSONB, -- pre-extracted metadata
deployment_key BIGINT NOT NULL,
deployed_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (bpmn_process_id, version, tenant_id)
);
-- Fast lookup by key
-- (covered by PRIMARY KEY index)
-- Latest version lookup
CREATE INDEX idx_pd_latest_version
ON process_definitions (tenant_id, bpmn_process_id, version DESC);
-- Used in:
-- SELECT ... ORDER BY version DESC LIMIT 1
Storage del BPMN XML¶
¿TEXT o BYTEA o externalize?
| Option | Pros | Cons |
|---|---|---|
TEXT column |
Simple, queryable | Storage overhead per row |
BYTEA (compressed) |
~70% smaller | Decompression overhead |
| External (S3) | Cero DB size impact | Network roundtrip, complexity |
Average BPMN XML: ~10-50KB. Para 10K process definitions: ~100MB-500MB. Manageable in Postgres.
Recomendación: TEXT column. Postgres TOAST compress automaticamente if > 2KB.
Links¶
- concepts/bpmn-execution-model — How BPMN executes
- concepts/deployment-pipeline — Cuándo se invalida cache
- concepts/journal-and-stream-platform — Engine processing
- adrs/adr-002-postgresql-as-state-store — Postgres LISTEN/NOTIFY
- LRU cache theory