ADR-025: Audit logging mandatory¶
- Status: Accepted
- Date: 2026-05-14
- Tags: security, compliance, audit
Context and Problem Statement¶
Users can deny actions ("I didn't approve that loan"). Workers can deny completions. Admins can be compromised. ¿Cómo trace TODO action attributable a actor específico?
Decision Drivers¶
- Compliance (SOC2, HIPAA, SOX) requires audit trails
- Non-repudiation requirement (T3 threats)
- Forensic investigation needs
- Insurance / legal requirements
Considered Options¶
- Mandatory audit log para security-relevant ops
- Log everything (overkill, massive storage)
- Log only failures
- No audit log (use existing logs)
Decision Outcome¶
Chosen: Comprehensive audit log para security-relevant operations. Append-only table con integrity protection.
Loggable actions (mandatory)¶
- Authentication: login success/failure
- Authorization: permission grants, revocations
- Resource modifications: deploy, deploy delete, instance create/cancel
- User management: user create/delete, role changes
- Configuration changes: rate limits, settings
- API key lifecycle: create, rotate, delete
- Process variable updates (audit who changed what)
- Incident resolutions
- Manual interventions
NOT logged (volume reasons)¶
- Routine reads (process instance views, list queries)
- Engine internal command processing
- Health checks
- OpenTelemetry trace spans (separate observability)
Positive Consequences¶
- Non-repudiation strong
- Compliance baseline met
- Forensic capability available
- Audit trail tamper-evident
Negative Consequences¶
- Storage overhead (~5-10% of total DB)
- Performance impact per logged action (~5ms per INSERT)
- Retention requirements (7 years SOX)
Schema¶
CREATE TABLE audit_log (
audit_id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
tenant_id TEXT NOT NULL,
-- Actor identification
user_id TEXT, -- from JWT sub
api_key_id UUID, -- if API key auth
source_ip INET NOT NULL,
user_agent TEXT,
session_id TEXT,
-- Action
action TEXT NOT NULL, -- e.g., 'process_instance.cancel'
resource_type TEXT NOT NULL,
resource_id TEXT,
-- Outcome
success BOOLEAN NOT NULL,
status_code INT,
error_code TEXT,
error_message TEXT,
-- Context
details JSONB NOT NULL, -- before/after, request params
-- Integrity
integrity_hmac TEXT NOT NULL -- HMAC-SHA256 of canonicalized row
);
-- Append-only enforcement (no UPDATE/DELETE)
REVOKE UPDATE, DELETE, TRUNCATE ON audit_log FROM PUBLIC;
GRANT INSERT, SELECT ON audit_log TO engine_user;
-- Indexes for common queries
CREATE INDEX idx_audit_tenant_time ON audit_log(tenant_id, timestamp DESC);
CREATE INDEX idx_audit_user_time ON audit_log(user_id, timestamp DESC) WHERE user_id IS NOT NULL;
CREATE INDEX idx_audit_action ON audit_log(action, timestamp DESC);
CREATE INDEX idx_audit_resource ON audit_log(resource_type, resource_id);
CREATE INDEX idx_audit_failures ON audit_log(timestamp DESC) WHERE success = FALSE;
-- Retention: time-based partitioning (per [concepts/command-log-compaction](<../../concepts/command-log-compaction.md>) pattern)
-- 7-year retention for compliance
SELECT partman.create_parent(
p_parent_table => 'public.audit_log',
p_control => 'timestamp',
p_type => 'native',
p_interval => '1 month'
);
Integrity protection¶
HMAC chain protects against tampering:
def compute_audit_hmac(row, prev_hmac, secret):
canonical = json.dumps({
'audit_id': row['audit_id'],
'timestamp': row['timestamp'].isoformat(),
'tenant_id': row['tenant_id'],
'action': row['action'],
# ... canonicalized fields
'prev_hmac': prev_hmac
}, sort_keys=True)
return hmac.new(secret.encode(), canonical.encode(), hashlib.sha256).hexdigest()
async def append_audit(action, context):
prev = await db.fetch_val("""
SELECT integrity_hmac FROM audit_log
ORDER BY audit_id DESC LIMIT 1
""")
row = build_row(action, context)
row['integrity_hmac'] = compute_audit_hmac(row, prev, AUDIT_SECRET)
await db.execute("INSERT INTO audit_log ...", row)
Periodic verification (cron):
async def verify_audit_chain():
rows = await db.fetch_all("SELECT * FROM audit_log ORDER BY audit_id ASC")
prev_hmac = None
for row in rows:
expected = compute_audit_hmac(row, prev_hmac, AUDIT_SECRET)
if row['integrity_hmac'] != expected:
alert(f"AUDIT TAMPERED at audit_id={row['audit_id']}")
return False
prev_hmac = row['integrity_hmac']
return True
If chain broken → security incident.
Helper en código¶
@audit("process_instance.cancel")
async def cancel_instance(request, pi_key):
# Auto-logs:
# - Actor (from request.user)
# - Action: process_instance.cancel
# - Resource: process_instances/{pi_key}
# - IP, UA, session
# - Success/failure based on response
result = await actually_cancel(pi_key)
return result
Decorator wraps function, logs automatically. Cero manual audit code en handlers.
Querying audit log¶
-- "Who canceled process instance 12345?"
SELECT user_id, timestamp, source_ip, details
FROM audit_log
WHERE action = 'process_instance.cancel'
AND resource_id = '12345'
AND tenant_id = 'acme';
-- "All admin actions in last 24 hours"
SELECT *
FROM audit_log al
JOIN user_tenants ut ON ut.user_id = al.user_id AND ut.tenant_id = al.tenant_id
WHERE ut.role = 'admin'
AND al.timestamp > NOW() - INTERVAL '24 hours'
ORDER BY al.timestamp DESC;
-- "Failed login attempts pattern"
SELECT user_id, source_ip, COUNT(*) as failures
FROM audit_log
WHERE action = 'auth.login' AND success = FALSE
AND timestamp > NOW() - INTERVAL '1 hour'
GROUP BY user_id, source_ip
HAVING COUNT(*) > 5;
Export for compliance¶
mvp-cli audit export --tenant=acme --from=2025-01-01 --to=2025-12-31 \
--format=json > audit-2025.json
JSON includes integrity_hmac for offline verification.
Links¶
- analysis/security-threat-model — T3.1, T3.2 threats
- adrs/adr-013-simple-rbac-three-roles — RBAC audited
- adrs/adr-014-oidc-single-idp — Auth audited
- SOC2 requirements