Skip to content

Async Patterns for Function Blocks

This guide explains how to handle blocking operations and achieve parallel execution in function blocks. It's designed for network engineers who may not be familiar with Python's async/await patterns.

Quick Start

from neops_worker_sdk.concurrency import run_in_thread, run_parallel

class MyFunctionBlock(FunctionBlock):
    @run_in_thread
    def fetch_config(self, ip: str) -> str:
        """Blocking operation - runs in a thread pool."""
        with ConnectHandler(host=ip, ...) as conn:
            return conn.send_command("show run")

    async def run(self, params, context):
        # Fetch from 3 devices IN PARALLEL
        configs = await run_parallel(
            self.fetch_config("10.0.0.1"),
            self.fetch_config("10.0.0.2"),
            self.fetch_config("10.0.0.3"),
        )
        # configs = ["config1", "config2", "config3"]

The concurrency Package

The SDK provides three main tools in neops_worker_sdk.concurrency:

Tool Purpose
@run_in_thread Wrap blocking functions for async execution
run_parallel() Execute multiple operations concurrently
@schedule_task Create fire-and-forget background tasks

@run_in_thread Decorator

Use this decorator when you have a function that performs blocking I/O (network connections, file operations, database queries, etc.).

Basic Usage

from neops_worker_sdk.concurrency import run_in_thread

class NetworkFunctionBlock(FunctionBlock):
    @run_in_thread
    def connect_to_device(self, ip: str, command: str) -> str:
        """
        This method uses netmiko, which blocks while waiting for the device.
        The @run_in_thread decorator runs it in a separate thread.
        """
        device = {
            "device_type": "cisco_ios",
            "host": ip,
            "username": "admin",
            "password": "secret",
        }
        with ConnectHandler(**device) as conn:
            return conn.send_command(command)

    async def run(self, params, context):
        # This now works in async code!
        output = await self.connect_to_device("10.0.0.1", "show version")
        return FunctionBlockResult(success=True, message="Done", data=...)

How It Works

  1. When you call a @run_in_thread decorated function, it returns an "awaitable"
  2. Use await to get the result
  3. While waiting, the event loop can do other things (like heartbeat pings)
  4. The actual blocking code runs in a separate thread

When to Use

Use @run_in_thread for:

  • netmiko - SSH connections to network devices
  • paramiko - Low-level SSH operations
  • requests - HTTP requests (though aiohttp is better)
  • File I/O - Reading/writing large files
  • Database queries - Synchronous database drivers
  • subprocess - Running external commands (though asyncio.create_subprocess_exec is better)

run_parallel() Function

Use this to execute multiple operations at the same time and wait for all results.

Basic Usage

from neops_worker_sdk.concurrency import run_in_thread, run_parallel

class MultiDeviceFunctionBlock(FunctionBlock):
    @run_in_thread
    def get_device_info(self, ip: str) -> dict:
        """Get information from a single device."""
        with ConnectHandler(host=ip, ...) as conn:
            version = conn.send_command("show version")
            interfaces = conn.send_command("show interfaces")
        return {"version": version, "interfaces": interfaces}

    async def run(self, params, context):
        # Get info from ALL devices at the same time
        devices = ["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]

        results = await run_parallel(
            self.get_device_info(devices[0]),
            self.get_device_info(devices[1]),
            self.get_device_info(devices[2]),
            self.get_device_info(devices[3]),
        )

        # results[0] = info from 10.0.0.1
        # results[1] = info from 10.0.0.2
        # etc.

        return FunctionBlockResult(success=True, message="Done", data=...)

Dynamic Number of Operations

If you have a variable number of operations:

async def run(self, params, context):
    devices = context.devices  # Unknown number of devices

    # Create a list of operations
    operations = [
        self.get_device_info(device.ip)
        for device in devices
    ]

    # Run them all in parallel
    results = await run_parallel(*operations)

    # results is a list with one result per device

Key Properties

  • Order preserved: Results are in the same order as inputs
  • Parallel execution: All operations run at the same time
  • Error handling: If any operation fails, the first exception is raised

@schedule_task Decorator

Use this for "fire-and-forget" operations that run in the background.

Basic Usage

from neops_worker_sdk.concurrency import schedule_task

class LoggingFunctionBlock(FunctionBlock):
    @schedule_task
    async def log_to_external_system(self, message: str) -> None:
        """Send log to external system without waiting."""
        await some_async_logging_call(message)

    async def run(self, params, context):
        # Start logging in background - doesn't wait
        log_task = self.log_to_external_system("Starting operation")

        # Continue with main work immediately
        result = await self.do_main_work()

        # Optionally wait for log to complete
        await log_task

        return result

Use Cases

  • Sending notifications
  • Background logging
  • Cache updates
  • Metrics collection

Blocking Detection

The SDK automatically detects when your code is blocking for too long and shows a helpful warning with the exact location of the blocking code.

Example Warning

WARNING: Blocking operation detected in ShowCmd.run() (exceeded 0.5s threshold)
Blocking at:
  File "fb_get_show_cmd.py", line 78, in run
    with ConnectHandler(**device) as conn:
  File "fb_get_show_cmd.py", line 79, in run
    output = conn.send_command(params.cmd)
Tip: Use @run_in_thread decorator for parallel execution:

    from neops_worker_sdk.concurrency import run_in_thread, run_parallel

    @run_in_thread
    def blocking_operation(self, ...):
        # Your blocking code here

    async def run(self, params, context):
        # Run multiple operations in parallel:
        results = await run_parallel(
            self.blocking_operation(arg1),
            self.blocking_operation(arg2),
        )

Configuration

The detection threshold can be configured via environment variable:

# Threshold in seconds (default: 0.5, set to 0 to disable)
export BLOCKING_DETECTION_THRESHOLD=1.0

Common Patterns

Pattern 1: Sequential Device Access

When you need to access devices one at a time:

async def run(self, params, context):
    results = []
    for device in context.devices:
        # Each call waits for the previous one
        result = await self.connect_to_device(device.ip, params.command)
        results.append(result)
    return results

Pattern 2: Parallel Device Access

When you can access all devices simultaneously:

async def run(self, params, context):
    results = await run_parallel(*[
        self.connect_to_device(device.ip, params.command)
        for device in context.devices
    ])
    return results

Pattern 3: Batched Parallel Access

When you have many devices but want to limit concurrency:

async def run(self, params, context):
    batch_size = 10
    all_results = []

    for i in range(0, len(context.devices), batch_size):
        batch = context.devices[i:i + batch_size]
        batch_results = await run_parallel(*[
            self.connect_to_device(device.ip, params.command)
            for device in batch
        ])
        all_results.extend(batch_results)

    return all_results

Pattern 4: Mixed Operations

Combining blocking I/O with async operations:

async def run(self, params, context):
    # These can all run in parallel
    results = await run_parallel(
        self.fetch_from_device(context.devices[0].ip),  # @run_in_thread
        self.fetch_from_api("http://api.example.com"),  # native async
        self.read_large_file("/path/to/config"),        # @run_in_thread
    )

    device_config, api_data, file_content = results
    return self.process_results(device_config, api_data, file_content)

Troubleshooting

"RuntimeError: no running event loop"

This happens when you try to use async features outside of an async context. Make sure you're calling await inside an async def function.

Operations Running Sequentially Instead of Parallel

Make sure you're using run_parallel():

# WRONG - runs sequentially
result1 = await self.operation1()
result2 = await self.operation2()

# CORRECT - runs in parallel
result1, result2 = await run_parallel(
    self.operation1(),
    self.operation2(),
)

Function Decorated with @run_in_thread Not Working

Make sure to use await:

# WRONG - returns a coroutine, not the result
result = self.blocking_operation()

# CORRECT - waits for the result
result = await self.blocking_operation()

Best Practices

  1. Use @run_in_thread for blocking libraries - netmiko, paramiko, requests, etc.
  2. Use run_parallel() for independent operations - Don't await each one separately
  3. Keep blocking operations short - Long-running operations should be broken up
  4. Handle exceptions - Wrap parallel operations in try/except when needed
  5. Consider timeouts - Use asyncio.wait_for() for operations that might hang