Saltar a contenido

Journal And Stream Platform

Camunda divide su engine en 4 capas: BPMN engine → stream-platform → logstreams → journal. La capa journal provee append-only log con segments file-based y doble sequencing (Raft index + application asqn). Stream-platform provee RecordProcessor interface con separación replay/process, ProcessingResultBuilder declarativo, y post-commit tasks. Para MVP basado en Postgres: capas se reemplazan, pero los patterns son directamente aplicables.

Arquitectura por capas

flowchart TD
    subgraph Engine["Engine BPMN (zeebe/engine/)"]
        E1[24 element processors]
        E2[12 behaviors]
        E3[State management BPMN-specific]
    end
    subgraph Stream["Stream Platform (zeebe/stream-platform/)"]
        S1[StreamProcessor — actor]
        S2[RecordProcessor interface]
        S3[ProcessingResultBuilder]
        S4[Replay machinery]
    end
    subgraph Logs["Log Streams (zeebe/logstreams/)"]
        L1[LogStream abstraction]
        L2[Append + read interface]
    end
    subgraph Journal["Journal (zeebe/journal/)"]
        J1[Append-only log]
        J2[Segments file-based]
        J3[Truncation / compaction]
    end
    FS[Filesystem]

    Engine --> Stream
    Stream --> Logs
    Logs --> Journal
    Journal --> FS

Cada capa es independientemente testeable y reusable. Una organización podría construir un workflow engine diferente sobre el mismo journal + stream-platform.

Journal — append-only log persistente

Interface

public interface Journal extends AutoCloseable {
  JournalRecord append(long asqn, BufferWriter recordDataWriter);
  JournalRecord append(BufferWriter recordDataWriter);
  void deleteAfter(long indexExclusive);   // truncate / rollback
  boolean deleteUntil(long indexExclusive); // compaction
  void reset(long nextIndex);
  long getLastIndex();
  long getFirstIndex();
  void flush() throws FlushException;
  JournalReader openReader();
}

Doble sequencing

Cada JournalRecord tiene DOS números de secuencia:

Sequence Significado Características
index Raft-level monotonic Sin gaps, controlled por Raft
asqn Application Sequence Number Positive, monotonic en app records, gaps OK

Por qué dos: Raft inserta records de control (leader change, configuration) que no son records de aplicación. Si tuvieran que compartir secuencia, control records causarían gaps en la secuencia de app. La separación permite: - Raft maneja index sin restricción - App ve secuencia consistente vía asqn

Segments

Journal está dividido en archivos físicos (segments): - Cada segment tiene primer index y tamaño máximo - Al llenarse, se crea nuevo segment - Compaction borra segments enteros (no bytes individuales)

Ventajas: - I/O bulk (entire segments) - GC simple (delete files vs reorganize) - Backup simple (copy files)

Async persistence

void flush() throws FlushException;

Por default, appends son buffered. flush() garantiza durability. Raft típicamente requiere flush antes de ACK (durability guarantee a quorum).

Stream Platform — framework reusable

RecordProcessor interface

public interface RecordProcessor {
  void init(RecordProcessorContext context);
  boolean accepts(ValueType valueType);
  void replay(TypedRecord record);
  ProcessingResult process(TypedRecord record, ProcessingResultBuilder result);
}

Cuatro responsabilidades separadas:

Método Cuándo se llama Qué hace
init Startup Setup state, dependencies
accepts Per record Filter por ValueType
replay REPLAY mode Reconstruir state desde events
process PROCESSING mode Procesar command, producir result

Separación replay vs process

Crítico para replay determinism:

  • replay(event): rebuild state desde events ya emitidos. NO emite nuevos records.
  • process(command, builder): procesa command, emite events vía builder.

Ambos pueden escribir al state store, pero solo process puede emitir records nuevos.

Esto garantiza que replay reconstruye exactamente el mismo state que processing produjo originalmente.

ProcessingResultBuilder — declarative

public interface ProcessingResultBuilder {
  ProcessingResultBuilder appendRecord(...);
  ProcessingResultBuilder withResponse(...);
  ProcessingResultBuilder appendPostCommitTask(PostCommitTask task);
  ProcessingResult build();
}

El processor NO escribe directamente. Declara intent: 1. "Apend este record al log" 2. "Envía esta response al cliente" 3. "Ejecuta este task después del commit"

La plataforma: 1. Acumula declarations 2. Inicia transaction (state DB + log) 3. Escribe todo atomicamente 4. Commit 5. Ejecuta post-commit tasks

Garantiza atomicity entre state mutations y log writes.

Post-commit tasks — side effect isolation

public interface PostCommitTask {
  void execute() throws Exception;
}

Cosas que NO deben ser atomic con el log write: - Send HTTP response al cliente - Trigger notifications - Async operations

Si fallan, NO corrompen state (ya fue committed). Si crashea el broker antes de ejecutarlas, alguien debe re-ejecutar (no-op semantics requeridas).

EventFilter — plug point

public interface EventFilter {
  boolean apply(TypedRecord record);
}

Permite filtrar records antes de procesamiento. Usado para: - Banned process instances (skip) - Tenant isolation - Authorization pre-checks

Plug point sin contaminar el processor core.

InterPartitionCommandSender — plug point

public interface InterPartitionCommandSender {
  void sendCommand(int receiverPartitionId, ...);
}

Stream-platform NO asume cómo se hace cross-partition communication. Alguien provee la implementación (Raft transport en producción, in-memory para tests).

Esto permite testing del engine sin red real.

Mapeo al MVP basado en Postgres

Reemplazo de journal

CREATE TABLE journal (
    index BIGSERIAL PRIMARY KEY,        -- Raft-level (Postgres SERIAL)
    asqn BIGINT,                         -- Application sequence (nullable for control records)
    record_type TEXT NOT NULL,           -- 'CONTROL' | 'APP'
    checksum BIGINT NOT NULL,            -- CRC32C
    serialized_record BYTEA NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_journal_asqn ON journal(asqn) WHERE asqn IS NOT NULL;
CREATE INDEX idx_journal_created ON journal(created_at);

Operaciones equivalentes:

Journal operation SQL equivalent
append(asqn, data) INSERT INTO journal(asqn, ...) VALUES (...)
deleteAfter(index) DELETE FROM journal WHERE index > $index
deleteUntil(index) DELETE FROM journal WHERE index < $index
flush() Implicit (Postgres WAL)
getLastIndex() SELECT MAX(index) FROM journal
getFirstIndex() SELECT MIN(index) FROM journal
openReader() SELECT * FROM journal WHERE index > $start ORDER BY index

Postgres provee gratis: - Durability (WAL) - Replication (streaming) - Backup (pg_basebackup) - Compaction (vacuum)

Reemplazo de stream-platform

// MVP stream platform equivalent (TypeScript pseudo-code)

interface RecordProcessor {
  init(ctx: RecordProcessorContext): void;
  accepts(valueType: ValueType): boolean;
  replay(record: TypedRecord): void;
  process(record: TypedRecord, result: ProcessingResultBuilder): ProcessingResult;
}

interface ProcessingResultBuilder {
  appendRecord(record: any): this;
  withResponse(response: any): this;
  appendPostCommitTask(task: () => Promise<void>): this;
  build(): ProcessingResult;
}

class StreamProcessor {
  async process(command: TypedRecord) {
    const builder = new ProcessingResultBuilder();
    const processor = this.routingTable.get(command.valueType);

    if (!processor || !processor.accepts(command.valueType)) {
      return;
    }

    const result = processor.process(command, builder);

    // Atomic: write state + log in one transaction
    await this.db.transaction(async (tx) => {
      for (const record of result.records) {
        await tx.insert('journal', record);
      }
      for (const stateChange of result.stateChanges) {
        await tx.execute(stateChange);
      }
    });

    // Side effects after commit
    for (const task of result.postCommitTasks) {
      try {
        await task();
      } catch (e) {
        // log but don't fail
      }
    }
  }
}

Patterns a mantener

  1. Doble sequencing: id (BIGSERIAL) + asqn (nullable BIGINT)
  2. RecordProcessor interface: init/accepts/replay/process
  3. ProcessingResultBuilder: declarative results
  4. Post-commit tasks: side effect isolation
  5. EventFilter / KeyValidator: plug points

Patterns NO necesarios

  • File-based segments (Postgres maneja files internamente)
  • Custom truncation logic (SQL DELETE suficiente)
  • Async flushing manual (WAL más sofisticado)
  • Custom segment GC (autovacuum)

Conclusión

Estos módulos foundation son excelente decoupling que permite: - Cada capa testeable independientemente - Reusable para construir engines diferentes (NO BPMN) - Patterns establecidos para state + log consistency

El MVP reemplaza la implementación (journal → Postgres tables, stream-platform → DB transaction layer) pero mantiene los patterns. Los patterns son el valor — la implementación es solo el detalle.

Vale la pena replicar el design por capas independientes desde el inicio, aunque la implementación inicial use Postgres. Permite swap futuro si se necesita performance custom.