Backpressure Rest Strategy
Camunda usa gRPC streaming + StabilizingAIMD para backpressure. El MVP usa REST sin streaming nativo, así que necesita strategy explícita. Solución: rate limiting per-tenant con token bucket, queue depth monitoring del processing loop, HTTP 429 con Retry-After header, whitelisted critical commands (cancel, complete). Targets: 100 TPS sustained sin OOM, recover automáticamente bajo overload.
El problema¶
REST API es inherentemente sin backpressure: - Cliente envía request → server responde - Si server saturado, ¿qué hace? Tres opciones malas: 1. Accept everything → OOM eventualmente 2. Reject everything → cliente experiencia errores 3. Slow down everyone → cascading latency increase
Camunda evita esto con gRPC streaming + StabilizingAIMD (ver concepts/backpressure): - HTTP/2 flow control nativo - Client streams jobs request, server controls flow - AIMD ajusta rate dinámicamente based on success/failure
MVP en REST necesita strategy explícita.
Estrategia: capas defensivas¶
flowchart TD
A[Cliente HTTP request] --> L1["Layer 1: Per-tenant rate limit<br/>(token bucket)<br/>429 Too Many Requests si exceeded"]
L1 -->|within rate limit| L2["Layer 2: Global queue depth check<br/>503 Service Unavailable si overloaded"]
L2 -->|queue space available| L3["Layer 3: Command queue (bounded)<br/>Blocks gracefully si contention temporal"]
L3 --> E["Engine processes command<br/>(single-threaded)"]
Layer 1: Per-tenant rate limiting¶
Token bucket implementation¶
class TokenBucket:
def __init__(self, rate_per_sec, burst):
self.rate = rate_per_sec
self.burst = burst
self.tokens = burst
self.last_refill = time.time()
self.lock = asyncio.Lock()
async def consume(self, n=1):
async with self.lock:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= n:
self.tokens -= n
return True
return False
# Per-tenant bucket
class TenantRateLimiter:
def __init__(self, default_rate=100, default_burst=200):
self.buckets = {} # tenant_id → TokenBucket
self.default_rate = default_rate
self.default_burst = default_burst
async def check(self, tenant_id):
if tenant_id not in self.buckets:
self.buckets[tenant_id] = TokenBucket(
self.default_rate, self.default_burst
)
return await self.buckets[tenant_id].consume(1)
Per-tenant config (Postgres)¶
CREATE TABLE tenant_rate_limits (
tenant_id TEXT PRIMARY KEY,
requests_per_second INT NOT NULL DEFAULT 100,
burst_size INT NOT NULL DEFAULT 200,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Defaults (free tier)
INSERT INTO tenant_rate_limits (tenant_id, requests_per_second, burst_size)
SELECT tenant_id, 50, 100 FROM tenants WHERE tier = 'free';
-- Premium
INSERT INTO tenant_rate_limits (tenant_id, requests_per_second, burst_size)
SELECT tenant_id, 1000, 2000 FROM tenants WHERE tier = 'premium';
Middleware¶
@app.middleware("http")
async def rate_limit_middleware(request, call_next):
tenant_id = extract_tenant_from_auth(request)
if not await rate_limiter.check(tenant_id):
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded"},
headers={
"Retry-After": "1",
"X-RateLimit-Reset": str(int(time.time()) + 1)
}
)
return await call_next(request)
Layer 2: Global queue depth monitoring¶
El processing loop tiene una queue interna. Si crece much, el engine está saturado.
class EngineMonitor:
def __init__(self, max_queue_depth=10000):
self.max_queue_depth = max_queue_depth
self.current_depth = 0
async def can_accept_command(self) -> bool:
return self.current_depth < self.max_queue_depth
async def queue_depth_percentage(self) -> float:
return self.current_depth / self.max_queue_depth
# Middleware
@app.middleware("http")
async def queue_depth_middleware(request, call_next):
if request.url.path.startswith("/v2/"):
if not await engine_monitor.can_accept_command():
return JSONResponse(
status_code=503,
content={"error": "Engine overloaded, retry shortly"},
headers={"Retry-After": "5"}
)
return await call_next(request)
Métricas para visibility¶
queue_depth_gauge = meter.create_gauge("engine.queue.depth")
queue_depth_pct_gauge = meter.create_gauge("engine.queue.depth.percent")
# Update periodically
async def report_queue_metrics():
while True:
queue_depth_gauge.set(engine_monitor.current_depth)
queue_depth_pct_gauge.set(await engine_monitor.queue_depth_percentage())
await asyncio.sleep(1)
Alert config:
- alert: EngineQueueHigh
expr: engine_queue_depth_percent > 0.8
for: 1m
annotations:
summary: "Engine queue at {{ $value }}% capacity"
Layer 3: Bounded command queue¶
El processing loop usa bounded queue — si llena, REST handler espera (graceful slowdown) o eventually 503.
class BoundedCommandQueue:
def __init__(self, max_size=10000):
self.queue = asyncio.Queue(maxsize=max_size)
async def enqueue(self, command, timeout=5.0):
try:
await asyncio.wait_for(
self.queue.put(command),
timeout=timeout
)
except asyncio.TimeoutError:
raise EngineSaturatedError("Queue full, request timeout")
async def dequeue(self):
return await self.queue.get()
# In handler
async def handle_create_instance(request):
cmd = build_command(request)
try:
await engine.command_queue.enqueue(cmd, timeout=5.0)
except EngineSaturatedError:
return Response(status=503, headers={"Retry-After": "5"})
# Wait for completion (async return ID)
return {"processInstanceKey": cmd.id}
Whitelisted critical commands¶
Camunda whitelist: cancel, complete, throwError NUNCA bloqueados (evita deadlocks donde sistema no puede progresar).
MVP equivalent:
WHITELISTED_INTENTS = {
'JOB:COMPLETE',
'JOB:FAIL',
'JOB:THROW_ERROR',
'PROCESS_INSTANCE:CANCEL',
'INCIDENT:RESOLVE',
}
async def rate_limit_middleware(request, call_next):
intent = extract_intent_from_request(request)
# Whitelist bypass
if intent in WHITELISTED_INTENTS:
return await call_next(request)
# Normal rate limit
if not await rate_limiter.check(tenant_id):
return JSONResponse(status_code=429, ...)
return await call_next(request)
Razón: si el sistema está overloaded de CreateInstance, no puede recovery sin permitir cancels/completes que liberan resources.
HTTP 429 vs 503 — cuándo cada uno¶
| Status | Significado | Retry-After |
|---|---|---|
| 429 Too Many Requests | Cliente excedió SU rate limit | Yes (1-60s) |
| 503 Service Unavailable | Server overloaded (todos) | Yes (5-30s) |
Cliente puede tratarlos diferente: - 429 → "I'm sending too fast, slow down" - 503 → "Server problem, retry shortly with backoff"
Adaptive backpressure (Phase 2)¶
Camunda's StabilizingAIMD ajusta dinámicamente. MVP puede hacer simpler:
class AdaptiveRateLimiter:
def __init__(self, base_rate=100):
self.base_rate = base_rate
self.current_rate = base_rate
self.recent_p99 = []
def record_latency(self, latency_ms):
self.recent_p99.append(latency_ms)
if len(self.recent_p99) > 100:
self.recent_p99.pop(0)
# If p99 > 1s, reduce rate
if percentile(self.recent_p99, 99) > 1000:
self.current_rate = max(10, self.current_rate * 0.9)
# If p99 < 200ms, increase
elif percentile(self.recent_p99, 99) < 200:
self.current_rate = min(self.base_rate, self.current_rate * 1.05)
Lazy implementation: review periodically based on metrics.
Cliente-side backoff¶
Document para SDK:
async function createInstanceWithRetry(client, params, maxRetries = 5) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await client.processInstances.create(params);
} catch (err) {
if (err.statusCode === 429 || err.statusCode === 503) {
const retryAfter = parseInt(err.headers['retry-after']) || 1;
const jitter = Math.random() * 0.5;
await sleep(retryAfter * 1000 * (1 + jitter));
continue;
}
throw err;
}
}
throw new Error('Max retries exceeded');
}
Built into SDK por default — users no need to think about it.
Capacity planning¶
Para target 100 TPS sustained, con safety margin 5x:
Peak target: 500 TPS
Per-tenant rate: 100 req/s (most tenants)
Burst: 200 (handles spikes)
Global queue: 10K commands buffered (~20s headroom)
Workers: 10-20 (concurrent processing within engine)
Si grows beyond: - Scale per-tenant limits (Phase 2 dynamic) - Add engine instances (Phase 2 active-active per ADR-003) - Eventually partition by tenant (Phase 3)
Testing¶
Load test scenarios:
# k6 scenario
scenarios:
rate_limit_test:
executor: constant-arrival-rate
rate: 150 # exceeds 100/s tenant limit
timeUnit: 1s
duration: 1m
preAllocatedVUs: 50
overload_test:
executor: ramping-arrival-rate
startRate: 100
timeUnit: 1s
stages:
- duration: 1m, target: 500 # ramp up
- duration: 5m, target: 500 # sustain (should saturate)
- duration: 1m, target: 100 # ramp down (should recover)
Validar: - Rate limit returns 429 properly - Overload returns 503, doesn't crash - After load drops, system recovers - TP99 doesn't tank during overload (rate limiting helps) - No memory leak (queue bounded works)
Links¶
- concepts/backpressure — Camunda's approach (full)
- adrs/adr-002-postgresql-as-state-store — Foundation
- concepts/api-engine-serialization — Cómo se procesa la queue
- Token bucket algorithm