Command Sourcing en Zeebe¶
Resumen: Zeebe usa command sourcing como patron fundamental: los commands se escriben al log primero y se procesan despues, generando events de forma deterministica. A diferencia de event sourcing puro (donde solo los eventos se persisten), aqui tanto el command como los eventos derivados quedan en el log. El sistema de Intent/RecordType dispatchea commands a
TypedRecordProcessors especificos, y losEventAppliers reconstruyen el estado durante replay.
Command Sourcing vs Event Sourcing¶
Event Sourcing clasico¶
En event sourcing puro: 1. Se recibe un command (no se persiste). 2. Se valida contra el estado actual. 3. Se genera un event (se persiste al log). 4. Se aplica el event al estado.
El log contiene solo events. Los commands son efimeros.
Command Sourcing (patron de Zeebe)¶
En command sourcing: 1. Se recibe un command y se escribe al log de Raft. 2. El command se replica a followers via Raft. 3. Una vez committed, el leader procesa el command. 4. Se generan events (tambien escritos al log). 5. Se aplican los events al estado.
El log contiene tanto commands como events.
Por que command sourcing¶
La razon principal es la replicacion determinista:
- En event sourcing, el leader procesa el command, genera el event, y luego replica el event. Si el leader falla entre procesamiento y replicacion, el evento se pierde.
- En command sourcing, el command se replica antes de procesarse. Si el leader falla, el nuevo leader tiene el command en su log y puede re-procesarlo de forma determinista.
Esto simplifica el recovery: no hay ventana de vulnerabilidad entre procesamiento y replicacion.
Trade-offs¶
| Aspecto | Event Sourcing | Command Sourcing |
|---|---|---|
| Log contiene | Solo events | Commands + events |
| Replicacion | Despues de procesamiento | Antes de procesamiento |
| Recovery | Puede perder ultimo event | Siempre consistente |
| Tamaño del log | Menor | Mayor (commands + events) |
| Replay | Reproducir events | Reproducir events (commands se ignoran en replay) |
| Latencia | Menor (no espera replicacion antes de procesar) | Mayor (espera commit de Raft antes de procesar) |
Intent / RecordType system¶
Cada record en el log tiene un RecordType y un Intent que juntos definen la semantica:
RecordType¶
| Tipo | Descripcion | Quien lo escribe |
|---|---|---|
COMMAND |
Solicitud de accion. Representa intencion, no hecho. | Cliente (via gateway) o engine (commands internos) |
EVENT |
Hecho consumado. Resultado determinista de un COMMAND procesado. | Engine (StreamProcessor) |
COMMAND_REJECTION |
Rechazo de un COMMAND. Indica que el command no pudo procesarse (precondiciones fallidas). | Engine (StreamProcessor) |
Intent¶
El Intent describe la accion semantica del record. Es especifico al ValueType. Ejemplos:
Para ValueType.PROCESS_INSTANCE:
- CREATE (COMMAND) → CREATED (EVENT)
- CANCEL (COMMAND) → CANCELED (EVENT)
- ACTIVATE_ELEMENT (COMMAND) → ELEMENT_ACTIVATED (EVENT)
- COMPLETE_ELEMENT (COMMAND) → ELEMENT_COMPLETED (EVENT)
Para ValueType.JOB:
- CREATE → CREATED
- ACTIVATE → ACTIVATED
- COMPLETE → COMPLETED
- FAIL → FAILED
- TIME_OUT → TIMED_OUT
Convencion de naming: los intents de COMMANDs son verbos imperativos (CREATE, CANCEL, COMPLETE). Los intents de EVENTs son participios pasados (CREATED, CANCELED, COMPLETED). Esto refleja la distincion entre intencion y hecho.
Engine class y RecordProcessor bridge¶
La clase Engine actua como punto de entrada que conecta el stream processing con los processors individuales:
Engine {
processors: RecordProcessorMap
eventAppliers: Map<(Intent, version), TypedEventApplier>
process(TypedRecord, StreamWriter, ResponseWriter)
replay(TypedRecord)
}
Flujo de un command¶
StreamProcessorlee un COMMAND del log.- Llama a
Engine.process(record, streamWriter, responseWriter). EngineconsultaRecordProcessorMapcon(recordType, valueType, intent)para obtener el processor correcto.- El processor ejecuta logica de negocio:
- Lee estado de RocksDB.
- Valida precondiciones.
- Si valido: escribe EVENTs al
StreamWriter. - Si invalido: escribe COMMAND_REJECTION al
StreamWriter. - Opcionalmente: envia respuesta al cliente via
ResponseWriter. - Los EVENTs escritos se pasan a los
EventAppliers correspondientes para actualizar el estado.
RecordProcessorMap¶
Tabla de dispatching construida al startup:
RecordProcessorMap {
// Key: (recordType, valueType, intent)
// Value: TypedRecordProcessor
Map<Tuple3<RecordType, ValueType, Intent>, TypedRecordProcessor> processors
get(recordType, valueType, intent) → TypedRecordProcessor
}
Registro¶
Cada TypedRecordProcessor declara que combinaciones maneja:
class CreateProcessInstanceProcessor implements TypedRecordProcessor {
// Maneja: COMMAND / PROCESS_INSTANCE / CREATE
boolean accepts(RecordType type, ValueType value, Intent intent) {
return type == COMMAND
&& value == PROCESS_INSTANCE
&& intent == CREATE;
}
}
Al startup, el engine escanea todos los processors registrados, llama accepts() con todas las combinaciones posibles, y construye el mapa. Esto permite agregar nuevos processors sin modificar el engine — basta registrar la clase.
EventAppliers¶
Los EventAppliers son responsables de actualizar el estado en RocksDB cuando se aplica un EVENT:
Versionado¶
Los EventAppliers estan versionados: (Intent, version). Esto permite evolucionar la logica de aplicacion de eventos manteniendo backward compatibility:
EventApplier v1: (CREATED, 1) → escribe a ELEMENT_INSTANCE_KEY
EventApplier v2: (CREATED, 2) → escribe a ELEMENT_INSTANCE_KEY + nueva column family
Durante replay de un log historico, se usa el EventApplier de la version correspondiente al event.
Replay¶
Durante replay (startup o failover):
ReplayStateMachinelee solo EVENTs del log (ignora COMMANDs y COMMAND_REJECTIONs).- Para cada EVENT, llama
EventApplier.applyState(). - El
EventAppliermodifica RocksDB identicamente a como lo hizo el leader original.
Esto funciona porque:
- Los EVENTs contienen toda la informacion necesaria para actualizar el estado.
- Los EventAppliers son deterministas: dado el mismo EVENT, siempre producen el mismo cambio de estado.
- No se necesita re-ejecutar la logica de negocio del processor (que podria depender de estado externo no-determinista).
Process Banning¶
Cuando un RecordProcessor lanza un error inesperado (no un rechazo de negocio, sino un NPE, ClassCastException, etc.):
onProcessingError()del processor se invoca.- El processor puede retornar
ProcessingError.PROCESS_BANNED. - El engine agrega el
processDefinitionKeya una ban list en memoria. - Todos los commands futuros para instancias de ese proceso se rechazan inmediatamente con un COMMAND_REJECTION de tipo
BANNED. - Las instancias activas del proceso baneado generan incidentes que requieren intervencion manual.
Proposito¶
El banning previene que un proceso defectuoso (con una definicion BPMN que trigger un bug en el engine) consuma recursos indefinidamente: - Sin banning, el engine reintentaria el command infinitamente. - Con banning, el engine falla rapido y los operadores pueden investigar.
Persistencia¶
La ban list es en memoria, no en RocksDB. Se reconstruye durante replay si los mismos errores ocurren. Esto es intencional: un hotfix del engine que corrige el bug permite que el proceso se des-banee automaticamente en el siguiente restart.
Diferencia con Event Sourcing puro: resumen¶
| Aspecto | Event Sourcing puro | Command Sourcing (Zeebe) |
|---|---|---|
| Que se escribe al log primero | Event (despues de procesar) | Command (antes de procesar) |
| Replicacion | Replica events | Replica commands, luego genera events |
| Replay usa | Events | Events (commands se ignoran en replay) |
| Ventana de perdida | Entre procesamiento y replicacion | Ninguna (command se replica antes de procesar) |
| Complejidad | Menor | Mayor (dos tipos de records en el log) |
| Idempotencia | Events son idempotentes por definicion | Commands pueden generar efectos diferentes segun estado |
La clave del diseño: el log de Raft contiene el command como fuente de verdad para replicacion, pero el replay usa solo los events como fuente de verdad para reconstruir estado. Esto combina las ventajas de ambos patrones.
Implicaciones para implementacion simplificada¶
- Command sourcing es el patron correcto para un sistema distribuido con Raft: la replicacion antes del procesamiento elimina la ventana de vulnerabilidad.
- El sistema de Intent/RecordType es elegante y extensible. Adoptarlo tal cual.
- EventAppliers separados de processors: mantener esta separacion. Permite que replay sea simple y eficiente.
- RecordProcessorMap como tabla de dispatching: patron plugin-friendly. Facilita agregar nuevos tipos de commands sin modificar el core.
- Process banning: implementar desde el inicio. Sin el, un proceso defectuoso puede derribar el cluster.
- Versionado de EventAppliers: no esencial para MVP (asumir version 1 de todo), pero diseñar la interfaz para soportarlo en el futuro.