Saltar a contenido

Backpressure Rest Strategy

Camunda usa gRPC streaming + StabilizingAIMD para backpressure. El MVP usa REST sin streaming nativo, así que necesita strategy explícita. Solución: rate limiting per-tenant con token bucket, queue depth monitoring del processing loop, HTTP 429 con Retry-After header, whitelisted critical commands (cancel, complete). Targets: 100 TPS sustained sin OOM, recover automáticamente bajo overload.

El problema

REST API es inherentemente sin backpressure: - Cliente envía request → server responde - Si server saturado, ¿qué hace? Tres opciones malas: 1. Accept everything → OOM eventualmente 2. Reject everything → cliente experiencia errores 3. Slow down everyone → cascading latency increase

Camunda evita esto con gRPC streaming + StabilizingAIMD (ver concepts/backpressure): - HTTP/2 flow control nativo - Client streams jobs request, server controls flow - AIMD ajusta rate dinámicamente based on success/failure

MVP en REST necesita strategy explícita.

Estrategia: capas defensivas

flowchart TD
    A[Cliente HTTP request] --> L1["Layer 1: Per-tenant rate limit<br/>(token bucket)<br/>429 Too Many Requests si exceeded"]
    L1 -->|within rate limit| L2["Layer 2: Global queue depth check<br/>503 Service Unavailable si overloaded"]
    L2 -->|queue space available| L3["Layer 3: Command queue (bounded)<br/>Blocks gracefully si contention temporal"]
    L3 --> E["Engine processes command<br/>(single-threaded)"]

Layer 1: Per-tenant rate limiting

Token bucket implementation

class TokenBucket:
    def __init__(self, rate_per_sec, burst):
        self.rate = rate_per_sec
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.time()
        self.lock = asyncio.Lock()

    async def consume(self, n=1):
        async with self.lock:
            now = time.time()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now

            if self.tokens >= n:
                self.tokens -= n
                return True
            return False

# Per-tenant bucket
class TenantRateLimiter:
    def __init__(self, default_rate=100, default_burst=200):
        self.buckets = {}  # tenant_id → TokenBucket
        self.default_rate = default_rate
        self.default_burst = default_burst

    async def check(self, tenant_id):
        if tenant_id not in self.buckets:
            self.buckets[tenant_id] = TokenBucket(
                self.default_rate, self.default_burst
            )
        return await self.buckets[tenant_id].consume(1)

Per-tenant config (Postgres)

CREATE TABLE tenant_rate_limits (
    tenant_id TEXT PRIMARY KEY,
    requests_per_second INT NOT NULL DEFAULT 100,
    burst_size INT NOT NULL DEFAULT 200,
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Defaults (free tier)
INSERT INTO tenant_rate_limits (tenant_id, requests_per_second, burst_size)
SELECT tenant_id, 50, 100 FROM tenants WHERE tier = 'free';

-- Premium
INSERT INTO tenant_rate_limits (tenant_id, requests_per_second, burst_size)
SELECT tenant_id, 1000, 2000 FROM tenants WHERE tier = 'premium';

Middleware

@app.middleware("http")
async def rate_limit_middleware(request, call_next):
    tenant_id = extract_tenant_from_auth(request)

    if not await rate_limiter.check(tenant_id):
        return JSONResponse(
            status_code=429,
            content={"error": "Rate limit exceeded"},
            headers={
                "Retry-After": "1",
                "X-RateLimit-Reset": str(int(time.time()) + 1)
            }
        )

    return await call_next(request)

Layer 2: Global queue depth monitoring

El processing loop tiene una queue interna. Si crece much, el engine está saturado.

class EngineMonitor:
    def __init__(self, max_queue_depth=10000):
        self.max_queue_depth = max_queue_depth
        self.current_depth = 0

    async def can_accept_command(self) -> bool:
        return self.current_depth < self.max_queue_depth

    async def queue_depth_percentage(self) -> float:
        return self.current_depth / self.max_queue_depth

# Middleware
@app.middleware("http")
async def queue_depth_middleware(request, call_next):
    if request.url.path.startswith("/v2/"):
        if not await engine_monitor.can_accept_command():
            return JSONResponse(
                status_code=503,
                content={"error": "Engine overloaded, retry shortly"},
                headers={"Retry-After": "5"}
            )
    return await call_next(request)

Métricas para visibility

queue_depth_gauge = meter.create_gauge("engine.queue.depth")
queue_depth_pct_gauge = meter.create_gauge("engine.queue.depth.percent")

# Update periodically
async def report_queue_metrics():
    while True:
        queue_depth_gauge.set(engine_monitor.current_depth)
        queue_depth_pct_gauge.set(await engine_monitor.queue_depth_percentage())
        await asyncio.sleep(1)

Alert config:

- alert: EngineQueueHigh
  expr: engine_queue_depth_percent > 0.8
  for: 1m
  annotations:
    summary: "Engine queue at {{ $value }}% capacity"

Layer 3: Bounded command queue

El processing loop usa bounded queue — si llena, REST handler espera (graceful slowdown) o eventually 503.

class BoundedCommandQueue:
    def __init__(self, max_size=10000):
        self.queue = asyncio.Queue(maxsize=max_size)

    async def enqueue(self, command, timeout=5.0):
        try:
            await asyncio.wait_for(
                self.queue.put(command),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            raise EngineSaturatedError("Queue full, request timeout")

    async def dequeue(self):
        return await self.queue.get()

# In handler
async def handle_create_instance(request):
    cmd = build_command(request)
    try:
        await engine.command_queue.enqueue(cmd, timeout=5.0)
    except EngineSaturatedError:
        return Response(status=503, headers={"Retry-After": "5"})

    # Wait for completion (async return ID)
    return {"processInstanceKey": cmd.id}

Whitelisted critical commands

Camunda whitelist: cancel, complete, throwError NUNCA bloqueados (evita deadlocks donde sistema no puede progresar).

MVP equivalent:

WHITELISTED_INTENTS = {
    'JOB:COMPLETE',
    'JOB:FAIL',
    'JOB:THROW_ERROR',
    'PROCESS_INSTANCE:CANCEL',
    'INCIDENT:RESOLVE',
}

async def rate_limit_middleware(request, call_next):
    intent = extract_intent_from_request(request)

    # Whitelist bypass
    if intent in WHITELISTED_INTENTS:
        return await call_next(request)

    # Normal rate limit
    if not await rate_limiter.check(tenant_id):
        return JSONResponse(status_code=429, ...)

    return await call_next(request)

Razón: si el sistema está overloaded de CreateInstance, no puede recovery sin permitir cancels/completes que liberan resources.

HTTP 429 vs 503 — cuándo cada uno

Status Significado Retry-After
429 Too Many Requests Cliente excedió SU rate limit Yes (1-60s)
503 Service Unavailable Server overloaded (todos) Yes (5-30s)

Cliente puede tratarlos diferente: - 429 → "I'm sending too fast, slow down" - 503 → "Server problem, retry shortly with backoff"

Adaptive backpressure (Phase 2)

Camunda's StabilizingAIMD ajusta dinámicamente. MVP puede hacer simpler:

class AdaptiveRateLimiter:
    def __init__(self, base_rate=100):
        self.base_rate = base_rate
        self.current_rate = base_rate
        self.recent_p99 = []

    def record_latency(self, latency_ms):
        self.recent_p99.append(latency_ms)
        if len(self.recent_p99) > 100:
            self.recent_p99.pop(0)

        # If p99 > 1s, reduce rate
        if percentile(self.recent_p99, 99) > 1000:
            self.current_rate = max(10, self.current_rate * 0.9)
        # If p99 < 200ms, increase
        elif percentile(self.recent_p99, 99) < 200:
            self.current_rate = min(self.base_rate, self.current_rate * 1.05)

Lazy implementation: review periodically based on metrics.

Cliente-side backoff

Document para SDK:

async function createInstanceWithRetry(client, params, maxRetries = 5) {
  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      return await client.processInstances.create(params);
    } catch (err) {
      if (err.statusCode === 429 || err.statusCode === 503) {
        const retryAfter = parseInt(err.headers['retry-after']) || 1;
        const jitter = Math.random() * 0.5;
        await sleep(retryAfter * 1000 * (1 + jitter));
        continue;
      }
      throw err;
    }
  }
  throw new Error('Max retries exceeded');
}

Built into SDK por default — users no need to think about it.

Capacity planning

Para target 100 TPS sustained, con safety margin 5x:

Peak target: 500 TPS
Per-tenant rate: 100 req/s (most tenants)
Burst: 200 (handles spikes)
Global queue: 10K commands buffered (~20s headroom)
Workers: 10-20 (concurrent processing within engine)

Si grows beyond: - Scale per-tenant limits (Phase 2 dynamic) - Add engine instances (Phase 2 active-active per ADR-003) - Eventually partition by tenant (Phase 3)

Testing

Load test scenarios:

# k6 scenario
scenarios:
  rate_limit_test:
    executor: constant-arrival-rate
    rate: 150       # exceeds 100/s tenant limit
    timeUnit: 1s
    duration: 1m
    preAllocatedVUs: 50

  overload_test:
    executor: ramping-arrival-rate
    startRate: 100
    timeUnit: 1s
    stages:
      - duration: 1m, target: 500    # ramp up
      - duration: 5m, target: 500    # sustain (should saturate)
      - duration: 1m, target: 100    # ramp down (should recover)

Validar: - Rate limit returns 429 properly - Overload returns 503, doesn't crash - After load drops, system recovers - TP99 doesn't tank during overload (rate limiting helps) - No memory leak (queue bounded works)