Async and Concurrency
When do I need this?
If your function blocks work correctly and you have not seen blocking detector
warnings, you can skip this page. Come back when you need to run commands on
multiple devices simultaneously or when the blocking detector warns about
slow operations. The key takeaway: wrap blocking network calls (netmiko,
scrapli) with @run_in_thread, and use run_parallel() to fan out across
devices.
The neops Worker SDK uses a hybrid sync/async execution model that lets you write blocking code (netmiko, paramiko, file I/O) in your function blocks without interfering with the worker's internal operations. This page explains the model and the concurrency utilities the SDK provides.
The Execution Model
The worker runs two things simultaneously:
| Component | Runs in | Purpose |
|---|---|---|
| Main event loop | Async (asyncio) |
Heartbeat, job polling, API communication |
| Function blocks | Thread pool (ThreadPoolExecutor) |
Your automation logic |
When a job arrives, the worker executes your function block in a dedicated thread with its own event loop (via asyncio.run()). This means:
- Blocking calls in
run()will not stall the heartbeat or job polling. - Each job is processed sequentially (
max_workers=1) to avoid resource contention on a single worker. - Inside a job, you can still use async/await and run multiple operations in parallel.
graph LR
A[Main Event Loop] -->|poll job| B[Thread Pool]
A -->|heartbeat| C[Workflow Engine API]
B -->|asyncio.run| D[Your Function Block]
D -->|"@run_in_thread"| E[Blocking I/O threads]
@run_in_thread -- Offloading Blocking I/O
Most network libraries (netmiko, paramiko, requests) perform blocking I/O. Wrap these operations with @run_in_thread to run them in a thread pool, returning an awaitable you can await from your async run() method.
The config_backup example demonstrates this pattern — _fetch_config uses netmiko (blocking) and is decorated with @run_in_thread:
class ConfigBackup(FunctionBlock[ConfigBackupParams, ConfigBackupResult]):
@run_in_thread
def _fetch_config(self, host: str, username: str, password: str) -> str:
from netmiko import ConnectHandler
device = {
"device_type": "cisco_ios",
"host": host,
"username": username,
"password": password,
}
with ConnectHandler(**device) as conn:
return str(conn.send_command("show running-config"))
async def run(
self, params: ConfigBackupParams, context: WorkflowContext
) -> FunctionBlockResult[ConfigBackupResult]:
del params
device = context.device
if device is None:
return FunctionBlockResult(message="No device in context.", success=False, data=None)
if not device.ip or not device.username:
return FunctionBlockResult(
message=f"Device {device.hostname} missing IP or credentials.", success=False, data=None
)
config = await self._fetch_config(
host=device.ip,
username=device.username,
password=device.password or "",
)
self.logger.info(f"Backed up {len(config.splitlines())} lines from {device.hostname}")
return FunctionBlockResult(
message="Configuration backed up successfully.",
success=True,
data=ConfigBackupResult(config_lines=len(config.splitlines())),
)
async def acquire(self, params: ConfigBackupParams) -> FunctionBlockAcquireResult:
del params
return FunctionBlockAcquireResult(message="No resources required.", success=True, acquires=None)
Key points:
- The decorated method becomes an awaitable — call it with
await. - Execution starts immediately when you call the method (not when you
await). This is different from coroutines, which are lazy. It means inrun_parallel(self.fetch(a), self.fetch(b)), both threads are already running beforerun_parallelitself is called. - The underlying function runs in a separate thread, so blocking is safe.
- Type hints are preserved; the return type becomes
Awaitable[R]. - Under the hood,
@run_in_threadcallsloop.run_in_executor(None, fn), which uses the default thread pool executor (limited tomin(32, os.cpu_count() + 4)threads).
run_parallel() -- Concurrent Execution
When you need to run the same (or different) operations across multiple targets concurrently, use run_parallel(). It accepts any number of awaitables and returns results in the same order.
The parallel_collection example collects command output from all devices in a group simultaneously. Note that this block uses run_on="group", giving it access to context.devices — a list of all devices in the group:
class ParallelCollect(FunctionBlock[ParallelCollectParams, ParallelCollectResult]):
@run_in_thread
def _run_command(self, host: str, username: str, password: str, command: str) -> str:
from netmiko import ConnectHandler
device = {"device_type": "cisco_ios", "host": host, "username": username, "password": password}
with ConnectHandler(**device) as conn:
return str(conn.send_command(command))
async def run(
self, params: ParallelCollectParams, context: WorkflowContext
) -> FunctionBlockResult[ParallelCollectResult]:
if not context.devices:
return FunctionBlockResult(message="No devices in context.", success=False, data=None)
reachable = [(d, d.ip, d.username) for d in context.devices if d.ip and d.username]
skipped = len(context.devices) - len(reachable)
if skipped:
self.logger.warning(f"Skipping {skipped} device(s) missing IP or credentials")
results = await run_parallel(
*[self._run_command(ip, username, d.password or "", params.command) for d, ip, username in reachable]
)
outputs = {}
for (device, _, _), output in zip(reachable, results, strict=True):
outputs[device.hostname or "unknown"] = output
self.logger.info(f"Collected output from {len(outputs)} devices in parallel")
return FunctionBlockResult(
message=f"Collected from {len(outputs)} devices.",
success=True,
data=ParallelCollectResult(outputs=outputs),
)
async def acquire(self, params: ParallelCollectParams) -> FunctionBlockAcquireResult:
del params
return FunctionBlockAcquireResult(message="No resources required.", success=True, acquires=None)
The combination of @run_in_thread and run_parallel() is the standard pattern for fan-out operations: decorate a blocking helper, then pass multiple calls into run_parallel().
run_parallel() is a thin wrapper around asyncio.gather(). If any operation fails, the first exception is raised; other operations may still complete in the background.
Error propagation
If one of the parallel operations raises an exception, run_parallel() raises it immediately.
The remaining operations continue running but their results are discarded.
@schedule_task -- Fire-and-Forget
For async operations you want to start without waiting for them, use @schedule_task. It wraps an async function so that calling it immediately schedules an asyncio.Task and returns the Task object.
from neops_worker_sdk.concurrency import schedule_task
@schedule_task
async def log_event(self, event: str) -> None:
await some_async_api.post(event)
The returned Task can be awaited later or used for telemetry, non-critical
notifications, or background cleanup.
Tasks are cancelled at job boundary
Each function block job runs in its own event loop via asyncio.run().
When run() returns, asyncio.run() cancels all pending tasks as part
of its cleanup. Any @schedule_task task that hasn't completed by then
receives CancelledError. If the task has side effects (API calls, writes),
those may be interrupted. Always await tasks with critical side effects
before returning from run().
Tip
@schedule_task only works in an async context with a running event loop.
Within a function block's run() method, this is always the case.
Blocking Detection
The SDK includes a BlockingDetector that monitors your function block's execution thread from the main event loop. When a blocking operation exceeds the configured threshold, it captures a stack trace and logs a warning pointing to the exact location.
Example output:
WARNING | Blocking operation detected in config_backup (exceeded 0.5s threshold)
Blocking at:
File "my_function_block.py", line 42, in _fetch_config
return conn.send_command("show running-config")
Tip: Use @run_in_thread decorator to wrap blocking operations
Configuration
The threshold is controlled by the BLOCKING_DETECTION_THRESHOLD environment variable:
| Value | Behavior |
|---|---|
0.5 (default) |
Warn when any operation blocks for more than 500ms |
2.0 |
More lenient — useful for slow connections |
0 |
Disable blocking detection entirely |
The detector filters out SDK internals and async I/O waits (e.g., selectors.py) from stack traces, showing only your code. The same threshold is used by the testing framework when running function blocks via FunctionBlockTestCase.
Deep Dive: Why max_workers=1?
The worker processes jobs one at a time per worker instance. This is deliberate:
- Resource safety: Network devices have limited concurrent session capacity. Running multiple jobs in parallel from the same worker risks SSH session limits and race conditions.
- Predictable memory usage: Each job can load significant context data. Sequential processing keeps memory bounded.
- Horizontal scaling: Need more throughput? Run more worker instances. The workflow engine distributes jobs across all registered workers automatically.
Inside a single job, @run_in_thread and run_parallel() still enable concurrency — for
example, collecting data from 50 devices simultaneously within one group-level function block.
In short: sequential between jobs, concurrent within a job.
The execution flow for a single job:
Main event loop Job thread
────────────── ──────────
poll_job() ──────────────────────> asyncio.run(execute_fb())
heartbeat() (continues running) ├─ run(params, context)
blocking_detector.monitor() │ ├─ await self._fetch(dev1) ──> thread
│ ├─ await self._fetch(dev2) ──> thread
│ └─ return result
└─ compute_db_updates()
<──────────────────────────────── result returned
push_result()
Summary
| Utility | Import | Purpose |
|---|---|---|
@run_in_thread |
neops_worker_sdk.concurrency |
Run a blocking function in a thread, return an awaitable |
run_parallel() |
neops_worker_sdk.concurrency |
Execute multiple awaitables concurrently |
@schedule_task |
neops_worker_sdk.concurrency |
Fire-and-forget an async function as a task |
BlockingDetector |
Automatic | Warns when blocking operations exceed the configured threshold |