Skip to content

Worker Lifecycle

The neops worker is a long-running process that discovers function blocks, registers them with the workflow engine, and continuously polls for jobs.


Startup Sequence

sequenceDiagram
    participant W as Worker
    participant FS as File System
    participant WE as Workflow Engine

    W->>W: Load environment (.env)
    W->>W: Check client/backend version compatibility
    W->>FS: Scan DIR_FUNCTION_BLOCKS directories
    FS-->>W: Discovered function block modules
    W->>W: Import modules (triggers @register_function_block)
    W->>WE: Register worker (name, IP)
    WE-->>W: worker_id
    W->>WE: Register function blocks (schemas, metadata)
    W->>W: Start heartbeat task
    W->>W: Start job polling task

1. Discovery

The worker scans directories listed in DIR_FUNCTION_BLOCKS (comma-separated paths). For each directory, it recursively imports all Python modules. Import triggers the @register_function_block decorator, which adds the class and its registration metadata to the global registry.

2. Registration

The worker registers itself with the workflow engine via the blackboard API, receiving a worker_id. It then sends all discovered function block registrations — including parameter/result JSON schemas, version information, and purity/idempotency flags.

3. Concurrent Tasks

Two async tasks run concurrently on the main event loop:

Task Interval Purpose
Heartbeat HEARTBEAT_INTERVAL (default 20s) Keeps the worker marked as online in the workflow engine
Job polling POLL_INTERVAL (default 10s) Fetches pending jobs from the blackboard

Job Execution

When the polling task receives jobs:

  1. Match job.function_block_id against the local registry.
  2. Submit the job to a ThreadPoolExecutor with max_workers=1.
  3. Inside the worker thread, call asyncio.run() to execute the function block (this gives each job its own event loop).
  4. Start blocking detection monitoring on the main loop.
  5. Collect the result and report it back to the blackboard.

Sequential Processing

Jobs are processed one at a time (max_workers=1). This avoids:

  • Connection exhaustion on shared devices.
  • Race conditions in context data.
  • Unpredictable resource usage.

If the workflow engine pushes multiple jobs, they are queued and executed in order.

Job Types

Type Method called Purpose
ACQUIRE acquire() Lock resources and expand context
EXECUTE execute_function_block()run() Main function block logic (wrapped with error boundaries)
ROLLBACK rollback() Undo changes (default returns failure — override to implement)

Graceful Shutdown

The worker handles SIGINT and SIGTERM signals:

  1. Set the shutdown event (stops polling for new jobs).
  2. Wait for the current job to finish (the worker blocks until complete).
  3. Cancel the heartbeat task.
  4. Deregister the worker from the workflow engine.
  5. Exit with code 0.

Warning

SHUTDOWN_TIMEOUT is logged but not currently enforced as a hard limit. The worker waits indefinitely for the running job to complete. If a job hangs, the process must be killed externally. The workflow engine will detect the missed heartbeat and re-queue the job.


Worker Expiry

If the worker fails to send heartbeats (e.g., network partition), the workflow engine marks it as expired.

  • The next heartbeat attempt receives a 404, causing WorkerExpiredError.
  • The worker logs the error and shuts down (exit code 1).
  • Jobs that were in-flight are re-queued by the engine.

The worker does not attempt to re-register automatically. Use your orchestrator's restart policy (Docker restart: unless-stopped, Kubernetes restartPolicy: Always) to bring it back.