Skip to content

Anatomy of a Function Block

Every function block follows the same three-layer structure: typed parameters flow in, a run() method processes them, and typed results flow out. The platform handles discovery, scheduling, error handling, and persistence — you focus on the automation logic.


Lifecycle

A function block goes through six stages from deployment to database update:

flowchart LR
    A[Registration] --> B[Discovery]
    B --> C[Acquire]
    C --> D[Execute]
    D --> E[Result]
    E --> F[DB Updates]
  1. Registration -- The @register_function_block decorator attaches metadata (name, version, schemas) to the class.
  2. Discovery -- On startup, the worker recursively imports all Python modules in the configured directory and collects decorated classes.
  3. Acquire -- The workflow engine dispatches an ACQUIRE job. The worker calls acquire() to declare resource needs; the engine locks entities and resolves context.
  4. Execute -- The engine dispatches a separate EXECUTE job. The worker calls execute_function_block(), which validates parameters and invokes run() with a WorkflowContext holding entity state.
  5. Result -- run() returns a typed FunctionBlockResult with a success flag, message, and structured data.
  6. DB Updates -- The platform diffs entity state before and after execution, generating create/patch/delete operations automatically.

Parameters

FunctionBlockParams extends Pydantic's BaseModel with extra="ignore". Unknown fields from the workflow engine are dropped (the SDK logs a warning listing them via parse_parameters_from_dto()), so schema extensions never break existing function blocks.

A minimal parameter model declares its fields with type annotations:

class EchoParameters(FunctionBlockParams):
    message: str

Pydantic validates automatically -- if the engine sends {"text": 42}, Pydantic coerces it to "42". Send {} without a required field and validation fails with a clear error before run() is ever called.

Complex parameters: nested models, defaults, and validators

Real-world function blocks often need richer input structures:

class ThresholdConfig(FunctionBlockParams):
    """Nested model for configurable thresholds."""

    warning: float = 80.0
    critical: float = 95.0

    @field_validator("critical")
    @classmethod
    def critical_above_warning(cls, v: float, info: ValidationInfo) -> float:
        warning = info.data.get("warning", 80.0)
        if v <= warning:
            raise ValueError("critical threshold must exceed warning")
        return v


class ComplianceCheckParams(FunctionBlockParams):
    policy_name: str
    interfaces_to_check: list[str] | None = None
    cpu_thresholds: ThresholdConfig = ThresholdConfig()
    memory_thresholds: ThresholdConfig = ThresholdConfig()
    skip_unreachable: bool = True

Key patterns:

  • Nested models -- compose FunctionBlockParams subclasses for structured input.
  • Optional fields -- list[str] | None = None for parameters that may be omitted.
  • Defaults -- assign values directly (skip_unreachable: bool = True).
  • Validators -- @field_validator for cross-field or business-rule checks.

JSON schemas auto-generated from these models drive the parameter form in the neops UI.


Results

FunctionBlockResultData extends BaseModel with extra="forbid". Unlike parameters, results enforce strict output validation -- any undeclared field raises a ValidationError. Downstream workflow steps and the UI can trust the schema completely.

class EchoResult(FunctionBlockResultData):
    echoed_message: str

Wrap your result data in a FunctionBlockResult:

FunctionBlockResult(
    message="Version retrieved successfully.",
    success=True,
    data=ShowVersionResult(version_info=version_info),
)
Field Type Purpose
success bool Whether the block completed without errors
message str Human-readable summary for the UI and logs
data ResultDataT \| None Typed result payload, or None on failure

Registration

Every function block declares its identity through a Registration dataclass passed to the @register_function_block decorator:

@register_function_block(
    Registration(
        name="echo",
        description="Return the provided text without modification.",
        package="fb.examples.neops.io",
        version=(1, 0, 0),
        run_on="global",
        fb_type="execute",
        param_cls=EchoParameters,
        result_cls=EchoResult,
        is_pure=True,
        is_idempotent=True,
    )
)
Field Default Purpose
name -- Unique name within the package
package -- Reverse-DNS namespace (e.g. fb.acme.com)
version (1, 0, 0) Semantic version tuple (major, minor, patch)
description -- Short description shown in the UI
run_on -- Entity scope: "device", "interface", "group", or "global"
fb_type -- Category: "configure", "facts", "check", "execute", "none"
param_cls -- The FunctionBlockParams subclass for this block
result_cls -- The FunctionBlockResultData subclass for this block
is_pure False True if the block has no side effects
is_idempotent False True if repeated execution produces the same result
markdown_helptext "No helptext provided" Markdown documentation shown in the UI
deprecated False Marks the block as deprecated

The platform identifies each block as {package}/{name}:{major}.{minor}.{patch}. For the example above, that resolves to fb.examples.neops.io/echo:1.0.0.

JSON schemas for param_cls and result_cls are auto-generated at registration time. The workflow engine uses these for input validation in both the UI and the API.

See Registration (Engine) for how the engine stores, resolves, and validates registrations.


Lifecycle Methods

The FunctionBlock base class defines three lifecycle methods:

Method Required Purpose
run(params, context) Yes Core execution logic
acquire(params) Yes Declare resource needs before execution
rollback(params, context, result) No Undo changes on failure

run() receives validated parameters and a WorkflowContext holding entity data. This is where your automation logic lives. Return a FunctionBlockResult with your typed output.

acquire() is dispatched as a separate job before run(). It declares which entities the block needs. The workflow engine locks resources and resolves additional context based on your response. For blocks that need no specific resources, return a simple success.

rollback() is intended for undoing changes on failure. The default implementation returns FunctionBlockRollbackResult(success=False, message="Rollback not implemented"). Override it when your block modifies external systems that need cleanup — it receives the failed result so you can inspect what went wrong.

Full FunctionBlock base class
class FunctionBlock(Generic[ParamsT, ResultDataT], ABC):
    @final
    def __init__(self, logger: BaseLogger | None = None) -> None:
        self.logger: BaseLogger = logger or Logger()

    @abstractmethod
    async def run(self, params: ParamsT, context: WorkflowContext) -> FunctionBlockResult[ResultDataT]:
        """
        Run the function block with the given context and return a result.
        """

    @abstractmethod
    async def acquire(self, params: ParamsT) -> FunctionBlockAcquireResult:
        """
        Acquire resources or perform setup before running the function block.
        This method should be implemented by subclasses to define how to acquire necessary resources.
        """

    async def rollback(
        self,
        params: ParamsT,
        context: WorkflowContext,
        result_from_failed: FunctionBlockResult[ResultDataT],
    ) -> FunctionBlockRollbackResult:
        """
        Rollback the function block in case of failure.
        This method should be implemented by subclasses to define how to rollback the changes made by the function block.
        """
        return FunctionBlockRollbackResult(success=False, message="Rollback not implemented")

    @final
    async def execute_function_block(
        self,
        params: ParamsT,
        context: WorkflowContext,
        *,
        propagate_exceptions: bool = False,
    ) -> FunctionBlockResult[ResultDataT]:
        """
        Execute the function block and return its result.

        If ``propagate_exceptions`` is True any exception raised by ``run`` will be
        re-raised so that tooling (e.g. pytest) can show a full stack-trace that
        points to the actual error location.
        This is essential for debugging and testing.
        In production code the default (``False``) keeps the previous behaviour and converts exceptions into a
        failed ``FunctionBlockResult``.
        """
        if propagate_exceptions:
            # Let exceptions bubble up for better debuggability
            return await self.run(params, context)

        try:
            return await self.run(params, context)
        except Exception as e:
            return FunctionBlockResult(
                message=f"Error executing function block: {e!s}",
                success=False,
                data=None,
            )

    @final
    def parse_parameters_from_dto(self, dto: dict[str, Any], param_cls: type[ParamsT]) -> ParamsT:
        """
        Parse and validate parameters from a DTO dictionary.

        Uses Pydantic's model_validate for automatic type coercion and validation.
        Logs a warning if extra properties are present in the input.
        """
        # pyrefly doesn't understand Pydantic's model_fields descriptor
        expected_fields = set(param_cls.model_fields.keys())  # type: ignore[no-matching-overload]
        provided_fields = set(dto.keys())
        extra_fields = provided_fields - expected_fields
        if extra_fields:
            expected_fields_sorted = sorted(field for field in expected_fields if field is not None)
            self.logger.warning(
                f"Extra properties in parameters for {param_cls.__name__}: {sorted(extra_fields)}. "
                f"Expected: {expected_fields_sorted}"
            )
        return param_cls.model_validate(dto)

execute_function_block()

You never call run() directly. The @final method execute_function_block() wraps every execution:

  • Production (default) -- catches all exceptions from run() and converts them into a failed FunctionBlockResult. A single failing block never crashes the worker process.
  • Testing (propagate_exceptions=True) -- lets exceptions propagate with full stack traces so pytest points you to the exact error location.
result = await block.execute_function_block(
    params=MyParams(name="test"),
    context=context,
    propagate_exceptions=True,
)

Tip

Always use execute_function_block() in tests rather than calling run() directly. With propagate_exceptions=True you get the same execution boundary as production but with full traceability.