Skip to content

Conduit — Effect DAG Engine

Purpose

The Effect engine is Conduit's second DAG engine, alongside the ingest engine. Where the ingest engine deploys a single DAG per data source using a fixed provider template, the Effect engine composes multi-step DAGs from a catalog of modular operators — each operator becoming a discrete Airflow step that executes at run time and passes its output to the next step via XCOM.

Oracle's LLM knows Conduit's operator catalog and acts as the orchestrator: it translates user intent ("take this data, convert to PDF, then save it in S3") into an ordered list of operators and calls Conduit to assemble and deploy the resulting DAG. Conduit is the assembler, executor, and owner of all DataEffect objects — create, read, update, delete, and run lifecycle.


The Two Engines

Ingest Engine Effect Engine
Purpose Pull data from a source into UDS Execute multi-step action workflows
DAG authorship One fixed provider template per source type N operator templates composed into one DAG
Composition Single Jinja2 template, rendered at deploy time Operator templates chained at effect-create time; steps execute at DAG run time
Deploy path Git branch → CI pipeline → main (same) Git branch → CI pipeline → main (same)
Trigger Schedule or manual Manual or Oracle-orchestrated
Domain object DataPipeline (Conduit-owned) DataEffect (Conduit-owned)
Lifecycle pending_validation → active → paused pending_validation → active → paused
CRUD owner Conduit Conduit

Operator Catalog

The Effect operator catalog documents every atomic operation available for DAG composition. Oracle reads this catalog (via MCP resource) to know what steps are available when translating user intent into an operator chain.

Each operator ships as a Jinja2 template snippet under conduit/templates/effects/. At effect-create time, Conduit stitches the selected operator snippets into a single coherent DAG file (one @task-decorated function per operator, wired sequentially as DAG steps), then commits it through the standard CI pipeline.

Available operators

Operator Category Description Required Connection Required compose-time params
sql_extract Extract Execute a SQL query; pass the result set to the next step Existing DataPipelineConnection (any relational/warehouse type) connection_id, sql
http_fetch Extract Fetch data from an HTTP endpoint connection_id in conf, or conduit__http_default url
s3_fetch Extract Read a file from S3 S3 DataPipelineConnection connection_id, bucket, key
docling_extract Extract Extract text or markdown from a document (PDF, Office, etc.) stored in cloud storage or accessible via HTTP. Runs on the docling Airflow worker queue. S3, GCS, Azure Blob, or HTTP DataPipelineConnection connection_id, path
pdf_convert Transform Convert input data or document to PDF None
csv_export Transform Serialize input data to CSV None
json_transform Transform Apply a jq-style or dot-notation path expression to a JSON payload; falls back to simple dot-notation if jmespath is not installed None expression
chart_render Transform Render a bar or line chart from tabular input (sql_extract output or list of dicts); returns a base64-encoded PNG None x_field, y_field
s3_upload Load Upload a file or data payload to S3 S3 DataPipelineConnection connection_id, bucket, key
http_dispatch Load POST a payload to an HTTP endpoint connection_id in conf, or conduit__http_default endpoint
email_notify Load Send an email to specified recipients SMTP DataPipelineConnection connection_id, recipients, subject
webhook_notify Load POST a notification payload to a webhook URL connection_id in conf, or conduit__http_default url
cortex_task_create Load Create a Cortex WorkflowTask via POST /tasks. task_type must be one of review, risk_review, attest, refresh, acknowledge (default acknowledge). edition_id is required in dagrun_conf for review and attest types. At least one of assignee_user_id (runtime) or assignee_roles (compose-time) is required. insight_id is required at runtime. Returns task_id (tsk_ prefix), event_id, and status. Bearer token propagated via dagrun_conf. None cortex_url, task_title
mqtt_publish Load Publish a message to an MQTT broker topic. Payload defaults to upstream output (JSON-serialised); can be overridden with a static compose-time value. TLS auto-enabled on port 8883. WARNING: for Sparkplug B device writes, only tags in the platform writable-tag whitelist (Ranger Tag Guide) are accepted at runtime — the broker silently drops writes to read-only or non-existent tags. Chain with mqtt_await_confirmation for the full online-gated Relay write protocol. MQTT DataPipelineConnection (conn_type=generic) connection_id, topic
mqtt_await_confirmation Load Subscribe to an MQTT topic and block until a confirmation message arrives or the timeout expires. For Sparkplug B the confirmation topic is spBv1.0/{group_id}/NBIRTH/{edge_node_id}. Default timeout is 900s (15 min) to cover the eDRX wake window. Raises a descriptive error on timeout. Use as the second step of mqtt_publishmqtt_await_confirmation for the full Relay write protocol, or omit for fire-and-forget. MQTT DataPipelineConnection (conn_type=generic) connection_id, confirmation_topic
relay_audit_emit Load Emit an immutable audit record to the relay-audit Elasticsearch ledger. Each invocation creates a new document keyed by a random UUID — records are never overwritten. Use as the final step in any relay write effect. entity_type and field are baked in at compose time; entity_id, before_value, after_value, and actor arrive via dagrun_conf at runtime. See Relay Write Pattern for the full wiring. None entity_type, field
sparkplug_read Extract Subscribe to a Sparkplug B sensor's BIRTH/DATA topics and return decoded metric values. Optionally forces an instant report via Properties/Report Interval NCMD. Returns metrics dict (name→value), message_type, is_birth, and topic. BIRTH messages carry full metric names; DATA (alias-only) returns an empty metrics dict with is_birth: false. Use as the first step in a write chain to capture before_value, or standalone to display current sensor configuration. MQTT DataPipelineConnection connection_id
platform_state_upsert Load Partial-upsert a single site_info field in Elasticsearch after a confirmed Sparkplug B write (#REQ.relay-dual-write). Takes confirming_value from the upstream DCMD/NCMD result and writes to site_info.{es_field} keyed by site:{source}:{eon_node_id}. Raises if the upstream step did not confirm — ES is never updated for unconfirmed writes. Raises if the site_info doc does not exist (sensor not yet provisioned). Middle step in dual-target chain: sparkplug_dcmd → platform_state_upsert → relay_audit_emit. None es_field
sparkplug_ncmd Load Online-gated single-metric NCMD write to a Sparkplug B edge node. Gates on NBIRTH/NDATA proof of life before publishing, then blocks until a confirming NBIRTH arrives. Optionally triggers an instant report via Properties/Report Interval rewrite for fast verification. Single-metric payloads only (Ranger firmware constraint). Requires pysparkplug and paho-mqtt. MQTT DataPipelineConnection connection_id, metric_name, datatype
sparkplug_dcmd Load Online-gated single-metric DCMD write to a Sparkplug B device (default Dev1). Same online-gate and instant-report logic as sparkplug_ncmd; waits for confirming DBIRTH or DDATA. For SignalFire Rangers, metric_name must include the Dev1/ prefix (e.g. Dev1/AIN1 Config/High Threshold). Requires pysparkplug and paho-mqtt. MQTT DataPipelineConnection connection_id, metric_name, datatype
sparkplug_fast_report Load Bundled 4-metric DCMD to configure Fast Reporting on a Sparkplug B device. Sends Enabled, Report Interval, Mode, and Duration in one payload; all four must be present. Gates on sensor online status and waits for confirming DBIRTH/DDATA. Requires pysparkplug and paho-mqtt. MQTT DataPipelineConnection connection_id
gitlab_issue_create Load Create a GitLab issue. Not available — removed from catalog pending EE-14 (per-customer token must arrive via dagrun_conf, not a static Airflow connection; Cortex-side token propagation contract not yet defined). Snippet exists at gitlab_issue_create.py.j2. Blocked — see #REQ.gitlab-per-customer-token

#REQ.gitlab-per-customer-tokengitlab_issue_create does not use a static Airflow connection for auth. Each customer has their own GitLab token; Cortex passes the token to Conduit via axonis-core at trigger time, and it arrives in the operator as a run-time param via dagrun_conf. Conduit does not store or cache customer GitLab tokens. The token contract between Cortex and Conduit is owned by axonis-core and is consistent with how other per-customer credentials are propagated across platform services.

Compose-time params: project_id, labels (base label list), confidential (bool). Run-time params (via dagrun_conf): title, description, extra_labels (appended to compose-time labels), gitlab_token (injected by axonis-core from caller context).

#REQ.catalog-is-open — the operator catalog is not a whitelist. Operators are the documented, well-tested building blocks; custom operators can be added by extending the Jinja2 snippet library without changing Conduit's core assembly logic. Oracle reads the catalog to know what is available, not to enforce a closed set.

#REQ.operator-xcom — each operator step writes its primary output to a defined XCOM key (effect_output). The next operator reads from that key as its primary input. Operators that produce no downstream-usable output (e.g., email_notify) write null to effect_output so the chain does not break.

Connection requirements

Operators that interact with external systems declare a required connection type. When an effect is created, Conduit checks whether each required connection already exists in Airflow. If a connection is missing, Conduit asks the user for the credentials before proceeding with DAG assembly.

#REQ.connection-collection — connection collection is interactive and conversational. Conduit returns a structured prompt listing each missing connection and the fields required for it (using the same JSON schema templates as connection_schema). The user provides credentials; Conduit creates the Airflow connection via connection_create before rendering the DAG. The effect-create call does not complete until all required connections exist.

#REQ.connection-reuse — if a suitable connection already exists in Airflow (matched by type and, where applicable, host/database), Conduit uses it without prompting. The user is shown which existing connection will be used and can override.

Generic HTTP fallback

Operators that dispatch to HTTP endpoints (http_fetch, http_dispatch, webhook_notify) accept an optional connection_id in their operator params. Resolution order:

  1. connection_id explicitly provided → use that Airflow connection
  2. No connection_id → use conduit__http_default, a platform-level generic HTTP connection that Conduit registers in Airflow on first startup

#REQ.generic-http-connection — Conduit registers conduit__http_default on startup, idempotently. This connection carries no credentials; it provides a named HTTP hook as a fallback for unauthenticated or bearer-token-in-payload targets.


DAG Composition

When effect_create is called with an ordered operator list, Conduit assembles a single Python DAG file from the operator Jinja2 snippets. The assembled DAG contains one @task-decorated function per operator, wired sequentially as DAG steps. Operators execute at DAG run time; outputs pass between steps via XCOM.

Assembly process

effect_create(name, operators=[...], visibility, schedule?, run_on_deploy?)
  1. Validate each operator exists in the catalog
  2. Determine required connections for all operators
  3. For each missing connection: collect credentials from user (§ Connection Requirements)
  4. Create any new Airflow connections via connection_create
  5. Render assembled DAG:
       a. Load operator Jinja2 snippet for each step
       b. Stitch into a single DAG file (dag_id: {username}__{name})
       c. Wire operator steps sequentially; each reads from previous step's XCOM
  6. Commit rendered DAG to git pending branch (same as datapipeline_deploy)
  7. Create DataEffect ES record (status: pending_validation)
  8. Return { uid, dag_id, status: pending_validation }
  → GitLab CI runs: syntax check, import check, connection test
  → On CI pass: merge to main, GitSync to Airflow, DataEffect status → active

#REQ.ci-polling-timeout — the CI polling background process times out after 10 minutes, matching ingest auto-start bounds. On timeout the DataEffect record is marked error.

Operator params at compose time vs. run time

Operator parameters fall into two categories:

Category When Resolved Example
Compose-time params Baked into the rendered DAG at effect-create S3 bucket name, SQL query, recipient email addresses, target URL
Run-time params Passed via DAG conf at trigger time Watermark values, specific record IDs, dynamic payloads from Cortex

Compose-time params are substituted into the Jinja2 snippets during assembly, producing a static DAG. Run-time params arrive via dagrun_conf and are read by individual operators at execution time. Both are valid; operators declare which of their params support run-time override.


Domain Model: DataEffect

DataEffect is a Conduit-owned domain object, parallel to DataPipeline. Conduit owns all CRUD operations, status, run lifecycle, and metrics for DataEffect — the same scope it owns for DataPipeline.

DataEffect fields

Field Writable Description
uid system Conduit-internal record ID
dag_id system Airflow DAG ID ({username}__{name})
name user Human-readable effect name
description user Optional description
visibility user ABAC marking
operators user Ordered list of operator names and their compose-time params
connections system Airflow connection IDs used by this effect (resolved at create time)
status system pending_validation, active, paused, error, rejected
dag_content system Rendered DAG source (stored for redeploy/edit)
commit_sha system Git commit SHA under CI validation
schedule user Re-run schedule (cron or named interval); same semantics as DataPipeline scheduling
next_scheduled_run system Next scheduled trigger time
last_scheduled_run system Last scheduled trigger time
pipeline_type system Always "effect" — used by CI and UI to identify this DAG as an Effect
created_by system Username from auth context ("cortex" on Cortex-dispatched effects)
create_ts system Creation timestamp (ISO 8601 UTC)
correlation_id user Optional opaque ID supplied by the caller at create time; links this DataEffect back to an upstream object (e.g., a Cortex DecisionEffect eff_ ID or edition ID). Not interpreted by Conduit; stored and returned as-is for caller use.
edition_id cortex The attested Edition that triggered this DataEffect via Cortex dispatch. Populated by effect_dispatch; absent on DataEffects created directly via effect_create.
effect_type cortex DES effect type (external_dispatch, notification, human_process) as declared on the Cortex Decision Effect. Populated by effect_dispatch; absent on DataEffects created directly via effect_create.
target cortex Implementation-defined target descriptor from the Cortex decision template. Populated by effect_dispatch; absent on DataEffects created directly via effect_create.
deadline cortex ISO 8601 deadline inherited from the Cortex Decision Effect. Conduit records it but does not enforce it — deadline monitoring is Cortex's responsibility.
payload_hash cortex SHA-256 hex digest of the canonical dispatch payload, computed by Cortex at Decision Effect creation time. Verified by effect_dispatch before DAG assembly; stored for audit purposes.

ES index

DataEffect records live in the same userspace ES index as DataPipeline and DataPipelineConnection. The pipeline_type: "effect" field discriminates them from ingest pipeline records.

#REQ.effect-type-discriminator — all DataEffect documents carry pipeline_type: "effect". Ingest pipeline documents carry pipeline_type equal to the provider template name (e.g., "postgres_ingest"). Queries that fetch only effects filter on pipeline_type: "effect".


MCP Tools

DataEffect Management (12)

The DataEffect management tool surface mirrors the DataPipeline management surface exactly.

Tool Description
effect_create(name, operators, visibility, schedule?, description?, run_on_deploy?) Assemble and deploy an Effect DAG from an ordered operator list. Collects missing connections interactively. Returns {uid, dag_id, status: pending_validation}.
effect_list() List Conduit-managed DataEffect records with live Airflow run state
effect_get(uid) Get a single DataEffect record including operator list and live Airflow state
effect_update(uid, operators?, schedule?, description?, visibility?) Redeploy with updated operator chain or metadata
effect_delete(uid) Remove effect DAG from git and ES
effect_status(uid) Poll GitLab CI for pending effects; return live Airflow state for active ones
effect_pause(uid) Pause the Airflow DAG
effect_unpause(uid) Resume a paused effect
effect_run(uid, conf?) Trigger a new DAG run; optional conf supplies run-time operator params
effect_stop(uid) Stop the active run for an effect
effect_runs(uid, state?, limit?) List run history for an effect
effect_metrics(uid, since?, limit?) Get run metrics and statistics

Effect Catalog (2)

Tool Description
effect_schema() List all available operators from the Effect operator catalog
effect_schema(operator_name) Get the full parameter schema for a specific operator, including which params are compose-time vs. run-time

Cortex Dispatch (1)

Tool Description
effect_dispatch(effect_id, edition_id, effect_type, target, payload, payload_hash, correlation_id?, deadline?) Inbound endpoint called by Cortex's agent_dispatch handler to execute a Decision Effect. Verifies payload_hash before proceeding. Assembles and deploys the described DAG, creates a DataEffect ES record, and returns {acknowledged, external_reference, dag_id, status: pending_validation}. external_reference is the DataEffect uid — Cortex stores this on its DecisionEffect record and uses it to poll effect_status.

REST Endpoints

DataEffect (13)

Mirrors the DataPipeline REST surface.

Method Path Description
POST /userspace/effect Create and deploy an Effect DAG
GET /userspace/effect List DataEffect records
GET /userspace/effect/{uid} Get a single DataEffect record
PATCH /userspace/effect/{uid} Update effect; action: "pause"\|"unpause" supported
DELETE /userspace/effect/{uid} Delete an effect
GET /userspace/status/effect Bulk status for all effects
GET /userspace/effect/{uid}/status Status for a single effect
GET /userspace/effect/schema List available operators
GET /userspace/effect/schema/{operator_name} Get operator parameter schema
GET /userspace/effect/{uid}/runs List run history
POST /userspace/effect/{uid}/run Trigger a run
POST /userspace/effect/{uid}/stop Stop the active run
GET /userspace/effect/{uid}/metrics Get run metrics

Oracle Integration

There are two distinct paths by which a DataEffect is created. They use different entry-point tools and have different callers.

User / Oracle path

Oracle reads the Effect operator catalog via the catalog://effects MCP resource. When a user expresses intent that maps to a multi-step action workflow, Oracle:

  1. Reads the catalog to identify relevant operators and their required connections
  2. Calls effect_create with the assembled operator list and compose-time params
  3. Responds to connection-collection prompts from Conduit by asking the user for missing credentials
  4. Monitors effect_status until the DAG is active, then optionally triggers a run

Cortex dispatch path

When an Edition is attested and its decision template declares effects, Cortex's EffectDispatcher fires its agent_dispatch handler, which calls Conduit's effect_dispatch MCP tool directly — Oracle is not in the loop. The Cortex Decision Effect types map to specific operators or operator chains:

Cortex effect_type Operator chain
external_dispatch http_dispatch (or typed connection if one exists)
notification (email) email_notify
notification (webhook) webhook_notify
human_process / task_creation cortex_task_create
human_process / ticket_creation gitlab_issue_create (not yet available — see EE-14)

effect_dispatch verifies the payload hash, assembles the DAG, creates the DataEffect ES record, and returns {acknowledged, external_reference, dag_id, status: pending_validation}. Cortex stores the returned external_reference (the DataEffect uid) on its own DecisionEffect record.

Status feedback — Cortex polls Conduit. Conduit does not push status back to Cortex. After calling effect_dispatch, Cortex is responsible for polling effect_status(uid) to track DAG activation and run completion. Cortex uses the returned Airflow run state to drive its own DecisionEffect lifecycle transitions (pending → acknowledged → completed | failed). The correlation_id field on DataEffect carries the Cortex eff_ ID so Cortex can match poll results back to the originating DecisionEffect record.


MCP Resources

Two MCP resources expose Effect engine metadata to Oracle and other agents:

Resource Description
catalog://effects Full Effect operator catalog — operator names, categories, descriptions, required connections, compose-time and run-time param schemas
templates://effects/{operator_name} Annotated reference snippet for a specific operator, showing step structure, XCOM usage, and param wiring

Relationship to the Ingest Engine

Shared components

  • conduit/airflow/client.py — same AirflowClient
  • conduit/git/client.py — same git commit/CI pipeline path
  • conduit/templates/renderer.py — extended to support multi-snippet stitching in addition to single-template rendering
  • AxonisMiddleware + require_auth — same auth on all DataEffect endpoints
  • Oracle self-registration — DataEffect tools appear in the same Conduit tool catalog
  • Userspace ES index — same index, discriminated by pipeline_type
  • Scheduling subsystem — same scheduler DAG, same schedule/next_scheduled_run/last_scheduled_run fields

Not shared

  • conduit/domain/pipeline.pyDataEffect is a separate domain class in conduit/domain/effect.py
  • conduit/templates/catalog.json — Effect operators are catalogued separately in conduit/templates/effects/catalog.json
  • Connection collection — ingest requires a connection upfront; DataEffect creation collects connections interactively during effect_create

Relay Write Pattern

The relay write pattern is the standard operator chain for writing a configuration value to a physical field device via MQTT/Sparkplug B and emitting an immutable audit record. "Relay" is a use case of the Effect engine — never a separate component, service, or capability contract (decision 2026-06-10, simplicity#124; the non-normative pointer doc at developers-environment/specs/platform/relay.md records the history). This section is the normative home for the entire device write-back use case (simplicity #107, #111): operator chains, safety invariants, registry, equipment, audit, and the Cortex client surface.

Operator chains

Three chains cover the writable field types. Choose based on the target metric level and whether you need bundled Fast Reporting config.

Node-level write (NCMD) — e.g. Properties/Name, Properties/Report Interval:

sparkplug_ncmd  →  relay_audit_emit

sparkplug_ncmd gates on sensor online status, publishes a single-metric NCMD, and optionally triggers an instant report for fast verification. relay_audit_emit writes the immutable audit record. Set trigger_instant_report: true for any write where you need verification within seconds rather than at the next eDRX wake cycle.

Device-level write (DCMD) — broker-only, e.g. Reporting Frequency, Rebirth:

sparkplug_dcmd  →  relay_audit_emit

sparkplug_dcmd gates on sensor online status, publishes a single-metric DCMD, and optionally triggers an instant report via Properties/Report Interval NCMD to force DBIRTH confirmation.

Dual-target device-level write (DCMD + ES platform state) — e.g. Sensor Elevation, Flooding Possible/Likely:

sparkplug_dcmd  →  platform_state_upsert  →  relay_audit_emit

After DBIRTH confirmation, platform_state_upsert writes the confirmed value to site_info.{es_field} in Elasticsearch so the normalization pipeline picks up the new calibration value on the next NBIRTH cycle. Set trigger_instant_report: true on the DCMD step if you need the normalize pipeline to re-run quickly. Raises and marks the effect failed if the DCMD was not confirmed — the ES write is never performed for unconfirmed device writes.

Fast Reporting configuration (bundled 4-metric DCMD):

sparkplug_fast_report  →  relay_audit_emit

sparkplug_fast_report sends Enabled, Report Interval, Mode, and Duration in one DCMD payload — the Fast Reporting subsystem requires all four together.

Legacy generic chain (JSON publish, no online gate):

mqtt_publish  →  mqtt_await_confirmation  →  relay_audit_emit

Use only when the target device does not speak Sparkplug B or when pysparkplug is unavailable. mqtt_publish sends a raw JSON payload; mqtt_await_confirmation blocks until any message arrives on the confirmation topic. No online-gate and no protobuf encoding.

A Sparkplug B device write MUST use the sparkplug_* chains. The legacy JSON chain cannot drive a Ranger and carries none of the safety invariants below; it is not a permitted fallback for device writes, only for genuinely non-Sparkplug MQTT targets.

Safety invariants

The sparkplug_* operators already deliver Sparkplug B protobuf encoding (pysparkplug), runtime device targeting (group_id/eon_node_id/metric_value via dagrun_conf; metric + datatype compose-time per field-class is the accepted design), the retained-NDEATH online gate, and cleanup-on-finally (EE-19..EE-21). Device writes are durable, asynchronous jobs with pollable status across the eDRX window — no surface exposes a blocking write call. The remaining invariants are mandated here:

#REQ.no-mqtt-core-duplication — the MQTT/Sparkplug write mechanics live only in this operator library. No consumer (Cortex, Beacon, ADI-UI, scripts) embeds its own MQTT client for device writes — every write path runs through a Conduit effect DAG.

#REQ.sparkplug-datatype-fidelity — the compose-time datatype MUST match the datatype the device declares for that metric in its DBIRTH payload, exactly. Before an effect for a new metric is created, the datatype is confirmed from the EMQX field tables or a live DBIRTH capture — never guessed from the measurement's physical kind. (Blocking discipline recorded on simplicity#107, 2026-06-10: the four device-level fields — PRESS1 Zero Offset, DIN1/DIN2 Average Frequency, AIN1 High/Low Threshold — await DBIRTH datatype confirmation.)

#REQ.confirm-value-verified — a confirmed write means the confirming NBIRTH/DBIRTH carries the written metric at the written value. The operators currently report confirmed + confirming_value without asserting; the step MUST fail (or mark the effect failed) on a value mismatch, and an NDATA-alias confirmation (where metric names are unavailable) MUST surface as verified: false rather than silently passing. Mismatch details land in the airflow-metrics failure record via _effect_failure_callback (see #REQ.effect-failure-callback) — the RuntimeError message raised by the operator is the carrier; it is captured in history[].error alongside the full run context.

#REQ.writable-tag-whitelist-enforcement — before publish, the write is validated against the curated writable-tag whitelist (Ranger Tag Guide — the broker silently drops writes to read-only or non-existent tags; the firmware writable flag is unreliable). Out-of-whitelist metrics and out-of-range/enum values are rejected with a descriptive error before any MQTT traffic. The whitelist registry (metric, datatype, range/enum, device model) lives in Conduit and is the single source of truth for the field → metric → datatype → platform-state mapping (#REQ.field-mapping-authority, see #relay-write-pattern.registry) — it is also where #REQ.sparkplug-datatype-fidelity's confirmed datatypes are recorded.

#REQ.relay-attested-dispatch (the #107 "secondary confirmation") — a device-write effect (any DAG containing a sparkplug_ncmd, sparkplug_dcmd, or sparkplug_fast_report step) dispatches only from an attested-edition context: the Cortex dispatch path (effect_dispatch carrying edition_id) or an equivalently attested effect_run. A direct, unattested effect_run on a device-write DAG is rejected and the attempt audited. The upstream firewall is Edition attestation (product.decision-effect #REQ.requires-attested-edition). Dispatch ships behind a feature flag until the eDRX confirm cycle is validated against real Ranger nodes; autonomous (un-attested) dispatch is never enabled.

#REQ.relay-dual-write — for fields that also exist as platform-side state (ES sensor calibration fields, AlertThreshold docs — see the dual-target field map in #relay-write-pattern.registry), the effect DAG updates the platform state as a step after write confirmation, broker-first; a platform-state failure after a confirmed device write marks the effect failed, records the divergence in the audit ledger, and raises an escalation Signal.

#REQ.sparkplug-read-operator — a read operator (sparkplug_read) subscribes to the target's BIRTH/DATA topics (optionally forcing an instant report, reusing the sparkplug_ncmd Phase-2 mechanic) and returns current decoded metric values via effect_output, enabling pre-write display and automated before_value capture (closing the caller-supplied interim in #REQ.relay-before-value).

#REQ.device-rebirth-recipe — device rebirth is realized as a sparkplug_ncmd effect with metric_name: "Node Control/Rebirth", datatype: BOOLEAN, metric_value: true — no dedicated operator (per the #107 checklist completion, 2026-06-10). The recipe is documented in the catalog and the rebirth effect is attestation-gated like any other device write.

#REQ.effect-failure-callback — every assembled effect DAG includes _effect_failure_callback in its default_args, wired by assemble_effect() at render time. The callback fires on any task failure, connects to Elasticsearch via the standard ELASTIC_HOST / ELASTIC_USERNAME / ELASTIC_PASSWORD / ELASTIC_VERIFY env vars, and indexes one document per run (keyed by run_id) to the airflow-metrics index. The document carries dag_id, task_id, timestamp, and a run_summary metric entry with status: Error and the full exception message in error — matching the schema used by atlas-airflow's pipeline_failure_callback so effect runs are visible alongside ingest runs in the same metrics index. The callback is non-fatal: any ES error is swallowed so it cannot mask the original task failure. When ELASTIC_HOST is absent the callback exits immediately.

Registry, write classes & field readiness

#REQ.registry-es-site-info — the configured sensor/equipment inventory is read from the ES site_info / sensors indices (maintained by the Airflow ingest path). Site/topic inventory never uses the EMQX /clients REST endpoint. Where component.sentinel.alerting is deployed, its Sensor registry is honored and write outcomes are additionally emitted as AlertEvents (source_type: relay) — conditional on Sentinel's presence, never a prerequisite (Sentinel is not deployed in the Simplicity engagement).

#REQ.write-classes — the use case admits three write classes, all attestation-gated and audited: dual-target (broker + platform state, #REQ.relay-dual-write ordering), broker-only (e.g. Reporting Frequency, Rebirth), and platform-state-only (e.g. the Sensor Location override, pending confirmation — no broker leg). The field-mapping table records each field's class.

#REQ.field-mapping-authority — the field → broker metric → DBIRTH-confirmed datatype → platform-state mapping is maintained with the writable-tag whitelist (one source of truth); the

107 mapping seeds it. Dual-target fields per #107:

Field (#107) Broker metric Platform state
Sensor Elevation (MSL) Dev1/PRESS1 Config/Zero Offset ES sensor calibration fields
Bottom of stream (MSL) Dev1/DIN1 Average Frequency (repurposed) ES stream_bottom_elevation_ft_d
Top of stream (MSL) Dev1/DIN2 Average Frequency (repurposed) ES sensor calibration fields
Flooding Possible level Dev1/AIN1 Config/High Threshold ES site_info.potential_flood_level_d (operator-entered override; normalize copies to flooding_possible_ft_d per reading)
Flooding Likely level Dev1/AIN1 Config/Low Threshold ES site_info.likely_flood_level_d (operator-entered override; normalize copies to flooding_likely_ft_d per reading)

Field readiness (#107 triage, 2026-06-10, informative): 9 fields unblocked (Sensor Name, Reporting Frequency, Fast Reporting, Rebirth already delivered; the four device-level fields above await DBIRTH datatype confirmation). 4 blocked pending customer resolution: Sensor Location (no writable GPS metric — likely platform-state-only), Installation Date (mapping looks wrong), Serial Number (separate non-broker workflow), Service period (unmapped).

Add-on equipment

The use case covers add-on field equipment (warning lights, sirens, barriers, pumps/irrigation) attached to sensor sites (simplicity#107).

#REQ.equipment-read-surface — read surface exposes equipment presence, type, online status, control state (On | Off | Auto), and operating context (manual vs. threshold-driven Auto), sourced per #REQ.registry-es-site-info.

#REQ.equipment-control-state — write surface issues control-state commands (On | Off | Auto) and Auto trigger-threshold updates through the same operator chains, whitelist, attestation, and audit invariants as sensor writes. Equipment writes are not a side channel.

Compose-time params

These are baked into the DAG at effect_create time. A separate effect is required for each field being written (e.g. one effect for Reporting Frequency, another for Sensor Elevation).

Operator Param Value
sparkplug_ncmd connection_id Airflow connection for the EMQX broker
sparkplug_ncmd metric_name Sparkplug B metric path, e.g. "Properties/Report Interval"
sparkplug_ncmd datatype Sparkplug B type string, e.g. "UINT32" or "STRING"
sparkplug_ncmd trigger_instant_report true to force immediate NBIRTH after write
sparkplug_dcmd connection_id Airflow connection for the EMQX broker
sparkplug_dcmd metric_name Metric path including device prefix, e.g. "Dev1/AIN1 Config/High Threshold"
sparkplug_dcmd datatype Sparkplug B type string
sparkplug_dcmd device_id "Dev1" (default for SignalFire Rangers)
sparkplug_fast_report connection_id Airflow connection for the EMQX broker
sparkplug_fast_report device_id "Dev1" (default)
platform_state_upsert es_field site_info field name, e.g. "sensor_elevation_ft_d", "stream_bottom_elevation_ft_d", "potential_flood_level_d", "likely_flood_level_d"
platform_state_upsert source Sensor source string for the site_info doc key. Defaults to "mqtt_sparkplug" (correct for all SignalFire Rangers)
relay_audit_emit entity_type "sensor" or "equipment"
relay_audit_emit field Broker metric path, e.g. "Properties/Report Interval"

Runtime params (dagrun_conf)

These are supplied at trigger time via effect_run(uid, conf={...}).

Param Required by Description
group_id sparkplug_ncmd, sparkplug_dcmd, sparkplug_fast_report Sparkplug B group ID for the target sensor
eon_node_id sparkplug_ncmd, sparkplug_dcmd, sparkplug_fast_report EON node ID (IMEI or named node)
metric_value sparkplug_ncmd, sparkplug_dcmd Value to write; must match the compose-time datatype
enabled sparkplug_fast_report Enable/disable Fast Reporting (boolean, default true)
report_interval sparkplug_fast_report Fast Reporting interval in seconds (default 60)
mode sparkplug_fast_report Fast Reporting mode (default "Duration")
duration sparkplug_fast_report Fast Reporting run duration in seconds (default 1200)
entity_id relay_audit_emit UID of the sensor or equipment record being changed
after_value relay_audit_emit The value being written to the device
actor relay_audit_emit Username of the user initiating the write, from Bearer token claims
before_value relay_audit_emit Last-known value of the field before the write (optional, see note below)

#REQ.relay-before-valuebefore_value is caller-supplied. Requirements (#111) did not specify how to capture prior state; the triggering client is expected to pass the value currently displayed to the user. The audit record emits null if the caller omits it, making the gap visible rather than silent. A future mqtt_read operator could automate before-value capture and close this gap. This decision is recorded in simplicity issue #111.

Relay audit ledger

relay_audit_emit writes to the relay-audit Elasticsearch index (Schema.RELAY_AUDIT in conduit/schema.py). This index is separate from data-ingest and holds only relay audit records. Each document is created with op_type: create (never index or upsert), so records are immutable by construction.

Every audit document contains:

Field Source
entity_type Compose-time param
field Compose-time param
entity_id dagrun_conf.entity_id
before_value dagrun_conf.before_value (null if omitted)
after_value dagrun_conf.after_value
actor dagrun_conf.actor
ts UTC timestamp at emit time
dag_id Airflow DAG ID of the triggering effect
run_id Airflow run ID

#REQ.audit-covers-all-editables — audit coverage spans the full #107 editable-fields list (sensor configuration + add-on equipment) and both targets of a dual-target write. Realizes simplicity#111; the ledger is queryable per entity and per actor (relay_audit_list, GET /userspace/relay/audit).

Relay clients (Cortex)

Like §oracle-integration.cortex, this subsection prescribes the consumer side of the contract. Clients are thin: no MQTT (#REQ.no-mqtt-core-duplication), no broker credentials — they stage Decision Effects and poll status.

#REQ.cortex-tool-surface — Cortex exposes the user-facing MCP tools as pure clients of the registry + Decision Effect dispatch:

Tool Action
sensor_config_read(sensor_uid) Current configuration per #REQ.registry-es-site-info + last-known broker values; includes pending-write state
sensor_config_write(sensor_uid, field, value, before_value) Validate against the whitelist registry; stage a Decision Effect for attestation. Does not dispatch.
sensor_write_status(effect_id) Poll lifecycle state across the eDRX window (per §oracle-integration.cortex, Cortex polls Conduit effect_status)
sensor_write_cancel(effect_id) Cancel before dispatch
device_rebirth(sensor_uid) Stage the #REQ.device-rebirth-recipe effect — attestation-gated like any write

#REQ.role-gated-writes — write/rebirth tools follow the #107 authorization matrix ("Simplicity Admin Only" / "Admin Can Edit" / read-only per field), enforced via platform capability gating (sensor_write capability in the domain profile pack). The Beacon/ADI-UI sensor configuration page is a thin client of these tools (offloadable per the #107 core/offload boundary) and must represent staged → pending-wake → confirmed states across the eDRX window.


Implementation Status

Gap Description Priority Status
EE-1 DataEffect domain class (conduit/domain/effect.py) P1 ✅ Done
EE-2 Multi-snippet DAG assembly in conduit/templates/renderer.py P1 ✅ Done
EE-3 Effect operator Jinja2 snippets — initial set: sql_extract, http_dispatch, s3_upload, email_notify, webhook_notify, pdf_convert, cortex_task_create P1 ✅ Done
EE-4 Effect operator catalog (conduit/templates/effects/catalog.json) P1 ✅ Done
EE-5 effect_create MCP tool with interactive connection collection P1 ✅ Done
EE-6 effect_status MCP tool P1 ✅ Done
EE-7 conduit__http_default Airflow connection registered on startup P1 ✅ Done
EE-8 catalog://effects and templates://effects/{operator} MCP resources P2 ✅ Done
EE-9 Remaining DataEffect management tools (effect_list, effect_get, effect_update, effect_delete, effect_pause, effect_unpause, effect_run, effect_stop, effect_runs, effect_metrics) P2 ⚠️ Partial — metadata update done; operator/schedule redeploy is EE-30
EE-D effect_dispatch MCP tool — Cortex agent_dispatch inbound endpoint with payload hash verification P1 ✅ Done
EE-10 REST endpoints for DataEffect P2 ✅ Done (full CRUD + status/runs/metrics/run/stop in routes.py)
EE-11 pipeline_type: "effect" discriminator on ES list queries — effects must not appear in datapipeline_list results P2 ✅ Done (handled by subtype: "dataeffect" filter in _pipeline.read())
EE-12 cortex_task_create operator — Cortex API contract (endpoint, payload schema) not yet confirmed P2 ✅ Done
EE-13 DataEffect scheduling — expose next_dagrun from Airflow in status response P3 ✅ Done (airflow_next_run field in _get_airflow_dag_info)
EE-14 gitlab_issue_create operator implementation (per-customer token via axonis-core dagrun_conf; requires Cortex-side token propagation contract) P2 ⚠️ Partial — snippet exists at gitlab_issue_create.py.j2 but removed from catalog; blocked on Cortex-side token propagation contract
EE-15 Operator snippets: http_fetch, s3_fetch, csv_export, json_transform, chart_render, docling_extract P2 ✅ Done
EE-16 effect_schema MCP tool — list operators and get per-operator param schema P2 ✅ Done
EE-17 Persist edition_id, effect_type, target, deadline, payload_hash fields on DataEffect ES writes in effect_dispatch P2 ✅ Done
EE-18 relay_audit_emit operator — immutable audit record to relay-audit ES ledger; Schema.RELAY_AUDIT registered in conduit/schema.py; relay write recipe documented (simplicity #111) P1 ✅ Done
EE-19 sparkplug_ncmd operator — online-gated single-metric NCMD with instant-report trigger; protobuf via pysparkplug; single-metric constraint documented (Ranger firmware) P1 ✅ Done
EE-20 sparkplug_dcmd operator — online-gated single-metric DCMD with Dev1 default; optional instant-report trigger; SignalFire metric-name prefix convention documented P1 ✅ Done
EE-21 sparkplug_fast_report operator — bundled 4-metric DCMD for Fast Reporting configuration; all four metrics sent in one payload (subsystem requirement) P1 ✅ Done
EE-22 Writable-tag whitelist registry + pre-publish validation incl. confirmed DBIRTH datatypes (#REQ.writable-tag-whitelist-enforcement, #REQ.sparkplug-datatype-fidelity, Ranger Tag Guide) — conduit/relay/whitelist.py P1 ✅ Done
EE-28 sensor_config_read, sensor_config_write, sensor_config_fields MCP tools — whitelist-validated relay write staging, site_info read, pending-effect surface (#REQ.cortex-tool-surface) P2 ⚠️ Partial — read/write/fields done; sensor_write_status, sensor_write_cancel, device_rebirth are EE-31–33
EE-23 Confirm-value enforcement — fail on confirming-value mismatch; NDATA-alias confirms surface verified: false; mismatch details written to airflow-metrics via _effect_failure_callback (#REQ.confirm-value-verified) P1 ✅ Done
EE-29 _effect_failure_callback injected into every assembled effect DAG via default_args; writes one run_summary metric doc to airflow-metrics per run on any task failure; index name resolved at render time (#REQ.effect-failure-callback) P1 ✅ Done
EE-24 Attested-dispatch enforcement on device-write DAGs (#REQ.relay-attested-dispatch) — effect_run rejects unattested runs when CONDUIT_RELAY_ATTESTATION_REQUIRED=true; ships false by default until eDRX cycle validated P1 ✅ Done
EE-25 Dual-write platform-state step (platform_state_upsert operator — ES site_info partial update after confirmed DCMD; #REQ.relay-dual-write) P2 ✅ Done
EE-26 sparkplug_read operator — decoded current metric values, automated before_value (#REQ.sparkplug-read-operator) P2 ✅ Done
EE-27 Device-rebirth recipe documented in catalog (#REQ.device-rebirth-recipesparkplug_ncmd + Node Control/Rebirth, no dedicated operator) P3 ✅ Done
EE-30 effect_update operator/schedule redeploy — reassemble DAG from new operator chain or schedule, resubmit to git, reset status: pending_validation, resume CI poll; domain/effect.py:update() + effect_tools.py:effect_update() (#[component.conduit.effect-engine#mcp-tools.effect-management](../conduit-effect-engine/index.md#mcp-tools.effect-management)) P2 ✅ Done
EE-31 sensor_write_status(effect_id) MCP tool — thin status wrapper for sensor write effects (#REQ.cortex-tool-surface) P2 ✅ Done
EE-32 sensor_write_cancel(effect_id) MCP tool — delete staged sensor write effect with confirmation (#REQ.cortex-tool-surface) P2 ✅ Done
EE-33 device_rebirth(sensor_uid, connection_id, visibility) MCP tool — assemble and stage sparkplug_ncmd Node Control/Rebirth effect; requires manual trigger after CI (#REQ.cortex-tool-surface) P2 ✅ Done
EE-34 Idempotency hardening — deterministic UID from dag_id hash + op_type="create" in ES write; ConflictError handled as duplicate signal; closes ~1 s scan-refresh race window in domain/effect.py:create() P1 ✅ Done
EE-35 _auto_start_poll persistence — polling_until field persisted at create/redeploy; _resume_pending_polls() on startup resumes live tasks and marks expired-pending records as error; wired in server/__main__.py lifespan P1 ✅ Done
EE-36 run_on_deploy race — loser of _claim_run_on_deploy clears update = {} so it cannot overwrite status: running written by the claimer P1 ✅ Done
EE-37 Integration tests — effect_dispatch → ES persist → effect_status polling round-trip; full system-level journeys for #REQ.done-effects-on-attest, #REQ.done-validated-and-logged, #REQ.done-one-handler P2 ⏳ Pending — deferred until effects engine fully hooked up end-to-end

Directory Layout Changes

New files:

conduit/
└── conduit/
    ├── domain/
    │   └── effect.py                               # DataEffect domain class
    └── templates/
        └── effects/
            ├── catalog.json                        # Effect operator catalog
            ├── effect_failure_callback.py.j2       # DAG-level on_failure_callback (EE-29)
            └── operators/                          # Jinja2 operator snippets
                ├── sql_extract.py.j2
                ├── http_fetch.py.j2
                ├── s3_fetch.py.j2
                ├── pdf_convert.py.j2
                ├── csv_export.py.j2
                ├── json_transform.py.j2
                ├── s3_upload.py.j2
                ├── http_dispatch.py.j2
                ├── email_notify.py.j2
                ├── webhook_notify.py.j2
                ├── cortex_task_create.py.j2
                ├── mqtt_publish.py.j2
                ├── mqtt_await_confirmation.py.j2
                ├── relay_audit_emit.py.j2
                ├── sparkplug_ncmd.py.j2
                ├── sparkplug_dcmd.py.j2
                ├── sparkplug_fast_report.py.j2
                ├── platform_state_upsert.py.j2
                ├── sparkplug_read.py.j2
                └── gitlab_issue_create.py.j2
conduit/
└── relay/
    └── whitelist.py                                # writable-tag registry (EE-22)
server/
└── tools/
    ├── effect_tools.py                             # effect_* MCP tools
    └── sensor_tools.py                             # sensor_config_* MCP tools (EE-28)

Files modified:

File Change
conduit/templates/renderer.py Add multi-snippet assembly path alongside existing single-template render
server/api/routes.py Add DataEffect REST endpoints
server/mcp/app.py Register effect_* tools and Effect catalog resources
server/__main__.py Register conduit__http_default Airflow connection on startup
conduit/schema.py Add DataEffect schema constant; add Schema.RELAY_AUDIT registered to relay-audit index
specs/service.md Update tool count, resource count, capability list, and directory layout

Depends on: component.conduit.service, component.cortex.decision-evidence, component.cortex.task-and-effect

Realizes: product.decision-effect

Required by: platform.relay