Saltar a contenido

Worker SDK Go — diseño de referencia

SDK Go minimalista para implementar workers (job handlers). Diseño orientado a DX < 5 min de hello-world, idempotencia explícita, observability built-in, y zero magic. Referencia para implementación M2.

Filosofía

  1. Convención > config: defaults sensatos, override sólo cuando necesites.
  2. Zero magic: no annotations runtime, no reflection AST, no DI container. Funciones simples.
  3. Idempotencia explícita: el SDK no la implementa por ti, pero hace fácil hacer lo correcto.
  4. Observability first: traces, metrics y structured logs out-of-the-box vía OpenTelemetry.
  5. Compilable, no interpretado: errores de tipo en build time, no en runtime.

Anti-patrón a evitar: el SDK de Zeebe Java tiene mucho ceremonial (builders, fluents, annotations Spring). Aspiramos a una API que se sienta como http.HandleFunc.

API surface (target M2)

Cliente

package wfclient

// New crea un cliente conectado al engine.
// La URL puede ser http://localhost:8080 (dev) o https://wf.example.com (prod).
func New(url string, opts ...Option) (*Client, error)

// Options
func WithAuth(provider TokenProvider) Option
func WithTLSConfig(cfg *tls.Config) Option
func WithLogger(l *slog.Logger) Option
func WithTracerProvider(tp trace.TracerProvider) Option
func WithMeterProvider(mp metric.MeterProvider) Option
func WithRetryPolicy(p RetryPolicy) Option
func WithTimeout(d time.Duration) Option

Job handler — la primitiva fundamental

package wfclient

// Job es lo que recibe el handler. Inmutable.
type Job struct {
    Key           int64             // identificador único e idempotente
    Type          string            // p.ej. "send-email"
    ProcessInstance int64
    ElementID     string            // BPMN element ID (service task)
    Variables     map[string]any    // input variables (subset visible al worker)
    Headers       map[string]string // task headers definidos en el modelo
    Retries       int32             // intentos restantes
    Deadline      time.Time         // job expira si no se completa antes
}

// JobHandler es la signatura de un worker.
// Devolver nil = completar exitosamente.
// Devolver error envuelto en BPMNError(code, message) = BPMN error boundary.
// Devolver cualquier otro error = job fail (consume retry).
// Devolver IncidentError = crea incident sin consumir retries.
type JobHandler func(ctx context.Context, job *Job) (output map[string]any, err error)

// Helpers de error
func BPMNError(code, message string) error  // dispara error boundary event
func IncidentError(message string) error    // crea incident (operator intervención)
func FailWithRetry(message string, retryAt time.Time) error  // retry custom

// Registrar un worker
type WorkerOptions struct {
    Type             string        // BPMN task type (obligatorio)
    Concurrency      int           // jobs concurrentes (default: GOMAXPROCS)
    Timeout          time.Duration // tiempo máx por job (default: 5min)
    PollInterval     time.Duration // back-off cuando no hay jobs (default: 1s)
    MaxJobsActivated int           // jobs por poll (default: 32)
    FetchVariables   []string      // sólo trae estas variables (default: todas)
    StreamEnabled    bool          // long-polling streaming (default: true)
    Tags             []string      // metadata para observability
}

func (c *Client) RegisterWorker(opts WorkerOptions, h JobHandler) error

Ejemplo mínimo: hello-world

package main

import (
    "context"
    "log/slog"

    "github.com/example/wf-sdk-go/wfclient"
)

func main() {
    client, err := wfclient.New("http://localhost:8080")
    if err != nil {
        slog.Error("connect failed", "err", err)
        return
    }
    defer client.Close()

    client.RegisterWorker(wfclient.WorkerOptions{
        Type: "send-email",
    }, func(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
        to := job.Variables["recipient"].(string)
        subject := job.Variables["subject"].(string)

        if err := sendEmail(ctx, to, subject); err != nil {
            return nil, err  // job fail; consume retry
        }

        return map[string]any{
            "sentAt": time.Now().Format(time.RFC3339),
        }, nil
    })

    client.Run(context.Background())  // bloquea hasta SIGTERM
}

5 min de hello-world: cumplido si tienes Go y un engine corriendo.

Ejemplo intermedio: idempotencia + BPMN error

client.RegisterWorker(wfclient.WorkerOptions{
    Type:        "charge-payment",
    Concurrency: 8,
    Timeout:     30 * time.Second,
}, func(ctx context.Context, job *wfclient.Job) (map[string]any, error) {
    // Idempotency key: el job.Key es único y estable
    idempotencyKey := fmt.Sprintf("wf-%d", job.Key)
    amount := job.Variables["amount"].(float64)
    customer := job.Variables["customerId"].(string)

    result, err := stripe.Charge(ctx, customer, amount, idempotencyKey)
    if err != nil {
        if errors.Is(err, stripe.ErrInsufficientFunds) {
            // BPMN error -> dispara boundary event "insufficient-funds"
            return nil, wfclient.BPMNError("insufficient-funds", err.Error())
        }
        if errors.Is(err, stripe.ErrFraudSuspected) {
            // Incident: requiere review humana
            return nil, wfclient.IncidentError("manual fraud review required")
        }
        // Network / 5xx: retry normal
        return nil, fmt.Errorf("charge failed: %w", err)
    }

    return map[string]any{
        "transactionId": result.ID,
        "chargedAt":     result.CreatedAt,
    }, nil
})

Streaming vs polling

// Streaming (default): server push vía HTTP/2 long-polling.
// El SDK abre N conexiones (= Concurrency) y mantiene jobs fluyendo.
StreamEnabled: true,
PollInterval:  0,  // ignorado

// Polling tradicional (fallback para proxies/lb que no soporten long-poll)
StreamEnabled: false,
PollInterval:  1 * time.Second,

Diferencia operativa: streaming consume 1 conexión por worker en uso (típico < 100 por proceso). Polling consume 1 conexión cada PollInterval. Para latencia < 50ms job-pickup, streaming es obligatorio.

Reliability built-in

Retry con jitter

El SDK envuelve JobHandler con retry sólo para fallas transitorias del SDK (conexión perdida al engine, 5xx en complete). El retry de la lógica de negocio lo controla el engine (basado en retries del BPMN model).

WithRetryPolicy(RetryPolicy{
    InitialBackoff: 100 * time.Millisecond,
    MaxBackoff:     30 * time.Second,
    Multiplier:     2.0,
    Jitter:         0.2,  // ±20%
})

Backpressure handling

Si el engine devuelve 429 Too Many Requests o 503 Service Unavailable: - Respeta Retry-After header. - Aplica exponential backoff con jitter. - Emite metric wf_sdk_throttled_total{reason}. - Log warn (no error — esto es normal).

Graceful shutdown

func (c *Client) Run(ctx context.Context) error {
    // 1. Recibe SIGTERM
    // 2. Detiene poll de nuevos jobs
    // 3. Espera jobs activos hasta MIN(shutdown_grace, max_job_timeout)
    // 4. Si timeout: fail jobs activos con "shutdown" reason
    // 5. Cierra conexiones
}

SIGTERM → ready=false (k8s) debe propagarse antes que SIGKILL, típico 30s grace period.

Poison message handling

Si un worker panics, el SDK: 1. Captura el panic con recover(). 2. Reporta el job como failed con stack trace. 3. Continúa procesando otros jobs (no propaga el panic). 4. Emite metric wf_sdk_panics_total{type}.

Para evitar loop de retry infinito con bug del worker: el BPMN model define retries: 3. Después del 3er fail, el engine crea un incident automáticamente (operator decide qué hacer).

Observability

Tracing (OpenTelemetry)

Cada JobHandler se envuelve automáticamente:

span name: wf.worker.<task-type>
attributes:
  wf.job.key
  wf.job.type
  wf.process.instance
  wf.element.id
  wf.retries.remaining

El traceparent del job se propaga desde el engine, manteniendo continuidad con el trace del proceso.

Metrics (Prometheus / OTLP)

wf_sdk_jobs_handled_total{type, outcome="success|fail|bpmn_error|incident"}
wf_sdk_job_duration_seconds{type} (histogram)
wf_sdk_active_jobs{type} (gauge)
wf_sdk_poll_duration_seconds{type} (histogram)
wf_sdk_throttled_total{reason}
wf_sdk_panics_total{type}

Structured logs (slog)

slog.InfoContext(ctx, "job completed",
    "job_key", job.Key,
    "job_type", job.Type,
    "duration_ms", elapsed.Milliseconds(),
    "trace_id", trace.SpanContextFromContext(ctx).TraceID(),
)

El SDK no decide formato (JSON vs text vs OTLP); inyecta vía WithLogger.

Process Operations API

Para casos donde el worker necesita interactuar con el engine más allá de complete/fail:

// Iniciar proceso (raro desde worker, pero útil)
type ProcessOps interface {
    Start(ctx context.Context, processID string, vars map[string]any) (*Instance, error)
    CancelInstance(ctx context.Context, instanceKey int64, reason string) error
    PublishMessage(ctx context.Context, name string, correlationKey string, vars map[string]any) error
    SetVariables(ctx context.Context, instanceKey int64, vars map[string]any, local bool) error
    ResolveIncident(ctx context.Context, incidentKey int64) error
}

ops := client.Processes()
ops.PublishMessage(ctx, "payment.received", "order-123", map[string]any{"amount": 99.99})

Testing utilities

El SDK incluye wfclienttest package para tests de workers:

package myworker_test

import (
    "testing"
    "github.com/example/wf-sdk-go/wfclienttest"
)

func TestSendEmailWorker(t *testing.T) {
    job := wfclienttest.NewJob().
        WithVariables(map[string]any{
            "recipient": "test@example.com",
            "subject":   "hello",
        }).
        Build()

    output, err := sendEmailHandler(context.Background(), job)
    require.NoError(t, err)
    require.Contains(t, output, "sentAt")
}

// Mock client para integration tests
func TestWorkflowIntegration(t *testing.T) {
    mock := wfclienttest.NewMockClient()
    mock.OnJob("send-email").Return(map[string]any{"sent": true}, nil)
    mock.OnJob("charge-payment").Return(nil, wfclient.BPMNError("insufficient-funds", ""))

    instance, _ := mock.Processes().Start(ctx, "order-flow", vars)
    mock.WaitFor(instance.Key, wfclienttest.Completed, 30*time.Second)
}

Comparativa con Zeebe Java SDK

Concepto Zeebe Java Nuestro SDK Go
Cliente ZeebeClient.newClientBuilder().gatewayAddress(...).build() wfclient.New("http://...")
Worker registration Fluent builder con 8+ métodos Struct WorkerOptions
Annotations @JobWorker(type = "...") Spring Ninguna
Threading Spring scheduler / Akka Goroutines, Concurrency field
Error handling Throw + custom exception types error return; helpers tipados
Idempotency Documentado, no built-in Documentado, job.Key estable
Observability Micrometer; manual OpenTelemetry built-in
Testing EmbeddedZeebeExtension (pesado) wfclienttest (in-memory mock)
Build size 80+ MB con deps (Netty, gRPC) ~10 MB binario incluyendo runtime
Startup time 3-8s (JVM + Spring) < 100ms

Versionado y compatibilidad

  • Semver estricto: MAJOR.MINOR.PATCH.
  • Wire protocol: el engine soporta últimas 2 MAJOR del SDK.
  • Breaking changes: solo en MAJOR; documentadas en CHANGELOG con migration path.
  • Module path bump: github.com/example/wf-sdk-go/v2 para v2.0.0.

Lo que NO incluye el SDK (deliberadamente)

  • DI / IoC container: usa tu propio (Wire, fx, manualmente). Demasiada opinión.
  • Database wrappers: si tu worker accede a Postgres/Mongo, usa el driver. SDK no opina.
  • HTTP server: si expones métricas/health, lo levantás vos. SDK provee http.Handler para /metrics.
  • Saga orchestration: BPMN ya es eso. Usar el engine, no envolverlo.
  • Schema validation: usá github.com/go-playground/validator/v10 o similar.

Roadmap

  • M2.1: API estable, RegisterWorker, Processes Ops, OTel built-in.
  • M2.2: Streaming bidireccional (HTTP/2), graceful shutdown.
  • M2.3: Testing utilities completas.
  • M3.0: Multi-tenant aware (header X-Tenant-ID propagado).
  • M4.0: Connection pooling para multi-cluster (escenarios geo-distributed).

Referencias cruzadas