Skip to content

DataPipeline

Definition

A DataPipeline is a declarative, deployable description of a data journey through the Axonis platform, realised as an Apache Airflow DAG. Two kinds share one mechanism: (1) ingest pipelines — pull external data through a connection into Elasticsearch (the production use conduit ships today via its provider-ingest templates); (2) journey tests — exercise a production path end to end (client → oracle/REST → service → store → back) and assert the outcome, so that "the behaviour works in production" is a runnable, repeatable artifact rather than a claim. conduit owns DataPipeline state when deployed; Airflow is the system of record; testament executes DAGs and runs their verifications. The journey-test kind is the SDD pipeline's definition of done: a task is finished when its A-to-Z journey test is green through the DAG path.

A-to-Z integration test

An A-to-Z integration test is a DataPipeline of the journey-test kind. It has four parts:

  1. The production journey — the real sequence of services the behaviour traverses in production, entered at the earliest platform wire (a client REST/MCP call), not a single repo's internal entrypoint. Example: a cortex monitor block is exercised as client → oracle → cortex MCP tool → fedai-rest → ES → response, not as a direct call to the Python handler.
  2. The DAG — an Airflow DAG that drives that journey. For REST/MCP journeys this is built from testament's call task (dags/src/call.py): one authenticated call per hop, chained, each checked for a non-error status. For federated/FATE journeys it is the existing homo/hetero operation DAG shape.
  3. The verification block — explicit assertions on the outcome, run by testament's verification mechanism (dags/src/fate.py::create_verification_function). Tabular/ES outputs use the DataVerifier methods (verify_row_count, verify_column_exists, verify_value_range, verify_no_nan, verify_unique_values, verify_values_in_set, plus op-specific verifiers). For REST/MCP journeys, the response body is asserted with verify_response_json(path, ...) — a general matcher that reads a key-path (e.g. operations.op1._truncated) out of the captured response and checks it. This is core testament infrastructure: the call task captures its response; the matcher reads it. A test with no verification block is a smoke test, not an A-to-Z test, and does not satisfy done.
  4. The done-bar — the test is rendered by conduit (from a catalog template) and run by testament through the Airflow DAG path. "Green" means the DAG run succeeded AND every verification passed. A local run that only checks status == 'success' does NOT satisfy done (see testament/CLAUDE.md contract).

Not every gap is journey-shaped. A behaviour is journey-testable only if its outcome is observable at a client wire — in the asserted response body, or in a downstream store the verification block reads. A change with no cross-service surface — a pure in-repo transform whose result never reaches a client response (e.g. an LLM-input compressor that shapes only model context) — is a unit mandate, not an A-to-Z journey; test it at the unit surface. Containment and journey-shape are opposites: the more deliberately self-contained a ticket, the less it is a journey.

The acceptance test is authored RED, against the golden state. Because a task exists to close a gap, its A-to-Z test fails at the start of the session — that red state IS the executable spec-gap. The session's job is to drive it green: implement the feature AND build whatever the test needs to even run (a missing verifier, the journey template, the conduit→testament wiring) — all in the same session ("Blockers are in-scope"). The red test is a working condition held in specs/staging/{slug}/; it is committed green, never red, and is never weakened to pass. "Done" = this test green through the DAG path, within the session.

Lifecycle

drafted → deployed(pending_validation) → validated → run(queued → running → {success | failed}).

  • drafted — DAG source rendered (conduit render) but not committed.
  • deployed(pending_validation) — committed to the conduit git pending branch via datapipeline_deploy; CI validates that the DAG imports and parses.
  • validated — CI passed; the DAG is visible to Airflow.
  • run — triggered (datapipeline_run / datapipeline_run_trigger); Airflow executes tasks. A journey-test run is success only if all tasks succeeded and all verifications passed.

A journey test is append-only with respect to a spec mandate: closing a mandate adds or updates its journey test; it is never deleted to make a build green.

Journey through the code

Authoring a journey test (the SDD done path):

  1. The implementer identifies the production journey for the task (the earliest-platform-wire sequence of hops).
  2. conduit renders a DAG from a catalog template — conduit/templates/renderer.py::render( template_name, params, dag_id, schedule), a Jinja2 render over a .py.j2 template registered in conduit/templates/catalog.json with its required_params.
  3. The rendered DAG is committed into testament/dags/ (its home, like all testament DAGs) and uses dags/src/call.py::call for each REST/MCP hop (or the FATE operation components for federated hops).
  4. testament runs the DAG (Airflow); create_verification_function executes the verification block against DataVerifier (or verify_response_json against the captured response).
  5. Result (success + verifications) is the task's done signal, read back via conduit datapipeline_run_status / datapipeline_run_errors or the testament Airflow run.

Ingest pipelines follow the same render → deploy → run path; their journey is connection → fetch → transform → ES, verified with verify_row_count / column checks against the landed index (the provider-ingest templates, today's production example).

Data shape

A DataPipeline (conduit DataPipeline domain object — see component.conduit.service): uid, dag_id, content (rendered DAG source), pipeline_type (ingest | journey_test), schedule (nullable; journey tests are None = triggered on demand), visibility, run_on_deploy.

A journey-test DAG additionally carries, by convention, a verification structure keyed by role, each a list of { method: "verify_*", args: {...} } (the shape testament's create_verification_function dispatches — full verify_* method name + args, e.g. {"method": "verify_no_nan", "args": {"columns": ["test.x0"]}}; REST journeys use {"method": "verify_response_json", "args": {"path": "operations.op1._truncated", "equals": true}}). Catalog template entry: { file, description, required_params{}, optional_params{} }.

Invariants

  1. A journey test proves production, not internals. It enters at the earliest platform wire (client REST/MCP), never a single repo's internal function.
  2. No verification = not done. A DAG that runs green but asserts nothing does not satisfy a spec mandate.
  3. Done is the DAG path. Only an Airflow DAG run executes verifications; a local status=='success' check is necessary-not-sufficient.
  4. Every journey is autonomously runnable — there is effectively no waiver. All credentials live in dev env files and every federate is always live in k8s, so an agent can author AND run any journey unattended (launching developers-environment/helpful-scripts/port_forward.py if a port-forward is missing). The one exclusion is GPU training, which is forbidden outright — the dev system has no GPU capability, so a journey requiring it is not waived, it is not written. No human-only-creds or federation-spin-up waiver exists.
  5. Tests are append-only against mandates. Never delete a journey test to pass CI.
  6. State-writing tests tear down what they write. A journey that ingests data, creates an object, or deploys a pipeline MUST clean up at the end (testament's call task delete path) so it is repeatable — runnable again tomorrow without colliding with its own prior state. Read-only journeys need no teardown. The federate itself is always live and is never started/stopped by a test.
  7. Single-session atomicity. A task plus everything its red acceptance test drags in (journey template, wiring, blocker fixes) is completed in one session and the test driven green before commit. The red test is a staging condition, never a committed state; it commits green or not at all.

Open questions

  • Response-matcher vocabulary. verify_response_json ships with equals / contains / exists; whether it needs more operators (regex, numeric compare, list-length) is decided per first real need.
  • Verification vocabulary for REST journeys more broadly: are key-path assertions on the response enough, or do some journeys need to assert on landed ES state after the call (combining verify_response_json with a follow-up ES query task)?

Realized by: component.conduit.service