Best Practices
Patterns and guidelines for reliable, maintainable device connections in production.
Use the connect() Context Manager
Always prefer the context manager over manual get_connection() / close() calls. It guarantees cleanup even when exceptions occur.
def collect_version(device: Device) -> dict[str, str | None]:
with DeviceInfoProxy.connect(device, "ssh", "scrapli") as proxy:
return proxy.get_version()
If you must manage the lifecycle manually, wrap the usage in try / finally:
proxy = MyProxy.get_connection(device, "ssh", "scrapli")
try:
result = proxy.get_version()
finally:
proxy.close()
Use get_raw_connection() as an Escape Hatch
When no capability covers your specific need, access the underlying library client through the plugin:
def send_custom_command(device: Device, command: str) -> str:
with DeviceInfoProxy.connect(device, "ssh", "scrapli") as proxy:
raw = proxy.plugin.get_raw_connection()
if raw is None:
raise RuntimeError("Connection is not alive.")
response = raw.send_command(command)
return str(response.result)
This is useful for ad-hoc send_command() or send_config() calls. If you find yourself using raw connections repeatedly for the same operation, consider creating a capability interface instead.
Handle NotImplementedForThisPlatform Gracefully
In multi-vendor environments, not every plugin implements every capability. Wrap calls that may be unsupported:
def safe_get_version(device: Device) -> dict[str, str | None] | None:
with DeviceInfoProxy.connect(device, "ssh", fallback_to_default=True) as proxy:
try:
return proxy.get_version()
except NotImplementedForThisPlatform:
return None
The exception includes a context attribute with proxy_class, method_name, interface_name, platform, plugin_class, and device_id for structured error handling.
Avoid Multiple Simultaneous Connections
Do not open more than one connection to the same device at the same time. Most network devices have limited VTY lines or session limits. Open a single connection, perform all operations, then close it before connecting again.
with MyProxy.connect(device, "ssh", "scrapli") as proxy:
version = proxy.get_version()
config = proxy.get_running_config()
Do Not Hold Connections Across Long Operations
Close connections before CPU-intensive processing or external API calls. Network devices may time out idle sessions, and holding connections unnecessarily blocks other automation.
with MyProxy.connect(device, "ssh", "scrapli") as proxy:
raw_config = proxy.get_running_config()
processed = expensive_parsing(raw_config)
Test Plugins with Registry Utilities
Use clear_registry() and get_registry_snapshot() to isolate plugin tests:
from neops_worker_sdk.connection.registry import ( # noqa: E402
clear_registry,
get_registry_snapshot,
)
def test_my_plugin_registers():
import sys
# Remove cached module so re-import triggers registration
sys.modules.pop("frr_plugin", None)
clear_registry()
# pyrefly: ignore [missing-import]
from frr_plugin import FRRNetmikoPlugin
snapshot = get_registry_snapshot()
assert "frr" in snapshot["plugins"]
assert snapshot["platform_defaults"]["frr"] is FRRNetmikoPlugin
clear_registry() removes all registered plugins, so each test starts from a clean state. Always call it in test setup to prevent cross-test contamination.
Credentials and Secrets
Credentials are stored on Device objects in the neops CMS and arrive in
your function block via WorkflowContext. The SDK reads device.username
and device.password directly when creating connections.
- Never hardcode credentials in function blocks or examples.
- For external secrets (HashiCorp Vault, AWS Secrets Manager), resolve them
in the
acquire()phase and set them on the device object before connecting. - SSH key authentication depends on the underlying library (scrapli supports
it natively via
auth_private_key; useget_raw_connection()to configure it).
Enable Mode / Privilege Escalation
Many Cisco and Arista devices require enable mode for privileged commands.
Handle this through your connection plugin's raw connection:
Netmiko — pass secret when constructing the connection, then call enable():
raw = proxy.plugin.get_raw_connection()
raw.secret = device.password # or a dedicated enable secret
raw.enable()
output = raw.send_command("show running-config")
Scrapli — use auth_secondary in the driver options:
raw = proxy.plugin.get_raw_connection()
raw.auth_secondary = device.password
raw.acquire_priv("privilege_exec")
Tip
If you frequently need enable mode, wrap the escalation in your connection plugin so callers don't deal with privilege details.
SSH Key Authentication
For environments that use key-based authentication instead of passwords:
Scrapli — supports auth_private_key natively. Set it in your plugin's
__init__ or pass it through connection options:
from scrapli import Scrapli
conn = Scrapli(
host=device.primary_ip,
auth_username=device.username,
auth_private_key="/path/to/key",
auth_strict_key=False,
transport="asyncssh",
)
Netmiko — pass key_file in the connection dictionary:
from netmiko import ConnectHandler
conn = ConnectHandler(
host=device.primary_ip,
username=device.username,
key_file="/path/to/key",
device_type="linux",
)
In production, store key paths in environment variables or resolve them from
a secrets manager in the acquire() phase.
Retry and Timeout Patterns
The SDK does not retry connections automatically. For transient failures (unreachable hosts, auth timeouts), use a retry decorator:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=2, max=10))
def collect_with_retry(proxy):
return proxy.get_version()
Timeouts are configured at the library level (e.g., scrapli's
timeout_transport, netmiko's timeout). Pass them via the
BaseConnection in a custom plugin, or access the raw connection:
Connection Limits and Concurrency
There is no connection pooling. Each connect() call creates a new session.
- Most network devices have limited VTY/SSH sessions (typically 5–16).
- Two function blocks connecting to the same device in parallel will open separate sessions. The SDK does not serialize access.
- For large-batch operations (hundreds of devices), use a semaphore to limit concurrent connections:
import asyncio
sem = asyncio.Semaphore(20)
async def collect_one(device):
async with sem:
return await self._collect(device)
Summary
| Practice | Why |
|---|---|
connect() context manager |
Guarantees cleanup on success and failure |
get_raw_connection() for one-offs |
Avoids creating a capability for a single use case |
Catch NotImplementedForThisPlatform |
Graceful degradation in multi-vendor environments |
| One connection per device at a time | Respects device session limits |
| Short-lived connections | Avoids idle timeouts and resource exhaustion |
| Enable mode in plugin | Keeps privilege escalation out of function block logic |
| SSH key auth via library options | Avoids password-based auth in hardened environments |
clear_registry() in tests |
Prevents test pollution |