Advanced Patterns
This page covers patterns you will encounter as your function blocks grow beyond simple read-and-return operations: requesting additional data before execution, handling rollback, deciding when to split blocks, and structuring error handling.
The Acquire Phase
Every function block has an acquire() method that the workflow engine calls before run(). Its purpose is to request additional entity data that the block needs but that is not included in the default context.
sequenceDiagram
participant WE as Workflow Engine
participant FB as Function Block
WE->>FB: acquire(params)
FB-->>WE: FunctionBlockAcquireResult (acquires list)
WE->>WE: Process acquisitions, enrich context
WE->>FB: run(params, enriched_context)
FB-->>WE: FunctionBlockResult
acquire() returns a FunctionBlockAcquireResult containing an optional list of EntityAcquire items. Each item is one of:
| Type | Purpose |
|---|---|
EntityAcquireByElasticQuery |
Fetch entities matching an Elasticsearch query |
EntityAcquireContext |
Expand the context to include related entities (e.g., all interfaces of a device) |
The workflow engine processes these acquisitions, merges the results into the WorkflowContext, and then calls run() with the enriched context.
If your block does not need additional data, return an empty result:
async def acquire(self, params: MyParams) -> FunctionBlockAcquireResult:
return FunctionBlockAcquireResult(
message="No additional data required.",
success=True,
acquires=None,
)
When to use acquire
Use the acquire phase when your function block operates on a device but also needs
data about its interfaces, parent group, or devices matching a dynamic query. Without
acquire, the context only contains the entities the workflow engine provides by default
based on run_on.
Rollback
The FunctionBlock base class defines a rollback() method that the workflow engine can call when a subsequent step in the workflow fails and the engine wants to undo this block's changes.
async def rollback(
self,
params: ParamsT,
context: WorkflowContext,
result_from_failed: FunctionBlockResult[ResultDataT],
) -> FunctionBlockRollbackResult:
The default implementation returns success=False with the message "Rollback not implemented".
Current status
Rollback support is defined in the SDK interface but is not yet fully integrated into the workflow engine's transaction model. Implement it if you want to future-proof your function blocks, but be aware that the engine does not currently invoke it automatically.
When designing for rollback, consider:
- Store the "before" state in your result data so the rollback method has something to restore.
- Idempotent rollbacks — calling rollback twice should not cause errors.
- Partial rollback — if your block made three changes and only two succeeded, rollback should handle the partial state.
When to Split Function Blocks
A function block should encapsulate a single concern. Signs that a block is doing too much:
- It opens connections to multiple devices sequentially.
- It mixes read and write operations (e.g., fetching config then deploying a template).
- It contains steps that could reasonably be retried independently.
- Different parts of its logic have different purity or idempotency characteristics.
Splitting provides concrete benefits:
| Benefit | Why it matters |
|---|---|
| Fine-grained retry | The workflow engine can retry a failed step without re-running successful ones |
| Accurate safety semantics | Each block can declare its own is_pure and is_idempotent flags |
| Workflow composition | Smaller blocks can be reused across multiple workflows |
| Clearer logging | Each block has its own execution log and result |
Example: config deploy pipeline
Instead of one large ConfigDeploy block:
ConfigBackup(facts,is_pure=True) — read and store current configConfigValidate(check,is_pure=True) — validate the new templateConfigPush(configure,is_idempotent=True) — deploy the templateConfigVerify(check,is_pure=True) — verify post-deploy state
If step 3 fails, the engine can retry it without re-running steps 1 and 2.
Error Handling Strategies
Function blocks can signal failure in two ways:
1. Return success=False
The preferred approach. Return a FunctionBlockResult with success=False and a descriptive message:
if device is None:
return FunctionBlockResult(
message="No device in context.",
success=False,
data=None,
)
The workflow engine receives the result and decides how to proceed (retry, skip, abort) based on the workflow configuration.
2. Raise an exception
If your code raises an unhandled exception, the execute_function_block() wrapper catches it and converts it to a failed result automatically:
# This exception becomes:
# FunctionBlockResult(message="Error executing function block: ...", success=False)
raise ConnectionError("SSH connection timed out")
The propagate_exceptions parameter on execute_function_block() controls this behavior:
| Value | Behavior | Use case |
|---|---|---|
False (default) |
Exceptions are caught and converted to failed results | Production |
True |
Exceptions propagate with full stack traces | Testing and debugging |
The testing framework sets propagate_exceptions=True by default so that pytest shows the actual error location rather than a generic failure message.
Prefer explicit failure over exceptions
Returning success=False with a clear message gives the workflow engine and operators
more actionable information than a raw exception. Reserve exceptions for truly
unexpected situations.