Saltar a contenido

ADR-025: Audit logging mandatory

  • Status: Accepted
  • Date: 2026-05-14
  • Tags: security, compliance, audit

Context and Problem Statement

Users can deny actions ("I didn't approve that loan"). Workers can deny completions. Admins can be compromised. ¿Cómo trace TODO action attributable a actor específico?

Decision Drivers

  • Compliance (SOC2, HIPAA, SOX) requires audit trails
  • Non-repudiation requirement (T3 threats)
  • Forensic investigation needs
  • Insurance / legal requirements

Considered Options

  1. Mandatory audit log para security-relevant ops
  2. Log everything (overkill, massive storage)
  3. Log only failures
  4. No audit log (use existing logs)

Decision Outcome

Chosen: Comprehensive audit log para security-relevant operations. Append-only table con integrity protection.

Loggable actions (mandatory)

  • Authentication: login success/failure
  • Authorization: permission grants, revocations
  • Resource modifications: deploy, deploy delete, instance create/cancel
  • User management: user create/delete, role changes
  • Configuration changes: rate limits, settings
  • API key lifecycle: create, rotate, delete
  • Process variable updates (audit who changed what)
  • Incident resolutions
  • Manual interventions

NOT logged (volume reasons)

  • Routine reads (process instance views, list queries)
  • Engine internal command processing
  • Health checks
  • OpenTelemetry trace spans (separate observability)

Positive Consequences

  • Non-repudiation strong
  • Compliance baseline met
  • Forensic capability available
  • Audit trail tamper-evident

Negative Consequences

  • Storage overhead (~5-10% of total DB)
  • Performance impact per logged action (~5ms per INSERT)
  • Retention requirements (7 years SOX)

Schema

CREATE TABLE audit_log (
    audit_id BIGSERIAL PRIMARY KEY,
    timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    tenant_id TEXT NOT NULL,

    -- Actor identification
    user_id TEXT,                       -- from JWT sub
    api_key_id UUID,                    -- if API key auth
    source_ip INET NOT NULL,
    user_agent TEXT,
    session_id TEXT,

    -- Action
    action TEXT NOT NULL,               -- e.g., 'process_instance.cancel'
    resource_type TEXT NOT NULL,
    resource_id TEXT,

    -- Outcome
    success BOOLEAN NOT NULL,
    status_code INT,
    error_code TEXT,
    error_message TEXT,

    -- Context
    details JSONB NOT NULL,             -- before/after, request params

    -- Integrity
    integrity_hmac TEXT NOT NULL        -- HMAC-SHA256 of canonicalized row
);

-- Append-only enforcement (no UPDATE/DELETE)
REVOKE UPDATE, DELETE, TRUNCATE ON audit_log FROM PUBLIC;
GRANT INSERT, SELECT ON audit_log TO engine_user;

-- Indexes for common queries
CREATE INDEX idx_audit_tenant_time ON audit_log(tenant_id, timestamp DESC);
CREATE INDEX idx_audit_user_time ON audit_log(user_id, timestamp DESC) WHERE user_id IS NOT NULL;
CREATE INDEX idx_audit_action ON audit_log(action, timestamp DESC);
CREATE INDEX idx_audit_resource ON audit_log(resource_type, resource_id);
CREATE INDEX idx_audit_failures ON audit_log(timestamp DESC) WHERE success = FALSE;

-- Retention: time-based partitioning (per [concepts/command-log-compaction](<../../concepts/command-log-compaction.md>) pattern)
-- 7-year retention for compliance
SELECT partman.create_parent(
    p_parent_table => 'public.audit_log',
    p_control => 'timestamp',
    p_type => 'native',
    p_interval => '1 month'
);

Integrity protection

HMAC chain protects against tampering:

def compute_audit_hmac(row, prev_hmac, secret):
    canonical = json.dumps({
        'audit_id': row['audit_id'],
        'timestamp': row['timestamp'].isoformat(),
        'tenant_id': row['tenant_id'],
        'action': row['action'],
        # ... canonicalized fields
        'prev_hmac': prev_hmac
    }, sort_keys=True)
    return hmac.new(secret.encode(), canonical.encode(), hashlib.sha256).hexdigest()

async def append_audit(action, context):
    prev = await db.fetch_val("""
        SELECT integrity_hmac FROM audit_log 
        ORDER BY audit_id DESC LIMIT 1
    """)

    row = build_row(action, context)
    row['integrity_hmac'] = compute_audit_hmac(row, prev, AUDIT_SECRET)

    await db.execute("INSERT INTO audit_log ...", row)

Periodic verification (cron):

async def verify_audit_chain():
    rows = await db.fetch_all("SELECT * FROM audit_log ORDER BY audit_id ASC")
    prev_hmac = None
    for row in rows:
        expected = compute_audit_hmac(row, prev_hmac, AUDIT_SECRET)
        if row['integrity_hmac'] != expected:
            alert(f"AUDIT TAMPERED at audit_id={row['audit_id']}")
            return False
        prev_hmac = row['integrity_hmac']
    return True

If chain broken → security incident.

Helper en código

@audit("process_instance.cancel")
async def cancel_instance(request, pi_key):
    # Auto-logs:
    # - Actor (from request.user)
    # - Action: process_instance.cancel
    # - Resource: process_instances/{pi_key}
    # - IP, UA, session
    # - Success/failure based on response

    result = await actually_cancel(pi_key)
    return result

Decorator wraps function, logs automatically. Cero manual audit code en handlers.

Querying audit log

-- "Who canceled process instance 12345?"
SELECT user_id, timestamp, source_ip, details
FROM audit_log
WHERE action = 'process_instance.cancel'
  AND resource_id = '12345'
  AND tenant_id = 'acme';

-- "All admin actions in last 24 hours"
SELECT *
FROM audit_log al
JOIN user_tenants ut ON ut.user_id = al.user_id AND ut.tenant_id = al.tenant_id
WHERE ut.role = 'admin'
  AND al.timestamp > NOW() - INTERVAL '24 hours'
ORDER BY al.timestamp DESC;

-- "Failed login attempts pattern"
SELECT user_id, source_ip, COUNT(*) as failures
FROM audit_log
WHERE action = 'auth.login' AND success = FALSE
  AND timestamp > NOW() - INTERVAL '1 hour'
GROUP BY user_id, source_ip
HAVING COUNT(*) > 5;

Export for compliance

mvp-cli audit export --tenant=acme --from=2025-01-01 --to=2025-12-31 \
                    --format=json > audit-2025.json

JSON includes integrity_hmac for offline verification.