Conduit — Effect DAG Engine
Purpose
The Effect engine is Conduit's second DAG engine, alongside the ingest engine. Where the ingest engine deploys a single DAG per data source using a fixed provider template, the Effect engine composes multi-step DAGs from a catalog of modular operators — each operator becoming a discrete Airflow step that executes at run time and passes its output to the next step via XCOM.
Oracle's LLM knows Conduit's operator catalog and acts as the orchestrator: it translates user intent ("take this data, convert to PDF, then save it in S3") into an ordered list of operators and calls Conduit to assemble and deploy the resulting DAG. Conduit is the assembler, executor, and owner of all DataEffect objects — create, read, update, delete, and run lifecycle.
The Two Engines
| Ingest Engine | Effect Engine | |
|---|---|---|
| Purpose | Pull data from a source into UDS | Execute multi-step action workflows |
| DAG authorship | One fixed provider template per source type | N operator templates composed into one DAG |
| Composition | Single Jinja2 template, rendered at deploy time | Operator templates chained at effect-create time; steps execute at DAG run time |
| Deploy path | Git branch → CI pipeline → main (same) | Git branch → CI pipeline → main (same) |
| Trigger | Schedule or manual | Manual or Oracle-orchestrated |
| Domain object | DataPipeline (Conduit-owned) |
DataEffect (Conduit-owned) |
| Lifecycle | pending_validation → active → paused |
pending_validation → active → paused |
| CRUD owner | Conduit | Conduit |
Operator Catalog
The Effect operator catalog documents every atomic operation available for DAG composition. Oracle reads this catalog (via MCP resource) to know what steps are available when translating user intent into an operator chain.
Each operator ships as a Jinja2 template snippet under conduit/templates/effects/. At
effect-create time, Conduit stitches the selected operator snippets into a single coherent DAG
file (one @task-decorated function per operator, wired sequentially as DAG steps), then commits
it through the standard CI pipeline.
Available operators
| Operator | Category | Description | Required Connection | Required compose-time params |
|---|---|---|---|---|
sql_extract |
Extract | Execute a SQL query; pass the result set to the next step | Existing DataPipelineConnection (any relational/warehouse type) | connection_id, sql |
http_fetch |
Extract | Fetch data from an HTTP endpoint | connection_id in conf, or conduit__http_default |
url |
s3_fetch |
Extract | Read a file from S3 | S3 DataPipelineConnection | connection_id, bucket, key |
docling_extract |
Extract | Extract text or markdown from a document (PDF, Office, etc.) stored in cloud storage or accessible via HTTP. Runs on the docling Airflow worker queue. |
S3, GCS, Azure Blob, or HTTP DataPipelineConnection | connection_id, path |
pdf_convert |
Transform | Convert input data or document to PDF | None | — |
csv_export |
Transform | Serialize input data to CSV | None | — |
json_transform |
Transform | Apply a jq-style or dot-notation path expression to a JSON payload; falls back to simple dot-notation if jmespath is not installed |
None | expression |
chart_render |
Transform | Render a bar or line chart from tabular input (sql_extract output or list of dicts); returns a base64-encoded PNG |
None | x_field, y_field |
s3_upload |
Load | Upload a file or data payload to S3 | S3 DataPipelineConnection | connection_id, bucket, key |
http_dispatch |
Load | POST a payload to an HTTP endpoint | connection_id in conf, or conduit__http_default |
endpoint |
email_notify |
Load | Send an email to specified recipients | SMTP DataPipelineConnection | connection_id, recipients, subject |
webhook_notify |
Load | POST a notification payload to a webhook URL | connection_id in conf, or conduit__http_default |
url |
cortex_task_create |
Load | Create a Cortex WorkflowTask via POST /tasks. task_type must be one of review, risk_review, attest, refresh, acknowledge (default acknowledge). edition_id is required in dagrun_conf for review and attest types. At least one of assignee_user_id (runtime) or assignee_roles (compose-time) is required. insight_id is required at runtime. Returns task_id (tsk_ prefix), event_id, and status. Bearer token propagated via dagrun_conf. |
None | cortex_url, task_title |
mqtt_publish |
Load | Publish a message to an MQTT broker topic. Payload defaults to upstream output (JSON-serialised); can be overridden with a static compose-time value. TLS auto-enabled on port 8883. WARNING: for Sparkplug B device writes, only tags in the platform writable-tag whitelist (Ranger Tag Guide) are accepted at runtime — the broker silently drops writes to read-only or non-existent tags. Chain with mqtt_await_confirmation for the full online-gated Relay write protocol. |
MQTT DataPipelineConnection (conn_type=generic) | connection_id, topic |
mqtt_await_confirmation |
Load | Subscribe to an MQTT topic and block until a confirmation message arrives or the timeout expires. For Sparkplug B the confirmation topic is spBv1.0/{group_id}/NBIRTH/{edge_node_id}. Default timeout is 900s (15 min) to cover the eDRX wake window. Raises a descriptive error on timeout. Use as the second step of mqtt_publish → mqtt_await_confirmation for the full Relay write protocol, or omit for fire-and-forget. |
MQTT DataPipelineConnection (conn_type=generic) | connection_id, confirmation_topic |
relay_audit_emit |
Load | Emit an immutable audit record to the relay-audit Elasticsearch ledger. Each invocation creates a new document keyed by a random UUID — records are never overwritten. Use as the final step in any relay write effect. entity_type and field are baked in at compose time; entity_id, before_value, after_value, and actor arrive via dagrun_conf at runtime. See Relay Write Pattern for the full wiring. |
None | entity_type, field |
sparkplug_read |
Extract | Subscribe to a Sparkplug B sensor's BIRTH/DATA topics and return decoded metric values. Optionally forces an instant report via Properties/Report Interval NCMD. Returns metrics dict (name→value), message_type, is_birth, and topic. BIRTH messages carry full metric names; DATA (alias-only) returns an empty metrics dict with is_birth: false. Use as the first step in a write chain to capture before_value, or standalone to display current sensor configuration. |
MQTT DataPipelineConnection | connection_id |
platform_state_upsert |
Load | Partial-upsert a single site_info field in Elasticsearch after a confirmed Sparkplug B write (#REQ.relay-dual-write). Takes confirming_value from the upstream DCMD/NCMD result and writes to site_info.{es_field} keyed by site:{source}:{eon_node_id}. Raises if the upstream step did not confirm — ES is never updated for unconfirmed writes. Raises if the site_info doc does not exist (sensor not yet provisioned). Middle step in dual-target chain: sparkplug_dcmd → platform_state_upsert → relay_audit_emit. |
None | es_field |
sparkplug_ncmd |
Load | Online-gated single-metric NCMD write to a Sparkplug B edge node. Gates on NBIRTH/NDATA proof of life before publishing, then blocks until a confirming NBIRTH arrives. Optionally triggers an instant report via Properties/Report Interval rewrite for fast verification. Single-metric payloads only (Ranger firmware constraint). Requires pysparkplug and paho-mqtt. |
MQTT DataPipelineConnection | connection_id, metric_name, datatype |
sparkplug_dcmd |
Load | Online-gated single-metric DCMD write to a Sparkplug B device (default Dev1). Same online-gate and instant-report logic as sparkplug_ncmd; waits for confirming DBIRTH or DDATA. For SignalFire Rangers, metric_name must include the Dev1/ prefix (e.g. Dev1/AIN1 Config/High Threshold). Requires pysparkplug and paho-mqtt. |
MQTT DataPipelineConnection | connection_id, metric_name, datatype |
sparkplug_fast_report |
Load | Bundled 4-metric DCMD to configure Fast Reporting on a Sparkplug B device. Sends Enabled, Report Interval, Mode, and Duration in one payload; all four must be present. Gates on sensor online status and waits for confirming DBIRTH/DDATA. Requires pysparkplug and paho-mqtt. |
MQTT DataPipelineConnection | connection_id |
gitlab_issue_create |
Load | Create a GitLab issue. Not available — removed from catalog pending EE-14 (per-customer token must arrive via dagrun_conf, not a static Airflow connection; Cortex-side token propagation contract not yet defined). Snippet exists at gitlab_issue_create.py.j2. |
Blocked — see #REQ.gitlab-per-customer-token |
— |
#REQ.gitlab-per-customer-token — gitlab_issue_create does not use a static Airflow connection for auth. Each customer has their own GitLab token; Cortex passes the token to Conduit via axonis-core at trigger time, and it arrives in the operator as a run-time param via dagrun_conf. Conduit does not store or cache customer GitLab tokens. The token contract between Cortex and Conduit is owned by axonis-core and is consistent with how other per-customer credentials are propagated across platform services.
Compose-time params: project_id, labels (base label list), confidential (bool).
Run-time params (via dagrun_conf): title, description, extra_labels (appended to compose-time labels), gitlab_token (injected by axonis-core from caller context).
#REQ.catalog-is-open — the operator catalog is not a whitelist. Operators are the documented, well-tested building blocks; custom operators can be added by extending the Jinja2 snippet library without changing Conduit's core assembly logic. Oracle reads the catalog to know what is available, not to enforce a closed set.
#REQ.operator-xcom — each operator step writes its primary output to a defined XCOM key
(effect_output). The next operator reads from that key as its primary input. Operators that
produce no downstream-usable output (e.g., email_notify) write null to effect_output so
the chain does not break.
Connection requirements
Operators that interact with external systems declare a required connection type. When an effect is created, Conduit checks whether each required connection already exists in Airflow. If a connection is missing, Conduit asks the user for the credentials before proceeding with DAG assembly.
#REQ.connection-collection — connection collection is interactive and conversational. Conduit
returns a structured prompt listing each missing connection and the fields required for it (using
the same JSON schema templates as connection_schema). The user provides credentials; Conduit
creates the Airflow connection via connection_create before rendering the DAG. The effect-create
call does not complete until all required connections exist.
#REQ.connection-reuse — if a suitable connection already exists in Airflow (matched by type and, where applicable, host/database), Conduit uses it without prompting. The user is shown which existing connection will be used and can override.
Generic HTTP fallback
Operators that dispatch to HTTP endpoints (http_fetch, http_dispatch, webhook_notify) accept
an optional connection_id in their operator params. Resolution order:
connection_idexplicitly provided → use that Airflow connection- No
connection_id→ useconduit__http_default, a platform-level generic HTTP connection that Conduit registers in Airflow on first startup
#REQ.generic-http-connection — Conduit registers conduit__http_default on startup,
idempotently. This connection carries no credentials; it provides a named HTTP hook as a fallback
for unauthenticated or bearer-token-in-payload targets.
DAG Composition
When effect_create is called with an ordered operator list, Conduit assembles a single Python
DAG file from the operator Jinja2 snippets. The assembled DAG contains one @task-decorated
function per operator, wired sequentially as DAG steps. Operators execute at DAG run time; outputs
pass between steps via XCOM.
Assembly process
effect_create(name, operators=[...], visibility, schedule?, run_on_deploy?)
1. Validate each operator exists in the catalog
2. Determine required connections for all operators
3. For each missing connection: collect credentials from user (§ Connection Requirements)
4. Create any new Airflow connections via connection_create
5. Render assembled DAG:
a. Load operator Jinja2 snippet for each step
b. Stitch into a single DAG file (dag_id: {username}__{name})
c. Wire operator steps sequentially; each reads from previous step's XCOM
6. Commit rendered DAG to git pending branch (same as datapipeline_deploy)
7. Create DataEffect ES record (status: pending_validation)
8. Return { uid, dag_id, status: pending_validation }
→ GitLab CI runs: syntax check, import check, connection test
→ On CI pass: merge to main, GitSync to Airflow, DataEffect status → active
#REQ.ci-polling-timeout — the CI polling background process times out after 10 minutes,
matching ingest auto-start bounds. On timeout the DataEffect record is marked error.
Operator params at compose time vs. run time
Operator parameters fall into two categories:
| Category | When Resolved | Example |
|---|---|---|
| Compose-time params | Baked into the rendered DAG at effect-create | S3 bucket name, SQL query, recipient email addresses, target URL |
| Run-time params | Passed via DAG conf at trigger time |
Watermark values, specific record IDs, dynamic payloads from Cortex |
Compose-time params are substituted into the Jinja2 snippets during assembly, producing a static
DAG. Run-time params arrive via dagrun_conf and are read by individual operators at
execution time. Both are valid; operators declare which of their params support run-time override.
Domain Model: DataEffect
DataEffect is a Conduit-owned domain object, parallel to DataPipeline. Conduit owns all CRUD
operations, status, run lifecycle, and metrics for DataEffect — the same scope it owns for
DataPipeline.
DataEffect fields
| Field | Writable | Description |
|---|---|---|
uid |
system | Conduit-internal record ID |
dag_id |
system | Airflow DAG ID ({username}__{name}) |
name |
user | Human-readable effect name |
description |
user | Optional description |
visibility |
user | ABAC marking |
operators |
user | Ordered list of operator names and their compose-time params |
connections |
system | Airflow connection IDs used by this effect (resolved at create time) |
status |
system | pending_validation, active, paused, error, rejected |
dag_content |
system | Rendered DAG source (stored for redeploy/edit) |
commit_sha |
system | Git commit SHA under CI validation |
schedule |
user | Re-run schedule (cron or named interval); same semantics as DataPipeline scheduling |
next_scheduled_run |
system | Next scheduled trigger time |
last_scheduled_run |
system | Last scheduled trigger time |
pipeline_type |
system | Always "effect" — used by CI and UI to identify this DAG as an Effect |
created_by |
system | Username from auth context ("cortex" on Cortex-dispatched effects) |
create_ts |
system | Creation timestamp (ISO 8601 UTC) |
correlation_id |
user | Optional opaque ID supplied by the caller at create time; links this DataEffect back to an upstream object (e.g., a Cortex DecisionEffect eff_ ID or edition ID). Not interpreted by Conduit; stored and returned as-is for caller use. |
edition_id |
cortex | The attested Edition that triggered this DataEffect via Cortex dispatch. Populated by effect_dispatch; absent on DataEffects created directly via effect_create. |
effect_type |
cortex | DES effect type (external_dispatch, notification, human_process) as declared on the Cortex Decision Effect. Populated by effect_dispatch; absent on DataEffects created directly via effect_create. |
target |
cortex | Implementation-defined target descriptor from the Cortex decision template. Populated by effect_dispatch; absent on DataEffects created directly via effect_create. |
deadline |
cortex | ISO 8601 deadline inherited from the Cortex Decision Effect. Conduit records it but does not enforce it — deadline monitoring is Cortex's responsibility. |
payload_hash |
cortex | SHA-256 hex digest of the canonical dispatch payload, computed by Cortex at Decision Effect creation time. Verified by effect_dispatch before DAG assembly; stored for audit purposes. |
ES index
DataEffect records live in the same userspace ES index as DataPipeline and
DataPipelineConnection. The pipeline_type: "effect" field discriminates them from ingest
pipeline records.
#REQ.effect-type-discriminator — all DataEffect documents carry pipeline_type: "effect".
Ingest pipeline documents carry pipeline_type equal to the provider template name (e.g.,
"postgres_ingest"). Queries that fetch only effects filter on pipeline_type: "effect".
MCP Tools
DataEffect Management (12)
The DataEffect management tool surface mirrors the DataPipeline management surface exactly.
| Tool | Description |
|---|---|
effect_create(name, operators, visibility, schedule?, description?, run_on_deploy?) |
Assemble and deploy an Effect DAG from an ordered operator list. Collects missing connections interactively. Returns {uid, dag_id, status: pending_validation}. |
effect_list() |
List Conduit-managed DataEffect records with live Airflow run state |
effect_get(uid) |
Get a single DataEffect record including operator list and live Airflow state |
effect_update(uid, operators?, schedule?, description?, visibility?) |
Redeploy with updated operator chain or metadata |
effect_delete(uid) |
Remove effect DAG from git and ES |
effect_status(uid) |
Poll GitLab CI for pending effects; return live Airflow state for active ones |
effect_pause(uid) |
Pause the Airflow DAG |
effect_unpause(uid) |
Resume a paused effect |
effect_run(uid, conf?) |
Trigger a new DAG run; optional conf supplies run-time operator params |
effect_stop(uid) |
Stop the active run for an effect |
effect_runs(uid, state?, limit?) |
List run history for an effect |
effect_metrics(uid, since?, limit?) |
Get run metrics and statistics |
Effect Catalog (2)
| Tool | Description |
|---|---|
effect_schema() |
List all available operators from the Effect operator catalog |
effect_schema(operator_name) |
Get the full parameter schema for a specific operator, including which params are compose-time vs. run-time |
Cortex Dispatch (1)
| Tool | Description |
|---|---|
effect_dispatch(effect_id, edition_id, effect_type, target, payload, payload_hash, correlation_id?, deadline?) |
Inbound endpoint called by Cortex's agent_dispatch handler to execute a Decision Effect. Verifies payload_hash before proceeding. Assembles and deploys the described DAG, creates a DataEffect ES record, and returns {acknowledged, external_reference, dag_id, status: pending_validation}. external_reference is the DataEffect uid — Cortex stores this on its DecisionEffect record and uses it to poll effect_status. |
REST Endpoints
DataEffect (13)
Mirrors the DataPipeline REST surface.
| Method | Path | Description |
|---|---|---|
| POST | /userspace/effect |
Create and deploy an Effect DAG |
| GET | /userspace/effect |
List DataEffect records |
| GET | /userspace/effect/{uid} |
Get a single DataEffect record |
| PATCH | /userspace/effect/{uid} |
Update effect; action: "pause"\|"unpause" supported |
| DELETE | /userspace/effect/{uid} |
Delete an effect |
| GET | /userspace/status/effect |
Bulk status for all effects |
| GET | /userspace/effect/{uid}/status |
Status for a single effect |
| GET | /userspace/effect/schema |
List available operators |
| GET | /userspace/effect/schema/{operator_name} |
Get operator parameter schema |
| GET | /userspace/effect/{uid}/runs |
List run history |
| POST | /userspace/effect/{uid}/run |
Trigger a run |
| POST | /userspace/effect/{uid}/stop |
Stop the active run |
| GET | /userspace/effect/{uid}/metrics |
Get run metrics |
Oracle Integration
There are two distinct paths by which a DataEffect is created. They use different entry-point tools and have different callers.
User / Oracle path
Oracle reads the Effect operator catalog via the catalog://effects MCP resource. When a user expresses intent that maps to a multi-step action workflow, Oracle:
- Reads the catalog to identify relevant operators and their required connections
- Calls
effect_createwith the assembled operator list and compose-time params - Responds to connection-collection prompts from Conduit by asking the user for missing credentials
- Monitors
effect_statusuntil the DAG is active, then optionally triggers a run
Cortex dispatch path
When an Edition is attested and its decision template declares effects, Cortex's EffectDispatcher fires its agent_dispatch handler, which calls Conduit's effect_dispatch MCP tool directly — Oracle is not in the loop. The Cortex Decision Effect types map to specific operators or operator chains:
Cortex effect_type |
Operator chain |
|---|---|
external_dispatch |
http_dispatch (or typed connection if one exists) |
notification (email) |
email_notify |
notification (webhook) |
webhook_notify |
human_process / task_creation |
cortex_task_create |
human_process / ticket_creation |
gitlab_issue_create (not yet available — see EE-14) |
effect_dispatch verifies the payload hash, assembles the DAG, creates the DataEffect ES record, and returns {acknowledged, external_reference, dag_id, status: pending_validation}. Cortex stores the returned external_reference (the DataEffect uid) on its own DecisionEffect record.
Status feedback — Cortex polls Conduit. Conduit does not push status back to Cortex. After calling effect_dispatch, Cortex is responsible for polling effect_status(uid) to track DAG activation and run completion. Cortex uses the returned Airflow run state to drive its own DecisionEffect lifecycle transitions (pending → acknowledged → completed | failed). The correlation_id field on DataEffect carries the Cortex eff_ ID so Cortex can match poll results back to the originating DecisionEffect record.
MCP Resources
Two MCP resources expose Effect engine metadata to Oracle and other agents:
| Resource | Description |
|---|---|
catalog://effects |
Full Effect operator catalog — operator names, categories, descriptions, required connections, compose-time and run-time param schemas |
templates://effects/{operator_name} |
Annotated reference snippet for a specific operator, showing step structure, XCOM usage, and param wiring |
Relationship to the Ingest Engine
Shared components
conduit/airflow/client.py— sameAirflowClientconduit/git/client.py— same git commit/CI pipeline pathconduit/templates/renderer.py— extended to support multi-snippet stitching in addition to single-template renderingAxonisMiddleware+require_auth— same auth on all DataEffect endpoints- Oracle self-registration — DataEffect tools appear in the same Conduit tool catalog
- Userspace ES index — same index, discriminated by
pipeline_type - Scheduling subsystem — same scheduler DAG, same
schedule/next_scheduled_run/last_scheduled_runfields
Not shared
conduit/domain/pipeline.py—DataEffectis a separate domain class inconduit/domain/effect.pyconduit/templates/catalog.json— Effect operators are catalogued separately inconduit/templates/effects/catalog.json- Connection collection — ingest requires a connection upfront; DataEffect creation collects connections interactively during
effect_create
Relay Write Pattern
The relay write pattern is the standard operator chain for writing a configuration value to a
physical field device via MQTT/Sparkplug B and emitting an immutable audit record. "Relay" is a
use case of the Effect engine — never a separate component, service, or capability contract
(decision 2026-06-10, simplicity#124; the non-normative pointer doc at
developers-environment/specs/platform/relay.md records the history). This section is the
normative home for the entire device write-back use case (simplicity #107, #111): operator
chains, safety invariants, registry, equipment, audit, and the Cortex client surface.
Operator chains
Three chains cover the writable field types. Choose based on the target metric level and whether you need bundled Fast Reporting config.
Node-level write (NCMD) — e.g. Properties/Name, Properties/Report Interval:
sparkplug_ncmd → relay_audit_emit
sparkplug_ncmd gates on sensor online status, publishes a single-metric NCMD, and optionally
triggers an instant report for fast verification. relay_audit_emit writes the immutable audit
record. Set trigger_instant_report: true for any write where you need verification within
seconds rather than at the next eDRX wake cycle.
Device-level write (DCMD) — broker-only, e.g. Reporting Frequency, Rebirth:
sparkplug_dcmd → relay_audit_emit
sparkplug_dcmd gates on sensor online status, publishes a single-metric DCMD, and optionally
triggers an instant report via Properties/Report Interval NCMD to force DBIRTH confirmation.
Dual-target device-level write (DCMD + ES platform state) — e.g. Sensor Elevation, Flooding Possible/Likely:
sparkplug_dcmd → platform_state_upsert → relay_audit_emit
After DBIRTH confirmation, platform_state_upsert writes the confirmed value to site_info.{es_field} in Elasticsearch so the normalization pipeline picks up the new calibration value on the next NBIRTH cycle. Set trigger_instant_report: true on the DCMD step if you need the normalize pipeline to re-run quickly. Raises and marks the effect failed if the DCMD was not confirmed — the ES write is never performed for unconfirmed device writes.
Fast Reporting configuration (bundled 4-metric DCMD):
sparkplug_fast_report → relay_audit_emit
sparkplug_fast_report sends Enabled, Report Interval, Mode, and Duration in one DCMD
payload — the Fast Reporting subsystem requires all four together.
Legacy generic chain (JSON publish, no online gate):
mqtt_publish → mqtt_await_confirmation → relay_audit_emit
Use only when the target device does not speak Sparkplug B or when pysparkplug is unavailable.
mqtt_publish sends a raw JSON payload; mqtt_await_confirmation blocks until any message
arrives on the confirmation topic. No online-gate and no protobuf encoding.
A Sparkplug B device write MUST use the sparkplug_* chains. The legacy JSON chain cannot
drive a Ranger and carries none of the safety invariants below; it is not a permitted fallback
for device writes, only for genuinely non-Sparkplug MQTT targets.
Safety invariants
The sparkplug_* operators already deliver Sparkplug B protobuf encoding (pysparkplug), runtime
device targeting (group_id/eon_node_id/metric_value via dagrun_conf; metric + datatype
compose-time per field-class is the accepted design), the retained-NDEATH online gate, and
cleanup-on-finally (EE-19..EE-21). Device writes are durable, asynchronous jobs with pollable
status across the eDRX window — no surface exposes a blocking write call. The remaining
invariants are mandated here:
#REQ.no-mqtt-core-duplication — the MQTT/Sparkplug write mechanics live only in this operator library. No consumer (Cortex, Beacon, ADI-UI, scripts) embeds its own MQTT client for device writes — every write path runs through a Conduit effect DAG.
#REQ.sparkplug-datatype-fidelity — the compose-time datatype MUST match the datatype the
device declares for that metric in its DBIRTH payload, exactly. Before an effect for a new metric
is created, the datatype is confirmed from the EMQX field tables or a live DBIRTH capture — never
guessed from the measurement's physical kind. (Blocking discipline recorded on simplicity#107,
2026-06-10: the four device-level fields — PRESS1 Zero Offset, DIN1/DIN2 Average Frequency, AIN1
High/Low Threshold — await DBIRTH datatype confirmation.)
#REQ.confirm-value-verified — a confirmed write means the confirming NBIRTH/DBIRTH carries
the written metric at the written value. The operators currently report confirmed +
confirming_value without asserting; the step MUST fail (or mark the effect failed) on a value
mismatch, and an NDATA-alias confirmation (where metric names are unavailable) MUST surface as
verified: false rather than silently passing. Mismatch details land in the airflow-metrics
failure record via _effect_failure_callback (see #REQ.effect-failure-callback) — the
RuntimeError message raised by the operator is the carrier; it is captured in history[].error
alongside the full run context.
#REQ.writable-tag-whitelist-enforcement — before publish, the write is validated against the
curated writable-tag whitelist (Ranger Tag Guide — the broker silently drops writes to read-only
or non-existent tags; the firmware writable flag is unreliable). Out-of-whitelist metrics and
out-of-range/enum values are rejected with a descriptive error before any MQTT traffic. The
whitelist registry (metric, datatype, range/enum, device model) lives in Conduit and is the
single source of truth for the field → metric → datatype → platform-state mapping
(#REQ.field-mapping-authority, see #relay-write-pattern.registry) — it is also where
#REQ.sparkplug-datatype-fidelity's confirmed datatypes are recorded.
#REQ.relay-attested-dispatch (the #107 "secondary confirmation") — a device-write effect (any DAG containing a sparkplug_ncmd,
sparkplug_dcmd, or sparkplug_fast_report step) dispatches only from an attested-edition
context: the Cortex dispatch path (effect_dispatch carrying edition_id) or an equivalently
attested effect_run. A direct, unattested effect_run on a device-write DAG is rejected and
the attempt audited. The upstream firewall is Edition attestation
(product.decision-effect #REQ.requires-attested-edition). Dispatch ships behind a feature flag
until the eDRX confirm cycle is validated against real Ranger nodes; autonomous (un-attested)
dispatch is never enabled.
#REQ.relay-dual-write — for fields that also exist as platform-side state (ES sensor
calibration fields, AlertThreshold docs — see the dual-target field map in
#relay-write-pattern.registry), the effect DAG updates
the platform state as a step after write confirmation, broker-first; a platform-state failure
after a confirmed device write marks the effect failed, records the divergence in the audit
ledger, and raises an escalation Signal.
#REQ.sparkplug-read-operator — a read operator (sparkplug_read) subscribes to the target's
BIRTH/DATA topics (optionally forcing an instant report, reusing the sparkplug_ncmd Phase-2
mechanic) and returns current decoded metric values via effect_output, enabling pre-write
display and automated before_value capture (closing the caller-supplied interim in
#REQ.relay-before-value).
#REQ.device-rebirth-recipe — device rebirth is realized as a sparkplug_ncmd effect with
metric_name: "Node Control/Rebirth", datatype: BOOLEAN, metric_value: true — no dedicated
operator (per the #107 checklist completion, 2026-06-10). The recipe is documented in the catalog
and the rebirth effect is attestation-gated like any other device write.
#REQ.effect-failure-callback — every assembled effect DAG includes _effect_failure_callback
in its default_args, wired by assemble_effect() at render time. The callback fires on any task
failure, connects to Elasticsearch via the standard ELASTIC_HOST / ELASTIC_USERNAME /
ELASTIC_PASSWORD / ELASTIC_VERIFY env vars, and indexes one document per run (keyed by
run_id) to the airflow-metrics index. The document carries dag_id, task_id, timestamp, and
a run_summary metric entry with status: Error and the full exception message in error —
matching the schema used by atlas-airflow's pipeline_failure_callback so effect runs are
visible alongside ingest runs in the same metrics index. The callback is non-fatal: any ES error is
swallowed so it cannot mask the original task failure. When ELASTIC_HOST is absent the callback
exits immediately.
Registry, write classes & field readiness
#REQ.registry-es-site-info — the configured sensor/equipment inventory is read from the ES
site_info / sensors indices (maintained by the Airflow ingest path). Site/topic inventory
never uses the EMQX /clients REST endpoint. Where component.sentinel.alerting is deployed,
its Sensor registry is honored and write outcomes are additionally emitted as AlertEvents
(source_type: relay) — conditional on Sentinel's presence, never a prerequisite
(Sentinel is not deployed in the Simplicity engagement).
#REQ.write-classes — the use case admits three write classes, all attestation-gated and
audited: dual-target (broker + platform state, #REQ.relay-dual-write ordering),
broker-only (e.g. Reporting Frequency, Rebirth), and platform-state-only (e.g. the Sensor
Location override, pending confirmation — no broker leg). The field-mapping table records each
field's class.
#REQ.field-mapping-authority — the field → broker metric → DBIRTH-confirmed datatype → platform-state mapping is maintained with the writable-tag whitelist (one source of truth); the
107 mapping seeds it. Dual-target fields per #107:
| Field (#107) | Broker metric | Platform state |
|---|---|---|
| Sensor Elevation (MSL) | Dev1/PRESS1 Config/Zero Offset |
ES sensor calibration fields |
| Bottom of stream (MSL) | Dev1/DIN1 Average Frequency (repurposed) |
ES stream_bottom_elevation_ft_d |
| Top of stream (MSL) | Dev1/DIN2 Average Frequency (repurposed) |
ES sensor calibration fields |
| Flooding Possible level | Dev1/AIN1 Config/High Threshold |
ES site_info.potential_flood_level_d (operator-entered override; normalize copies to flooding_possible_ft_d per reading) |
| Flooding Likely level | Dev1/AIN1 Config/Low Threshold |
ES site_info.likely_flood_level_d (operator-entered override; normalize copies to flooding_likely_ft_d per reading) |
Field readiness (#107 triage, 2026-06-10, informative): 9 fields unblocked (Sensor Name, Reporting Frequency, Fast Reporting, Rebirth already delivered; the four device-level fields above await DBIRTH datatype confirmation). 4 blocked pending customer resolution: Sensor Location (no writable GPS metric — likely platform-state-only), Installation Date (mapping looks wrong), Serial Number (separate non-broker workflow), Service period (unmapped).
Add-on equipment
The use case covers add-on field equipment (warning lights, sirens, barriers, pumps/irrigation) attached to sensor sites (simplicity#107).
#REQ.equipment-read-surface — read surface exposes equipment presence, type, online status,
control state (On | Off | Auto), and operating context (manual vs. threshold-driven Auto),
sourced per #REQ.registry-es-site-info.
#REQ.equipment-control-state — write surface issues control-state commands (On | Off |
Auto) and Auto trigger-threshold updates through the same operator chains, whitelist,
attestation, and audit invariants as sensor writes. Equipment writes are not a side channel.
Compose-time params
These are baked into the DAG at effect_create time. A separate effect is required for each
field being written (e.g. one effect for Reporting Frequency, another for Sensor Elevation).
| Operator | Param | Value |
|---|---|---|
sparkplug_ncmd |
connection_id |
Airflow connection for the EMQX broker |
sparkplug_ncmd |
metric_name |
Sparkplug B metric path, e.g. "Properties/Report Interval" |
sparkplug_ncmd |
datatype |
Sparkplug B type string, e.g. "UINT32" or "STRING" |
sparkplug_ncmd |
trigger_instant_report |
true to force immediate NBIRTH after write |
sparkplug_dcmd |
connection_id |
Airflow connection for the EMQX broker |
sparkplug_dcmd |
metric_name |
Metric path including device prefix, e.g. "Dev1/AIN1 Config/High Threshold" |
sparkplug_dcmd |
datatype |
Sparkplug B type string |
sparkplug_dcmd |
device_id |
"Dev1" (default for SignalFire Rangers) |
sparkplug_fast_report |
connection_id |
Airflow connection for the EMQX broker |
sparkplug_fast_report |
device_id |
"Dev1" (default) |
platform_state_upsert |
es_field |
site_info field name, e.g. "sensor_elevation_ft_d", "stream_bottom_elevation_ft_d", "potential_flood_level_d", "likely_flood_level_d" |
platform_state_upsert |
source |
Sensor source string for the site_info doc key. Defaults to "mqtt_sparkplug" (correct for all SignalFire Rangers) |
relay_audit_emit |
entity_type |
"sensor" or "equipment" |
relay_audit_emit |
field |
Broker metric path, e.g. "Properties/Report Interval" |
Runtime params (dagrun_conf)
These are supplied at trigger time via effect_run(uid, conf={...}).
| Param | Required by | Description |
|---|---|---|
group_id |
sparkplug_ncmd, sparkplug_dcmd, sparkplug_fast_report |
Sparkplug B group ID for the target sensor |
eon_node_id |
sparkplug_ncmd, sparkplug_dcmd, sparkplug_fast_report |
EON node ID (IMEI or named node) |
metric_value |
sparkplug_ncmd, sparkplug_dcmd |
Value to write; must match the compose-time datatype |
enabled |
sparkplug_fast_report |
Enable/disable Fast Reporting (boolean, default true) |
report_interval |
sparkplug_fast_report |
Fast Reporting interval in seconds (default 60) |
mode |
sparkplug_fast_report |
Fast Reporting mode (default "Duration") |
duration |
sparkplug_fast_report |
Fast Reporting run duration in seconds (default 1200) |
entity_id |
relay_audit_emit |
UID of the sensor or equipment record being changed |
after_value |
relay_audit_emit |
The value being written to the device |
actor |
relay_audit_emit |
Username of the user initiating the write, from Bearer token claims |
before_value |
relay_audit_emit |
Last-known value of the field before the write (optional, see note below) |
#REQ.relay-before-value — before_value is caller-supplied. Requirements (#111) did not
specify how to capture prior state; the triggering client is expected to pass the value currently
displayed to the user. The audit record emits null if the caller omits it, making the gap
visible rather than silent. A future mqtt_read operator could automate before-value capture and
close this gap. This decision is recorded in simplicity issue #111.
Relay audit ledger
relay_audit_emit writes to the relay-audit Elasticsearch index (Schema.RELAY_AUDIT in
conduit/schema.py). This index is separate from data-ingest and holds only relay audit
records. Each document is created with op_type: create (never index or upsert), so records
are immutable by construction.
Every audit document contains:
| Field | Source |
|---|---|
entity_type |
Compose-time param |
field |
Compose-time param |
entity_id |
dagrun_conf.entity_id |
before_value |
dagrun_conf.before_value (null if omitted) |
after_value |
dagrun_conf.after_value |
actor |
dagrun_conf.actor |
ts |
UTC timestamp at emit time |
dag_id |
Airflow DAG ID of the triggering effect |
run_id |
Airflow run ID |
#REQ.audit-covers-all-editables — audit coverage spans the full #107 editable-fields list
(sensor configuration + add-on equipment) and both targets of a dual-target write. Realizes
simplicity#111; the ledger is queryable per entity and per actor (relay_audit_list,
GET /userspace/relay/audit).
Relay clients (Cortex)
Like §oracle-integration.cortex, this subsection prescribes the consumer side of the contract.
Clients are thin: no MQTT (#REQ.no-mqtt-core-duplication), no broker credentials — they
stage Decision Effects and poll status.
#REQ.cortex-tool-surface — Cortex exposes the user-facing MCP tools as pure clients of the registry + Decision Effect dispatch:
| Tool | Action |
|---|---|
sensor_config_read(sensor_uid) |
Current configuration per #REQ.registry-es-site-info + last-known broker values; includes pending-write state |
sensor_config_write(sensor_uid, field, value, before_value) |
Validate against the whitelist registry; stage a Decision Effect for attestation. Does not dispatch. |
sensor_write_status(effect_id) |
Poll lifecycle state across the eDRX window (per §oracle-integration.cortex, Cortex polls Conduit effect_status) |
sensor_write_cancel(effect_id) |
Cancel before dispatch |
device_rebirth(sensor_uid) |
Stage the #REQ.device-rebirth-recipe effect — attestation-gated like any write |
#REQ.role-gated-writes — write/rebirth tools follow the #107 authorization matrix
("Simplicity Admin Only" / "Admin Can Edit" / read-only per field), enforced via platform
capability gating (sensor_write capability in the domain profile pack). The Beacon/ADI-UI
sensor configuration page is a thin client of these tools (offloadable per the #107 core/offload
boundary) and must represent staged → pending-wake → confirmed states across the eDRX window.
Implementation Status
| Gap | Description | Priority | Status |
|---|---|---|---|
| EE-1 | DataEffect domain class (conduit/domain/effect.py) |
P1 | ✅ Done |
| EE-2 | Multi-snippet DAG assembly in conduit/templates/renderer.py |
P1 | ✅ Done |
| EE-3 | Effect operator Jinja2 snippets — initial set: sql_extract, http_dispatch, s3_upload, email_notify, webhook_notify, pdf_convert, cortex_task_create |
P1 | ✅ Done |
| EE-4 | Effect operator catalog (conduit/templates/effects/catalog.json) |
P1 | ✅ Done |
| EE-5 | effect_create MCP tool with interactive connection collection |
P1 | ✅ Done |
| EE-6 | effect_status MCP tool |
P1 | ✅ Done |
| EE-7 | conduit__http_default Airflow connection registered on startup |
P1 | ✅ Done |
| EE-8 | catalog://effects and templates://effects/{operator} MCP resources |
P2 | ✅ Done |
| EE-9 | Remaining DataEffect management tools (effect_list, effect_get, effect_update, effect_delete, effect_pause, effect_unpause, effect_run, effect_stop, effect_runs, effect_metrics) |
P2 | ⚠️ Partial — metadata update done; operator/schedule redeploy is EE-30 |
| EE-D | effect_dispatch MCP tool — Cortex agent_dispatch inbound endpoint with payload hash verification |
P1 | ✅ Done |
| EE-10 | REST endpoints for DataEffect | P2 | ✅ Done (full CRUD + status/runs/metrics/run/stop in routes.py) |
| EE-11 | pipeline_type: "effect" discriminator on ES list queries — effects must not appear in datapipeline_list results |
P2 | ✅ Done (handled by subtype: "dataeffect" filter in _pipeline.read()) |
| EE-12 | cortex_task_create operator — Cortex API contract (endpoint, payload schema) not yet confirmed |
P2 | ✅ Done |
| EE-13 | DataEffect scheduling — expose next_dagrun from Airflow in status response |
P3 | ✅ Done (airflow_next_run field in _get_airflow_dag_info) |
| EE-14 | gitlab_issue_create operator implementation (per-customer token via axonis-core dagrun_conf; requires Cortex-side token propagation contract) |
P2 | ⚠️ Partial — snippet exists at gitlab_issue_create.py.j2 but removed from catalog; blocked on Cortex-side token propagation contract |
| EE-15 | Operator snippets: http_fetch, s3_fetch, csv_export, json_transform, chart_render, docling_extract |
P2 | ✅ Done |
| EE-16 | effect_schema MCP tool — list operators and get per-operator param schema |
P2 | ✅ Done |
| EE-17 | Persist edition_id, effect_type, target, deadline, payload_hash fields on DataEffect ES writes in effect_dispatch |
P2 | ✅ Done |
| EE-18 | relay_audit_emit operator — immutable audit record to relay-audit ES ledger; Schema.RELAY_AUDIT registered in conduit/schema.py; relay write recipe documented (simplicity #111) |
P1 | ✅ Done |
| EE-19 | sparkplug_ncmd operator — online-gated single-metric NCMD with instant-report trigger; protobuf via pysparkplug; single-metric constraint documented (Ranger firmware) |
P1 | ✅ Done |
| EE-20 | sparkplug_dcmd operator — online-gated single-metric DCMD with Dev1 default; optional instant-report trigger; SignalFire metric-name prefix convention documented |
P1 | ✅ Done |
| EE-21 | sparkplug_fast_report operator — bundled 4-metric DCMD for Fast Reporting configuration; all four metrics sent in one payload (subsystem requirement) |
P1 | ✅ Done |
| EE-22 | Writable-tag whitelist registry + pre-publish validation incl. confirmed DBIRTH datatypes (#REQ.writable-tag-whitelist-enforcement, #REQ.sparkplug-datatype-fidelity, Ranger Tag Guide) — conduit/relay/whitelist.py |
P1 | ✅ Done |
| EE-28 | sensor_config_read, sensor_config_write, sensor_config_fields MCP tools — whitelist-validated relay write staging, site_info read, pending-effect surface (#REQ.cortex-tool-surface) |
P2 | ⚠️ Partial — read/write/fields done; sensor_write_status, sensor_write_cancel, device_rebirth are EE-31–33 |
| EE-23 | Confirm-value enforcement — fail on confirming-value mismatch; NDATA-alias confirms surface verified: false; mismatch details written to airflow-metrics via _effect_failure_callback (#REQ.confirm-value-verified) |
P1 | ✅ Done |
| EE-29 | _effect_failure_callback injected into every assembled effect DAG via default_args; writes one run_summary metric doc to airflow-metrics per run on any task failure; index name resolved at render time (#REQ.effect-failure-callback) |
P1 | ✅ Done |
| EE-24 | Attested-dispatch enforcement on device-write DAGs (#REQ.relay-attested-dispatch) — effect_run rejects unattested runs when CONDUIT_RELAY_ATTESTATION_REQUIRED=true; ships false by default until eDRX cycle validated |
P1 | ✅ Done |
| EE-25 | Dual-write platform-state step (platform_state_upsert operator — ES site_info partial update after confirmed DCMD; #REQ.relay-dual-write) |
P2 | ✅ Done |
| EE-26 | sparkplug_read operator — decoded current metric values, automated before_value (#REQ.sparkplug-read-operator) |
P2 | ✅ Done |
| EE-27 | Device-rebirth recipe documented in catalog (#REQ.device-rebirth-recipe — sparkplug_ncmd + Node Control/Rebirth, no dedicated operator) |
P3 | ✅ Done |
| EE-30 | effect_update operator/schedule redeploy — reassemble DAG from new operator chain or schedule, resubmit to git, reset status: pending_validation, resume CI poll; domain/effect.py:update() + effect_tools.py:effect_update() (#[component.conduit.effect-engine#mcp-tools.effect-management](../conduit-effect-engine/index.md#mcp-tools.effect-management)) |
P2 | ✅ Done |
| EE-31 | sensor_write_status(effect_id) MCP tool — thin status wrapper for sensor write effects (#REQ.cortex-tool-surface) |
P2 | ✅ Done |
| EE-32 | sensor_write_cancel(effect_id) MCP tool — delete staged sensor write effect with confirmation (#REQ.cortex-tool-surface) |
P2 | ✅ Done |
| EE-33 | device_rebirth(sensor_uid, connection_id, visibility) MCP tool — assemble and stage sparkplug_ncmd Node Control/Rebirth effect; requires manual trigger after CI (#REQ.cortex-tool-surface) |
P2 | ✅ Done |
| EE-34 | Idempotency hardening — deterministic UID from dag_id hash + op_type="create" in ES write; ConflictError handled as duplicate signal; closes ~1 s scan-refresh race window in domain/effect.py:create() |
P1 | ✅ Done |
| EE-35 | _auto_start_poll persistence — polling_until field persisted at create/redeploy; _resume_pending_polls() on startup resumes live tasks and marks expired-pending records as error; wired in server/__main__.py lifespan |
P1 | ✅ Done |
| EE-36 | run_on_deploy race — loser of _claim_run_on_deploy clears update = {} so it cannot overwrite status: running written by the claimer |
P1 | ✅ Done |
| EE-37 | Integration tests — effect_dispatch → ES persist → effect_status polling round-trip; full system-level journeys for #REQ.done-effects-on-attest, #REQ.done-validated-and-logged, #REQ.done-one-handler |
P2 | ⏳ Pending — deferred until effects engine fully hooked up end-to-end |
Directory Layout Changes
New files:
conduit/
└── conduit/
├── domain/
│ └── effect.py # DataEffect domain class
└── templates/
└── effects/
├── catalog.json # Effect operator catalog
├── effect_failure_callback.py.j2 # DAG-level on_failure_callback (EE-29)
└── operators/ # Jinja2 operator snippets
├── sql_extract.py.j2
├── http_fetch.py.j2
├── s3_fetch.py.j2
├── pdf_convert.py.j2
├── csv_export.py.j2
├── json_transform.py.j2
├── s3_upload.py.j2
├── http_dispatch.py.j2
├── email_notify.py.j2
├── webhook_notify.py.j2
├── cortex_task_create.py.j2
├── mqtt_publish.py.j2
├── mqtt_await_confirmation.py.j2
├── relay_audit_emit.py.j2
├── sparkplug_ncmd.py.j2
├── sparkplug_dcmd.py.j2
├── sparkplug_fast_report.py.j2
├── platform_state_upsert.py.j2
├── sparkplug_read.py.j2
└── gitlab_issue_create.py.j2
conduit/
└── relay/
└── whitelist.py # writable-tag registry (EE-22)
server/
└── tools/
├── effect_tools.py # effect_* MCP tools
└── sensor_tools.py # sensor_config_* MCP tools (EE-28)
Files modified:
| File | Change |
|---|---|
conduit/templates/renderer.py |
Add multi-snippet assembly path alongside existing single-template render |
server/api/routes.py |
Add DataEffect REST endpoints |
server/mcp/app.py |
Register effect_* tools and Effect catalog resources |
server/__main__.py |
Register conduit__http_default Airflow connection on startup |
conduit/schema.py |
Add DataEffect schema constant; add Schema.RELAY_AUDIT registered to relay-audit index |
specs/service.md |
Update tool count, resource count, capability list, and directory layout |
Depends on: component.conduit.service, component.cortex.decision-evidence, component.cortex.task-and-effect
Realizes: product.decision-effect
Required by: platform.relay