Conduit — Data Ingestion Orchestration Service
Milestone: P3 (optional — deployed when Airflow integration is required)
Purpose
Conduit is a standalone MCP service that wraps Apache Airflow, exposing DAG orchestration and connection management as first-class MCP tools and REST endpoints. It replaces the embedded AirflowRestClient in fedai-rest with a purpose-built service that can be optionally deployed, registered with Oracle, and driven by LLM agents.
Conduit owns the DataPipeline and DataPipelineConnection domain objects when deployed. While Conduit is down or not deployed, fedai-rest retains its Airflow integration for backwards compatibility.
Service Identity
| Field | Value |
|---|---|
name |
conduit |
version |
0.1.0 |
port |
8008 |
mcp_path |
/agentspace |
health_path |
/health |
api_path |
/userspace |
tools_count |
44 |
resources_count |
5 |
capabilities |
dag-management, connection-management, airflow-orchestration, data-ingestion, pipeline-management, pipeline-scheduling, effect-engine |
Domain Objects
Conduit uses the existing axonis-core userspace objects. It does not define new schema constants.
| Object | Schema constant | ES index | subtype discriminator |
|---|---|---|---|
| DataPipeline | Schema.DATA_PIPELINE |
data-ingest |
"datapipeline" |
| DataPipelineConnection | Schema.DATA_PIPELINE_CONNECTION |
data-ingest |
"datapipelineconnection" |
| DataEffect | Schema.DATA_EFFECT |
data-ingest |
"dataeffect" |
All three objects share the same ES index, discriminated by the subtype field. Read queries for one type exclude the others — _pipeline.read() filters by subtype: "datapipeline" so DataEffect records never appear in pipeline lists, and vice versa.
Conduit treats DataPipeline and DataPipelineConnection as a read/write cache of Airflow state. DataEffect is Conduit's internal execution record for assembled multi-step effect DAGs dispatched from Cortex or created interactively.
DataEffect fields
| Field | Writable | Semantics |
|---|---|---|
dag_id |
user/cortex | Airflow DAG identifier; namespaced cortex__{eff_id} for Cortex-dispatched effects |
name |
user/cortex | Human-readable label |
uds.visibility |
user/cortex | ABAC visibility marking |
schedule |
user/cortex | Cron expression for recurring runs; null for one-shot |
run_on_deploy |
user/cortex | Trigger an initial run automatically after CI passes |
operators |
user/cortex | Ordered list of {name, params} operator configs used to assemble the DAG |
connections |
system | Deduplicated list of connection_id values from the operators list |
status |
system | pending_validation → active / rejected / error; also paused, running |
correlation_id |
cortex | Cortex eff_ ID — opaque upstream link |
edition_id |
cortex | Attested Edition that triggered the effect (Cortex dispatch path only) |
effect_type |
cortex | DES effect type, e.g. external_dispatch (Cortex dispatch path only) |
target |
cortex | Implementation-defined target descriptor from the decision template |
payload_hash |
cortex | SHA-256 hex digest of the canonical dispatch payload; verified before assembly |
deadline |
cortex | ISO 8601 deadline; recorded but not enforced by Conduit |
delete_on_success |
user/cortex | Auto-suggest cleanup after a successful one-shot run (incompatible with schedule) |
created_by |
system | Username from auth context at create time; "cortex" for Cortex-dispatched effects |
create_ts |
system | Creation timestamp (ISO 8601 UTC) |
airflow_next_run |
live | Next scheduled run timestamp from Airflow (next_dagrun); returned on status/read, not stored in ES |
DataPipeline scheduling fields
The DataPipeline userspace object carries three scheduling fields, populated by the scheduling subsystem (§ Pipeline Scheduling):
| Field | Writable | Semantics |
|---|---|---|
schedule |
user | A cron expression (or named interval) describing how often the pipeline should be re-run. Distinct from the DAG's own Airflow schedule_interval, which controls the DAG, not a run. Editable via datapipeline_update / PATCH /userspace/datapipeline/{uid}. |
next_scheduled_run |
system | The next wall-clock time a scheduled run will fire. Computed from schedule at create time, at each scheduled runtime, and at update time whenever schedule changes. |
last_scheduled_run |
system | The wall-clock time the most recent scheduled run was triggered. |
- #REQ.schedule-vs-dag-interval —
schedulecontrols runs, never the DAG's own scheduling state. It is implemented by the scheduler DAG (§ Pipeline Scheduling), never by setting the managed pipeline's Airflowschedule_interval. - #REQ.next-run-recompute —
next_scheduled_runis recomputed at create, at each scheduled trigger, and on any update that changesschedule. Ascheduleof null/empty clearsnext_scheduled_runand disables scheduling for that pipeline.
MCP Tools (30)
DataPipeline Management (12)
| Tool | Description |
|---|---|
datapipeline_deploy(dag_id, content, visibility, schedule, pipeline_type, run_on_deploy) |
Commit a Python DAG to the git repo pending branch; returns status: pending_validation. If run_on_deploy=true, triggers an initial run automatically after CI passes. |
datapipeline_list() |
List Conduit-managed DataPipeline records with live Airflow run state |
datapipeline_get(uid) |
Get a single pipeline record including live Airflow state |
datapipeline_update(uid, content, schedule, action, params) |
Redeploy with updated content or schedule; action: "pause"\|"unpause" controls DAG scheduling state |
datapipeline_delete(uid) |
Remove pipeline from git and ES |
datapipeline_status(uid) |
Poll GitLab CI for pending pipelines; return live Airflow state for active ones |
datapipeline_pause(uid) |
Pause the Airflow DAG |
datapipeline_unpause(uid) |
Resume a paused pipeline |
datapipeline_run(uid, conf) |
Trigger a new DAG run for a pipeline |
datapipeline_stop(uid) |
Stop the active run for a pipeline |
datapipeline_runs(uid, state, limit) |
List run history for a pipeline |
datapipeline_metrics(uid, since, limit) |
Get run metrics and statistics from the airflow-metrics ES index |
DataPipeline Run Tools (6)
| Tool | Description |
|---|---|
datapipeline_run_trigger(dag_id, conf, logical_date) |
Trigger a new DAG run by dag_id |
datapipeline_run_list(dag_id, state, limit) |
List runs for a DAG |
datapipeline_run_get(dag_id, dag_run_id) |
Get status and metadata for a specific run |
datapipeline_run_delete(dag_id, dag_run_id) |
Cancel and delete a DAG run |
datapipeline_run_status(dag_id, dag_run_id) |
Get task-level status breakdown for a run |
datapipeline_run_errors(dag_id, dag_run_id) |
Get failed task logs for a run |
Connection Management (7)
| Tool | Description |
|---|---|
connection_schema(conn_type) |
Return the JSON schema for a connection type from local templates (e.g. postgres, s3, mongo) |
connection_list() |
List all DataPipelineConnection records |
connection_get(uid) |
Get a single connection record |
connection_create(...) |
Create a new connection record with Airflow registration and live connectivity test |
connection_update(uid, ...) |
Update an existing connection record |
connection_delete(uid) |
Delete a connection record |
connection_test(uid) |
Run Airflow's test_connection against a stored connection |
Connection records are 1:1 with pipelines: a pipeline reads from exactly one connection. The set of connection types connection_schema / connection_create accept is defined in § Supported Connection Types.
Scheduling (3)
| Tool | Description |
|---|---|
datapipeline_schedule_set(uid, schedule) |
Set or update the re-run schedule (cron/interval) for a pipeline; recomputes next_scheduled_run. A null/empty schedule disables scheduling. |
datapipeline_schedule_get(uid) |
Return schedule, next_scheduled_run, last_scheduled_run for a pipeline |
datapipeline_schedule_list(due_only) |
List pipelines with a schedule and their next/last run times; due_only=true filters to pipelines currently due |
DataEffect Management (14)
| Tool | Description |
|---|---|
effect_create(name, visibility, operators, schedule, run_on_deploy, connections) |
Assemble a multi-step effect DAG from operator configs; commit to git pending branch; return {uid, dag_id, status: pending_validation}. Pre-flight checks missing connections and uninstalled packages (warnings, non-blocking). |
effect_list(limit) |
List all DataEffect records |
effect_get(uid) |
Get a single DataEffect record including live Airflow state |
effect_status(uid) |
Poll CI if pending; return live Airflow state for active effects including airflow_next_run |
effect_update(uid, name, description, visibility, action) |
Update metadata or pause/unpause via action: "pause"\|"unpause" |
effect_delete(uid, confirm) |
Remove DAG from git and delete ES record; requires confirm: true |
effect_pause(uid) |
Pause the Airflow DAG for an effect |
effect_unpause(uid) |
Resume a paused effect DAG |
effect_run(uid, conf) |
Trigger a new DAG run for an effect |
effect_stop(uid) |
Stop the active run for an effect |
effect_runs(uid, state, limit) |
List run history for an effect |
effect_metrics(uid, since, limit) |
Get run metrics from the airflow-metrics ES index |
effect_schema(operator_name) |
Without argument: list all operators from the effect catalog. With argument: return full compose/runtime param schema for one operator. |
effect_dispatch(effect_id, edition_id, effect_type, target, payload, payload_hash, ...) |
Inbound endpoint called by Cortex's agent_dispatch handler. Verifies SHA-256 payload_hash, assembles and deploys the DAG, returns {acknowledged, external_reference, dag_id, status: pending_validation}. |
MCP Resources (5)
| URI | Description |
|---|---|
catalog://operators |
Full ingest operator catalog — universal params, ingest patterns, capabilities, DAG structure conventions |
templates://dags/{dag_type} |
Annotated reference DAG for a specific ingest pattern (e.g. postgres_ingest) |
catalog://effects |
Effect operator catalog — operator names, categories, descriptions, compose_params, runtime_params, requires_packages |
templates://effects/{operator} |
Raw Jinja2 snippet for a single effect operator; for inspection of task signatures, imports, and return shapes |
capabilities://airflow |
Python packages available on Airflow workers, sourced from atlas-airflow requirements files at startup |
Operational (2)
| Tool | Airflow API | Description |
|---|---|---|
airflow_health() |
GET /api/v1/health |
Check Airflow scheduler and metadata DB health |
dag_list(active_only) |
GET /api/v1/dags |
List Airflow DAGs visible to the caller; active_only=true filters to unpaused DAGs |
REST Endpoints (44)
All endpoints are mounted at both /userspace (canonical, fedai-rest compatible) and /api/v1 (legacy alias). Both prefixes route to the same handlers. MCP tools and REST routes call the same underlying domain layer.
DAG Proxy (8)
| Method | Path | Description |
|---|---|---|
| GET | /userspace/dag |
List Airflow DAGs visible to the caller |
| POST | /userspace/dag/{dag_id}/run |
Trigger a DAG run |
| GET | /userspace/dag/{dag_id}/run |
List runs for a DAG |
| GET | /userspace/dag/{dag_id}/run/{run_id} |
Get a specific run |
| GET | /userspace/dag/{dag_id}/run/{run_id}/status |
Task-level status breakdown |
| GET | /userspace/dag/{dag_id}/run/{run_id}/errors |
Failed task logs |
| DELETE | /userspace/dag/{dag_id}/run/{run_id} |
Delete a run |
| GET | /userspace/airflow/health |
Airflow health |
DataPipeline (13)
| Method | Path | Description |
|---|---|---|
| GET | /userspace/datapipeline |
List DataPipeline records. ?statistics=true inlines metrics from the airflow-metrics ES index. |
| POST | /userspace/datapipeline |
Create/deploy a pipeline |
| GET | /userspace/datapipeline/{uid} |
Get a pipeline record. ?statistics=true inlines metrics. |
| PATCH | /userspace/datapipeline/{uid} |
Update pipeline; action: "pause"\|"unpause" supported |
| DELETE | /userspace/datapipeline/{uid} |
Delete a pipeline |
| GET | /userspace/status/datapipeline |
Bulk status for all pipelines |
| GET | /userspace/datapipeline/{uid}/status |
Status for a single pipeline |
| GET | /userspace/datapipeline/schema |
List available DAG templates |
| GET | /userspace/datapipeline/schema/{template_name} |
Get a template schema |
| GET | /userspace/datapipeline/{uid}/runs |
List run history |
| POST | /userspace/datapipeline/{uid}/run |
Trigger a run |
| POST | /userspace/datapipeline/{uid}/stop |
Stop the active run |
| GET | /userspace/datapipeline/{uid}/metrics |
Get run metrics from airflow-metrics index |
DataPipeline Scheduling (3)
| Method | Path | Description |
|---|---|---|
| GET | /userspace/datapipeline/schedule |
List pipelines with a schedule; ?due_only=true filters to those currently due |
| GET | /userspace/datapipeline/{uid}/schedule |
Get schedule, next_scheduled_run, last_scheduled_run for a pipeline |
| PUT | /userspace/datapipeline/{uid}/schedule |
Set/update the re-run schedule; recomputes next_scheduled_run. Empty body/null schedule disables scheduling. |
The re-run schedule is also editable inline via PATCH /userspace/datapipeline/{uid} (the schedule field), matching datapipeline_update.
DataPipelineConnection (7)
| Method | Path | Description |
|---|---|---|
| GET | /userspace/connection |
List DataPipelineConnection records |
| POST | /userspace/connection |
Create a connection record |
| GET | /userspace/connection/{uid} |
Get a connection record |
| PATCH | /userspace/connection/{uid} |
Update a connection record |
| DELETE | /userspace/connection/{uid} |
Delete a connection record |
| GET | /userspace/status/connection |
Bulk status for all connections |
| GET | /userspace/connection/{uid}/status |
Test connectivity for a connection |
The /userspace/status/{target} bulk status path matches the fedai-rest convention. Valid targets are datapipeline, connection, and effect.
DataEffect (13)
| Method | Path | Description |
|---|---|---|
| GET | /userspace/effect/schema |
List all effect operators from the catalog |
| GET | /userspace/effect/schema/{operator_name} |
Get compose/runtime param schema for one operator |
| GET | /userspace/effect |
List DataEffect records |
| POST | /userspace/effect |
Create an effect — same validation as effect_create MCP tool |
| GET | /userspace/effect/{uid} |
Get a single DataEffect record including live Airflow state |
| PATCH | /userspace/effect/{uid} |
Update metadata or pause/unpause (action: "pause"\|"unpause") |
| DELETE | /userspace/effect/{uid} |
Delete an effect (removes DAG from git and ES) |
| GET | /userspace/status/effect |
Bulk status for all effects |
| GET | /userspace/effect/{uid}/status |
Status for a single effect; polls CI if pending |
| GET | /userspace/effect/{uid}/runs |
List run history |
| POST | /userspace/effect/{uid}/run |
Trigger a DAG run |
| POST | /userspace/effect/{uid}/stop |
Stop the active run |
| GET | /userspace/effect/{uid}/metrics |
Get run metrics from airflow-metrics index |
Pipeline Scheduling
Users can schedule a pipeline to be re-run on a recurring interval. Scheduling is owned by Conduit's ingestion-orchestration layer; it is distinct from a DAG's own Airflow schedule_interval (which controls the DAG itself, not a run).
Scheduler DAG
- #REQ.scheduler-is-a-dag — scheduling is implemented as a single scheduler DAG, not by setting each managed pipeline's Airflow
schedule_interval. The scheduler DAG wakes on a fixed tick, finds pipelines whosenext_scheduled_runis due, and triggers a run of each via the sametrigger_dag_run()path a manual run uses. - #REQ.schedule-granularity — the most frequent supported
scheduleinterval is 30 minutes. The scheduler tick is sized accordingly. - #REQ.sequential-scheduled-runs — when multiple scheduled runs are found due in the same tick, they are triggered sequentially (no concurrent fan-out) for now.
- After triggering, the scheduler updates
last_scheduled_runto the trigger time and recomputesnext_scheduled_runfromschedule.
Scheduler index
A dedicated scheduler ES index tracks scheduling state independently of the airflow-metrics index.
- #REQ.scheduler-index-keyed-by-pipeline — the scheduler index is keyed by pipeline ID (the DataPipeline
uid), giving one record per pipeline and making duplicate scheduled runs impossible. - #REQ.scheduler-crash-safety — each record tracks status so the scheduler can reason about crash scenarios and long-running jobs: a pipeline whose previous scheduled run is still in-flight (or whose status was left mid-flight by a crash) is not double-triggered. This dedup is the reason the index is keyed by pipeline ID rather than per-run.
- Scheduled-run metrics are emitted to the airflow-metrics index as usual (same path as manual runs); the scheduler index holds only scheduling/dedup state, not run metrics.
Supported Connection Types
Connections (1:1 with pipelines) are registered with Airflow and exposed via connection_schema / connection_create. Each type ships a JSON schema template under conduit/templates/connections/<type>.json and a corresponding Airflow hook/operator wiring.
Available types
| Type | Category |
|---|---|
postgres |
Relational DB |
mysql |
Relational DB |
sqlite |
Relational DB |
mssql (SQL Server) |
Relational DB |
oracle |
Relational DB |
snowflake |
Cloud data warehouse |
bigquery (Google BigQuery) |
Cloud data warehouse |
databricks |
Lakehouse |
mongo |
Document DB |
cassandra |
Wide-column DB |
couchbase |
Document DB |
redis |
Key-value store |
elasticsearch |
Search index (incl. scenario data index) |
azure_blob (Azure Blob Storage) |
Object storage (file-based) |
aws_s3 (AWS Cloud Storage) |
Object storage (file-based) |
gcs (Google Cloud Storage) |
Object storage (file-based) |
sharepoint |
Document store (file-based) |
kafka (Apache Kafka) |
Streaming |
rabbitmq |
Messaging (topic subscription) |
spark (Apache Spark) |
Compute engine (tentative) |
parquet |
File-based dataset |
- #REQ.connection-type-template — every supported connection type has a JSON schema template under
conduit/templates/connections/;connection_schema(conn_type)returns it, andconnection_createvalidates against it before Airflow registration + connectivity test. - #REQ.connection-pipeline-1to1 — connections and pipelines are 1:1; a pipeline reads from exactly one connection. Adding a connection type implies the matching pipeline ingest path (operator/hook) is wired, not the connection record alone.
Ingest Filetypes
Data pipelines ingest a defined set of file/data formats. This section covers the pipeline / ingest-orchestration aspect — which formats a pipeline's ingest path can read and the DAG-template/operator wiring that handles them. The downstream uds-adapter aspect (how a read payload becomes a UDS-mapped dataset) is owned by fedai-rest and specified separately.
Supported filetypes
| Filetype | Category |
|---|---|
csv |
Tabular |
xls / xlsx |
Tabular (spreadsheet) |
parquet |
Tabular (columnar) |
json |
Structured |
avro |
Structured (row, schema-bearing) |
xml |
Structured |
pdf |
Document |
raster |
Geospatial |
geotiff |
Geospatial (raster) |
vector |
Geospatial |
geojson |
Geospatial (vector) |
image types beyond jpg / png |
Imagery |
- #REQ.ingest-filetype-orchestration — for each supported filetype, the pipeline ingest path (DAG template + operator) is responsible for reading and orchestrating the format into the pipeline; conversion into a UDS-mapped dataset is the fedai-rest uds-adapter's responsibility, not conduit's.
- #REQ.geospatial-ingest — geospatial filetypes (
raster,geotiff,vector,geojson) are ingestible by pipelines; their geospatial processing semantics are owned downstream (geodex / uds-adapters), but the ingest-orchestration wiring lives here.
Directory Layout
Follows platform.service-contract service anatomy:
conduit/
├── conduit/ # Domain library package
│ ├── __init__.py
│ ├── config.py # pydantic-settings Config, env var bindings
│ ├── airflow/
│ │ ├── __init__.py
│ │ └── client.py # Async Airflow REST client (apache-airflow-client)
│ ├── domain/
│ │ ├── __init__.py
│ │ ├── pipeline.py # DataPipeline domain class + singleton
│ │ ├── pipeline_connection.py# DataPipelineConnection domain class + singleton
│ │ ├── pipeline_metrics.py # Read-only access to airflow-metrics ES index
│ │ ├── pipeline_scheduler.py # Schedule state + next/last run calc; scheduler ES index access
│ │ └── effect.py # DataEffect domain class + singleton; CI poll, auto-start, Airflow sync
│ ├── git/
│ │ ├── __init__.py
│ │ └── client.py # GitLab file API client for DAG repo commits
│ ├── models/
│ │ ├── __init__.py
│ │ └── conduit.py # Pydantic request/response models (auto-generated)
│ ├── capabilities.py # Worker package capability cache; refresh at startup from atlas-airflow requirements
│ └── templates/
│ ├── catalog.json # Ingest operator catalog served via catalog://operators
│ ├── connections/ # Per-type JSON schema files (postgres.json, mongo.json, databricks.json, kafka.json, sharepoint.json, rabbitmq.json, …)
│ ├── dags/ # Jinja2 DAG templates per pipeline_type (incl. scheduler.py.j2 — the scheduler DAG)
│ └── effects/
│ ├── catalog.json # Effect operator catalog served via catalog://effects
│ └── operators/ # Jinja2 snippets per operator (sql_extract, http_dispatch, mqtt_publish, mqtt_await_confirmation, …)
├── server/ # Thin web wrapper
│ ├── __init__.py
│ ├── __main__.py # Starlette app, lifespan, uvicorn entry
│ ├── api/
│ │ ├── __init__.py
│ │ ├── routes.py # FastAPI REST endpoints
│ │ └── schema/
│ │ └── conduit.yaml # OpenAPI schema components
│ ├── mcp/
│ │ ├── __init__.py
│ │ ├── app.py # FastMCP app, tool registration
│ │ └── resources.py # MCP resources: catalog://operators, templates://dags/{dag_type}
│ ├── oracle/
│ │ ├── __init__.py
│ │ ├── memory.py # fire-and-forget memory store via axonis-core Service
│ │ └── registration.py # OracleRegistration — heartbeat self-registration
│ └── tools/
│ ├── __init__.py
│ ├── datapipeline_tools.py # datapipeline_* tools
│ ├── datapipeline_run_tools.py # datapipeline_run_* tools
│ ├── datapipeline_schedule_tools.py # datapipeline_schedule_* tools
│ ├── connection_tools.py # connection_* tools
│ ├── effect_tools.py # effect_* tools (create, list, get, status, update, delete, pause, unpause, run, stop, runs, metrics, schema)
│ ├── effect_dispatch_tools.py # effect_dispatch — inbound Cortex agent_dispatch endpoint
│ └── operational_tools.py # airflow_health, dag_list
├── tests/
├── charts/conduit/ # Helm chart
├── .gitlab-ci.yml
└── pyproject.toml
Journey-test authoring (conduit → testament)
Beyond ingest, conduit renders journey-test DAGs — the artifact that makes an A-to-Z integration test (product.data-pipeline § A-to-Z integration test) a deployable, runnable object.
Mechanism (wires existing pieces; no new infra):
- A
journey_testtemplate is registered inconduit/templates/catalog.jsonalongside the provider-ingest templates, withrequired_params(e.g.dag_id,hops,verifications). conduit/templates/renderer.py::render("journey_test", params, dag_id, schedule=None)renders the Jinja2 templateconduit/templates/dags/journey_test.py.j2into a DAG that uses testament'sdags/src/call.py::calltask for each REST/MCP hop and attaches averificationblock per asserting task. (Renderer is Jinja2 over.py.j2; catalog section isingest_patterns— the new entry sits alongside the 11 provider-ingest templates.)- The rendered DAG is committed into
testament/dags/like every other testament DAG — that is its home; conduit is the authoring/rendering surface, not the storage location. testament's Airflow runs it from there. - Results are read via
datapipeline_run_status/datapipeline_run_errors(or directly from the testament Airflow run).
Why conduit owns rendering: conduit is the platform's DAG authoring surface (it already owns
render + datapipeline_deploy). testament owns execution + verification. The SDD pipeline's
done-bar therefore runs: spec mandate → conduit render(journey_test) → testament DAG run +
verifications → done.
Gap this closes: today conduit has zero knowledge of testament (the ingest_patterns catalog
has only provider-ingest templates, and grep testament in the conduit repo returns nothing). This
section is the contract for the missing journey_test template and the conduit→testament handoff.
Response-body verification (built with this contract). REST/MCP journeys assert on a JSON
response body, but every existing DataVerifier method asserts on tabular/ES output, and the
call task currently discards its response. The contract therefore builds, as core testament
infrastructure: (a) response capture in the call task, and (b) a general
verify_response_json(path, ...) matcher that reads a key-path out of the captured response. This
is the standard assertion mechanism for all REST/MCP journeys — not deferred to a later ticket.
Authentication
Inbound (client → Conduit)
Conduit uses AxonisMiddleware from axonis.middleware — Bearer token validation against the Keycloak JWKS endpoint. Every endpoint requires a valid Bearer token via Depends(require_auth).
Unauthenticated paths: /health, /service-info
Outbound (Conduit → Airflow)
Conduit authenticates to Airflow via Keycloak token exchange — not Basic Auth. The caller's Bearer token is exchanged for a user-level Airflow token using axonis.auth.async_token_exchange.exchange_token(). The resulting token is set via api_client.set_default_header("Authorization", f"Bearer {token}") on each call.
The service account (CC grant) is used only for Oracle registration. It must never be used for Airflow or Elasticsearch operations — those must always use the caller's user token.
Outbound (Conduit → Elasticsearch)
ES ABAC is enforced by the axonis middleware on every request. No manual Authenticator priming is performed — all ES operations run in a user request context where the middleware has already set the auth state.
Auto-Start (run_on_deploy)
When a pipeline is created with run_on_deploy: true, Conduit automatically triggers an initial Airflow DAG run after CI validation passes.
Implementation: asyncio.create_task() is called within the deploy request handler. The task inherits the caller's contextvars.Context snapshot (including the access token), so get_auth_context() remains valid without any token storage. The task polls deploy_check() every 10 seconds for up to 200 seconds, then retries trigger_dag_run() up to 24 times with 15-second gaps (6 minutes total) to allow Airflow's DAG directory scan interval to complete.
Token rule: Tokens are held in-memory only for the lifetime of the async task. They are never persisted to ES, disk, or any cache.
Oracle Registration
Conduit self-registers with Oracle using axonis.gateway.oracle.OracleRegistration. server/oracle/registration.py instantiates it with conduit-specific config and exports the singleton.
from server.oracle.registration import oracle_registration
# In server/__main__.py lifespan:
await oracle_registration.start() # registers immediately, then heartbeats at TTL/2
# ...on shutdown:
await oracle_registration.stop()
Registration uses the client_credentials grant via SSO_TOKEN_URL, SSO_CLIENT_ID, and SSO_CLIENT_SECRET. Heartbeat fires at ORACLE_TTL_SECONDS / 2. All failures are swallowed.
Environment Variables
| Variable | Default | Required | Purpose |
|---|---|---|---|
AIRFLOW_BASE_URL |
— | Yes | Airflow REST API base URL |
AIRFLOW_API_VERSION |
v1 |
No | Airflow REST API version path segment |
AIRFLOW_VERIFY |
true |
No | TLS verification for Airflow: true (default), false (skip — dev only), or a path to a CA bundle |
AIRFLOW_HOOK_CHECK_TIMEOUT |
5.0 |
No | Seconds to wait for Airflow hook validation during connection creation |
CONDUIT_SCHEDULER_TICK_MINUTES |
30 |
No | Scheduler DAG tick interval; also the most-frequent supported re-run schedule |
CONDUIT_SCHEDULER_INDEX |
conduit-scheduler |
No | ES index (keyed by pipeline ID) tracking scheduled-run status for crash safety / dedup |
CONDUIT_HOST |
0.0.0.0 |
No | Bind host |
CONDUIT_PORT |
8008 |
No | Bind port |
CONDUIT_WORKERS |
1 |
No | uvicorn worker count |
CONDUIT_LOG_LEVEL |
info |
No | Log level |
CONDUIT_DOMAIN |
localhost |
No | Public DNS name; used in MCP DNS-rebinding allowed-hosts list |
MCP_EXTRA_HOSTS |
— | No | Comma-separated additional hostnames for the MCP DNS-rebinding allowlist (e.g. data-ingest for the k8s service name so in-cluster callers are accepted alongside the public domain) |
CONDUIT_PUBLIC_URL |
— | No | Full public URL Oracle uses to reach Conduit |
ORACLE_URL |
http://localhost:8001 |
No | Oracle base URL for self-registration |
ORACLE_TTL_SECONDS |
300 |
No | Registration TTL; heartbeat fires at TTL/2 |
SSO_WELLKNOWN |
https://sso.axonis.ai/realms/T2S/.well-known/openid-configuration |
No | OIDC well-known endpoint |
SSO_ISSUER |
https://sso.axonis.ai/realms/T2S |
No | Expected JWT issuer claim |
SSO_AUDIENCE |
account |
No | Expected JWT audience claim |
SSO_TLS_VERIFY |
true |
No | Set false only for self-signed certs in dev/staging |
SSO_TOKEN_URL |
— | No | Token endpoint for Oracle registration CC grant and Airflow token exchange |
SSO_CLIENT_ID |
— | No | Service account client ID (Oracle registration only) |
SSO_CLIENT_SECRET |
— | No | Service account client secret (Oracle registration only) |
DAG_REPO_URL |
— | No | GitLab instance URL |
DAG_REPO_TOKEN |
— | No | GitLab project/personal access token |
DAG_REPO_PROJECT |
— | No | GitLab namespace/project path |
DAG_REPO_BRANCH |
main |
No | Target branch for live DAGs |
DAG_REPO_DAG_PATH |
dags |
No | Path within repo for validated live DAGs |
DAG_REPO_PENDING_PATH |
pending |
No | Path for DAGs awaiting CI validation |
DAG_REPO_CATALOG_PATH |
catalog.json |
No | Path to operator catalog JSON |
ELASTIC_HOST |
https://127.0.0.1:9200 |
Yes | Elasticsearch base URL |
ELASTIC_USERNAME |
— | Yes | Elasticsearch username |
ELASTIC_PASSWORD |
— | Yes | Elasticsearch password |
ELASTIC_VERIFY |
true |
No | Set false to skip TLS cert verification (dev only) |
ELASTIC_TIMEOUT |
20 |
No | Elasticsearch request timeout in seconds |
ELASTIC_SCROLL |
5m |
No | Scroll window TTL for scan queries |
ELASTIC_PKI_CA |
— | No | Path to CA certificate for mutual TLS |
The env file lives at: developers-environment/conduit/conduit.env
Migration from fedai-rest
Conduit is a replacement, not an extension, for the Airflow integration in fedai-rest. REST endpoints are path-compatible with fedai-rest's /userspace/{target} convention — datapipeline and connection are the same target names fedai-rest uses.
Phase 1 — Conduit deployed alongside fedai-rest
fedai-restAirflow endpoints remain unchanged.- Conduit is deployed and registered with Oracle.
- Oracle begins routing Airflow tool calls to Conduit.
Phase 2 — fedai-rest Airflow code removed
After Conduit is validated in production:
- Remove AirflowRestClient from fedai-rest.
- Remove /datapipeline and /connection REST endpoints from fedai-rest.
- Remove Airflow env vars from fedai-rest configuration.
Do not remove fedai-rest Airflow code until Conduit is deployed and confirmed working.
Compliance with platform.service-contract
| Requirement | Status |
|---|---|
/health endpoint |
Included — probes Airflow on 3s timeout; returns degraded if unreachable |
/service-info endpoint |
Included |
MCP at /agentspace |
Included |
REST at /userspace |
Included |
| Bearer token validation on all endpoints | Included (AxonisMiddleware + require_auth dependency) |
| Oracle self-registration | Included (axonis.gateway.oracle.OracleRegistration) |
| uv + hatchling | Included |
| ruff linting | Included |
| Bitnami Helm chart | Included (charts/conduit/) |
| CI/CD pipeline | Partial — test stage active; deploy stage commented out pending environment config |
| No cross-service code imports | Compliant |
Depends on: component.oracle.gateway, platform.axonis-core, platform.service-contract
Realizes: product.data-pipeline
Required by: component.conduit.effect-engine