Worker Lifecycle
The neops worker is a long-running process that discovers function blocks, registers them with the workflow engine, and continuously polls for jobs.
Startup Sequence
sequenceDiagram
participant W as Worker
participant FS as File System
participant WE as Workflow Engine
W->>W: Load environment (.env)
W->>W: Check client/backend version compatibility
W->>FS: Scan DIR_FUNCTION_BLOCKS directories
FS-->>W: Discovered function block modules
W->>W: Import modules (triggers @register_function_block)
W->>WE: Register worker (name, IP)
WE-->>W: worker_id
W->>WE: Register function blocks (schemas, metadata)
W->>W: Start heartbeat task
W->>W: Start job polling task
1. Discovery
The worker scans directories listed in DIR_FUNCTION_BLOCKS (comma-separated
paths). For each directory, it recursively imports all Python modules. Import
triggers the @register_function_block decorator, which adds the class and
its registration metadata to the global registry.
2. Registration
The worker registers itself with the workflow engine via the blackboard API,
receiving a worker_id. It then sends all discovered function block
registrations — including parameter/result JSON schemas, version information,
and purity/idempotency flags.
3. Concurrent Tasks
Two async tasks run concurrently on the main event loop:
| Task | Interval | Purpose |
|---|---|---|
| Heartbeat | HEARTBEAT_INTERVAL (default 20s) |
Keeps the worker marked as online in the workflow engine |
| Job polling | POLL_INTERVAL (default 10s) |
Fetches pending jobs from the blackboard |
Job Execution
When the polling task receives jobs:
- Match
job.function_block_idagainst the local registry. - Submit the job to a
ThreadPoolExecutorwithmax_workers=1. - Inside the worker thread, call
asyncio.run()to execute the function block (this gives each job its own event loop). - Start blocking detection monitoring on the main loop.
- Collect the result and report it back to the blackboard.
Sequential Processing
Jobs are processed one at a time (max_workers=1). This avoids:
- Connection exhaustion on shared devices.
- Race conditions in context data.
- Unpredictable resource usage.
If the workflow engine pushes multiple jobs, they are queued and executed in order.
Job Types
| Type | Method called | Purpose |
|---|---|---|
ACQUIRE |
acquire() |
Lock resources and expand context |
EXECUTE |
execute_function_block() → run() |
Main function block logic (wrapped with error boundaries) |
ROLLBACK |
rollback() |
Undo changes (default returns failure — override to implement) |
Graceful Shutdown
The worker handles SIGINT and SIGTERM signals:
- Set the shutdown event (stops polling for new jobs).
- Wait for the current job to finish (the worker blocks until complete).
- Cancel the heartbeat task.
- Deregister the worker from the workflow engine.
- Exit with code 0.
Warning
SHUTDOWN_TIMEOUT is logged but not currently enforced as a hard limit.
The worker waits indefinitely for the running job to complete. If a job
hangs, the process must be killed externally. The workflow engine will
detect the missed heartbeat and re-queue the job.
Worker Expiry
If the worker fails to send heartbeats (e.g., network partition), the workflow engine marks it as expired.
- The next heartbeat attempt receives a
404, causingWorkerExpiredError. - The worker logs the error and shuts down (exit code 1).
- Jobs that were in-flight are re-queued by the engine.
The worker does not attempt to re-register automatically. Use your
orchestrator's restart policy (Docker restart: unless-stopped, Kubernetes
restartPolicy: Always) to bring it back.