Working with Data
Function blocks receive their data through the WorkflowContext -- a container of entity state that the workflow engine populates before execution. You read from it, modify it, and the platform persists the changes automatically. No manual database calls, no serialization code.
The WorkflowContext
When the workflow engine dispatches a job, it resolves the relevant entities from the CMS and bundles them into a JobExecutionContextDto. The SDK wraps this into a WorkflowContext that your run() method receives.
Platform Components
The workflow engine populates the context from CMS data and dispatches it with each job. See the engine's Context concept and Acquire phase for how entity resolution works on the engine side. CMS documentation is forthcoming.
The context exposes three entity collections and three convenience properties:
| Property | Type | Populated when |
|---|---|---|
context.devices |
list[DeviceTypeDto] \| None |
Always (if devices exist in scope) |
context.device_groups |
list[DeviceGroupTypeDto] \| None |
Always (if groups exist in scope) |
context.interfaces |
list[InterfaceTypeDto] \| None |
Always (if interfaces exist in scope) |
context.device |
DeviceTypeDto \| None |
run_on="device" |
context.device_group |
DeviceGroupTypeDto \| None |
run_on="group" |
context.interface |
InterfaceTypeDto \| None |
run_on="interface" |
The convenience properties (device, device_group, interface) are set based on the run_on value in your registration. When run_on="device", the SDK matches the job's entity_id against context.devices and assigns the matching entity to context.device.
For function blocks that don't target a specific entity (e.g. discovery tasks
that scan a subnet), use run_on="global". All convenience properties remain
None, and you work directly with the collections.
Entity ID mismatch
If run_on is "device", "group", or "interface" but the entity_id
does not match any entity in the provided list, the context raises an
unhelpful StopIteration error. Make sure your test fixtures include an
entity whose ID matches the entity_id you pass to the context.
How convenience properties are resolved
def __init_run_on_entity(self) -> None:
match self.run_on:
case "device":
if self.entity_id is None:
raise ValueError("entity_id must be provided when run_on is 'device'")
if self.devices is None:
raise ValueError("devices must be provided in context when run_on is 'device'")
self.device = next(d for d in self.devices if d.id == self.entity_id)
case "group":
if self.entity_id is None:
raise ValueError("entity_id must be provided when run_on is 'group'")
if self.device_groups is None:
raise ValueError("device_groups must be provided in context when run_on is 'group'")
self.device_group = next(g for g in self.device_groups if g.id == self.entity_id)
case "interface":
if self.entity_id is None:
raise ValueError("entity_id must be provided when run_on is 'interface'")
if self.interfaces is None:
raise ValueError("interfaces must be provided in context when run_on is 'interface'")
self.interface = next(i for i in self.interfaces if i.id == self.entity_id)
Accessing Entity Data
Device fields
Devices carry the richest data model. The most commonly used fields:
| Field | Type | Description |
|---|---|---|
hostname |
str \| None |
Device hostname |
ip |
str \| None |
Management IP address |
username |
str \| None |
SSH/API username |
password |
str \| None |
SSH/API password (encrypted at rest) |
platform |
object | Platform reference (vendor, OS) |
connection_state |
str \| None |
NEW, OK, UNREACHABLE, NOSSH, AUTHFAILURE |
facts |
dict \| None |
Versioned key-value data (JSON) |
checks |
dict \| None |
Compliance/validation data (JSON) |
serial |
str \| None |
Hardware serial number |
vendor |
str \| None |
Device vendor |
model |
str \| None |
Device model |
software_release |
str \| None |
Running software version |
Here is a function block that reads device data to retrieve a software version:
class ShowVersion(FunctionBlock[ShowVersionParams, ShowVersionResult]):
async def run(self, params: ShowVersionParams, context: WorkflowContext) -> FunctionBlockResult[ShowVersionResult]:
del params
device = context.device
if device is None:
return FunctionBlockResult(message="No device in context.", success=False, data=None)
with VersionProxy.connect(device, fallback_to_default=True) as proxy:
version_info = proxy.get_version()
return FunctionBlockResult(
message="Version retrieved successfully.",
success=True,
data=ShowVersionResult(version_info=version_info),
)
async def acquire(self, params: ShowVersionParams) -> FunctionBlockAcquireResult:
del params
return FunctionBlockAcquireResult(message="No resources required.", success=True, acquires=None)
Interfaces and groups
Interface fields include name, ifindex, state (UP, DOWN, ADMINISTRATIVE_SHUTDOWN, ERROR_DISABLED), description, facts, and checks. Access them through context.interface (for the current entity) or iterate context.interfaces for all interfaces in scope.
Device groups carry name, title, description, facts, and checks. Access them via context.device_group or context.device_groups.
Tip
Even when run_on="device", you still have access to context.devices (all devices in scope), context.interfaces, and context.device_groups. The convenience property just gives you the current entity for this execution cycle. Use the full lists when you need cross-entity logic.
Facts and Checks
Facts and checks are versioned key-value stores attached to every entity. They are auto-aggregated as JSON and accessible through device.facts, device.checks, interface.facts, and so on.
- Facts store operational data collected by function blocks -- interface counters, software versions, inventory details.
- Checks store compliance and validation results -- policy pass/fail, threshold violations, audit findings.
Both work identically from a data-access perspective:
Facts are stored as a flat JSON dictionary on each entity. Each top-level key represents a fact namespace (e.g. "inventory", "compliance", "routing"). When a function block writes to a key, the CMS creates a new version of that fact automatically. The aggregated view across all versions is what you see in device.facts.
The typical flow: a function block reads the current entity state, collects new data (from the device, an API, or computation), and writes the result back into facts. The platform handles versioning and persistence.
Note
Facts and checks live on all three entity types. device.facts, interface.facts, and device_group.facts all follow the same pattern. Choose the entity that best represents the data's scope -- per-device for hardware info, per-interface for port counters, per-group for site-level aggregates.
Change Tracking
The WorkflowContext tracks every modification you make to entities -- automatically. You never need to build update payloads or call save methods.
flowchart LR
A[Context created<br/>snapshot taken] --> B[run() executes<br/>entities modified]
B --> C[compute_db_updates()<br/>diffs current vs snapshot]
C --> D[Create / Patch / Delete<br/>operations generated]
When the context is created, a deep copy snapshot of all entities is taken. After run() completes, the platform calls compute_db_updates() which compares the current entity state against that snapshot. Any differences are translated into typed database operations (EntityCreateDto, EntityPatchDto, EntityDeleteDto) and sent back to the CMS.
Snapshot creation in __init__
def __init__(self, context: JobExecutionContextDto, run_on: str, entity_id: int | float | None = None) -> None:
self.devices: list[DeviceTypeDto] | None = context.devices
self.device_groups: list[DeviceGroupTypeDto] | None = context.device_groups
self.interfaces: list[InterfaceTypeDto] | None = context.interfaces
self.run_on = run_on
self.entity_id = entity_id
self.device: DeviceTypeDto | None = None
self.device_group: DeviceGroupTypeDto | None = None
self.interface: InterfaceTypeDto | None = None
self.__init_run_on_entity()
# Snapshot for change tracking (deep copy to preserve original state)
self._snapshot: dict[EntityType, list[Any]] = {
EntityType.DEVICE: copy.deepcopy(self.devices) or [],
EntityType.INTERFACE: copy.deepcopy(self.interfaces) or [],
EntityType.GROUP: copy.deepcopy(self.device_groups) or [],
}
logger.debug("WorkflowContext initialized with snapshot for change tracking")
The compute_db_updates() method
def _get_current_entities(self) -> dict[EntityType, list[Any]]:
"""Get current entity state for diffing."""
return {
EntityType.DEVICE: self.devices or [],
EntityType.INTERFACE: self.interfaces or [],
EntityType.GROUP: self.device_groups or [],
}
def compute_db_updates(self) -> JobExecutionResultDbUpdatesDto | None:
"""Compute database updates by diffing current state against snapshot.
Returns None if no changes detected, otherwise returns the updates DTO.
"""
current = self._get_current_entities()
updates = generate_db_updates(self._snapshot, current)
if not updates:
return None
# Return as the expected DTO structure
# Note: The actual DTO types (EntityCreateDto, etc.) require API fixes.
# For now, we return raw dicts that match the expected JSON structure.
wrapped_updates = [JobExecutionResultDbUpdatesDtoUpdatesInner(u) for u in updates]
return JobExecutionResultDbUpdatesDto(updates=wrapped_updates)
The diffing engine compares all top-level fields on each entity. Read-only fields (id, created_at, updated_at, lock states, computed counts) are excluded automatically. Nested changes (e.g. modifying a key inside facts) result in a full field replacement at the top level.
Modifying Entities
Modifying entity data is as straightforward as assigning Python attributes. The diff engine picks up every change:
device.hostname = "router-01.dc1"
device.facts = {**(device.facts or {}), "inventory": {"count": 42}}
Here is a complete function block that reads device data, inspects existing facts, and writes new inventory data:
class CollectInventory(FunctionBlock[InventoryParams, InventoryResult]):
async def run(self, params: InventoryParams, context: WorkflowContext) -> FunctionBlockResult[InventoryResult]:
del params
device = context.device
if device is None:
return FunctionBlockResult(message="No device in context.", success=False, data=None)
hostname = device.hostname or "unknown"
interfaces = context.interfaces or []
self.logger.info(f"Collecting inventory for {hostname} ({device.ip})")
existing_facts = device.facts or {}
previous_run = existing_facts.get("inventory", {})
if previous_run:
self.logger.info(f"Previous inventory: {previous_run.get('interface_count', '?')} interfaces")
device.facts = {
**existing_facts,
"inventory": {
"interface_count": len(interfaces),
"interfaces": [iface.name for iface in interfaces if iface.name],
},
}
return FunctionBlockResult(
message=f"Inventory collected for {hostname}.",
success=True,
data=InventoryResult(hostname=hostname, interface_count=len(interfaces)),
)
async def acquire(self, params: InventoryParams) -> FunctionBlockAcquireResult:
del params
return FunctionBlockAcquireResult(message="No resources required.", success=True, acquires=None)
The three key patterns in this example:
- Read entity fields -- access
device.hostname,device.ip, or any DTO field directly. - Read existing facts --
device.facts or {}gives you the current fact dictionary; use.get()to safely access nested keys. - Write facts -- merge new data into the existing dict and assign it back. The diff engine detects the change and generates a patch operation.
Warning
Some fields are read-only and excluded from diffing: id, created_at, updated_at, lock states, and computed counts. Writing to these fields has no effect -- the platform silently ignores them.
Next: Creating Entities to learn how to add new devices, interfaces, and groups from within a function block.