Neops Worker SDK - Architecture & Standards
This document describes the architectural decisions, coding standards, and patterns used throughout the neops-worker-sdk-py project.
Asyncio Architecture
Design Decision: Hybrid Sync/Async Model
The worker uses a hybrid sync/async architecture where:
- Main event loop handles:
- API communication (polling, heartbeat, result pushing)
- Graceful shutdown coordination via
asyncio.Event -
Concurrent task management via
asyncio.gather() -
Function blocks execute synchronously in a
ThreadPoolExecutor: - Allows blocking I/O operations (e.g., netmiko, paramiko, file I/O)
- Jobs are processed sequentially (max_workers=1) for ordering guarantees
- Each job creates its own event loop via
asyncio.run()for async function block methods
Why This Design?
Function blocks are designed to interact with network devices and external systems that often use blocking libraries (netmiko, paramiko, requests, etc.). Running these in a thread pool:
- Prevents blocking the main event loop (heartbeat stays alive)
- Allows function block authors to use any library without async restrictions
- Maintains sequential job processing for predictable behavior
Execution Flow
main_async()
│
├──► asyncio.gather()
│ ├──► heartbeat_task() [async - main loop]
│ └──► job_polling_task() [async - main loop]
│ │
│ └──► loop.run_in_executor()
│ │
│ └──► _process_job_sync() [sync - thread pool]
│ │
│ └──► asyncio.run(fb.execute_function_block())
Async Patterns Used
| Pattern | Location | Purpose |
|---|---|---|
asyncio.gather() |
main_async() |
Run heartbeat + polling concurrently |
asyncio.wait_for() |
heartbeat_task, job_polling_task |
Interruptible sleep with shutdown check |
asyncio.Event |
GracefulShutdownHandler |
Cross-task shutdown coordination |
asyncio.create_subprocess_exec() |
python_eval.py |
Non-blocking subprocess execution |
loop.run_in_executor() |
job_polling_task |
Offload blocking work to thread pool |
loop.add_signal_handler() |
GracefulShutdownHandler |
Async-safe signal handling |
API Client
The neops_workflow_engine_client library provides async methods (coroutines).
All API calls are awaited in the main event loop:
WorkersApi.workers_controller_register()- asyncWorkersApi.workers_controller_ping()- asyncBlackboardApi.blackboard_controller_poll_job()- asyncBlackboardApi.blackboard_controller_push_job_result()- asyncFunctionBlockApi.function_blocks_controller_register()- async
Lazy API Client Initialization
Important: API clients require an event loop to exist when instantiated (due to aiohttp's connector). To avoid "no running event loop" errors during module import:
# BAD - creates API client at import time
class Registry:
def __init__(self):
self.api = FunctionBlockApi(ApiClient(...)) # ❌ Requires event loop!
# GOOD - lazy initialization
class Registry:
def __init__(self):
self._api: FunctionBlockApi | None = None
@property
def _api(self) -> FunctionBlockApi:
if self._function_block_api is None:
self._function_block_api = FunctionBlockApi(ApiClient(...)) # ✓ Created when needed
return self._function_block_api
This pattern is used in Registry to allow function block registration at module
import time without requiring an event loop.
Function Blocks
Execution Model
Function blocks have async def methods but execute in a thread pool:
class MyFunctionBlock(FunctionBlock[MyParams, MyResult]):
async def run(self, params: MyParams, context: WorkflowContext) -> FunctionBlockResult[MyResult]:
# Can use blocking I/O here - runs in thread pool
with ConnectHandler(**device) as conn:
output = conn.send_command(params.cmd)
return FunctionBlockResult(success=True, message="Done", data=MyResult(output=output))
Important: While function blocks are defined as async, they are called via
asyncio.run() in a dedicated thread. This means:
- You CAN use blocking I/O (netmiko, paramiko, requests, etc.)
- You CAN use
awaitfor async operations (subprocess, aiohttp, etc.) - You CANNOT share state with the main event loop directly
Best Practices for Function Blocks
- Prefer async for I/O when available (e.g.,
asyncio.create_subprocess_exec) - Blocking I/O is acceptable - the thread pool isolates it
- Handle timeouts explicitly - use
asyncio.wait_for()for async operations - Log meaningful messages with appropriate log levels
Parallel Execution in Function Blocks
For parallel operations within a function block, use the concurrency package:
from neops_worker_sdk.concurrency import run_in_thread, run_parallel
class MyFunctionBlock(FunctionBlock):
@run_in_thread
def fetch_config(self, ip: str) -> str:
"""Blocking operation wrapped for async execution."""
with ConnectHandler(host=ip, ...) as conn:
return conn.send_command("show run")
async def run(self, params, context):
# Execute operations IN PARALLEL
configs = await run_parallel(
self.fetch_config("10.0.0.1"),
self.fetch_config("10.0.0.2"),
self.fetch_config("10.0.0.3"),
)
return FunctionBlockResult(success=True, ...)
See async-patterns.md for detailed documentation.
Concurrency Package
The neops_worker_sdk.concurrency package provides tools for handling blocking
operations and parallel execution in function blocks.
Package Contents
| Module | Export | Purpose |
|---|---|---|
run_in_thread.py |
@run_in_thread |
Wrap sync functions to run in thread pool |
run_parallel.py |
run_parallel() |
Execute multiple operations concurrently |
tasks.py |
@schedule_task |
Create fire-and-forget background tasks |
detection.py |
BlockingDetector |
Detect and warn about blocking operations |
@run_in_thread Decorator
Wraps a synchronous function to run in a thread pool executor:
@run_in_thread
def blocking_operation(ip: str) -> str:
# This blocking code runs in a separate thread
time.sleep(1)
return f"Result from {ip}"
# Usage in async code:
result = await blocking_operation("10.0.0.1")
run_parallel() Function
Executes multiple awaitables concurrently (wrapper around asyncio.gather):
results = await run_parallel(
operation1(),
operation2(),
operation3(),
)
# results = [result1, result2, result3]
Blocking Detection
The BlockingDetector monitors job execution from the main event loop and warns
when operations exceed a configurable threshold. This helps developers identify
code that could benefit from @run_in_thread.
Configuration via environment variable:
Warning output includes:
- The function block name
- Stack trace showing the exact blocking location
- Usage example for @run_in_thread and run_parallel()
Graceful Shutdown
Signal Handling
The GracefulShutdownHandler attaches to SIGINT and SIGTERM:
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, self._trigger_shutdown, sig)
Shutdown Coordination
All long-running tasks check shutdown_event.is_set() and use interruptible waits:
try:
await asyncio.wait_for(shutdown_event.wait(), timeout=Config.poll_interval)
break # Shutdown requested
except asyncio.TimeoutError:
pass # Normal timeout, continue loop
This ensures:
- Tasks exit promptly on shutdown signal
- No busy-waiting during idle periods
- Clean resource cleanup in finally blocks
Logging Standards
Logger Configuration
Each module should have its own module-level logger:
Log Levels
| Level | Use Case |
|---|---|
DEBUG |
Detailed diagnostic info (scanning dirs, creating objects) |
INFO |
Normal operational messages (startup, job counts, completion) |
WARNING |
Recoverable issues (heartbeat failure, extra parameters) |
ERROR |
Serious problems (registration failed, worker expired) |
Format Guidelines
- Include context:
f"Processing {total} job(s)" - Use structured data where helpful:
f"Worker registered (version={version}, name={name})" - Error messages should explain what happened and impact
Testing Patterns
Pytest Configuration
Test discovery is configured in pytest.ini:
testpaths =
tests # SDK unit/integration tests
examples # Function block examples with embedded tests
io # Production function blocks (io/neops/...)
norecursedirs =
docs
make_scripts
neops_worker_sdk # SDK source (not tests)
.venv
.git
__pycache__
*.egg-info
python_files = *.py # Collect all Python files in testpaths
Test Markers
| Marker | Applied By | Purpose |
|---|---|---|
function_block |
@fb_test_case, @fb_test_case_with_lab |
Slow integration tests (may be skipped in fast pipelines) |
sdk |
Manual | SDK internal tests (currently unused) |
testing |
Manual | Tests for the testing toolkit itself (currently unused) |
Run specific marker groups:
pytest -m function_block # Only function block tests
pytest -m "not function_block" # Skip slow tests
Fixture Organization
Fixtures are organized hierarchically:
conftest.py # Root: shared fixtures (remote_lab fixtures)
├── tests/
│ └── (inherits from root)
└── examples/
└── (inherits from root)
Remote lab fixtures (simple_frr, simple_iol) are defined in the root conftest.py
so they're accessible from both tests/ and examples/.
Async Tests
Use @pytest.mark.asyncio for async test functions:
@pytest.mark.asyncio
async def test_echo_returns_same_text() -> None:
result = await block.execute_function_block(params=..., context=...)
assert result.success
Mocking Async Methods
Use AsyncMock for async API methods:
worker_registration._workers_api.workers_controller_register = AsyncMock(return_value=mock_response)
Function Block Test Cases
Use the @fb_test_case or @fb_test_case_with_lab decorators to create tests
directly on function block classes. These decorators automatically:
- Create a pytest test function
- Apply
@pytest.mark.asyncioand@pytest.mark.function_block - Handle fixture injection (for
_with_labvariant)
@fb_test_case_with_lab(
"Ping IOL device",
RandomPingParameters(ping_amount=2),
remote_lab_fixture="simple_iol",
)
class Ping(FunctionBlock[...]):
...
For tests without remote lab dependencies:
@fb_test_case(
"Echo test",
EchoParameters(text="hello"),
context=create_workflow_context(devices=[...]),
)
class Echo(FunctionBlock[...]):
...
Code Style
Type Annotations
- Use modern Python 3.10+ syntax:
list[str]notList[str] - Use
|for unions:str | NonenotOptional[str] - Generic types should be explicit:
FunctionBlock[ParamsT, ResultDataT]
Pydantic Models
- Use
model_config = ConfigDict(...)for model configuration - Parameter models:
extra="ignore"(allow extra fields with warning) - Result models:
extra="forbid"(strict schema compliance)
Error Handling
- Catch specific exceptions when possible
- Re-raise with context when appropriate
- Use custom exceptions for domain errors (e.g.,
WorkerExpiredError)
Future Considerations
Potential Async Improvements
When function blocks become fully async in the future:
- Enable concurrent job processing with
asyncio.gather()orasyncio.TaskGroup
Concurrent Job Processing
Current design processes jobs sequentially. For concurrent processing:
# Future: process multiple jobs concurrently
tasks = [process_job(job) for job in jobs]
results = await asyncio.gather(*tasks, return_exceptions=True)
This requires: - Thread-safe logging - Independent resource management per job - Careful error isolation