Stream Processing en Zeebe¶
Resumen: Zeebe procesa comandos mediante un
StreamProcessorsingle-threaded por particion que implementa una maquina de estados deterministica. Los records (COMMAND, EVENT, COMMAND_REJECTION) se escriben a un log append-only; solo los COMMANDs se procesan activamente, mientras que los EVENTs se usan para replay del estado. El formato de serializacion es SBE (Simple Binary Encoding) para maximo rendimiento.
StreamProcessor y modelo de actor¶
StreamProcessor extiende Actor, lo que garantiza single-threaded execution per partition. Cada particion tiene exactamente un StreamProcessor en el leader. Este modelo elimina la necesidad de locks o sincronizacion: todo acceso al estado de la particion ocurre en un unico thread.
El Actor framework de Zeebe es un scheduler cooperativo basado en tareas. El StreamProcessor registra tareas que leen del log, procesan records y escriben resultados. Al ser cooperativo, cada tarea debe ceder el control voluntariamente (no hay preemption).
Implicaciones para implementacion: - No se necesitan estructuras de datos concurrentes dentro de una particion. - El throughput esta limitado por la velocidad de un solo core. - La escalabilidad horizontal se logra con mas particiones, no mas threads por particion. - Aproximadamente ~2 threads por particion en total (stream processing + I/O).
ProcessingStateMachine¶
El ciclo de procesamiento principal es una maquina de estados con las siguientes transiciones:
tryToReadNextRecord
→ processCommand
→ writeRecords
→ updateState
→ executeSideEffects
→ tryToReadNextRecord (loop)
Fases detalladas¶
-
tryToReadNextRecord: Lee el siguiente record del log (Raft log). Filtra por tipo: solo records conrecordType == COMMANDse procesan. LosEVENTyCOMMAND_REJECTIONse saltan durante procesamiento normal (son resultado de procesamiento anterior). -
processCommand: Invoca elRecordProcessorcorrespondiente al tipo de comando. El processor lee el estado actual desde RocksDB, valida precondiciones, y genera cero o mas records de salida (eventos o rechazos). -
writeRecords: Los records generados se escriben al log de Raft de forma atomica (batch write). Esto es critico: o todos los records de un comando se escriben, o ninguno. -
updateState: Una vez confirmada la escritura, se actualiza el estado en RocksDB. LosEventAppliers correspondientes modifican las column families. Esta fase es deterministica: dado el mismo evento, siempre produce el mismo cambio de estado. -
executeSideEffects: Efectos secundarios no-deterministas (enviar respuesta gRPC al cliente, notificar job workers, disparar timers). Esta fase puede fallar sin afectar la consistencia: si falla, el side effect se puede reintentar o el cliente detecta timeout y re-envia.
ReplayStateMachine¶
Usado durante startup o failover para reconstruir el estado en memoria y en RocksDB desde el ultimo snapshot.
Flujo:
1. Cargar el ultimo snapshot de RocksDB (checkpoint).
2. Leer todos los records del log posteriores al snapshot.
3. Para cada record: solo replay EVENT records — los COMMANDs se ignoran durante replay porque los EVENTs ya contienen el resultado determinista del procesamiento.
4. Cada EVENT se pasa al EventApplier correspondiente que actualiza el estado en RocksDB.
Diferencia clave con el procesamiento normal: - Normal: lee COMMANDs, genera EVENTs, escribe al log, actualiza estado. - Replay: lee EVENTs, aplica directamente al estado. No genera nada nuevo.
Esto garantiza que el estado reconstruido es identico al que tenia el leader original, porque los EVENTs son deterministas.
Record format¶
Cada record en el log tiene la siguiente estructura:
| Campo | Descripcion |
|---|---|
position |
Posicion absoluta en el log (monotonicamente creciente) |
sourceRecordPosition |
Posicion del record que origino este (e.g., el COMMAND que genero un EVENT) |
key |
Identificador unico del recurso. Codifica el partition ID en los 13 bits superiores y un key local de 51 bits en los inferiores. Ver concepts/partitioning |
intent |
Intencion semantica del record (e.g., CREATE, COMPLETE, ACTIVATE, CANCEL, EXPIRE) |
recordType |
Tipo: COMMAND, EVENT, COMMAND_REJECTION |
valueType |
Tipo del payload: PROCESS_INSTANCE, JOB, MESSAGE, TIMER, VARIABLE, INCIDENT, DEPLOYMENT, etc. |
value |
Payload serializado con el contenido especifico del tipo |
Relacion entre campos¶
recordType+valueType+intentforman la clave de dispatching: determinan queTypedRecordProcessormaneja el record.sourceRecordPositionpermite trazar la cadena causal: un EVENT siempre apunta al COMMAND que lo origino.keypermite lookup directo del recurso en RocksDB sin ambiguedad de particion.
SBE (Simple Binary Encoding)¶
Zeebe usa SBE como wire format para serializar records, no Protocol Buffers ni JSON. SBE es un formato binario de baja latencia desarrollado originalmente para trading de alta frecuencia (FIX/SBE standard).
Caracteristicas relevantes:
- Zero-copy reads: los campos se leen directamente del buffer sin deserializacion.
- Fixed-size fields: acceso O(1) por offset.
- Variable-length fields: al final del mensaje (strings, bytes).
- Schema-driven: los schemas .xml generan codigo Java en compile-time.
- Sin overhead de reflection: todo es typed y compilado.
Trade-offs: - Mucho mas rapido que Protobuf para encode/decode. - Menos flexible: cambiar schemas requiere cuidado con backward compatibility. - Sin soporte nativo para tipos complejos anidados (se maneja con grupos repetidos).
Para una implementacion simplificada, Protobuf o FlatBuffers son alternativas viables si no se necesita el rendimiento extremo de SBE.
RecordProcessor interface¶
La interfaz RecordProcessor define el contrato que cada procesador de tipos debe implementar:
RecordProcessor {
accepts(valueType, intent, recordType) → boolean
replay(TypedRecord) → void
process(TypedRecord, ResponseWriter, StreamWriter) → void
onProcessingError(Throwable, TypedRecord, ErrorResponseWriter) → ProcessingError
}
accepts(): Declara que combinaciones de (valueType, intent, recordType) este processor maneja. ElRecordProcessorMapusa esto para construir la tabla de dispatching.replay(): Llamado durante replay. Actualiza estado sin generar nuevos records.process(): Logica principal. Lee estado, valida, escribe EVENTs o COMMAND_REJECTIONs alStreamWriter, y envia respuesta al cliente viaResponseWriter.onProcessingError(): Manejo de errores inesperados. Decide si reintentar, rechazar, o banear la process instance.
RecordProcessorMap¶
Tabla de dispatching que mapea (recordType, valueType, intent) → TypedRecordProcessor. Se construye al inicio escaneando todos los processors registrados y sus declaraciones accepts().
Cuando llega un record, el StreamProcessor consulta esta tabla para obtener el processor correcto. Si no hay match, el record se ignora (lo cual es un invariante que no deberia ocurrir para COMMANDs validos).
Batch processing¶
Para mejorar throughput, Zeebe procesa multiples commands en un batch antes de hacer flush al log de Raft.
Parametros clave:
- maxCommandsInBatch: numero maximo de comandos procesados antes de forzar un write al log. Controla el trade-off entre latencia (batches pequeños) y throughput (batches grandes).
Retry con batches mas pequeños¶
Si un batch falla (e.g., por un error en un comando del medio), Zeebe puede reintentar con un batch mas pequeño para aislar el comando problematico: 1. Reducir el tamaño del batch a la mitad. 2. Reintentar. 3. Si sigue fallando, reducir a 1 (procesamiento individual). 4. Si el comando individual falla, aplicar la retry strategy correspondiente.
Retry strategies¶
Zeebe implementa tres estrategias de retry para errores durante procesamiento:
AbortableRetryStrategy¶
Usada cuando el error es no-recuperable (e.g., violacion de invariante, bug en el engine). Aborta el procesamiento del comando y genera un COMMAND_REJECTION con el motivo del error. El proceso puede ser baneado para evitar reintentos infinitos.
RecoverableRetryStrategy¶
Usada para errores transitorios (e.g., RocksDB busy, write temporalmente fallido). Reintenta el procesamiento despues de un backoff. Si el maximo de reintentos se supera, escala a AbortableRetryStrategy.
Interaccion con Process Banning¶
Cuando un RecordProcessor lanza un error inesperado durante process(), el callback onProcessingError() puede devolver ProcessingError.PROCESS_BANNED. Esto marca la definicion de proceso como baneada: ninguna instancia nueva se creara y las existentes generaran incidentes. Esto previene que un proceso defectuoso consuma recursos indefinidamente.
Implicaciones para implementacion simplificada¶
Para construir un engine simplificado inspirado en este modelo:
- Mantener single-thread por particion: es la garantia fundamental de consistencia. No intentar paralelizar el procesamiento dentro de una particion.
- Separar procesamiento de replay: usar EVENTs para replay, COMMANDs para procesamiento. Esto permite reconstruir estado sin re-ejecutar logica de negocio.
- SBE es opcional: Protobuf o incluso JSON son suficientes para un MVP. SBE solo se justifica en escenarios de ultra-baja latencia.
- Batch processing es una optimizacion: empezar con procesamiento record-by-record y añadir batching cuando el throughput lo requiera.
- El modelo de side effects es critico: separar la logica pura (determinista) de los efectos secundarios (no-deterministas) permite recovery limpio.
Ver tambien: concepts/command-sourcing para el patron fundamental, concepts/raft-consensus para como se replica el log, concepts/rocksdb-state-store para el almacenamiento de estado.