Saltar a contenido

Job Queue Fairness

Workers distribuyen jobs entre N tenants y M process types. Sin fair scheduling, un tenant dominante starves a otros. Estrategia MVP: weighted fair queueing per tenant + priority levels respected per job + per-job-type concurrency limits + claim-based activation con randomization. Implementación via Postgres SELECT FOR UPDATE SKIP LOCKED. Prevents starvation, respects priority, scales to thousands of workers.

El problema

Workers piden jobs via:

POST /v2/jobs/activate
{
  "type": "send-email",
  "maxJobs": 10,
  "timeout": 30000
}

Engine debe responder con N jobs activos. Pero:

  1. Múltiples tenants: tenant A tiene 10000 jobs, tenant B tiene 5. ¿Cómo dividir?
  2. Múltiples job types: workers de tipo X piden jobs. Jobs de prioridad alta deben ir primero.
  3. Worker poaching: dos workers piden simultaneously. ¿Ambos reciben el mismo job?
  4. Starvation: si tenant A dominante, tenant B nunca recibe jobs.
  5. Long-running jobs: blocking workers de quick jobs.

Camunda usa gRPC streaming + complex scheduling. MVP en REST necesita strategy más simple but effective.

Estrategia: Weighted Fair Queueing + SKIP LOCKED

Schema

CREATE TABLE jobs (
    job_key BIGSERIAL PRIMARY KEY,
    process_instance_key BIGINT NOT NULL,
    element_instance_key BIGINT NOT NULL,
    tenant_id TEXT NOT NULL,

    -- Job config
    job_type TEXT NOT NULL,
    retries INT NOT NULL DEFAULT 3,
    priority INT NOT NULL DEFAULT 50,           -- 0-100, higher first

    -- Variables
    variables JSONB,
    custom_headers JSONB,

    -- State
    state TEXT NOT NULL,                         -- CREATED | ACTIVATED | COMPLETED | FAILED
    activation_count INT NOT NULL DEFAULT 0,

    -- Activation tracking
    activated_at TIMESTAMPTZ,
    activated_by TEXT,                           -- worker id
    timeout_at TIMESTAMPTZ,                      -- when activation expires

    -- Errors
    error_message TEXT,
    error_code TEXT,

    created_at TIMESTAMPTZ DEFAULT NOW() NOT NULL,
    completed_at TIMESTAMPTZ
);

-- Critical indexes for activation queries
CREATE INDEX idx_jobs_activatable 
ON jobs (job_type, priority DESC, created_at ASC) 
WHERE state = 'CREATED' AND timeout_at IS NULL;

CREATE INDEX idx_jobs_timeout
ON jobs (timeout_at)
WHERE state = 'ACTIVATED';

CREATE INDEX idx_jobs_tenant_type ON jobs(tenant_id, job_type);

Algorithm: weighted fair activation

Step 1: Per-tenant weights

CREATE TABLE tenant_job_weights (
    tenant_id TEXT PRIMARY KEY,
    weight INT NOT NULL DEFAULT 1,        -- relative weight
    max_concurrent_jobs INT DEFAULT 1000  -- cap on activated jobs
);

-- Examples
INSERT INTO tenant_job_weights VALUES
    ('free-tier-a', 1, 50),       -- low weight, low cap
    ('premium-acme', 10, 5000),   -- 10x weight, high cap
    ('enterprise-bank', 50, 50000); -- big customer, lots of capacity

Step 2: Activation query

-- Activate N jobs of given type for worker
WITH ranked_jobs AS (
    SELECT 
        j.job_key,
        j.tenant_id,
        j.priority,
        -- Per-tenant ranking weighted
        ROW_NUMBER() OVER (
            PARTITION BY j.tenant_id 
            ORDER BY j.priority DESC, j.created_at ASC
        ) AS tenant_rank,
        -- Tenant weight influences global ordering
        tw.weight,
        -- Random tiebreaker to prevent thundering herd
        random() AS jitter
    FROM jobs j
    JOIN tenant_job_weights tw ON tw.tenant_id = j.tenant_id
    WHERE j.state = 'CREATED'
      AND j.job_type = $1                 -- e.g., 'send-email'
      AND j.timeout_at IS NULL
      -- Honor per-tenant concurrency cap
      AND (
          SELECT COUNT(*) FROM jobs 
          WHERE tenant_id = j.tenant_id 
            AND state = 'ACTIVATED'
      ) < tw.max_concurrent_jobs
)
SELECT job_key 
FROM ranked_jobs
ORDER BY 
    -- Within tenant: priority order
    tenant_rank ASC,
    -- Between tenants: weighted random pick
    -- Higher weight = more likely to be activated first
    -tw.weight + jitter,
    priority DESC
LIMIT $2;  -- maxJobs from worker request

Complex pero handle multiple constraints simultaneously.

Step 3: Atomic activation via SKIP LOCKED

-- The actual activation: lock + update + return
WITH selected AS (
    -- Use ranked_jobs CTE from above
    SELECT job_key FROM ranked_jobs
    ORDER BY ... 
    LIMIT $2
    FOR UPDATE SKIP LOCKED              -- KEY: skip rows locked by other workers
)
UPDATE jobs 
SET 
    state = 'ACTIVATED',
    activated_at = NOW(),
    timeout_at = NOW() + ($3 || ' milliseconds')::INTERVAL,
    activated_by = $4,
    activation_count = activation_count + 1
WHERE job_key IN (SELECT job_key FROM selected)
RETURNING *;

SKIP LOCKED is critical: - Two workers query simultaneously - Both target same rows - One locks rows, second sees "locked", skips them - No deadlock, no contention - Each worker gets disjoint set of jobs

Why SKIP LOCKED matters

Without SKIP LOCKED, two scenarios:

Without FOR UPDATE (race condition)

Worker A: SELECT jobs ... LIMIT 10 → finds jobs 1-10
Worker B: SELECT jobs ... LIMIT 10 → finds jobs 1-10 (same)
Worker A: UPDATE jobs SET state='ACTIVATED' WHERE key IN (1..10)
Worker B: UPDATE jobs SET state='ACTIVATED' WHERE key IN (1..10)
  → Both workers think they own jobs 1-10
  → Duplicate processing (depends on engine handling)

With FOR UPDATE (blocking)

Worker A: SELECT ... FOR UPDATE → locks jobs 1-10
Worker B: SELECT ... FOR UPDATE → BLOCKS waiting on lock
Worker A: completes activation, releases lock
Worker B: now gets jobs 11-20 (assuming Worker A took 1-10)
  → Correct but serialized, slow

With FOR UPDATE SKIP LOCKED (best)

Worker A: SELECT ... FOR UPDATE SKIP LOCKED → locks jobs 1-10
Worker B: SELECT ... FOR UPDATE SKIP LOCKED → skips 1-10, locks jobs 11-20
  → Both workers proceed in parallel
  → No blocking, no duplicates

This is the standard pattern for queue systems on relational DBs. Used by Sidekiq, Celery, etc.

Priority handling

BPMN soporta priority en job definitions:

<bpmn:serviceTask id="urgent-call">
  <bpmn:extensionElements>
    <zeebe:taskDefinition type="phone-call" />
    <zeebe:priority level="90" />
  </bpmn:extensionElements>
</bpmn:serviceTask>

Engine stores priority en jobs table. Activation query orders por priority DESC primero.

Within same priority: FIFO (created_at ASC).

Concurrency limits per job type

Sometimes need to limit concurrent execution of a job type: - "Send-email" can run 100 concurrent (rate limit por SMTP) - "Generate-pdf" puede run 10 concurrent (CPU-heavy)

CREATE TABLE job_type_concurrency (
    job_type TEXT PRIMARY KEY,
    max_concurrent INT NOT NULL
);

INSERT INTO job_type_concurrency VALUES
    ('send-email', 100),
    ('generate-pdf', 10),
    ('call-slow-api', 5);

Activation query checks:

WHERE ... AND (
    SELECT COUNT(*) FROM jobs 
    WHERE job_type = j.job_type 
      AND state = 'ACTIVATED'
) < (SELECT max_concurrent FROM job_type_concurrency WHERE job_type = j.job_type)

Preventing starvation

Risk: tenant A dominante always sends jobs first → tenant B never gets activated.

Multiple safeguards:

1. Per-tenant max_concurrent_jobs

Hard cap on activated jobs per tenant. Forces tenant A to wait until completions occur.

2. Weighted random selection

-- Order by weighted random
ORDER BY -tenant_weight + random() * 0.1

Even if tenant A has 10x weight, tenant B sometimes gets pick first due to random jitter.

3. Anti-starvation timer

Job created > N minutes ago → priority boost:

SELECT job_key,
       priority + LEAST(20, EXTRACT(EPOCH FROM (NOW() - created_at)) / 60) AS effective_priority
FROM jobs
WHERE state = 'CREATED'
ORDER BY effective_priority DESC

After 20 minutes waiting, jobs get +20 priority. Eventually anything gets activated.

4. Round-robin tie-breaker

Track last-activated tenant. Next activation prefers different tenant.

CREATE TABLE worker_state (
    instance_id TEXT PRIMARY KEY,
    last_activated_tenant TEXT,
    last_activation_time TIMESTAMPTZ
);

-- Bias query
ORDER BY 
    CASE WHEN tenant_id = (SELECT last_activated_tenant ...) THEN 1 ELSE 0 END,
    ...

Job timeout handling

Cuando activation timeout expires sin completion:

-- Background sweeper (every 10s)
UPDATE jobs
SET 
    state = 'CREATED',           -- back to activatable
    activated_at = NULL,
    timeout_at = NULL,
    activated_by = NULL
WHERE state = 'ACTIVATED'
  AND timeout_at < NOW();

Job becomes activatable again. Original worker (if still alive) may try to complete — engine rejects via activation_count mismatch:

-- Worker tries to complete
UPDATE jobs
SET state = 'COMPLETED', completed_at = NOW()
WHERE job_key = $1 
  AND activation_count = $2     -- worker remembers its count
  AND state = 'ACTIVATED'
RETURNING *;

-- If returns 0 rows: stale completion, ignore

Prevents duplicate processing race (per F13 mitigation).

Worker activation flow

Worker startup:
  1. Connect to engine
  2. Subscribe to job_type 'send-email'

Worker loop:
  3. POST /v2/jobs/activate { type: 'send-email', maxJobs: 10, timeout: 30000 }
  4. Engine runs activation query
  5. Engine returns N jobs (could be 0 if none)
  6. Worker processes each job
  7. For each: POST /v2/jobs/{key}/complete or /fail
  8. Worker waits short time (long polling)
  9. Back to step 3

Long polling

Si no jobs available, server holds request for N seconds (waiting for new jobs):

async def activate_jobs(request):
    job_type = request['type']
    max_jobs = request['maxJobs']
    timeout = request.get('longPollTimeout', 30)  # seconds

    deadline = time.time() + timeout
    while time.time() < deadline:
        jobs = await activate_jobs_atomic(job_type, max_jobs)
        if jobs:
            return jobs

        # Wait for notification or short timeout
        try:
            await wait_for_new_job_signal(job_type, timeout=2)
        except TimeoutError:
            pass

    return []  # no jobs, worker can retry

Notification via Postgres LISTEN/NOTIFY:

# When new job created
await db.execute(f"NOTIFY new_job_{job_type}, ''")

# Workers listening
async with db.acquire() as conn:
    await conn.add_listener(f'new_job_{job_type}', on_new_job)

Eliminates polling overhead. Workers idle until work appears.

Streaming (Phase 2+)

Camunda's gRPC streaming activates jobs as they appear, without polling. MVP REST can approximate via SSE:

GET /v2/jobs/stream?type=send-email&maxJobs=10
Accept: text/event-stream

# Server streams events as jobs become available
event: job
data: {"jobKey": 123, "variables": {...}}

event: job
data: {"jobKey": 124, "variables": {...}}

Server pushes jobs as available. Worker complete via separate POST. Eliminates long-poll round trips.

Metrics

jobs_activated_counter = meter.create_counter("jobs.activated")
jobs_activated_counter.add(1, {"job_type": type, "tenant": tenant})

jobs_completed_counter = meter.create_counter("jobs.completed")
jobs_completed_counter.add(1, {"job_type": type, "tenant": tenant, "outcome": "success"})

jobs_failed_counter = meter.create_counter("jobs.failed")

jobs_active_gauge = meter.create_gauge("jobs.active")
jobs_pending_gauge = meter.create_gauge("jobs.pending")

job_activation_latency = meter.create_histogram("jobs.activation.latency")
job_processing_duration = meter.create_histogram("jobs.processing.duration")

Per-tenant + per-job-type. Visualizable en Grafana.

Performance characteristics

For target 1000 jobs/sec activation:

Query Latency Notes
INSERT INTO jobs ~1ms When job created
Activation query (above) ~5-10ms With proper indexes
Completion UPDATE ~1ms Primary key lookup
Timeout sweep ~10-50ms every 10s Background

Postgres handles 1000 jobs/sec easily. Bottleneck es likely workers themselves, not the DB.

For higher throughput (Phase 3+): - Partition jobs table by tenant (Citus) - Move activation logic into stored procedure for atomicity - Job index on tenant for partition pruning

Comparison vs Camunda

Aspect Camunda gRPC stream MVP Postgres + SKIP LOCKED
Activation latency < 1ms (streaming) ~5-10ms (REST + DB)
Worker setup gRPC connection HTTP client (simpler)
Backpressure HTTP/2 flow control Long polling + concurrent limits
Fairness Custom scheduler Weighted query
Throughput 7000+ TPS (Intuit) ~1000 TPS Phase 0, scale Phase 2+
Implementation complexity High Low (SQL only)

Trade-off: marginal latency hit, massive simplicity.

Testing fairness

async def test_no_starvation():
    """Tenant B gets jobs even if tenant A dominant."""
    # Create 1000 jobs for tenant A
    for _ in range(1000):
        await create_job(tenant='a', job_type='send-email')

    # Create 10 jobs for tenant B
    for _ in range(10):
        await create_job(tenant='b', job_type='send-email')

    # Workers activate over time
    activations = []
    for _ in range(20):
        jobs = await activate_jobs('send-email', maxJobs=10)
        activations.extend(jobs)

    # Tenant B's jobs should have been activated within 20 rounds
    tenant_b_jobs = [j for j in activations if j.tenant == 'b']
    assert len(tenant_b_jobs) >= 5, "Tenant B starved!"