Saltar a contenido

RocksDB State Store en Zeebe

Resumen: Zeebe usa RocksDB como state store por particion, accedido a traves de ZeebeDb, una capa de abstraccion type-safe con column families genericas. El esquema tiene 142+ column families organizadas en scopes GLOBAL y PARTITION_LOCAL. Las keys usan tipos compuestos (DbCompositeKey, DbTenantAwareKey) para lookups eficientes. Los snapshots se crean como RocksDB checkpoints y se transmiten en chunks para replicacion.


ZeebeDb: capa de abstraccion

ZeebeDb<ColumnFamilyType> es el wrapper principal sobre RocksDB que provee acceso type-safe:

ZeebeDb<ZbColumnFamilies> {
    createColumnFamily(ZbColumnFamilies, TransactionContext, KeyType, ValueType)
        → ColumnFamily<KeyType, ValueType>

    createContext() → TransactionContext
    createSnapshot(path) → void
}

Generics y type-safety

El parametro ColumnFamilyType es un enum (ZbColumnFamilies) que enumera todas las column families posibles. Esto hace que: - Los errores de nombres de column families se detectan en compile time, no en runtime. - Cada column family tiene tipos de key y value definidos en su creacion. - No es posible accidentalmente escribir un tipo incorrecto de key/value en una column family.


ColumnFamily

La interfaz principal para interactuar con datos. Opera como un key-value store tipado:

Operaciones

Operacion Semantica
insert(key, value) Inserta. Falla si el key ya existe. Previene overwrites accidentales.
update(key, value) Actualiza. Falla si el key no existe. Previene creacion accidental.
upsert(key, value) Inserta o actualiza. No falla nunca (excepto errores de I/O).
get(key) Retorna el valor asociado al key, o null si no existe.
deleteExisting(key) Elimina. Falla si el key no existe.
delete(key) Elimina si existe, no-op si no.
forEach(consumer) Itera sobre todos los pares key-value en la column family.
whileEqualPrefix(prefix, consumer) Itera sobre todos los entries cuyo key comparte el prefijo dado. Operacion critica para range queries eficientes.
isEmpty() Retorna true si la column family no tiene entries.
count() Retorna el numero de entries.

Semantica de insert vs update vs upsert

La distincion insert/update es una invariante de safety: si el engine intenta insertar un recurso que ya existe, hay un bug. Fallar ruidosamente (exception) es preferible a corromper datos silenciosamente. upsert se usa solo cuando la logica del engine explicitamente permite ambos casos (e.g., actualizar variables que pueden o no existir).


Key types

Zeebe define tipos de key especificos para RocksDB. Todos implementan una interfaz comun que sabe serializar/deserializar a bytes:

Tipos simples

Tipo Descripcion Tamaño
DbLong Entero de 64 bits. Usado para keys basados en IDs (process instance key, job key). 8 bytes
DbInt Entero de 32 bits. Usado para enums serializados o contadores pequeños. 4 bytes
DbString String UTF-8 con prefijo de longitud. Usado para nombres de procesos, variables. Variable
DbBytes Bytes arbitrarios con prefijo de longitud. Usado para payloads. Variable
DbNil Key vacio (0 bytes). Usado para column families que son singletons (e.g., el generador de keys). 0 bytes

Tipos compuestos

Tipo Descripcion Uso principal
DbCompositeKey<F, S> Concatena dos keys en uno. El primer key (F) actua como prefijo para range queries con whileEqualPrefix. Buscar todos los jobs de una instancia: CompositeKey<processInstanceKey, jobKey>
DbForeignKey<T> Wrapper semantico que indica que el key referencia a otra column family. No cambia la serializacion, pero documenta la relacion. Foreign keys entre column families
DbTenantAwareKey<K> Agrega el tenant ID como prefijo al key subyacente. Permite multi-tenancy: whileEqualPrefix(tenantId) retorna solo datos del tenant. Todas las column families tenant-aware

DbCompositeKey en detalle

DbCompositeKey es el mecanismo fundamental para indices secundarios en RocksDB:

// Definicion
DbCompositeKey<DbLong, DbLong> processInstanceJobKey

// Serialized layout:
|-- process instance key (8 bytes) --|-- job key (8 bytes) --|

// Range query: todos los jobs de la instancia 12345
processInstanceJobKey.first().set(12345)
columnFamily.whileEqualPrefix(processInstanceJobKey.first(), (key, value) -> {
    // itera sobre todos los jobs cuya instancia es 12345
})

Esto simula indices secundarios que RocksDB no soporta nativamente. Al poner el "indice" como prefijo del key, el range scan de RocksDB (que es sorted by key) retorna exactamente los entries deseados.


ZbColumnFamilies: 142+ column families

El enum ZbColumnFamilies define todas las column families. Cada una tiene un scope:

Scopes

  • GLOBAL: datos compartidos entre todas las particiones de un broker. Raro; usado para metadata del broker.
  • PARTITION_LOCAL: datos especificos de una particion. La gran mayoria.

Column families clave

Estado de procesos

Column Family Key Value Descripcion
PROCESS_CACHE process definition key BPMN serializado Cache de definiciones de proceso. Evita re-parsear BPMN en cada instancia.
ELEMENT_INSTANCE_KEY element instance key ElementInstanceRecord Estado de cada elemento BPMN activo (task, gateway, subprocess, etc.).
ELEMENT_INSTANCE_PARENT_KEY CompositeKey DbNil Indice parent→children para navegacion jerarquica.

Jobs

Column Family Key Value Descripcion
JOBS job key JobRecord Datos del job (type, retries, deadline, variables).
JOB_STATES job key JobState enum Estado actual: ACTIVATABLE, ACTIVATED, FAILED, ERROR_THROWN, TIMED_OUT, COMPLETED, CANCELED.
JOB_DEADLINES CompositeKey DbNil Indice por deadline para detectar timeouts. Key compuesto permite range scan de "todos los jobs con deadline < now".
JOB_ACTIVATABLE CompositeKey DbNil Indice de jobs listos para activar, agrupados por type. El job worker query usa whileEqualPrefix(type).

Mensajes

Column Family Key Value Descripcion
MESSAGES message key MessageRecord Datos del mensaje (name, correlationKey, TTL, variables).
MESSAGE_CORR_KEY CompositeKey message key Indice para message correlation: buscar por nombre+correlationKey.

Timers

Column Family Key Value Descripcion
TIMERS timer key TimerRecord Datos del timer (dueDate, repetitions, elementId).
TIMER_DUE_DATES CompositeKey DbNil Indice por dueDate para evaluacion eficiente.

Incidentes

Column Family Key Value Descripcion
INCIDENTS incident key IncidentRecord Datos del incidente (type, processInstanceKey, jobKey, errorMessage).

Variables

Column Family Key Value Descripcion
VARIABLES CompositeKey VariableValue Variables de proceso, scoped a nivel de elemento.

Key generator

Column Family Key Value Descripcion
KEY DbNil DbLong Singleton: proximo key local disponible. Incrementado atomicamente en cada asignacion.

Exporter

Column Family Key Value Descripcion
EXPORTER exporter ID ExporterPosition Posicion del ultimo record exportado por cada exporter. Para resume despues de restart.

Snapshots

Creacion

Zeebe crea snapshots periodicos del estado RocksDB usando la funcionalidad nativa de RocksDB checkpoint:

rocksDb.checkpoint(snapshotPath)

Un checkpoint es una copia point-in-time del estado completo de RocksDB. Es barata de crear porque usa hard links a los SST files existentes (copy-on-write semantics). Solo se copian los archivos WAL y MANIFEST.

Transmision via SnapshotChunk

Cuando un follower necesita recibir un snapshot (e.g., esta muy atrasado y no puede hacer catch-up via log replication), el leader transmite el snapshot en chunks:

SnapshotChunk {
    snapshotId: String       // identificador del snapshot
    chunkName: String        // nombre del archivo dentro del snapshot
    totalCount: int          // total de chunks
    checksum: long           // CRC para integridad
    data: byte[]             // contenido del chunk
    snapshotChecksum: long   // checksum del snapshot completo
}

El follower: 1. Recibe todos los chunks. 2. Verifica checksums. 3. Ensambla el snapshot en disco. 4. Reemplaza su RocksDB con el snapshot recibido. 5. Reanuda log replication desde la posicion del snapshot.


LogCompactor

Despues de crear un snapshot exitoso, el LogCompactor elimina entries del log de Raft que ya estan incluidos en el snapshot:

LogCompactor {
    compact(snapshotPosition) {
        // Eliminar todos los log entries con position <= snapshotPosition
        raftLog.deleteRange(0, snapshotPosition)
    }
}

Esto previene que el log crezca indefinidamente. Sin compactacion, el log eventualmente llenaria el disco.

Balance

  • Snapshots muy frecuentes: mas I/O de disco pero logs mas cortos.
  • Snapshots muy infrecuentes: menos I/O pero logs largos y replay mas lento en recovery.
  • Valor tipico: snapshot cada ~5 minutos o cada N records procesados.

TransactionContext

TransactionContext wraps una transaccion de RocksDB para garantizar atomicidad de las escrituras del engine:

TransactionContext {
    getCurrentTransaction() → Transaction
    runInTransaction(Runnable) → void
}

Dentro de una iteracion del ProcessingStateMachine (ver concepts/stream-processing): 1. Se abre una transaccion RocksDB. 2. Todos los EventAppliers escriben al estado dentro de esta transaccion. 3. Si todo sale bien, se commitea atomicamente. 4. Si algo falla, se hace rollback: el estado queda intacto.

Esto garantiza que el estado de RocksDB es siempre consistente: o todos los cambios de un comando se aplican, o ninguno.


Implicaciones para implementacion simplificada

  1. RocksDB es una buena eleccion para el state store: embebido, rapido, maduro, con transacciones y snapshots nativos. Alternativas: LMDB, SQLite (mas simple pero sin column families nativas).
  2. La capa de abstraccion type-safe es importante: previene errores sutiles con column families. Implementar algo similar (generics o wrappers tipados).
  3. DbCompositeKey para indices: este patron es esencial. Sin el, las queries comunes (todos los jobs de un tipo, todas las variables de un scope) requeririan full scans.
  4. 142 column families es excesivo para un MVP: empezar con ~20 column families cubriendo procesos, jobs, mensajes, timers, variables, incidentes, y el key generator.
  5. Snapshots via checkpoint: usar la funcionalidad nativa de RocksDB. No implementar snapshots custom.
  6. TransactionContext: implementar siempre. La atomicidad de escrituras es un invariante de consistencia no-negociable.
  7. Multi-tenancy via DbTenantAwareKey: es una funcionalidad avanzada. Para MVP, no implementar multi-tenancy y simplificar todas las keys.