Skip to content

Engine Architecture

The workflow engine is a NestJS application built around an event-driven core. Database changes trigger events that are dispatched to handlers through a registry pattern. This architecture enables clean separation of lifecycle phases, testability, and recovery from restarts.

Module Structure

src/
├── engine/                   # Core engine: event handling, lifecycle, CMS client
│   ├── core-engine/          # Event handler registry, lifecycle executors
│   │   ├── event-handler/    # Base handler, registry, event types
│   │   └── lifecycle/        # Workflow and step executors
│   ├── cms-client/           # CMS GraphQL client (entity acquisition, locking)
│   └── event-aggregator/     # RxJS-based event pub/sub
├── resources/                # REST API modules
│   ├── blackboard/           # Job polling and result pushing
│   ├── function-blocks/      # FB registration and resolution
│   ├── workers/              # Worker registration and health
│   ├── workflow-definition/  # Workflow CRUD
│   ├── workflow-execution/   # Execution lifecycle
│   ├── health/               # Health checks
│   ├── schema/               # Schema endpoints
│   └── version/              # Version and compatibility
├── model/                    # Domain models
│   ├── entities/             # MikroORM entity classes
│   └── execution/            # Execution types, status enums, schemas
├── persistence/              # Repositories and DB-specific code
├── validator/                # JSON Schema validation, compatibility checks
├── logging/                  # Logger module, HTTP interceptor
└── utils/                    # JMESPath, semver, typing utilities

Event-Driven Core

The engine processes workflow executions through an event-driven pipeline:

graph LR
    DB["DB Flush<br/>(MikroORM)"] --> Sub["DB Subscribers"]
    Sub --> EA["Event<br/>Aggregator"]
    EA --> CE["Core Engine"]
    CE --> Reg["Handler<br/>Registry"]
    Reg --> H["Handler"]
    H --> DB

DB Subscribers

MikroORM entity subscribers detect changes to workflow executions and job results on flush. They create events:

  • WorkflowExecutionUpdatedEvent -- When a workflow execution status changes
  • JobResultUpdatedEvent -- When a job result is created or updated

Event Aggregator

The EventAggregatorService is an RxJS Subject-based pub/sub system. It receives events from DB subscribers and forwards them to the core engine. It also handles event buffering and deduplication.

Core Engine

CoreEngineService subscribes to the event aggregator and dispatches events to the handler registry. The handleOrDispatchEvent() method:

  1. Looks up the appropriate handler via the registry
  2. Runs the handler within a database transaction
  3. Flushes the entity manager (which may trigger new events, continuing the cycle)

Handler Registry

The EventHandlerRegistry maps event types to handler instances. It selects handlers based on:

  1. Event class -- WorkflowExecutionUpdatedEvent or JobResultUpdatedEvent
  2. Event conditions -- The handler's eventConditions() (workflow status, job status, etc.)

All handlers are registered during initEngine() in CoreEngineService.

Workflow Execution Handlers

Each state transition has a dedicated handler:

State Handler Responsibility
NEW NewWorkflowExecutionHandler Resolve FBs, validate definition → VALID
VALID ValidWorkflowExecutionHandler Check preconditions → READY
READY ReadyWorkflowExecutionHandler Create ACQUIRE jobs → RESOURCE_DISCOVERY
RESOURCE_DISCOVERY ResourceDiscoveryWorkflowExecutionHandler Wait for acquire results
(acquire job results) JobResultAcquireResultHandler Process acquire results → RESOURCES_DISCOVERED
RESOURCES_DISCOVERED ResourceDiscoveredWorkflowExecutionHandler Prepare scheduling → SCHEDULED
SCHEDULED ScheduledWorkflowExecutionHandler Send lock request → LOCKING
LOCKING LockingWorkflowExecutionHandler Process lock response → LOCKED
LOCKED LockedWorkflowExecutionHandler Start execution → RUNNING
RUNNING (via job results) Process step results, advance workflow
ERROR ErrorWorkflowExecutionHandler Classify failure, initiate rollback
ROLLBACK RollbackWorkflowExecutionHandler Execute rollback jobs
COMPLETED CompletedWorkflowExecutionHandler Apply DB updates, release locks → COMPLETED_ACK
FAILED_SAFE FailedSafeWorkflowExecutionHandler Release locks → FAILED_SAFE_ACK
FAILED_UNSAFE FailedUnsafeWorkflowExecutionHandler Release locks → FAILED_UNSAFE_ACK

Job Result Handlers

Scope Handler Responsibility
Acquire JobResultAcquireResultHandler Process acquire results → RESOURCES_DISCOVERED
Running JobResultRunningResultHandler Process execution results, advance steps
Rollback JobResultRollbackResultHandler Process rollback results
Success ACK JobResultSuccessAcknowledgedHandler Post-success cleanup
Failed ACK JobResultFailedAcknowledgedHandler Post-failure cleanup

Startup Recovery

The InitEventService runs on application startup. It loads all non-terminal workflow executions and pending job results from the database and re-publishes their events to the aggregator. This ensures that executions interrupted by an engine restart are resumed correctly.

Key NestJS Modules

Module Purpose Key dependencies
AppModule Root module wiring all engine capabilities. Imports all modules below.
CoreEngineModule Orchestrates lifecycle and event handling. EventAggregatorModule, CmsClientModule
EventAggregatorModule Event pub/sub and buffering. Used by engine, subscribers, and init.
DbEventSubscriberModule MikroORM subscribers that emit engine events. EventAggregatorModule
InitEventModule Startup recovery and event replay. EventAggregatorModule
BlackboardModule Job polling and result endpoints. REST resources layer.
FunctionBlocksModule Function block registration and resolution. REST resources layer.
WorkflowDefinitionModule Workflow CRUD and validation. REST resources layer.
WorkflowExecutionModule Execution lifecycle endpoints. REST resources layer.
WorkersModule Worker registration and health tracking. REST resources layer.
JobCleanupModule Cleanup of stale jobs and timeouts. Scheduler/maintenance.
CmsClientModule CMS GraphQL client for entities and locks. Consumed by engine.