Saltar a contenido

Command Sourcing en Zeebe

Resumen: Zeebe usa command sourcing como patron fundamental: los commands se escriben al log primero y se procesan despues, generando events de forma deterministica. A diferencia de event sourcing puro (donde solo los eventos se persisten), aqui tanto el command como los eventos derivados quedan en el log. El sistema de Intent/RecordType dispatchea commands a TypedRecordProcessors especificos, y los EventAppliers reconstruyen el estado durante replay.


Command Sourcing vs Event Sourcing

Event Sourcing clasico

En event sourcing puro: 1. Se recibe un command (no se persiste). 2. Se valida contra el estado actual. 3. Se genera un event (se persiste al log). 4. Se aplica el event al estado.

El log contiene solo events. Los commands son efimeros.

Command Sourcing (patron de Zeebe)

En command sourcing: 1. Se recibe un command y se escribe al log de Raft. 2. El command se replica a followers via Raft. 3. Una vez committed, el leader procesa el command. 4. Se generan events (tambien escritos al log). 5. Se aplican los events al estado.

El log contiene tanto commands como events.

Por que command sourcing

La razon principal es la replicacion determinista:

  • En event sourcing, el leader procesa el command, genera el event, y luego replica el event. Si el leader falla entre procesamiento y replicacion, el evento se pierde.
  • En command sourcing, el command se replica antes de procesarse. Si el leader falla, el nuevo leader tiene el command en su log y puede re-procesarlo de forma determinista.

Esto simplifica el recovery: no hay ventana de vulnerabilidad entre procesamiento y replicacion.

Trade-offs

Aspecto Event Sourcing Command Sourcing
Log contiene Solo events Commands + events
Replicacion Despues de procesamiento Antes de procesamiento
Recovery Puede perder ultimo event Siempre consistente
Tamaño del log Menor Mayor (commands + events)
Replay Reproducir events Reproducir events (commands se ignoran en replay)
Latencia Menor (no espera replicacion antes de procesar) Mayor (espera commit de Raft antes de procesar)

Intent / RecordType system

Cada record en el log tiene un RecordType y un Intent que juntos definen la semantica:

RecordType

Tipo Descripcion Quien lo escribe
COMMAND Solicitud de accion. Representa intencion, no hecho. Cliente (via gateway) o engine (commands internos)
EVENT Hecho consumado. Resultado determinista de un COMMAND procesado. Engine (StreamProcessor)
COMMAND_REJECTION Rechazo de un COMMAND. Indica que el command no pudo procesarse (precondiciones fallidas). Engine (StreamProcessor)

Intent

El Intent describe la accion semantica del record. Es especifico al ValueType. Ejemplos:

Para ValueType.PROCESS_INSTANCE: - CREATE (COMMAND) → CREATED (EVENT) - CANCEL (COMMAND) → CANCELED (EVENT) - ACTIVATE_ELEMENT (COMMAND) → ELEMENT_ACTIVATED (EVENT) - COMPLETE_ELEMENT (COMMAND) → ELEMENT_COMPLETED (EVENT)

Para ValueType.JOB: - CREATECREATED - ACTIVATEACTIVATED - COMPLETECOMPLETED - FAILFAILED - TIME_OUTTIMED_OUT

Convencion de naming: los intents de COMMANDs son verbos imperativos (CREATE, CANCEL, COMPLETE). Los intents de EVENTs son participios pasados (CREATED, CANCELED, COMPLETED). Esto refleja la distincion entre intencion y hecho.


Engine class y RecordProcessor bridge

La clase Engine actua como punto de entrada que conecta el stream processing con los processors individuales:

Engine {
    processors: RecordProcessorMap
    eventAppliers: Map<(Intent, version), TypedEventApplier>

    process(TypedRecord, StreamWriter, ResponseWriter)
    replay(TypedRecord)
}

Flujo de un command

  1. StreamProcessor lee un COMMAND del log.
  2. Llama a Engine.process(record, streamWriter, responseWriter).
  3. Engine consulta RecordProcessorMap con (recordType, valueType, intent) para obtener el processor correcto.
  4. El processor ejecuta logica de negocio:
  5. Lee estado de RocksDB.
  6. Valida precondiciones.
  7. Si valido: escribe EVENTs al StreamWriter.
  8. Si invalido: escribe COMMAND_REJECTION al StreamWriter.
  9. Opcionalmente: envia respuesta al cliente via ResponseWriter.
  10. Los EVENTs escritos se pasan a los EventAppliers correspondientes para actualizar el estado.

RecordProcessorMap

Tabla de dispatching construida al startup:

RecordProcessorMap {
    // Key: (recordType, valueType, intent)
    // Value: TypedRecordProcessor

    Map<Tuple3<RecordType, ValueType, Intent>, TypedRecordProcessor> processors

    get(recordType, valueType, intent) → TypedRecordProcessor
}

Registro

Cada TypedRecordProcessor declara que combinaciones maneja:

class CreateProcessInstanceProcessor implements TypedRecordProcessor {
    // Maneja: COMMAND / PROCESS_INSTANCE / CREATE
    boolean accepts(RecordType type, ValueType value, Intent intent) {
        return type == COMMAND 
            && value == PROCESS_INSTANCE 
            && intent == CREATE;
    }
}

Al startup, el engine escanea todos los processors registrados, llama accepts() con todas las combinaciones posibles, y construye el mapa. Esto permite agregar nuevos processors sin modificar el engine — basta registrar la clase.


EventAppliers

Los EventAppliers son responsables de actualizar el estado en RocksDB cuando se aplica un EVENT:

EventApplier {
    // Key: (Intent, version)
    applyState(TypedRecord<EventValue>) → void
}

Versionado

Los EventAppliers estan versionados: (Intent, version). Esto permite evolucionar la logica de aplicacion de eventos manteniendo backward compatibility:

EventApplier v1: (CREATED, 1) → escribe a ELEMENT_INSTANCE_KEY
EventApplier v2: (CREATED, 2) → escribe a ELEMENT_INSTANCE_KEY + nueva column family

Durante replay de un log historico, se usa el EventApplier de la version correspondiente al event.

Replay

Durante replay (startup o failover):

  1. ReplayStateMachine lee solo EVENTs del log (ignora COMMANDs y COMMAND_REJECTIONs).
  2. Para cada EVENT, llama EventApplier.applyState().
  3. El EventApplier modifica RocksDB identicamente a como lo hizo el leader original.

Esto funciona porque: - Los EVENTs contienen toda la informacion necesaria para actualizar el estado. - Los EventAppliers son deterministas: dado el mismo EVENT, siempre producen el mismo cambio de estado. - No se necesita re-ejecutar la logica de negocio del processor (que podria depender de estado externo no-determinista).


Process Banning

Cuando un RecordProcessor lanza un error inesperado (no un rechazo de negocio, sino un NPE, ClassCastException, etc.):

  1. onProcessingError() del processor se invoca.
  2. El processor puede retornar ProcessingError.PROCESS_BANNED.
  3. El engine agrega el processDefinitionKey a una ban list en memoria.
  4. Todos los commands futuros para instancias de ese proceso se rechazan inmediatamente con un COMMAND_REJECTION de tipo BANNED.
  5. Las instancias activas del proceso baneado generan incidentes que requieren intervencion manual.

Proposito

El banning previene que un proceso defectuoso (con una definicion BPMN que trigger un bug en el engine) consuma recursos indefinidamente: - Sin banning, el engine reintentaria el command infinitamente. - Con banning, el engine falla rapido y los operadores pueden investigar.

Persistencia

La ban list es en memoria, no en RocksDB. Se reconstruye durante replay si los mismos errores ocurren. Esto es intencional: un hotfix del engine que corrige el bug permite que el proceso se des-banee automaticamente en el siguiente restart.


Diferencia con Event Sourcing puro: resumen

Aspecto Event Sourcing puro Command Sourcing (Zeebe)
Que se escribe al log primero Event (despues de procesar) Command (antes de procesar)
Replicacion Replica events Replica commands, luego genera events
Replay usa Events Events (commands se ignoran en replay)
Ventana de perdida Entre procesamiento y replicacion Ninguna (command se replica antes de procesar)
Complejidad Menor Mayor (dos tipos de records en el log)
Idempotencia Events son idempotentes por definicion Commands pueden generar efectos diferentes segun estado

La clave del diseño: el log de Raft contiene el command como fuente de verdad para replicacion, pero el replay usa solo los events como fuente de verdad para reconstruir estado. Esto combina las ventajas de ambos patrones.


Implicaciones para implementacion simplificada

  1. Command sourcing es el patron correcto para un sistema distribuido con Raft: la replicacion antes del procesamiento elimina la ventana de vulnerabilidad.
  2. El sistema de Intent/RecordType es elegante y extensible. Adoptarlo tal cual.
  3. EventAppliers separados de processors: mantener esta separacion. Permite que replay sea simple y eficiente.
  4. RecordProcessorMap como tabla de dispatching: patron plugin-friendly. Facilita agregar nuevos tipos de commands sin modificar el core.
  5. Process banning: implementar desde el inicio. Sin el, un proceso defectuoso puede derribar el cluster.
  6. Versionado de EventAppliers: no esencial para MVP (asumir version 1 de todo), pero diseñar la interfaz para soportarlo en el futuro.