DataPipeline
Definition
A DataPipeline is a declarative, deployable description of a data journey through the Axonis platform, realised as an Apache Airflow DAG. Two kinds share one mechanism: (1) ingest pipelines — pull external data through a connection into Elasticsearch (the production use conduit ships today via its provider-ingest templates); (2) journey tests — exercise a production path end to end (client → oracle/REST → service → store → back) and assert the outcome, so that "the behaviour works in production" is a runnable, repeatable artifact rather than a claim. conduit owns DataPipeline state when deployed; Airflow is the system of record; testament executes DAGs and runs their verifications. The journey-test kind is the SDD pipeline's definition of done: a task is finished when its A-to-Z journey test is green through the DAG path.
A-to-Z integration test
An A-to-Z integration test is a DataPipeline of the journey-test kind. It has four parts:
- The production journey — the real sequence of services the behaviour traverses in
production, entered at the earliest platform wire (a client REST/MCP call), not a single
repo's internal entrypoint. Example: a cortex monitor block is exercised as
client → oracle → cortex MCP tool → fedai-rest → ES → response, not as a direct call to the Python handler. - The DAG — an Airflow DAG that drives that journey. For REST/MCP journeys this is built from
testament's
calltask (dags/src/call.py): one authenticated call per hop, chained, each checked for a non-error status. For federated/FATE journeys it is the existing homo/hetero operation DAG shape. - The verification block — explicit assertions on the outcome, run by testament's verification
mechanism (
dags/src/fate.py::create_verification_function). Tabular/ES outputs use theDataVerifiermethods (verify_row_count,verify_column_exists,verify_value_range,verify_no_nan,verify_unique_values,verify_values_in_set, plus op-specific verifiers). For REST/MCP journeys, the response body is asserted withverify_response_json(path, ...)— a general matcher that reads a key-path (e.g.operations.op1._truncated) out of the captured response and checks it. This is core testament infrastructure: thecalltask captures its response; the matcher reads it. A test with no verification block is a smoke test, not an A-to-Z test, and does not satisfy done. - The done-bar — the test is rendered by conduit (from a catalog template) and run by
testament through the Airflow DAG path. "Green" means the DAG run succeeded AND every
verification passed. A local run that only checks
status == 'success'does NOT satisfy done (see testament/CLAUDE.md contract).
Not every gap is journey-shaped. A behaviour is journey-testable only if its outcome is observable at a client wire — in the asserted response body, or in a downstream store the verification block reads. A change with no cross-service surface — a pure in-repo transform whose result never reaches a client response (e.g. an LLM-input compressor that shapes only model context) — is a unit mandate, not an A-to-Z journey; test it at the unit surface. Containment and journey-shape are opposites: the more deliberately self-contained a ticket, the less it is a journey.
The acceptance test is authored RED, against the golden state. Because a task exists to close a
gap, its A-to-Z test fails at the start of the session — that red state IS the executable
spec-gap. The session's job is to drive it green: implement the feature AND build whatever the test
needs to even run (a missing verifier, the journey template, the conduit→testament wiring) — all in
the same session ("Blockers are in-scope"). The red test is a working condition held in
specs/staging/{slug}/; it is committed green, never red, and is never weakened to pass.
"Done" = this test green through the DAG path, within the session.
Lifecycle
drafted → deployed(pending_validation) → validated → run(queued → running → {success | failed}).
- drafted — DAG source rendered (conduit
render) but not committed. - deployed(pending_validation) — committed to the conduit git pending branch via
datapipeline_deploy; CI validates that the DAG imports and parses. - validated — CI passed; the DAG is visible to Airflow.
- run — triggered (
datapipeline_run/datapipeline_run_trigger); Airflow executes tasks. A journey-test run is success only if all tasks succeeded and all verifications passed.
A journey test is append-only with respect to a spec mandate: closing a mandate adds or updates its journey test; it is never deleted to make a build green.
Journey through the code
Authoring a journey test (the SDD done path):
- The implementer identifies the production journey for the task (the earliest-platform-wire sequence of hops).
- conduit renders a DAG from a catalog template —
conduit/templates/renderer.py::render( template_name, params, dag_id, schedule), a Jinja2 render over a.py.j2template registered inconduit/templates/catalog.jsonwith itsrequired_params. - The rendered DAG is committed into
testament/dags/(its home, like all testament DAGs) and usesdags/src/call.py::callfor each REST/MCP hop (or the FATE operation components for federated hops). - testament runs the DAG (Airflow);
create_verification_functionexecutes theverificationblock againstDataVerifier(orverify_response_jsonagainst the captured response). - Result (success + verifications) is the task's done signal, read back via conduit
datapipeline_run_status/datapipeline_run_errorsor the testament Airflow run.
Ingest pipelines follow the same render → deploy → run path; their journey is
connection → fetch → transform → ES, verified with verify_row_count / column checks against the
landed index (the provider-ingest templates, today's production example).
Data shape
A DataPipeline (conduit DataPipeline domain object — see component.conduit.service): uid, dag_id,
content (rendered DAG source), pipeline_type (ingest | journey_test), schedule (nullable;
journey tests are None = triggered on demand), visibility, run_on_deploy.
A journey-test DAG additionally carries, by convention, a verification structure keyed by role,
each a list of { method: "verify_*", args: {...} } (the shape testament's
create_verification_function dispatches — full verify_* method name + args, e.g.
{"method": "verify_no_nan", "args": {"columns": ["test.x0"]}}; REST journeys use
{"method": "verify_response_json", "args": {"path": "operations.op1._truncated", "equals": true}}).
Catalog template entry: { file, description, required_params{}, optional_params{} }.
Invariants
- A journey test proves production, not internals. It enters at the earliest platform wire (client REST/MCP), never a single repo's internal function.
- No verification = not done. A DAG that runs green but asserts nothing does not satisfy a spec mandate.
- Done is the DAG path. Only an Airflow DAG run executes verifications; a local
status=='success'check is necessary-not-sufficient. - Every journey is autonomously runnable — there is effectively no waiver. All credentials
live in dev env files and every federate is always live in k8s, so an agent can author AND run
any journey unattended (launching
developers-environment/helpful-scripts/port_forward.pyif a port-forward is missing). The one exclusion is GPU training, which is forbidden outright — the dev system has no GPU capability, so a journey requiring it is not waived, it is not written. No human-only-creds or federation-spin-up waiver exists. - Tests are append-only against mandates. Never delete a journey test to pass CI.
- State-writing tests tear down what they write. A journey that ingests data, creates an
object, or deploys a pipeline MUST clean up at the end (testament's
calltaskdeletepath) so it is repeatable — runnable again tomorrow without colliding with its own prior state. Read-only journeys need no teardown. The federate itself is always live and is never started/stopped by a test. - Single-session atomicity. A task plus everything its red acceptance test drags in (journey template, wiring, blocker fixes) is completed in one session and the test driven green before commit. The red test is a staging condition, never a committed state; it commits green or not at all.
Related products
component.conduit.service— the service that renders + deploys these DAGs.- testament (repo-local CLAUDE.md) — executes the DAGs and runs verifications.
product.dataset/product.trained-model— ingest pipelines produce the datasets these depend on.component.oracle.gateway— the MCP entry hop for agent-driven journeys.
Open questions
- Response-matcher vocabulary.
verify_response_jsonships withequals/contains/exists; whether it needs more operators (regex, numeric compare, list-length) is decided per first real need. - Verification vocabulary for REST journeys more broadly: are key-path assertions on the response
enough, or do some journeys need to assert on landed ES state after the call (combining
verify_response_jsonwith a follow-up ES query task)?
Realized by: component.conduit.service