Anatomy of a Function Block
Every function block follows the same three-layer structure: typed parameters flow in, a run() method processes them, and typed results flow out. The platform handles discovery, scheduling, error handling, and persistence — you focus on the automation logic.
Lifecycle
A function block goes through six stages from deployment to database update:
flowchart LR
A[Registration] --> B[Discovery]
B --> C[Acquire]
C --> D[Execute]
D --> E[Result]
E --> F[DB Updates]
- Registration -- The
@register_function_blockdecorator attaches metadata (name, version, schemas) to the class. - Discovery -- On startup, the worker recursively imports all Python modules in the configured directory and collects decorated classes.
- Acquire -- The workflow engine dispatches an
ACQUIREjob. The worker callsacquire()to declare resource needs; the engine locks entities and resolves context. - Execute -- The engine dispatches a separate
EXECUTEjob. The worker callsexecute_function_block(), which validates parameters and invokesrun()with aWorkflowContextholding entity state. - Result --
run()returns a typedFunctionBlockResultwith a success flag, message, and structured data. - DB Updates -- The platform diffs entity state before and after execution, generating create/patch/delete operations automatically.
Parameters
FunctionBlockParams extends Pydantic's BaseModel with extra="ignore". Unknown fields from the workflow engine are dropped (the SDK logs a warning listing them via parse_parameters_from_dto()), so schema extensions never break existing function blocks.
A minimal parameter model declares its fields with type annotations:
Pydantic validates automatically -- if the engine sends {"text": 42}, Pydantic coerces it to "42". Send {} without a required field and validation fails with a clear error before run() is ever called.
Complex parameters: nested models, defaults, and validators
Real-world function blocks often need richer input structures:
class ThresholdConfig(FunctionBlockParams):
"""Nested model for configurable thresholds."""
warning: float = 80.0
critical: float = 95.0
@field_validator("critical")
@classmethod
def critical_above_warning(cls, v: float, info: ValidationInfo) -> float:
warning = info.data.get("warning", 80.0)
if v <= warning:
raise ValueError("critical threshold must exceed warning")
return v
class ComplianceCheckParams(FunctionBlockParams):
policy_name: str
interfaces_to_check: list[str] | None = None
cpu_thresholds: ThresholdConfig = ThresholdConfig()
memory_thresholds: ThresholdConfig = ThresholdConfig()
skip_unreachable: bool = True
Key patterns:
- Nested models -- compose
FunctionBlockParamssubclasses for structured input. - Optional fields --
list[str] | None = Nonefor parameters that may be omitted. - Defaults -- assign values directly (
skip_unreachable: bool = True). - Validators --
@field_validatorfor cross-field or business-rule checks.
JSON schemas auto-generated from these models drive the parameter form in the neops UI.
Results
FunctionBlockResultData extends BaseModel with extra="forbid". Unlike parameters, results enforce strict output validation -- any undeclared field raises a ValidationError. Downstream workflow steps and the UI can trust the schema completely.
Wrap your result data in a FunctionBlockResult:
FunctionBlockResult(
message="Version retrieved successfully.",
success=True,
data=ShowVersionResult(version_info=version_info),
)
| Field | Type | Purpose |
|---|---|---|
success |
bool |
Whether the block completed without errors |
message |
str |
Human-readable summary for the UI and logs |
data |
ResultDataT \| None |
Typed result payload, or None on failure |
Registration
Every function block declares its identity through a Registration dataclass passed to the @register_function_block decorator:
@register_function_block(
Registration(
name="echo",
description="Return the provided text without modification.",
package="fb.examples.neops.io",
version=(1, 0, 0),
run_on="global",
fb_type="execute",
param_cls=EchoParameters,
result_cls=EchoResult,
is_pure=True,
is_idempotent=True,
)
)
| Field | Default | Purpose |
|---|---|---|
name |
-- | Unique name within the package |
package |
-- | Reverse-DNS namespace (e.g. fb.acme.com) |
version |
(1, 0, 0) |
Semantic version tuple (major, minor, patch) |
description |
-- | Short description shown in the UI |
run_on |
-- | Entity scope: "device", "interface", "group", or "global" |
fb_type |
-- | Category: "configure", "facts", "check", "execute", "none" |
param_cls |
-- | The FunctionBlockParams subclass for this block |
result_cls |
-- | The FunctionBlockResultData subclass for this block |
is_pure |
False |
True if the block has no side effects |
is_idempotent |
False |
True if repeated execution produces the same result |
markdown_helptext |
"No helptext provided" |
Markdown documentation shown in the UI |
deprecated |
False |
Marks the block as deprecated |
The platform identifies each block as {package}/{name}:{major}.{minor}.{patch}. For the example above, that resolves to fb.examples.neops.io/echo:1.0.0.
JSON schemas for param_cls and result_cls are auto-generated at registration time. The workflow engine uses these for input validation in both the UI and the API.
See Registration (Engine) for how the engine stores, resolves, and validates registrations.
Lifecycle Methods
The FunctionBlock base class defines three lifecycle methods:
| Method | Required | Purpose |
|---|---|---|
run(params, context) |
Yes | Core execution logic |
acquire(params) |
Yes | Declare resource needs before execution |
rollback(params, context, result) |
No | Undo changes on failure |
run() receives validated parameters and a WorkflowContext holding entity data. This is where your automation logic lives. Return a FunctionBlockResult with your typed output.
acquire() is dispatched as a separate job before run(). It declares which entities the block needs. The workflow engine locks resources and resolves additional context based on your response. For blocks that need no specific resources, return a simple success.
rollback() is intended for undoing changes on failure. The default implementation returns FunctionBlockRollbackResult(success=False, message="Rollback not implemented"). Override it when your block modifies external systems that need cleanup — it receives the failed result so you can inspect what went wrong.
Full FunctionBlock base class
class FunctionBlock(Generic[ParamsT, ResultDataT], ABC):
@final
def __init__(self, logger: BaseLogger | None = None) -> None:
self.logger: BaseLogger = logger or Logger()
@abstractmethod
async def run(self, params: ParamsT, context: WorkflowContext) -> FunctionBlockResult[ResultDataT]:
"""
Run the function block with the given context and return a result.
"""
@abstractmethod
async def acquire(self, params: ParamsT) -> FunctionBlockAcquireResult:
"""
Acquire resources or perform setup before running the function block.
This method should be implemented by subclasses to define how to acquire necessary resources.
"""
async def rollback(
self,
params: ParamsT,
context: WorkflowContext,
result_from_failed: FunctionBlockResult[ResultDataT],
) -> FunctionBlockRollbackResult:
"""
Rollback the function block in case of failure.
This method should be implemented by subclasses to define how to rollback the changes made by the function block.
"""
return FunctionBlockRollbackResult(success=False, message="Rollback not implemented")
@final
async def execute_function_block(
self,
params: ParamsT,
context: WorkflowContext,
*,
propagate_exceptions: bool = False,
) -> FunctionBlockResult[ResultDataT]:
"""
Execute the function block and return its result.
If ``propagate_exceptions`` is True any exception raised by ``run`` will be
re-raised so that tooling (e.g. pytest) can show a full stack-trace that
points to the actual error location.
This is essential for debugging and testing.
In production code the default (``False``) keeps the previous behaviour and converts exceptions into a
failed ``FunctionBlockResult``.
"""
if propagate_exceptions:
# Let exceptions bubble up for better debuggability
return await self.run(params, context)
try:
return await self.run(params, context)
except Exception as e:
return FunctionBlockResult(
message=f"Error executing function block: {e!s}",
success=False,
data=None,
)
@final
def parse_parameters_from_dto(self, dto: dict[str, Any], param_cls: type[ParamsT]) -> ParamsT:
"""
Parse and validate parameters from a DTO dictionary.
Uses Pydantic's model_validate for automatic type coercion and validation.
Logs a warning if extra properties are present in the input.
"""
# pyrefly doesn't understand Pydantic's model_fields descriptor
expected_fields = set(param_cls.model_fields.keys()) # type: ignore[no-matching-overload]
provided_fields = set(dto.keys())
extra_fields = provided_fields - expected_fields
if extra_fields:
expected_fields_sorted = sorted(field for field in expected_fields if field is not None)
self.logger.warning(
f"Extra properties in parameters for {param_cls.__name__}: {sorted(extra_fields)}. "
f"Expected: {expected_fields_sorted}"
)
return param_cls.model_validate(dto)
execute_function_block()
You never call run() directly. The @final method execute_function_block() wraps every execution:
- Production (default) -- catches all exceptions from
run()and converts them into a failedFunctionBlockResult. A single failing block never crashes the worker process. - Testing (
propagate_exceptions=True) -- lets exceptions propagate with full stack traces so pytest points you to the exact error location.
result = await block.execute_function_block(
params=MyParams(name="test"),
context=context,
propagate_exceptions=True,
)
Tip
Always use execute_function_block() in tests rather than calling run() directly. With propagate_exceptions=True you get the same execution boundary as production but with full traceability.