Async Patterns for Function Blocks
This guide explains how to handle blocking operations and achieve parallel execution in function blocks. It's designed for network engineers who may not be familiar with Python's async/await patterns.
Quick Start
from neops_worker_sdk.concurrency import run_in_thread, run_parallel
class MyFunctionBlock(FunctionBlock):
@run_in_thread
def fetch_config(self, ip: str) -> str:
"""Blocking operation - runs in a thread pool."""
with ConnectHandler(host=ip, ...) as conn:
return conn.send_command("show run")
async def run(self, params, context):
# Fetch from 3 devices IN PARALLEL
configs = await run_parallel(
self.fetch_config("10.0.0.1"),
self.fetch_config("10.0.0.2"),
self.fetch_config("10.0.0.3"),
)
# configs = ["config1", "config2", "config3"]
The concurrency Package
The SDK provides three main tools in neops_worker_sdk.concurrency:
| Tool | Purpose |
|---|---|
@run_in_thread |
Wrap blocking functions for async execution |
run_parallel() |
Execute multiple operations concurrently |
@schedule_task |
Create fire-and-forget background tasks |
@run_in_thread Decorator
Use this decorator when you have a function that performs blocking I/O (network connections, file operations, database queries, etc.).
Basic Usage
from neops_worker_sdk.concurrency import run_in_thread
class NetworkFunctionBlock(FunctionBlock):
@run_in_thread
def connect_to_device(self, ip: str, command: str) -> str:
"""
This method uses netmiko, which blocks while waiting for the device.
The @run_in_thread decorator runs it in a separate thread.
"""
device = {
"device_type": "cisco_ios",
"host": ip,
"username": "admin",
"password": "secret",
}
with ConnectHandler(**device) as conn:
return conn.send_command(command)
async def run(self, params, context):
# This now works in async code!
output = await self.connect_to_device("10.0.0.1", "show version")
return FunctionBlockResult(success=True, message="Done", data=...)
How It Works
- When you call a
@run_in_threaddecorated function, it returns an "awaitable" - Use
awaitto get the result - While waiting, the event loop can do other things (like heartbeat pings)
- The actual blocking code runs in a separate thread
When to Use
Use @run_in_thread for:
- netmiko - SSH connections to network devices
- paramiko - Low-level SSH operations
- requests - HTTP requests (though
aiohttpis better) - File I/O - Reading/writing large files
- Database queries - Synchronous database drivers
- subprocess - Running external commands (though
asyncio.create_subprocess_execis better)
run_parallel() Function
Use this to execute multiple operations at the same time and wait for all results.
Basic Usage
from neops_worker_sdk.concurrency import run_in_thread, run_parallel
class MultiDeviceFunctionBlock(FunctionBlock):
@run_in_thread
def get_device_info(self, ip: str) -> dict:
"""Get information from a single device."""
with ConnectHandler(host=ip, ...) as conn:
version = conn.send_command("show version")
interfaces = conn.send_command("show interfaces")
return {"version": version, "interfaces": interfaces}
async def run(self, params, context):
# Get info from ALL devices at the same time
devices = ["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]
results = await run_parallel(
self.get_device_info(devices[0]),
self.get_device_info(devices[1]),
self.get_device_info(devices[2]),
self.get_device_info(devices[3]),
)
# results[0] = info from 10.0.0.1
# results[1] = info from 10.0.0.2
# etc.
return FunctionBlockResult(success=True, message="Done", data=...)
Dynamic Number of Operations
If you have a variable number of operations:
async def run(self, params, context):
devices = context.devices # Unknown number of devices
# Create a list of operations
operations = [
self.get_device_info(device.ip)
for device in devices
]
# Run them all in parallel
results = await run_parallel(*operations)
# results is a list with one result per device
Key Properties
- Order preserved: Results are in the same order as inputs
- Parallel execution: All operations run at the same time
- Error handling: If any operation fails, the first exception is raised
@schedule_task Decorator
Use this for "fire-and-forget" operations that run in the background.
Basic Usage
from neops_worker_sdk.concurrency import schedule_task
class LoggingFunctionBlock(FunctionBlock):
@schedule_task
async def log_to_external_system(self, message: str) -> None:
"""Send log to external system without waiting."""
await some_async_logging_call(message)
async def run(self, params, context):
# Start logging in background - doesn't wait
log_task = self.log_to_external_system("Starting operation")
# Continue with main work immediately
result = await self.do_main_work()
# Optionally wait for log to complete
await log_task
return result
Use Cases
- Sending notifications
- Background logging
- Cache updates
- Metrics collection
Blocking Detection
The SDK automatically detects when your code is blocking for too long and shows a helpful warning with the exact location of the blocking code.
Example Warning
WARNING: Blocking operation detected in ShowCmd.run() (exceeded 0.5s threshold)
Blocking at:
File "fb_get_show_cmd.py", line 78, in run
with ConnectHandler(**device) as conn:
File "fb_get_show_cmd.py", line 79, in run
output = conn.send_command(params.cmd)
Tip: Use @run_in_thread decorator for parallel execution:
from neops_worker_sdk.concurrency import run_in_thread, run_parallel
@run_in_thread
def blocking_operation(self, ...):
# Your blocking code here
async def run(self, params, context):
# Run multiple operations in parallel:
results = await run_parallel(
self.blocking_operation(arg1),
self.blocking_operation(arg2),
)
Configuration
The detection threshold can be configured via environment variable:
Common Patterns
Pattern 1: Sequential Device Access
When you need to access devices one at a time:
async def run(self, params, context):
results = []
for device in context.devices:
# Each call waits for the previous one
result = await self.connect_to_device(device.ip, params.command)
results.append(result)
return results
Pattern 2: Parallel Device Access
When you can access all devices simultaneously:
async def run(self, params, context):
results = await run_parallel(*[
self.connect_to_device(device.ip, params.command)
for device in context.devices
])
return results
Pattern 3: Batched Parallel Access
When you have many devices but want to limit concurrency:
async def run(self, params, context):
batch_size = 10
all_results = []
for i in range(0, len(context.devices), batch_size):
batch = context.devices[i:i + batch_size]
batch_results = await run_parallel(*[
self.connect_to_device(device.ip, params.command)
for device in batch
])
all_results.extend(batch_results)
return all_results
Pattern 4: Mixed Operations
Combining blocking I/O with async operations:
async def run(self, params, context):
# These can all run in parallel
results = await run_parallel(
self.fetch_from_device(context.devices[0].ip), # @run_in_thread
self.fetch_from_api("http://api.example.com"), # native async
self.read_large_file("/path/to/config"), # @run_in_thread
)
device_config, api_data, file_content = results
return self.process_results(device_config, api_data, file_content)
Troubleshooting
"RuntimeError: no running event loop"
This happens when you try to use async features outside of an async context.
Make sure you're calling await inside an async def function.
Operations Running Sequentially Instead of Parallel
Make sure you're using run_parallel():
# WRONG - runs sequentially
result1 = await self.operation1()
result2 = await self.operation2()
# CORRECT - runs in parallel
result1, result2 = await run_parallel(
self.operation1(),
self.operation2(),
)
Function Decorated with @run_in_thread Not Working
Make sure to use await:
# WRONG - returns a coroutine, not the result
result = self.blocking_operation()
# CORRECT - waits for the result
result = await self.blocking_operation()
Best Practices
- Use
@run_in_threadfor blocking libraries - netmiko, paramiko, requests, etc. - Use
run_parallel()for independent operations - Don't await each one separately - Keep blocking operations short - Long-running operations should be broken up
- Handle exceptions - Wrap parallel operations in try/except when needed
- Consider timeouts - Use
asyncio.wait_for()for operations that might hang