RocksDB State Store en Zeebe¶
Resumen: Zeebe usa RocksDB como state store por particion, accedido a traves de
ZeebeDb, una capa de abstraccion type-safe con column families genericas. El esquema tiene 142+ column families organizadas en scopes GLOBAL y PARTITION_LOCAL. Las keys usan tipos compuestos (DbCompositeKey,DbTenantAwareKey) para lookups eficientes. Los snapshots se crean como RocksDB checkpoints y se transmiten en chunks para replicacion.
ZeebeDb: capa de abstraccion¶
ZeebeDb<ColumnFamilyType> es el wrapper principal sobre RocksDB que provee acceso type-safe:
ZeebeDb<ZbColumnFamilies> {
createColumnFamily(ZbColumnFamilies, TransactionContext, KeyType, ValueType)
→ ColumnFamily<KeyType, ValueType>
createContext() → TransactionContext
createSnapshot(path) → void
}
Generics y type-safety¶
El parametro ColumnFamilyType es un enum (ZbColumnFamilies) que enumera todas las column families posibles. Esto hace que:
- Los errores de nombres de column families se detectan en compile time, no en runtime.
- Cada column family tiene tipos de key y value definidos en su creacion.
- No es posible accidentalmente escribir un tipo incorrecto de key/value en una column family.
ColumnFamily¶
La interfaz principal para interactuar con datos. Opera como un key-value store tipado:
Operaciones¶
| Operacion | Semantica |
|---|---|
insert(key, value) |
Inserta. Falla si el key ya existe. Previene overwrites accidentales. |
update(key, value) |
Actualiza. Falla si el key no existe. Previene creacion accidental. |
upsert(key, value) |
Inserta o actualiza. No falla nunca (excepto errores de I/O). |
get(key) |
Retorna el valor asociado al key, o null si no existe. |
deleteExisting(key) |
Elimina. Falla si el key no existe. |
delete(key) |
Elimina si existe, no-op si no. |
forEach(consumer) |
Itera sobre todos los pares key-value en la column family. |
whileEqualPrefix(prefix, consumer) |
Itera sobre todos los entries cuyo key comparte el prefijo dado. Operacion critica para range queries eficientes. |
isEmpty() |
Retorna true si la column family no tiene entries. |
count() |
Retorna el numero de entries. |
Semantica de insert vs update vs upsert¶
La distincion insert/update es una invariante de safety: si el engine intenta insertar un recurso que ya existe, hay un bug. Fallar ruidosamente (exception) es preferible a corromper datos silenciosamente. upsert se usa solo cuando la logica del engine explicitamente permite ambos casos (e.g., actualizar variables que pueden o no existir).
Key types¶
Zeebe define tipos de key especificos para RocksDB. Todos implementan una interfaz comun que sabe serializar/deserializar a bytes:
Tipos simples¶
| Tipo | Descripcion | Tamaño |
|---|---|---|
DbLong |
Entero de 64 bits. Usado para keys basados en IDs (process instance key, job key). | 8 bytes |
DbInt |
Entero de 32 bits. Usado para enums serializados o contadores pequeños. | 4 bytes |
DbString |
String UTF-8 con prefijo de longitud. Usado para nombres de procesos, variables. | Variable |
DbBytes |
Bytes arbitrarios con prefijo de longitud. Usado para payloads. | Variable |
DbNil |
Key vacio (0 bytes). Usado para column families que son singletons (e.g., el generador de keys). | 0 bytes |
Tipos compuestos¶
| Tipo | Descripcion | Uso principal |
|---|---|---|
DbCompositeKey<F, S> |
Concatena dos keys en uno. El primer key (F) actua como prefijo para range queries con whileEqualPrefix. |
Buscar todos los jobs de una instancia: CompositeKey<processInstanceKey, jobKey> |
DbForeignKey<T> |
Wrapper semantico que indica que el key referencia a otra column family. No cambia la serializacion, pero documenta la relacion. | Foreign keys entre column families |
DbTenantAwareKey<K> |
Agrega el tenant ID como prefijo al key subyacente. Permite multi-tenancy: whileEqualPrefix(tenantId) retorna solo datos del tenant. |
Todas las column families tenant-aware |
DbCompositeKey en detalle¶
DbCompositeKey es el mecanismo fundamental para indices secundarios en RocksDB:
// Definicion
DbCompositeKey<DbLong, DbLong> processInstanceJobKey
// Serialized layout:
|-- process instance key (8 bytes) --|-- job key (8 bytes) --|
// Range query: todos los jobs de la instancia 12345
processInstanceJobKey.first().set(12345)
columnFamily.whileEqualPrefix(processInstanceJobKey.first(), (key, value) -> {
// itera sobre todos los jobs cuya instancia es 12345
})
Esto simula indices secundarios que RocksDB no soporta nativamente. Al poner el "indice" como prefijo del key, el range scan de RocksDB (que es sorted by key) retorna exactamente los entries deseados.
ZbColumnFamilies: 142+ column families¶
El enum ZbColumnFamilies define todas las column families. Cada una tiene un scope:
Scopes¶
GLOBAL: datos compartidos entre todas las particiones de un broker. Raro; usado para metadata del broker.PARTITION_LOCAL: datos especificos de una particion. La gran mayoria.
Column families clave¶
Estado de procesos¶
| Column Family | Key | Value | Descripcion |
|---|---|---|---|
PROCESS_CACHE |
process definition key | BPMN serializado | Cache de definiciones de proceso. Evita re-parsear BPMN en cada instancia. |
ELEMENT_INSTANCE_KEY |
element instance key | ElementInstanceRecord | Estado de cada elemento BPMN activo (task, gateway, subprocess, etc.). |
ELEMENT_INSTANCE_PARENT_KEY |
CompositeKey |
DbNil | Indice parent→children para navegacion jerarquica. |
Jobs¶
| Column Family | Key | Value | Descripcion |
|---|---|---|---|
JOBS |
job key | JobRecord | Datos del job (type, retries, deadline, variables). |
JOB_STATES |
job key | JobState enum | Estado actual: ACTIVATABLE, ACTIVATED, FAILED, ERROR_THROWN, TIMED_OUT, COMPLETED, CANCELED. |
JOB_DEADLINES |
CompositeKey |
DbNil | Indice por deadline para detectar timeouts. Key compuesto permite range scan de "todos los jobs con deadline < now". |
JOB_ACTIVATABLE |
CompositeKey |
DbNil | Indice de jobs listos para activar, agrupados por type. El job worker query usa whileEqualPrefix(type). |
Mensajes¶
| Column Family | Key | Value | Descripcion |
|---|---|---|---|
MESSAGES |
message key | MessageRecord | Datos del mensaje (name, correlationKey, TTL, variables). |
MESSAGE_CORR_KEY |
CompositeKey |
message key | Indice para message correlation: buscar por nombre+correlationKey. |
Timers¶
| Column Family | Key | Value | Descripcion |
|---|---|---|---|
TIMERS |
timer key | TimerRecord | Datos del timer (dueDate, repetitions, elementId). |
TIMER_DUE_DATES |
CompositeKey |
DbNil | Indice por dueDate para evaluacion eficiente. |
Incidentes¶
| Column Family | Key | Value | Descripcion |
|---|---|---|---|
INCIDENTS |
incident key | IncidentRecord | Datos del incidente (type, processInstanceKey, jobKey, errorMessage). |
Variables¶
| Column Family | Key | Value | Descripcion |
|---|---|---|---|
VARIABLES |
CompositeKey |
VariableValue | Variables de proceso, scoped a nivel de elemento. |
Key generator¶
| Column Family | Key | Value | Descripcion |
|---|---|---|---|
KEY |
DbNil | DbLong | Singleton: proximo key local disponible. Incrementado atomicamente en cada asignacion. |
Exporter¶
| Column Family | Key | Value | Descripcion |
|---|---|---|---|
EXPORTER |
exporter ID | ExporterPosition | Posicion del ultimo record exportado por cada exporter. Para resume despues de restart. |
Snapshots¶
Creacion¶
Zeebe crea snapshots periodicos del estado RocksDB usando la funcionalidad nativa de RocksDB checkpoint:
Un checkpoint es una copia point-in-time del estado completo de RocksDB. Es barata de crear porque usa hard links a los SST files existentes (copy-on-write semantics). Solo se copian los archivos WAL y MANIFEST.
Transmision via SnapshotChunk¶
Cuando un follower necesita recibir un snapshot (e.g., esta muy atrasado y no puede hacer catch-up via log replication), el leader transmite el snapshot en chunks:
SnapshotChunk {
snapshotId: String // identificador del snapshot
chunkName: String // nombre del archivo dentro del snapshot
totalCount: int // total de chunks
checksum: long // CRC para integridad
data: byte[] // contenido del chunk
snapshotChecksum: long // checksum del snapshot completo
}
El follower: 1. Recibe todos los chunks. 2. Verifica checksums. 3. Ensambla el snapshot en disco. 4. Reemplaza su RocksDB con el snapshot recibido. 5. Reanuda log replication desde la posicion del snapshot.
LogCompactor¶
Despues de crear un snapshot exitoso, el LogCompactor elimina entries del log de Raft que ya estan incluidos en el snapshot:
LogCompactor {
compact(snapshotPosition) {
// Eliminar todos los log entries con position <= snapshotPosition
raftLog.deleteRange(0, snapshotPosition)
}
}
Esto previene que el log crezca indefinidamente. Sin compactacion, el log eventualmente llenaria el disco.
Balance¶
- Snapshots muy frecuentes: mas I/O de disco pero logs mas cortos.
- Snapshots muy infrecuentes: menos I/O pero logs largos y replay mas lento en recovery.
- Valor tipico: snapshot cada ~5 minutos o cada N records procesados.
TransactionContext¶
TransactionContext wraps una transaccion de RocksDB para garantizar atomicidad de las escrituras del engine:
Dentro de una iteracion del ProcessingStateMachine (ver concepts/stream-processing):
1. Se abre una transaccion RocksDB.
2. Todos los EventAppliers escriben al estado dentro de esta transaccion.
3. Si todo sale bien, se commitea atomicamente.
4. Si algo falla, se hace rollback: el estado queda intacto.
Esto garantiza que el estado de RocksDB es siempre consistente: o todos los cambios de un comando se aplican, o ninguno.
Implicaciones para implementacion simplificada¶
- RocksDB es una buena eleccion para el state store: embebido, rapido, maduro, con transacciones y snapshots nativos. Alternativas: LMDB, SQLite (mas simple pero sin column families nativas).
- La capa de abstraccion type-safe es importante: previene errores sutiles con column families. Implementar algo similar (generics o wrappers tipados).
- DbCompositeKey para indices: este patron es esencial. Sin el, las queries comunes (todos los jobs de un tipo, todas las variables de un scope) requeririan full scans.
- 142 column families es excesivo para un MVP: empezar con ~20 column families cubriendo procesos, jobs, mensajes, timers, variables, incidentes, y el key generator.
- Snapshots via checkpoint: usar la funcionalidad nativa de RocksDB. No implementar snapshots custom.
- TransactionContext: implementar siempre. La atomicidad de escrituras es un invariante de consistencia no-negociable.
- Multi-tenancy via DbTenantAwareKey: es una funcionalidad avanzada. Para MVP, no implementar multi-tenancy y simplificar todas las keys.