Skip to content

Neops Worker SDK - Architecture & Standards

This document describes the architectural decisions, coding standards, and patterns used throughout the neops-worker-sdk-py project.

Asyncio Architecture

Design Decision: Hybrid Sync/Async Model

The worker uses a hybrid sync/async architecture where:

  1. Main event loop handles:
  2. API communication (polling, heartbeat, result pushing)
  3. Graceful shutdown coordination via asyncio.Event
  4. Concurrent task management via asyncio.gather()

  5. Function blocks execute synchronously in a ThreadPoolExecutor:

  6. Allows blocking I/O operations (e.g., netmiko, paramiko, file I/O)
  7. Jobs are processed sequentially (max_workers=1) for ordering guarantees
  8. Each job creates its own event loop via asyncio.run() for async function block methods

Why This Design?

Function blocks are designed to interact with network devices and external systems that often use blocking libraries (netmiko, paramiko, requests, etc.). Running these in a thread pool:

  • Prevents blocking the main event loop (heartbeat stays alive)
  • Allows function block authors to use any library without async restrictions
  • Maintains sequential job processing for predictable behavior

Execution Flow

main_async()
    ├──► asyncio.gather()
    │       ├──► heartbeat_task()      [async - main loop]
    │       └──► job_polling_task()    [async - main loop]
    │               │
    │               └──► loop.run_in_executor()
    │                       │
    │                       └──► _process_job_sync()  [sync - thread pool]
    │                               │
    │                               └──► asyncio.run(fb.execute_function_block())

Async Patterns Used

Pattern Location Purpose
asyncio.gather() main_async() Run heartbeat + polling concurrently
asyncio.wait_for() heartbeat_task, job_polling_task Interruptible sleep with shutdown check
asyncio.Event GracefulShutdownHandler Cross-task shutdown coordination
asyncio.create_subprocess_exec() python_eval.py Non-blocking subprocess execution
loop.run_in_executor() job_polling_task Offload blocking work to thread pool
loop.add_signal_handler() GracefulShutdownHandler Async-safe signal handling

API Client

The neops_workflow_engine_client library provides async methods (coroutines). All API calls are awaited in the main event loop:

  • WorkersApi.workers_controller_register() - async
  • WorkersApi.workers_controller_ping() - async
  • BlackboardApi.blackboard_controller_poll_job() - async
  • BlackboardApi.blackboard_controller_push_job_result() - async
  • FunctionBlockApi.function_blocks_controller_register() - async

Lazy API Client Initialization

Important: API clients require an event loop to exist when instantiated (due to aiohttp's connector). To avoid "no running event loop" errors during module import:

# BAD - creates API client at import time
class Registry:
    def __init__(self):
        self.api = FunctionBlockApi(ApiClient(...))  # ❌ Requires event loop!

# GOOD - lazy initialization
class Registry:
    def __init__(self):
        self._api: FunctionBlockApi | None = None

    @property
    def _api(self) -> FunctionBlockApi:
        if self._function_block_api is None:
            self._function_block_api = FunctionBlockApi(ApiClient(...))  # ✓ Created when needed
        return self._function_block_api

This pattern is used in Registry to allow function block registration at module import time without requiring an event loop.


Function Blocks

Execution Model

Function blocks have async def methods but execute in a thread pool:

class MyFunctionBlock(FunctionBlock[MyParams, MyResult]):
    async def run(self, params: MyParams, context: WorkflowContext) -> FunctionBlockResult[MyResult]:
        # Can use blocking I/O here - runs in thread pool
        with ConnectHandler(**device) as conn:
            output = conn.send_command(params.cmd)
        return FunctionBlockResult(success=True, message="Done", data=MyResult(output=output))

Important: While function blocks are defined as async, they are called via asyncio.run() in a dedicated thread. This means:

  • You CAN use blocking I/O (netmiko, paramiko, requests, etc.)
  • You CAN use await for async operations (subprocess, aiohttp, etc.)
  • You CANNOT share state with the main event loop directly

Best Practices for Function Blocks

  1. Prefer async for I/O when available (e.g., asyncio.create_subprocess_exec)
  2. Blocking I/O is acceptable - the thread pool isolates it
  3. Handle timeouts explicitly - use asyncio.wait_for() for async operations
  4. Log meaningful messages with appropriate log levels

Parallel Execution in Function Blocks

For parallel operations within a function block, use the concurrency package:

from neops_worker_sdk.concurrency import run_in_thread, run_parallel

class MyFunctionBlock(FunctionBlock):
    @run_in_thread
    def fetch_config(self, ip: str) -> str:
        """Blocking operation wrapped for async execution."""
        with ConnectHandler(host=ip, ...) as conn:
            return conn.send_command("show run")

    async def run(self, params, context):
        # Execute operations IN PARALLEL
        configs = await run_parallel(
            self.fetch_config("10.0.0.1"),
            self.fetch_config("10.0.0.2"),
            self.fetch_config("10.0.0.3"),
        )
        return FunctionBlockResult(success=True, ...)

See async-patterns.md for detailed documentation.


Concurrency Package

The neops_worker_sdk.concurrency package provides tools for handling blocking operations and parallel execution in function blocks.

Package Contents

Module Export Purpose
run_in_thread.py @run_in_thread Wrap sync functions to run in thread pool
run_parallel.py run_parallel() Execute multiple operations concurrently
tasks.py @schedule_task Create fire-and-forget background tasks
detection.py BlockingDetector Detect and warn about blocking operations

@run_in_thread Decorator

Wraps a synchronous function to run in a thread pool executor:

@run_in_thread
def blocking_operation(ip: str) -> str:
    # This blocking code runs in a separate thread
    time.sleep(1)
    return f"Result from {ip}"

# Usage in async code:
result = await blocking_operation("10.0.0.1")

run_parallel() Function

Executes multiple awaitables concurrently (wrapper around asyncio.gather):

results = await run_parallel(
    operation1(),
    operation2(),
    operation3(),
)
# results = [result1, result2, result3]

Blocking Detection

The BlockingDetector monitors job execution from the main event loop and warns when operations exceed a configurable threshold. This helps developers identify code that could benefit from @run_in_thread.

Configuration via environment variable:

export BLOCKING_DETECTION_THRESHOLD=0.5  # seconds, 0 to disable

Warning output includes: - The function block name - Stack trace showing the exact blocking location - Usage example for @run_in_thread and run_parallel()


Graceful Shutdown

Signal Handling

The GracefulShutdownHandler attaches to SIGINT and SIGTERM:

for sig in (signal.SIGINT, signal.SIGTERM):
    loop.add_signal_handler(sig, self._trigger_shutdown, sig)

Shutdown Coordination

All long-running tasks check shutdown_event.is_set() and use interruptible waits:

try:
    await asyncio.wait_for(shutdown_event.wait(), timeout=Config.poll_interval)
    break  # Shutdown requested
except asyncio.TimeoutError:
    pass  # Normal timeout, continue loop

This ensures: - Tasks exit promptly on shutdown signal - No busy-waiting during idle periods - Clean resource cleanup in finally blocks


Logging Standards

Logger Configuration

Each module should have its own module-level logger:

from neops_worker_sdk.logger.logger import Logger

logger = Logger()

Log Levels

Level Use Case
DEBUG Detailed diagnostic info (scanning dirs, creating objects)
INFO Normal operational messages (startup, job counts, completion)
WARNING Recoverable issues (heartbeat failure, extra parameters)
ERROR Serious problems (registration failed, worker expired)

Format Guidelines

  • Include context: f"Processing {total} job(s)"
  • Use structured data where helpful: f"Worker registered (version={version}, name={name})"
  • Error messages should explain what happened and impact

Testing Patterns

Pytest Configuration

Test discovery is configured in pytest.ini:

testpaths =
    tests      # SDK unit/integration tests
    examples   # Function block examples with embedded tests
    io         # Production function blocks (io/neops/...)

norecursedirs =
    docs
    make_scripts
    neops_worker_sdk  # SDK source (not tests)
    .venv
    .git
    __pycache__
    *.egg-info

python_files = *.py  # Collect all Python files in testpaths

Test Markers

Marker Applied By Purpose
function_block @fb_test_case, @fb_test_case_with_lab Slow integration tests (may be skipped in fast pipelines)
sdk Manual SDK internal tests (currently unused)
testing Manual Tests for the testing toolkit itself (currently unused)

Run specific marker groups:

pytest -m function_block      # Only function block tests
pytest -m "not function_block" # Skip slow tests

Fixture Organization

Fixtures are organized hierarchically:

conftest.py              # Root: shared fixtures (remote_lab fixtures)
├── tests/
│   └── (inherits from root)
└── examples/
    └── (inherits from root)

Remote lab fixtures (simple_frr, simple_iol) are defined in the root conftest.py so they're accessible from both tests/ and examples/.

Async Tests

Use @pytest.mark.asyncio for async test functions:

@pytest.mark.asyncio
async def test_echo_returns_same_text() -> None:
    result = await block.execute_function_block(params=..., context=...)
    assert result.success

Mocking Async Methods

Use AsyncMock for async API methods:

worker_registration._workers_api.workers_controller_register = AsyncMock(return_value=mock_response)

Function Block Test Cases

Use the @fb_test_case or @fb_test_case_with_lab decorators to create tests directly on function block classes. These decorators automatically:

  • Create a pytest test function
  • Apply @pytest.mark.asyncio and @pytest.mark.function_block
  • Handle fixture injection (for _with_lab variant)
@fb_test_case_with_lab(
    "Ping IOL device",
    RandomPingParameters(ping_amount=2),
    remote_lab_fixture="simple_iol",
)
class Ping(FunctionBlock[...]):
    ...

For tests without remote lab dependencies:

@fb_test_case(
    "Echo test",
    EchoParameters(text="hello"),
    context=create_workflow_context(devices=[...]),
)
class Echo(FunctionBlock[...]):
    ...

Code Style

Type Annotations

  • Use modern Python 3.10+ syntax: list[str] not List[str]
  • Use | for unions: str | None not Optional[str]
  • Generic types should be explicit: FunctionBlock[ParamsT, ResultDataT]

Pydantic Models

  • Use model_config = ConfigDict(...) for model configuration
  • Parameter models: extra="ignore" (allow extra fields with warning)
  • Result models: extra="forbid" (strict schema compliance)

Error Handling

  • Catch specific exceptions when possible
  • Re-raise with context when appropriate
  • Use custom exceptions for domain errors (e.g., WorkerExpiredError)

Future Considerations

Potential Async Improvements

When function blocks become fully async in the future:

  1. Enable concurrent job processing with asyncio.gather() or asyncio.TaskGroup

Concurrent Job Processing

Current design processes jobs sequentially. For concurrent processing:

# Future: process multiple jobs concurrently
tasks = [process_job(job) for job in jobs]
results = await asyncio.gather(*tasks, return_exceptions=True)

This requires: - Thread-safe logging - Independent resource management per job - Careful error isolation