Journal And Stream Platform
Camunda divide su engine en 4 capas: BPMN engine → stream-platform → logstreams → journal. La capa journal provee append-only log con segments file-based y doble sequencing (Raft index + application asqn). Stream-platform provee RecordProcessor interface con separación replay/process, ProcessingResultBuilder declarativo, y post-commit tasks. Para MVP basado en Postgres: capas se reemplazan, pero los patterns son directamente aplicables.
Arquitectura por capas¶
flowchart TD
subgraph Engine["Engine BPMN (zeebe/engine/)"]
E1[24 element processors]
E2[12 behaviors]
E3[State management BPMN-specific]
end
subgraph Stream["Stream Platform (zeebe/stream-platform/)"]
S1[StreamProcessor — actor]
S2[RecordProcessor interface]
S3[ProcessingResultBuilder]
S4[Replay machinery]
end
subgraph Logs["Log Streams (zeebe/logstreams/)"]
L1[LogStream abstraction]
L2[Append + read interface]
end
subgraph Journal["Journal (zeebe/journal/)"]
J1[Append-only log]
J2[Segments file-based]
J3[Truncation / compaction]
end
FS[Filesystem]
Engine --> Stream
Stream --> Logs
Logs --> Journal
Journal --> FS
Cada capa es independientemente testeable y reusable. Una organización podría construir un workflow engine diferente sobre el mismo journal + stream-platform.
Journal — append-only log persistente¶
Interface¶
public interface Journal extends AutoCloseable {
JournalRecord append(long asqn, BufferWriter recordDataWriter);
JournalRecord append(BufferWriter recordDataWriter);
void deleteAfter(long indexExclusive); // truncate / rollback
boolean deleteUntil(long indexExclusive); // compaction
void reset(long nextIndex);
long getLastIndex();
long getFirstIndex();
void flush() throws FlushException;
JournalReader openReader();
}
Doble sequencing¶
Cada JournalRecord tiene DOS números de secuencia:
| Sequence | Significado | Características |
|---|---|---|
index |
Raft-level monotonic | Sin gaps, controlled por Raft |
asqn |
Application Sequence Number | Positive, monotonic en app records, gaps OK |
Por qué dos: Raft inserta records de control (leader change, configuration) que no son records de aplicación. Si tuvieran que compartir secuencia, control records causarían gaps en la secuencia de app. La separación permite:
- Raft maneja index sin restricción
- App ve secuencia consistente vía asqn
Segments¶
Journal está dividido en archivos físicos (segments): - Cada segment tiene primer index y tamaño máximo - Al llenarse, se crea nuevo segment - Compaction borra segments enteros (no bytes individuales)
Ventajas: - I/O bulk (entire segments) - GC simple (delete files vs reorganize) - Backup simple (copy files)
Async persistence¶
Por default, appends son buffered. flush() garantiza durability. Raft típicamente requiere flush antes de ACK (durability guarantee a quorum).
Stream Platform — framework reusable¶
RecordProcessor interface¶
public interface RecordProcessor {
void init(RecordProcessorContext context);
boolean accepts(ValueType valueType);
void replay(TypedRecord record);
ProcessingResult process(TypedRecord record, ProcessingResultBuilder result);
}
Cuatro responsabilidades separadas:
| Método | Cuándo se llama | Qué hace |
|---|---|---|
init |
Startup | Setup state, dependencies |
accepts |
Per record | Filter por ValueType |
replay |
REPLAY mode | Reconstruir state desde events |
process |
PROCESSING mode | Procesar command, producir result |
Separación replay vs process¶
Crítico para replay determinism:
replay(event): rebuild state desde events ya emitidos. NO emite nuevos records.process(command, builder): procesa command, emite events vía builder.
Ambos pueden escribir al state store, pero solo process puede emitir records nuevos.
Esto garantiza que replay reconstruye exactamente el mismo state que processing produjo originalmente.
ProcessingResultBuilder — declarative¶
public interface ProcessingResultBuilder {
ProcessingResultBuilder appendRecord(...);
ProcessingResultBuilder withResponse(...);
ProcessingResultBuilder appendPostCommitTask(PostCommitTask task);
ProcessingResult build();
}
El processor NO escribe directamente. Declara intent: 1. "Apend este record al log" 2. "Envía esta response al cliente" 3. "Ejecuta este task después del commit"
La plataforma: 1. Acumula declarations 2. Inicia transaction (state DB + log) 3. Escribe todo atomicamente 4. Commit 5. Ejecuta post-commit tasks
Garantiza atomicity entre state mutations y log writes.
Post-commit tasks — side effect isolation¶
Cosas que NO deben ser atomic con el log write: - Send HTTP response al cliente - Trigger notifications - Async operations
Si fallan, NO corrompen state (ya fue committed). Si crashea el broker antes de ejecutarlas, alguien debe re-ejecutar (no-op semantics requeridas).
EventFilter — plug point¶
Permite filtrar records antes de procesamiento. Usado para: - Banned process instances (skip) - Tenant isolation - Authorization pre-checks
Plug point sin contaminar el processor core.
InterPartitionCommandSender — plug point¶
Stream-platform NO asume cómo se hace cross-partition communication. Alguien provee la implementación (Raft transport en producción, in-memory para tests).
Esto permite testing del engine sin red real.
Mapeo al MVP basado en Postgres¶
Reemplazo de journal¶
CREATE TABLE journal (
index BIGSERIAL PRIMARY KEY, -- Raft-level (Postgres SERIAL)
asqn BIGINT, -- Application sequence (nullable for control records)
record_type TEXT NOT NULL, -- 'CONTROL' | 'APP'
checksum BIGINT NOT NULL, -- CRC32C
serialized_record BYTEA NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_journal_asqn ON journal(asqn) WHERE asqn IS NOT NULL;
CREATE INDEX idx_journal_created ON journal(created_at);
Operaciones equivalentes:
| Journal operation | SQL equivalent |
|---|---|
append(asqn, data) |
INSERT INTO journal(asqn, ...) VALUES (...) |
deleteAfter(index) |
DELETE FROM journal WHERE index > $index |
deleteUntil(index) |
DELETE FROM journal WHERE index < $index |
flush() |
Implicit (Postgres WAL) |
getLastIndex() |
SELECT MAX(index) FROM journal |
getFirstIndex() |
SELECT MIN(index) FROM journal |
openReader() |
SELECT * FROM journal WHERE index > $start ORDER BY index |
Postgres provee gratis: - Durability (WAL) - Replication (streaming) - Backup (pg_basebackup) - Compaction (vacuum)
Reemplazo de stream-platform¶
// MVP stream platform equivalent (TypeScript pseudo-code)
interface RecordProcessor {
init(ctx: RecordProcessorContext): void;
accepts(valueType: ValueType): boolean;
replay(record: TypedRecord): void;
process(record: TypedRecord, result: ProcessingResultBuilder): ProcessingResult;
}
interface ProcessingResultBuilder {
appendRecord(record: any): this;
withResponse(response: any): this;
appendPostCommitTask(task: () => Promise<void>): this;
build(): ProcessingResult;
}
class StreamProcessor {
async process(command: TypedRecord) {
const builder = new ProcessingResultBuilder();
const processor = this.routingTable.get(command.valueType);
if (!processor || !processor.accepts(command.valueType)) {
return;
}
const result = processor.process(command, builder);
// Atomic: write state + log in one transaction
await this.db.transaction(async (tx) => {
for (const record of result.records) {
await tx.insert('journal', record);
}
for (const stateChange of result.stateChanges) {
await tx.execute(stateChange);
}
});
// Side effects after commit
for (const task of result.postCommitTasks) {
try {
await task();
} catch (e) {
// log but don't fail
}
}
}
}
Patterns a mantener¶
- Doble sequencing:
id(BIGSERIAL) +asqn(nullable BIGINT) - RecordProcessor interface: init/accepts/replay/process
- ProcessingResultBuilder: declarative results
- Post-commit tasks: side effect isolation
- EventFilter / KeyValidator: plug points
Patterns NO necesarios¶
- File-based segments (Postgres maneja files internamente)
- Custom truncation logic (SQL DELETE suficiente)
- Async flushing manual (WAL más sofisticado)
- Custom segment GC (autovacuum)
Conclusión¶
Estos módulos foundation son excelente decoupling que permite: - Cada capa testeable independientemente - Reusable para construir engines diferentes (NO BPMN) - Patterns establecidos para state + log consistency
El MVP reemplaza la implementación (journal → Postgres tables, stream-platform → DB transaction layer) pero mantiene los patterns. Los patterns son el valor — la implementación es solo el detalle.
Vale la pena replicar el design por capas independientes desde el inicio, aunque la implementación inicial use Postgres. Permite swap futuro si se necesita performance custom.