Saltar a contenido

Message Correlation en Zeebe

Resumen: La message correlation en Zeebe usa el correlation key para determinar la partición destino, garantiza deduplicación por messageId (combinación de name + correlationKey + messageId), y soporta matching contra intermediate catch events y start events. La comunicación cross-partition se maneja via SubscriptionCommandSender con retry cada 30 segundos, y los mensajes tienen TTL con expiración automática vía MessageTimeToLiveCheckScheduler.


Modelo de correlación

Correlation key y partición

El correlation key es la pieza central del sistema de mensajería de Zeebe. Determina:

  1. A qué partición se dirige el mensaje: Zeebe aplica una función hash sobre el correlation key para determinar la partición owner del mensaje. Esto garantiza que todos los mensajes con el mismo correlation key llegan a la misma partición.
  2. Con qué process instance se correlaciona: dentro de la partición, el correlation key se usa para encontrar subscriptions activas que coincidan.
PublishMessage(name="payment-received", correlationKey="order-123", ...)
    → hash("order-123") % numPartitions → partición 2
    → En partición 2: buscar subscriptions con name="payment-received" 
      AND correlationKey="order-123"

Por qué el correlation key determina la partición

Esta decisión de diseño tiene implicaciones profundas:

  • Localidad: la correlación es una operación local a la partición. No se necesita coordinación entre particiones para correlacionar un mensaje.
  • Consistencia: al estar en la misma partición, la correlación y la activación del element instance son una operación atómica en el log replicado.
  • Trade-off: si un process instance está en la partición 1 pero el correlation key mapea a la partición 2, se requiere comunicación cross-partition (ver sección abajo).

Deduplicación de mensajes

messageId: la clave de dedup

Zeebe garantiza at-least-once delivery de mensajes, pero proporciona deduplicación basada en el messageId:

Identidad del mensaje = name + correlationKey + messageId
  • name: tipo de mensaje (por ejemplo, "payment-received").
  • correlationKey: valor de correlación (por ejemplo, "order-123").
  • messageId: identificador único proporcionado por el publisher (por ejemplo, UUID).

Si se publica un mensaje con la misma combinación de name + correlationKey + messageId y el TTL del mensaje original no ha expirado, Zeebe lo rechaza como duplicado.

Ventana de deduplicación

La ventana de deduplicación está vinculada al TTL del mensaje:

  • Mientras el mensaje no haya expirado, su messageId se mantiene en el state store.
  • Después de la expiración, el messageId se purga y un mensaje con el mismo ID sería aceptado como nuevo.
  • Esto significa que el messageId proporciona exactly-once semantics dentro de la ventana de TTL.

Sin messageId

Si el publisher no proporciona un messageId, no se realiza deduplicación. El mensaje se acepta siempre. Esto es útil para escenarios donde la duplicación es manejada por la lógica de negocio.


Matching: intermediate catch events y start events

Intermediate catch events

Para process instances en ejecución que están esperando un mensaje (message intermediate catch event o message boundary event):

  1. Cuando un process instance llega a un message catch event, se crea una message subscription:
    MessageSubscription {
        processInstanceKey: 12345,
        elementInstanceKey: 67890,
        messageName: "payment-received",
        correlationKey: "order-123",  // evaluada desde variables del proceso
        partitionId: 2  // calculada desde el hash del correlationKey
    }
    
  2. Cuando llega un mensaje, se busca una subscription con matching name y correlationKey.
  3. Si hay match, el mensaje se correlaciona y el catch event se completa.
  4. Si no hay match, el mensaje se almacena temporalmente (buffered) esperando que una subscription aparezca antes de que expire el TTL.

Start events

Los message start events funcionan diferente:

  1. No hay process instance previa — el mensaje crea una nueva instancia.
  2. El matching se hace contra message start event subscriptions que se crean al desplegar un proceso con message start events.
  3. Un deployment genera subscriptions en todas las particiones (ya que no se sabe a cuál llegará el mensaje).
  4. Solo el latest version del proceso tiene subscriptions activas para start events.

Orden de matching

Cuando llega un mensaje, el matching sigue este orden:

  1. Primero intermediate catch events: si hay una subscription activa de un process instance en ejecución, el mensaje se correlaciona ahí.
  2. Después start events: solo si no hubo match con catch events, se verifica si hay un message start event subscription.
  3. Buffering: si no hay match en ninguno, el mensaje se almacena esperando una subscription futura.

Cross-partition communication

El problema

El correlation key determina la partición del mensaje, pero el process instance puede estar en cualquier partición. Cuando un process instance en la partición 1 espera un mensaje cuyo correlation key mapea a la partición 2:

Partición 1: process instance 12345, esperando "payment-received" con key "order-123"
Partición 2: mensaje "payment-received" con correlationKey "order-123" llega aquí

SubscriptionCommandSender

El SubscriptionCommandSender resuelve este problema:

  1. Cuando se crea una message subscription en la partición 1, el engine determina que el correlation key "order-123" pertenece a la partición 2.
  2. Se envía un comando OPEN_MESSAGE_SUBSCRIPTION a la partición 2 vía el SubscriptionCommandSender.
  3. La partición 2 registra la subscription y puede hacer matching local.
  4. Cuando un mensaje llega y hace match, la partición 2 envía un comando CORRELATE_MESSAGE_SUBSCRIPTION de vuelta a la partición 1.
  5. La partición 1 completa el catch event del process instance.

Retry cada 30 segundos

La comunicación cross-partition no está garantizada:

  • Los comandos inter-partition pueden perderse si una partición está temporalmente no disponible.
  • El SubscriptionCommandSender implementa un retry periódico cada 30 segundos.
  • En cada retry, se re-envían los comandos pendientes de subscriptions que no han sido confirmadas.
  • Esto garantiza eventual consistency: eventualmente todas las subscriptions se registran en la partición correcta.

Implicaciones de rendimiento

  • Latencia adicional: la correlación cross-partition agrega latencia de red (al menos un round-trip entre particiones).
  • Retry window: en el peor caso, un mensaje puede tardar hasta 30 segundos en correlacionarse si el comando de subscription se perdió y necesita retry.
  • Tráfico inter-partition: en sistemas con muchos message catch events y particiones, el tráfico de subscription commands puede ser significativo.

TTL y expiración de mensajes

MessageTimeToLiveCheckScheduler

Cada mensaje publicado tiene un Time-To-Live (TTL):

  • Si no se especifica, se usa un default configurable (típicamente 5 minutos).
  • El TTL determina cuánto tiempo el mensaje permanece buffered esperando una subscription.

El MessageTimeToLiveCheckScheduler es responsable de purgar mensajes expirados:

  1. Mantiene un schedule basado en los TTL de los mensajes almacenados.
  2. Periódicamente verifica qué mensajes han expirado.
  3. Los mensajes expirados se eliminan del state store.
  4. Los messageId de mensajes expirados también se purgan, cerrando la ventana de deduplicación.

Flujo de vida de un mensaje

1. PublishMessage(name, correlationKey, messageId, timeToLive=300000)  // 5 min TTL
2. Mensaje llega a la partición determinada por hash(correlationKey)
3. Intento de matching:
   a. Match encontrado → correlacionar y marcar como consumed
   b. No match → almacenar en buffer
4. Si buffered:
   a. Subscription aparece antes de TTL → correlacionar
   b. TTL expira → MessageTimeToLiveCheckScheduler purga el mensaje
5. Dedup: durante el TTL, mensajes con mismo (name, correlationKey, messageId) son rechazados
6. Post-expiración: el messageId se purga → nuevos mensajes con mismo ID son aceptados

Estado en RocksDB

Mensaje almacenado

Campo Descripción
messageKey Identificador único del mensaje
name Tipo de mensaje
correlationKey Valor de correlación
messageId ID para deduplicación (puede ser vacío)
variables Payload del mensaje (serializado como MessagePack)
timeToLive TTL en milisegundos
deadline Timestamp absoluto de expiración (publishTime + TTL)
state PUBLISHED, EXPIRED

Message subscription

Campo Descripción
processInstanceKey Process instance que espera el mensaje
elementInstanceKey Element instance específico (catch event)
messageName Nombre del mensaje esperado
correlationKey Valor de correlación (evaluado de variables del proceso)
state OPENING, OPENED, CORRELATING, CORRELATED, CLOSING, CLOSED

Consideraciones para un MVP

  • Esencial: la correlación basada en correlation key con matching local a la partición es el diseño correcto para escalabilidad.
  • Esencial: la deduplicación por messageId previene procesamiento duplicado en sistemas distribuidos.
  • Esencial: el matching contra catch events y start events cubre los dos casos de uso principales de mensajería en BPMN.
  • Simplificable para MVP: la comunicación cross-partition se puede diferir si el MVP usa una sola partición. El diseño de subscription commands se puede agregar cuando se implemente multi-partition.
  • Simplificable: el buffering con TTL puede empezar con un TTL fijo y sin dedup, añadiendo messageId dedup después.
  • Importante: el retry de 30 segundos para cross-partition es un valor pragmático; un MVP podría usar un intervalo más corto a costa de más tráfico.