Job Queue Fairness
Workers distribuyen jobs entre N tenants y M process types. Sin fair scheduling, un tenant dominante starves a otros. Estrategia MVP: weighted fair queueing per tenant + priority levels respected per job + per-job-type concurrency limits + claim-based activation con randomization. Implementación via Postgres SELECT FOR UPDATE SKIP LOCKED. Prevents starvation, respects priority, scales to thousands of workers.
El problema¶
Workers piden jobs via:
Engine debe responder con N jobs activos. Pero:
- Múltiples tenants: tenant A tiene 10000 jobs, tenant B tiene 5. ¿Cómo dividir?
- Múltiples job types: workers de tipo X piden jobs. Jobs de prioridad alta deben ir primero.
- Worker poaching: dos workers piden simultaneously. ¿Ambos reciben el mismo job?
- Starvation: si tenant A dominante, tenant B nunca recibe jobs.
- Long-running jobs: blocking workers de quick jobs.
Camunda usa gRPC streaming + complex scheduling. MVP en REST necesita strategy más simple but effective.
Estrategia: Weighted Fair Queueing + SKIP LOCKED¶
Schema¶
CREATE TABLE jobs (
job_key BIGSERIAL PRIMARY KEY,
process_instance_key BIGINT NOT NULL,
element_instance_key BIGINT NOT NULL,
tenant_id TEXT NOT NULL,
-- Job config
job_type TEXT NOT NULL,
retries INT NOT NULL DEFAULT 3,
priority INT NOT NULL DEFAULT 50, -- 0-100, higher first
-- Variables
variables JSONB,
custom_headers JSONB,
-- State
state TEXT NOT NULL, -- CREATED | ACTIVATED | COMPLETED | FAILED
activation_count INT NOT NULL DEFAULT 0,
-- Activation tracking
activated_at TIMESTAMPTZ,
activated_by TEXT, -- worker id
timeout_at TIMESTAMPTZ, -- when activation expires
-- Errors
error_message TEXT,
error_code TEXT,
created_at TIMESTAMPTZ DEFAULT NOW() NOT NULL,
completed_at TIMESTAMPTZ
);
-- Critical indexes for activation queries
CREATE INDEX idx_jobs_activatable
ON jobs (job_type, priority DESC, created_at ASC)
WHERE state = 'CREATED' AND timeout_at IS NULL;
CREATE INDEX idx_jobs_timeout
ON jobs (timeout_at)
WHERE state = 'ACTIVATED';
CREATE INDEX idx_jobs_tenant_type ON jobs(tenant_id, job_type);
Algorithm: weighted fair activation¶
Step 1: Per-tenant weights¶
CREATE TABLE tenant_job_weights (
tenant_id TEXT PRIMARY KEY,
weight INT NOT NULL DEFAULT 1, -- relative weight
max_concurrent_jobs INT DEFAULT 1000 -- cap on activated jobs
);
-- Examples
INSERT INTO tenant_job_weights VALUES
('free-tier-a', 1, 50), -- low weight, low cap
('premium-acme', 10, 5000), -- 10x weight, high cap
('enterprise-bank', 50, 50000); -- big customer, lots of capacity
Step 2: Activation query¶
-- Activate N jobs of given type for worker
WITH ranked_jobs AS (
SELECT
j.job_key,
j.tenant_id,
j.priority,
-- Per-tenant ranking weighted
ROW_NUMBER() OVER (
PARTITION BY j.tenant_id
ORDER BY j.priority DESC, j.created_at ASC
) AS tenant_rank,
-- Tenant weight influences global ordering
tw.weight,
-- Random tiebreaker to prevent thundering herd
random() AS jitter
FROM jobs j
JOIN tenant_job_weights tw ON tw.tenant_id = j.tenant_id
WHERE j.state = 'CREATED'
AND j.job_type = $1 -- e.g., 'send-email'
AND j.timeout_at IS NULL
-- Honor per-tenant concurrency cap
AND (
SELECT COUNT(*) FROM jobs
WHERE tenant_id = j.tenant_id
AND state = 'ACTIVATED'
) < tw.max_concurrent_jobs
)
SELECT job_key
FROM ranked_jobs
ORDER BY
-- Within tenant: priority order
tenant_rank ASC,
-- Between tenants: weighted random pick
-- Higher weight = more likely to be activated first
-tw.weight + jitter,
priority DESC
LIMIT $2; -- maxJobs from worker request
Complex pero handle multiple constraints simultaneously.
Step 3: Atomic activation via SKIP LOCKED¶
-- The actual activation: lock + update + return
WITH selected AS (
-- Use ranked_jobs CTE from above
SELECT job_key FROM ranked_jobs
ORDER BY ...
LIMIT $2
FOR UPDATE SKIP LOCKED -- KEY: skip rows locked by other workers
)
UPDATE jobs
SET
state = 'ACTIVATED',
activated_at = NOW(),
timeout_at = NOW() + ($3 || ' milliseconds')::INTERVAL,
activated_by = $4,
activation_count = activation_count + 1
WHERE job_key IN (SELECT job_key FROM selected)
RETURNING *;
SKIP LOCKED is critical: - Two workers query simultaneously - Both target same rows - One locks rows, second sees "locked", skips them - No deadlock, no contention - Each worker gets disjoint set of jobs
Why SKIP LOCKED matters¶
Without SKIP LOCKED, two scenarios:
Without FOR UPDATE (race condition)¶
Worker A: SELECT jobs ... LIMIT 10 → finds jobs 1-10
Worker B: SELECT jobs ... LIMIT 10 → finds jobs 1-10 (same)
Worker A: UPDATE jobs SET state='ACTIVATED' WHERE key IN (1..10)
Worker B: UPDATE jobs SET state='ACTIVATED' WHERE key IN (1..10)
→ Both workers think they own jobs 1-10
→ Duplicate processing (depends on engine handling)
With FOR UPDATE (blocking)¶
Worker A: SELECT ... FOR UPDATE → locks jobs 1-10
Worker B: SELECT ... FOR UPDATE → BLOCKS waiting on lock
Worker A: completes activation, releases lock
Worker B: now gets jobs 11-20 (assuming Worker A took 1-10)
→ Correct but serialized, slow
With FOR UPDATE SKIP LOCKED (best)¶
Worker A: SELECT ... FOR UPDATE SKIP LOCKED → locks jobs 1-10
Worker B: SELECT ... FOR UPDATE SKIP LOCKED → skips 1-10, locks jobs 11-20
→ Both workers proceed in parallel
→ No blocking, no duplicates
This is the standard pattern for queue systems on relational DBs. Used by Sidekiq, Celery, etc.
Priority handling¶
BPMN soporta priority en job definitions:
<bpmn:serviceTask id="urgent-call">
<bpmn:extensionElements>
<zeebe:taskDefinition type="phone-call" />
<zeebe:priority level="90" />
</bpmn:extensionElements>
</bpmn:serviceTask>
Engine stores priority en jobs table. Activation query orders por priority DESC primero.
Within same priority: FIFO (created_at ASC).
Concurrency limits per job type¶
Sometimes need to limit concurrent execution of a job type: - "Send-email" can run 100 concurrent (rate limit por SMTP) - "Generate-pdf" puede run 10 concurrent (CPU-heavy)
CREATE TABLE job_type_concurrency (
job_type TEXT PRIMARY KEY,
max_concurrent INT NOT NULL
);
INSERT INTO job_type_concurrency VALUES
('send-email', 100),
('generate-pdf', 10),
('call-slow-api', 5);
Activation query checks:
WHERE ... AND (
SELECT COUNT(*) FROM jobs
WHERE job_type = j.job_type
AND state = 'ACTIVATED'
) < (SELECT max_concurrent FROM job_type_concurrency WHERE job_type = j.job_type)
Preventing starvation¶
Risk: tenant A dominante always sends jobs first → tenant B never gets activated.
Multiple safeguards:
1. Per-tenant max_concurrent_jobs¶
Hard cap on activated jobs per tenant. Forces tenant A to wait until completions occur.
2. Weighted random selection¶
Even if tenant A has 10x weight, tenant B sometimes gets pick first due to random jitter.
3. Anti-starvation timer¶
Job created > N minutes ago → priority boost:
SELECT job_key,
priority + LEAST(20, EXTRACT(EPOCH FROM (NOW() - created_at)) / 60) AS effective_priority
FROM jobs
WHERE state = 'CREATED'
ORDER BY effective_priority DESC
After 20 minutes waiting, jobs get +20 priority. Eventually anything gets activated.
4. Round-robin tie-breaker¶
Track last-activated tenant. Next activation prefers different tenant.
CREATE TABLE worker_state (
instance_id TEXT PRIMARY KEY,
last_activated_tenant TEXT,
last_activation_time TIMESTAMPTZ
);
-- Bias query
ORDER BY
CASE WHEN tenant_id = (SELECT last_activated_tenant ...) THEN 1 ELSE 0 END,
...
Job timeout handling¶
Cuando activation timeout expires sin completion:
-- Background sweeper (every 10s)
UPDATE jobs
SET
state = 'CREATED', -- back to activatable
activated_at = NULL,
timeout_at = NULL,
activated_by = NULL
WHERE state = 'ACTIVATED'
AND timeout_at < NOW();
Job becomes activatable again. Original worker (if still alive) may try to complete — engine rejects via activation_count mismatch:
-- Worker tries to complete
UPDATE jobs
SET state = 'COMPLETED', completed_at = NOW()
WHERE job_key = $1
AND activation_count = $2 -- worker remembers its count
AND state = 'ACTIVATED'
RETURNING *;
-- If returns 0 rows: stale completion, ignore
Prevents duplicate processing race (per F13 mitigation).
Worker activation flow¶
Worker startup:
1. Connect to engine
2. Subscribe to job_type 'send-email'
Worker loop:
3. POST /v2/jobs/activate { type: 'send-email', maxJobs: 10, timeout: 30000 }
4. Engine runs activation query
5. Engine returns N jobs (could be 0 if none)
6. Worker processes each job
7. For each: POST /v2/jobs/{key}/complete or /fail
8. Worker waits short time (long polling)
9. Back to step 3
Long polling¶
Si no jobs available, server holds request for N seconds (waiting for new jobs):
async def activate_jobs(request):
job_type = request['type']
max_jobs = request['maxJobs']
timeout = request.get('longPollTimeout', 30) # seconds
deadline = time.time() + timeout
while time.time() < deadline:
jobs = await activate_jobs_atomic(job_type, max_jobs)
if jobs:
return jobs
# Wait for notification or short timeout
try:
await wait_for_new_job_signal(job_type, timeout=2)
except TimeoutError:
pass
return [] # no jobs, worker can retry
Notification via Postgres LISTEN/NOTIFY:
# When new job created
await db.execute(f"NOTIFY new_job_{job_type}, ''")
# Workers listening
async with db.acquire() as conn:
await conn.add_listener(f'new_job_{job_type}', on_new_job)
Eliminates polling overhead. Workers idle until work appears.
Streaming (Phase 2+)¶
Camunda's gRPC streaming activates jobs as they appear, without polling. MVP REST can approximate via SSE:
GET /v2/jobs/stream?type=send-email&maxJobs=10
Accept: text/event-stream
# Server streams events as jobs become available
event: job
data: {"jobKey": 123, "variables": {...}}
event: job
data: {"jobKey": 124, "variables": {...}}
Server pushes jobs as available. Worker complete via separate POST. Eliminates long-poll round trips.
Metrics¶
jobs_activated_counter = meter.create_counter("jobs.activated")
jobs_activated_counter.add(1, {"job_type": type, "tenant": tenant})
jobs_completed_counter = meter.create_counter("jobs.completed")
jobs_completed_counter.add(1, {"job_type": type, "tenant": tenant, "outcome": "success"})
jobs_failed_counter = meter.create_counter("jobs.failed")
jobs_active_gauge = meter.create_gauge("jobs.active")
jobs_pending_gauge = meter.create_gauge("jobs.pending")
job_activation_latency = meter.create_histogram("jobs.activation.latency")
job_processing_duration = meter.create_histogram("jobs.processing.duration")
Per-tenant + per-job-type. Visualizable en Grafana.
Performance characteristics¶
For target 1000 jobs/sec activation:
| Query | Latency | Notes |
|---|---|---|
INSERT INTO jobs |
~1ms | When job created |
| Activation query (above) | ~5-10ms | With proper indexes |
| Completion UPDATE | ~1ms | Primary key lookup |
| Timeout sweep | ~10-50ms every 10s | Background |
Postgres handles 1000 jobs/sec easily. Bottleneck es likely workers themselves, not the DB.
For higher throughput (Phase 3+): - Partition jobs table by tenant (Citus) - Move activation logic into stored procedure for atomicity - Job index on tenant for partition pruning
Comparison vs Camunda¶
| Aspect | Camunda gRPC stream | MVP Postgres + SKIP LOCKED |
|---|---|---|
| Activation latency | < 1ms (streaming) | ~5-10ms (REST + DB) |
| Worker setup | gRPC connection | HTTP client (simpler) |
| Backpressure | HTTP/2 flow control | Long polling + concurrent limits |
| Fairness | Custom scheduler | Weighted query |
| Throughput | 7000+ TPS (Intuit) | ~1000 TPS Phase 0, scale Phase 2+ |
| Implementation complexity | High | Low (SQL only) |
Trade-off: marginal latency hit, massive simplicity.
Testing fairness¶
async def test_no_starvation():
"""Tenant B gets jobs even if tenant A dominant."""
# Create 1000 jobs for tenant A
for _ in range(1000):
await create_job(tenant='a', job_type='send-email')
# Create 10 jobs for tenant B
for _ in range(10):
await create_job(tenant='b', job_type='send-email')
# Workers activate over time
activations = []
for _ in range(20):
jobs = await activate_jobs('send-email', maxJobs=10)
activations.extend(jobs)
# Tenant B's jobs should have been activated within 20 rounds
tenant_b_jobs = [j for j in activations if j.tenant == 'b']
assert len(tenant_b_jobs) >= 5, "Tenant B starved!"
Links¶
- concepts/job-worker-pattern — Pattern
- concepts/api-engine-serialization — Sync point con commands
- adrs/adr-007-at-least-once-idempotent-workers — Worker requirements
- analysis/failure-mode-analysis — F13-F15 worker failures
- PostgreSQL SKIP LOCKED docs