Skip to content

Writing Base Plugins (New Library)

This page explains how to add support for a new third‑party connection library by implementing:

  • a BaseConnection wrapper (connect/disconnect/is_alive)
  • a ConnectionPlugin base for that library

Then platform plugins can inherit from your base and add capabilities.


What a “ConnectionPlugin base” is in this SDK

The codebase uses a two-layer design:

  • BaseConnection: wraps the raw client object (e.g. Netmiko connection, Scrapli driver, httpx client).
  • ConnectionPlugin: owns the active connection and exposes capability methods.

Base plugins live under:

  • neops_worker_sdk/connection/plugins/base/

Examples to use as reference:

  • neops_worker_sdk/connection/plugins/base/netmiko_plugin.py
  • neops_worker_sdk/connection/plugins/base/scrapli_plugin.py

Key rules:

  • Validate device fields early (ip, username, password, …).
  • Lazy import the third‑party dependency inside connect() so missing deps only fail when used, unless you are sure the dependency is installed in your environment.
  • Store the raw connection in self._connection only after a successful connect. (BaseConnection.connection reads from this.)
  • Raise SDK exceptions for consistent handling:
  • ConnectionValidationError for missing fields
  • ConnectionCreationError for import/connect failures
  • Make disconnect() defensive: log warnings, don’t raise.
  • Make is_alive defensive: wrap errors and return False on unexpected exceptions.
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from neops_worker_sdk.connection.exceptions import ConnectionCreationError, ConnectionValidationError
from neops_worker_sdk.connection.types import BaseConnection
from neops_worker_sdk.logger.logger import Logger

logger = Logger()

if TYPE_CHECKING:
    from some_lib import Client as SomeLibType  # type-only import


class SomeLibConnection(BaseConnection["SomeLibType"]):
    def connect(self, **kwargs: Any) -> None:
        # 1) Lazy import
        try:
            from some_lib import Client
        except ImportError as exc:
            raise ConnectionCreationError(
                "some_lib is not installed. Add it to your dependencies.",
                device_id=str(self.device.id) if self.device.id else None,
                cause=exc,
            ) from exc

        # 2) Validate device fields
        if not self.device.ip:
            raise ConnectionValidationError(
                "Device has no IP address, cannot connect.",
                device_id=str(self.device.id) if self.device.id else None,
                missing_fields=["ip"],
            )
        if not self.device.username:
            raise ConnectionValidationError(
                "Device has no username, cannot connect.",
                device_id=str(self.device.id) if self.device.id else None,
                missing_fields=["username"],
            )
        if not self.device.password:
            raise ConnectionValidationError(
                "Device has no password, cannot connect.",
                device_id=str(self.device.id) if self.device.id else None,
                missing_fields=["password"],
            )

        # 3) Create and connect raw client
        try:
            client = Client(host=self.device.ip, username=self.device.username, password=self.device.password, **kwargs)
            client.connect()
            self._connection = client
            logger.info("Connected to device %s via some_lib", self.device.id)
        except Exception as exc:
            raise ConnectionCreationError(
                "Failed to establish some_lib connection.",
                device_id=str(self.device.id) if self.device.id else None,
                cause=exc,
            ) from exc

    def disconnect(self) -> None:
        try:
            if self._connection:
                self._connection.close()
                self._connection = None
                logger.info("Disconnected some_lib connection for device %s", self.device.id)
            else:
                logger.warning("No some_lib connection to close for device %s", self.device.id)
        except Exception as exc:
            logger.warning("Error closing some_lib connection for device %s: %s", self.device.id, exc)

    @property
    def is_alive(self) -> bool:
        try:
            return bool(self._connection and self._connection.is_alive())
        except Exception as exc:
            logger.warning("Error checking some_lib connection status for device %s: %s", self.device.id, exc)
            return False

ConnectionPlugin base for the new library

Once you have the wrapper, create a plugin base that:

  • sets connection_type and connection_library
  • implements _create_connection() to return your BaseConnection
from __future__ import annotations

from typing import TYPE_CHECKING

from neops_worker_sdk.connection.types import ConnectionPlugin

if TYPE_CHECKING:
    from some_lib import Client as SomeLibType  # type-only import


class SomeLibConnectionPluginBase(ConnectionPlugin[SomeLibConnection, "SomeLibType"]):
    connection_type = "ssh"
    connection_library = "some_lib"

    def _create_connection(self) -> SomeLibConnection:
        # SomeLibConnection is the BaseConnection wrapper defined above
        return SomeLibConnection(self.device)

Platform plugins then inherit from this base and add capabilities (see 35-platform-plugins-and-defaults.md).

See 35-platform-plugins-and-defaults.md for platform plugin examples.

For designing and composing the capabilities your plugins implement, see 33-writing-capabilities-and-proxies.md.


Common pitfalls

  1. Importing third-party libs at module import time

This makes everything fail if a dependency isn’t installed. Prefer lazy imports inside connect(), unless you are sure the dependency is installed in your environment.

  1. Raising generic ValueError

Use ConnectionValidationError / ConnectionCreationError so callers can handle failures consistently.

  1. Disconnect that raises

Cleanup should be robust. Log warnings/errors but don’t throw new exceptions from disconnect().

  1. is_alive that throws

Treat is_alive like a health check: return False on errors and log once.


Logging guidelines (using Logger()):

  • logger.info(): user-visible lifecycle events (connected/disconnected, executing a high-level action)
  • logger.warning(): recoverable issues (parsing fallbacks, cleanup errors, transient health check failures)
  • logger.error(): unexpected failures you cannot recover from cleanly
  • logger.debug(): internal state transitions and noisy details

Exception context guidelines:

  • always include device_id when available
  • include cause when wrapping third‑party exceptions (preserves the original error)
  • use missing_fields on ConnectionValidationError so callers can fix data quickly