Skip to content

Advanced Patterns

This page covers patterns you will encounter as your function blocks grow beyond simple read-and-return operations: requesting additional data before execution, handling rollback, deciding when to split blocks, and structuring error handling.

The Acquire Phase

Every function block has an acquire() method that the workflow engine calls before run(). Its purpose is to request additional entity data that the block needs but that is not included in the default context.

sequenceDiagram
    participant WE as Workflow Engine
    participant FB as Function Block
    WE->>FB: acquire(params)
    FB-->>WE: FunctionBlockAcquireResult (acquires list)
    WE->>WE: Process acquisitions, enrich context
    WE->>FB: run(params, enriched_context)
    FB-->>WE: FunctionBlockResult

acquire() returns a FunctionBlockAcquireResult containing an optional list of EntityAcquire items. Each item is one of:

Type Purpose
EntityAcquireByElasticQuery Fetch entities matching an Elasticsearch query
EntityAcquireContext Expand the context to include related entities (e.g., all interfaces of a device)

The workflow engine processes these acquisitions, merges the results into the WorkflowContext, and then calls run() with the enriched context.

If your block does not need additional data, return an empty result:

async def acquire(self, params: MyParams) -> FunctionBlockAcquireResult:
    return FunctionBlockAcquireResult(
        message="No additional data required.",
        success=True,
        acquires=None,
    )

When to use acquire

Use the acquire phase when your function block operates on a device but also needs data about its interfaces, parent group, or devices matching a dynamic query. Without acquire, the context only contains the entities the workflow engine provides by default based on run_on.

Rollback

The FunctionBlock base class defines a rollback() method that the workflow engine can call when a subsequent step in the workflow fails and the engine wants to undo this block's changes.

async def rollback(
    self,
    params: ParamsT,
    context: WorkflowContext,
    result_from_failed: FunctionBlockResult[ResultDataT],
) -> FunctionBlockRollbackResult:

The default implementation returns success=False with the message "Rollback not implemented".

Current status

Rollback support is defined in the SDK interface but is not yet fully integrated into the workflow engine's transaction model. Implement it if you want to future-proof your function blocks, but be aware that the engine does not currently invoke it automatically.

When designing for rollback, consider:

  • Store the "before" state in your result data so the rollback method has something to restore.
  • Idempotent rollbacks — calling rollback twice should not cause errors.
  • Partial rollback — if your block made three changes and only two succeeded, rollback should handle the partial state.

When to Split Function Blocks

A function block should encapsulate a single concern. Signs that a block is doing too much:

  • It opens connections to multiple devices sequentially.
  • It mixes read and write operations (e.g., fetching config then deploying a template).
  • It contains steps that could reasonably be retried independently.
  • Different parts of its logic have different purity or idempotency characteristics.

Splitting provides concrete benefits:

Benefit Why it matters
Fine-grained retry The workflow engine can retry a failed step without re-running successful ones
Accurate safety semantics Each block can declare its own is_pure and is_idempotent flags
Workflow composition Smaller blocks can be reused across multiple workflows
Clearer logging Each block has its own execution log and result
Example: config deploy pipeline

Instead of one large ConfigDeploy block:

  1. ConfigBackup (facts, is_pure=True) — read and store current config
  2. ConfigValidate (check, is_pure=True) — validate the new template
  3. ConfigPush (configure, is_idempotent=True) — deploy the template
  4. ConfigVerify (check, is_pure=True) — verify post-deploy state

If step 3 fails, the engine can retry it without re-running steps 1 and 2.

Error Handling Strategies

Function blocks can signal failure in two ways:

1. Return success=False

The preferred approach. Return a FunctionBlockResult with success=False and a descriptive message:

if device is None:
    return FunctionBlockResult(
        message="No device in context.",
        success=False,
        data=None,
    )

The workflow engine receives the result and decides how to proceed (retry, skip, abort) based on the workflow configuration.

2. Raise an exception

If your code raises an unhandled exception, the execute_function_block() wrapper catches it and converts it to a failed result automatically:

# This exception becomes:
# FunctionBlockResult(message="Error executing function block: ...", success=False)
raise ConnectionError("SSH connection timed out")

The propagate_exceptions parameter on execute_function_block() controls this behavior:

Value Behavior Use case
False (default) Exceptions are caught and converted to failed results Production
True Exceptions propagate with full stack traces Testing and debugging

The testing framework sets propagate_exceptions=True by default so that pytest shows the actual error location rather than a generic failure message.

Prefer explicit failure over exceptions

Returning success=False with a clear message gives the workflow engine and operators more actionable information than a raw exception. Reserve exceptions for truly unexpected situations.