Engine Architecture
The workflow engine is a NestJS application built around an event-driven core. Database changes trigger events that are dispatched to handlers through a registry pattern. This architecture enables clean separation of lifecycle phases, testability, and recovery from restarts.
Module Structure
src/
├── engine/ # Core engine: event handling, lifecycle, CMS client
│ ├── core-engine/ # Event handler registry, lifecycle executors
│ │ ├── event-handler/ # Base handler, registry, event types
│ │ └── lifecycle/ # Workflow and step executors
│ ├── cms-client/ # CMS GraphQL client (entity acquisition, locking)
│ └── event-aggregator/ # RxJS-based event pub/sub
├── resources/ # REST API modules
│ ├── blackboard/ # Job polling and result pushing
│ ├── function-blocks/ # FB registration and resolution
│ ├── workers/ # Worker registration and health
│ ├── workflow-definition/ # Workflow CRUD
│ ├── workflow-execution/ # Execution lifecycle
│ ├── health/ # Health checks
│ ├── schema/ # Schema endpoints
│ └── version/ # Version and compatibility
├── model/ # Domain models
│ ├── entities/ # MikroORM entity classes
│ └── execution/ # Execution types, status enums, schemas
├── persistence/ # Repositories and DB-specific code
├── validator/ # JSON Schema validation, compatibility checks
├── logging/ # Logger module, HTTP interceptor
└── utils/ # JMESPath, semver, typing utilities
Event-Driven Core
The engine processes workflow executions through an event-driven pipeline:
graph LR
DB["DB Flush<br/>(MikroORM)"] --> Sub["DB Subscribers"]
Sub --> EA["Event<br/>Aggregator"]
EA --> CE["Core Engine"]
CE --> Reg["Handler<br/>Registry"]
Reg --> H["Handler"]
H --> DB
DB Subscribers
MikroORM entity subscribers detect changes to workflow executions and job results on flush. They create events:
WorkflowExecutionUpdatedEvent-- When a workflow execution status changesJobResultUpdatedEvent-- When a job result is created or updated
Event Aggregator
The EventAggregatorService is an RxJS Subject-based pub/sub system. It receives events from DB subscribers and forwards them to the core engine. It also handles event buffering and deduplication.
Core Engine
CoreEngineService subscribes to the event aggregator and dispatches events to the handler registry. The handleOrDispatchEvent() method:
- Looks up the appropriate handler via the registry
- Runs the handler within a database transaction
- Flushes the entity manager (which may trigger new events, continuing the cycle)
Handler Registry
The EventHandlerRegistry maps event types to handler instances. It selects handlers based on:
- Event class --
WorkflowExecutionUpdatedEventorJobResultUpdatedEvent - Event conditions -- The handler's
eventConditions()(workflow status, job status, etc.)
All handlers are registered during initEngine() in CoreEngineService.
Workflow Execution Handlers
Each state transition has a dedicated handler:
| State | Handler | Responsibility |
|---|---|---|
NEW |
NewWorkflowExecutionHandler |
Resolve FBs, validate definition → VALID |
VALID |
ValidWorkflowExecutionHandler |
Check preconditions → READY |
READY |
ReadyWorkflowExecutionHandler |
Create ACQUIRE jobs → RESOURCE_DISCOVERY |
RESOURCE_DISCOVERY |
ResourceDiscoveryWorkflowExecutionHandler |
Wait for acquire results |
| (acquire job results) | JobResultAcquireResultHandler |
Process acquire results → RESOURCES_DISCOVERED |
RESOURCES_DISCOVERED |
ResourceDiscoveredWorkflowExecutionHandler |
Prepare scheduling → SCHEDULED |
SCHEDULED |
ScheduledWorkflowExecutionHandler |
Send lock request → LOCKING |
LOCKING |
LockingWorkflowExecutionHandler |
Process lock response → LOCKED |
LOCKED |
LockedWorkflowExecutionHandler |
Start execution → RUNNING |
RUNNING |
(via job results) | Process step results, advance workflow |
ERROR |
ErrorWorkflowExecutionHandler |
Classify failure, initiate rollback |
ROLLBACK |
RollbackWorkflowExecutionHandler |
Execute rollback jobs |
COMPLETED |
CompletedWorkflowExecutionHandler |
Apply DB updates, release locks → COMPLETED_ACK |
FAILED_SAFE |
FailedSafeWorkflowExecutionHandler |
Release locks → FAILED_SAFE_ACK |
FAILED_UNSAFE |
FailedUnsafeWorkflowExecutionHandler |
Release locks → FAILED_UNSAFE_ACK |
Job Result Handlers
| Scope | Handler | Responsibility |
|---|---|---|
| Acquire | JobResultAcquireResultHandler |
Process acquire results → RESOURCES_DISCOVERED |
| Running | JobResultRunningResultHandler |
Process execution results, advance steps |
| Rollback | JobResultRollbackResultHandler |
Process rollback results |
| Success ACK | JobResultSuccessAcknowledgedHandler |
Post-success cleanup |
| Failed ACK | JobResultFailedAcknowledgedHandler |
Post-failure cleanup |
Startup Recovery
The InitEventService runs on application startup. It loads all non-terminal workflow executions and pending job results from the database and re-publishes their events to the aggregator. This ensures that executions interrupted by an engine restart are resumed correctly.
Key NestJS Modules
| Module | Purpose | Key dependencies |
|---|---|---|
AppModule |
Root module wiring all engine capabilities. | Imports all modules below. |
CoreEngineModule |
Orchestrates lifecycle and event handling. | EventAggregatorModule, CmsClientModule |
EventAggregatorModule |
Event pub/sub and buffering. | Used by engine, subscribers, and init. |
DbEventSubscriberModule |
MikroORM subscribers that emit engine events. | EventAggregatorModule |
InitEventModule |
Startup recovery and event replay. | EventAggregatorModule |
BlackboardModule |
Job polling and result endpoints. | REST resources layer. |
FunctionBlocksModule |
Function block registration and resolution. | REST resources layer. |
WorkflowDefinitionModule |
Workflow CRUD and validation. | REST resources layer. |
WorkflowExecutionModule |
Execution lifecycle endpoints. | REST resources layer. |
WorkersModule |
Worker registration and health tracking. | REST resources layer. |
JobCleanupModule |
Cleanup of stale jobs and timeouts. | Scheduler/maintenance. |
CmsClientModule |
CMS GraphQL client for entities and locks. | Consumed by engine. |