Skip to content

Conduit — Data Ingestion Orchestration Service

Milestone: P3 (optional — deployed when Airflow integration is required)

Purpose

Conduit is a standalone MCP service that wraps Apache Airflow, exposing DAG orchestration and connection management as first-class MCP tools and REST endpoints. It replaces the embedded AirflowRestClient in fedai-rest with a purpose-built service that can be optionally deployed, registered with Oracle, and driven by LLM agents.

Conduit owns the DataPipeline and DataPipelineConnection domain objects when deployed. While Conduit is down or not deployed, fedai-rest retains its Airflow integration for backwards compatibility.


Service Identity

Field Value
name conduit
version 0.1.0
port 8008
mcp_path /agentspace
health_path /health
api_path /userspace
tools_count 44
resources_count 5
capabilities dag-management, connection-management, airflow-orchestration, data-ingestion, pipeline-management, pipeline-scheduling, effect-engine

Domain Objects

Conduit uses the existing axonis-core userspace objects. It does not define new schema constants.

Object Schema constant ES index subtype discriminator
DataPipeline Schema.DATA_PIPELINE data-ingest "datapipeline"
DataPipelineConnection Schema.DATA_PIPELINE_CONNECTION data-ingest "datapipelineconnection"
DataEffect Schema.DATA_EFFECT data-ingest "dataeffect"

All three objects share the same ES index, discriminated by the subtype field. Read queries for one type exclude the others — _pipeline.read() filters by subtype: "datapipeline" so DataEffect records never appear in pipeline lists, and vice versa.

Conduit treats DataPipeline and DataPipelineConnection as a read/write cache of Airflow state. DataEffect is Conduit's internal execution record for assembled multi-step effect DAGs dispatched from Cortex or created interactively.

DataEffect fields

Field Writable Semantics
dag_id user/cortex Airflow DAG identifier; namespaced cortex__{eff_id} for Cortex-dispatched effects
name user/cortex Human-readable label
uds.visibility user/cortex ABAC visibility marking
schedule user/cortex Cron expression for recurring runs; null for one-shot
run_on_deploy user/cortex Trigger an initial run automatically after CI passes
operators user/cortex Ordered list of {name, params} operator configs used to assemble the DAG
connections system Deduplicated list of connection_id values from the operators list
status system pending_validationactive / rejected / error; also paused, running
correlation_id cortex Cortex eff_ ID — opaque upstream link
edition_id cortex Attested Edition that triggered the effect (Cortex dispatch path only)
effect_type cortex DES effect type, e.g. external_dispatch (Cortex dispatch path only)
target cortex Implementation-defined target descriptor from the decision template
payload_hash cortex SHA-256 hex digest of the canonical dispatch payload; verified before assembly
deadline cortex ISO 8601 deadline; recorded but not enforced by Conduit
delete_on_success user/cortex Auto-suggest cleanup after a successful one-shot run (incompatible with schedule)
created_by system Username from auth context at create time; "cortex" for Cortex-dispatched effects
create_ts system Creation timestamp (ISO 8601 UTC)
airflow_next_run live Next scheduled run timestamp from Airflow (next_dagrun); returned on status/read, not stored in ES

DataPipeline scheduling fields

The DataPipeline userspace object carries three scheduling fields, populated by the scheduling subsystem (§ Pipeline Scheduling):

Field Writable Semantics
schedule user A cron expression (or named interval) describing how often the pipeline should be re-run. Distinct from the DAG's own Airflow schedule_interval, which controls the DAG, not a run. Editable via datapipeline_update / PATCH /userspace/datapipeline/{uid}.
next_scheduled_run system The next wall-clock time a scheduled run will fire. Computed from schedule at create time, at each scheduled runtime, and at update time whenever schedule changes.
last_scheduled_run system The wall-clock time the most recent scheduled run was triggered.
  • #REQ.schedule-vs-dag-intervalschedule controls runs, never the DAG's own scheduling state. It is implemented by the scheduler DAG (§ Pipeline Scheduling), never by setting the managed pipeline's Airflow schedule_interval.
  • #REQ.next-run-recomputenext_scheduled_run is recomputed at create, at each scheduled trigger, and on any update that changes schedule. A schedule of null/empty clears next_scheduled_run and disables scheduling for that pipeline.

MCP Tools (30)

DataPipeline Management (12)

Tool Description
datapipeline_deploy(dag_id, content, visibility, schedule, pipeline_type, run_on_deploy) Commit a Python DAG to the git repo pending branch; returns status: pending_validation. If run_on_deploy=true, triggers an initial run automatically after CI passes.
datapipeline_list() List Conduit-managed DataPipeline records with live Airflow run state
datapipeline_get(uid) Get a single pipeline record including live Airflow state
datapipeline_update(uid, content, schedule, action, params) Redeploy with updated content or schedule; action: "pause"\|"unpause" controls DAG scheduling state
datapipeline_delete(uid) Remove pipeline from git and ES
datapipeline_status(uid) Poll GitLab CI for pending pipelines; return live Airflow state for active ones
datapipeline_pause(uid) Pause the Airflow DAG
datapipeline_unpause(uid) Resume a paused pipeline
datapipeline_run(uid, conf) Trigger a new DAG run for a pipeline
datapipeline_stop(uid) Stop the active run for a pipeline
datapipeline_runs(uid, state, limit) List run history for a pipeline
datapipeline_metrics(uid, since, limit) Get run metrics and statistics from the airflow-metrics ES index

DataPipeline Run Tools (6)

Tool Description
datapipeline_run_trigger(dag_id, conf, logical_date) Trigger a new DAG run by dag_id
datapipeline_run_list(dag_id, state, limit) List runs for a DAG
datapipeline_run_get(dag_id, dag_run_id) Get status and metadata for a specific run
datapipeline_run_delete(dag_id, dag_run_id) Cancel and delete a DAG run
datapipeline_run_status(dag_id, dag_run_id) Get task-level status breakdown for a run
datapipeline_run_errors(dag_id, dag_run_id) Get failed task logs for a run

Connection Management (7)

Tool Description
connection_schema(conn_type) Return the JSON schema for a connection type from local templates (e.g. postgres, s3, mongo)
connection_list() List all DataPipelineConnection records
connection_get(uid) Get a single connection record
connection_create(...) Create a new connection record with Airflow registration and live connectivity test
connection_update(uid, ...) Update an existing connection record
connection_delete(uid) Delete a connection record
connection_test(uid) Run Airflow's test_connection against a stored connection

Connection records are 1:1 with pipelines: a pipeline reads from exactly one connection. The set of connection types connection_schema / connection_create accept is defined in § Supported Connection Types.

Scheduling (3)

Tool Description
datapipeline_schedule_set(uid, schedule) Set or update the re-run schedule (cron/interval) for a pipeline; recomputes next_scheduled_run. A null/empty schedule disables scheduling.
datapipeline_schedule_get(uid) Return schedule, next_scheduled_run, last_scheduled_run for a pipeline
datapipeline_schedule_list(due_only) List pipelines with a schedule and their next/last run times; due_only=true filters to pipelines currently due

DataEffect Management (14)

Tool Description
effect_create(name, visibility, operators, schedule, run_on_deploy, connections) Assemble a multi-step effect DAG from operator configs; commit to git pending branch; return {uid, dag_id, status: pending_validation}. Pre-flight checks missing connections and uninstalled packages (warnings, non-blocking).
effect_list(limit) List all DataEffect records
effect_get(uid) Get a single DataEffect record including live Airflow state
effect_status(uid) Poll CI if pending; return live Airflow state for active effects including airflow_next_run
effect_update(uid, name, description, visibility, action) Update metadata or pause/unpause via action: "pause"\|"unpause"
effect_delete(uid, confirm) Remove DAG from git and delete ES record; requires confirm: true
effect_pause(uid) Pause the Airflow DAG for an effect
effect_unpause(uid) Resume a paused effect DAG
effect_run(uid, conf) Trigger a new DAG run for an effect
effect_stop(uid) Stop the active run for an effect
effect_runs(uid, state, limit) List run history for an effect
effect_metrics(uid, since, limit) Get run metrics from the airflow-metrics ES index
effect_schema(operator_name) Without argument: list all operators from the effect catalog. With argument: return full compose/runtime param schema for one operator.
effect_dispatch(effect_id, edition_id, effect_type, target, payload, payload_hash, ...) Inbound endpoint called by Cortex's agent_dispatch handler. Verifies SHA-256 payload_hash, assembles and deploys the DAG, returns {acknowledged, external_reference, dag_id, status: pending_validation}.

MCP Resources (5)

URI Description
catalog://operators Full ingest operator catalog — universal params, ingest patterns, capabilities, DAG structure conventions
templates://dags/{dag_type} Annotated reference DAG for a specific ingest pattern (e.g. postgres_ingest)
catalog://effects Effect operator catalog — operator names, categories, descriptions, compose_params, runtime_params, requires_packages
templates://effects/{operator} Raw Jinja2 snippet for a single effect operator; for inspection of task signatures, imports, and return shapes
capabilities://airflow Python packages available on Airflow workers, sourced from atlas-airflow requirements files at startup

Operational (2)

Tool Airflow API Description
airflow_health() GET /api/v1/health Check Airflow scheduler and metadata DB health
dag_list(active_only) GET /api/v1/dags List Airflow DAGs visible to the caller; active_only=true filters to unpaused DAGs

REST Endpoints (44)

All endpoints are mounted at both /userspace (canonical, fedai-rest compatible) and /api/v1 (legacy alias). Both prefixes route to the same handlers. MCP tools and REST routes call the same underlying domain layer.

DAG Proxy (8)

Method Path Description
GET /userspace/dag List Airflow DAGs visible to the caller
POST /userspace/dag/{dag_id}/run Trigger a DAG run
GET /userspace/dag/{dag_id}/run List runs for a DAG
GET /userspace/dag/{dag_id}/run/{run_id} Get a specific run
GET /userspace/dag/{dag_id}/run/{run_id}/status Task-level status breakdown
GET /userspace/dag/{dag_id}/run/{run_id}/errors Failed task logs
DELETE /userspace/dag/{dag_id}/run/{run_id} Delete a run
GET /userspace/airflow/health Airflow health

DataPipeline (13)

Method Path Description
GET /userspace/datapipeline List DataPipeline records. ?statistics=true inlines metrics from the airflow-metrics ES index.
POST /userspace/datapipeline Create/deploy a pipeline
GET /userspace/datapipeline/{uid} Get a pipeline record. ?statistics=true inlines metrics.
PATCH /userspace/datapipeline/{uid} Update pipeline; action: "pause"\|"unpause" supported
DELETE /userspace/datapipeline/{uid} Delete a pipeline
GET /userspace/status/datapipeline Bulk status for all pipelines
GET /userspace/datapipeline/{uid}/status Status for a single pipeline
GET /userspace/datapipeline/schema List available DAG templates
GET /userspace/datapipeline/schema/{template_name} Get a template schema
GET /userspace/datapipeline/{uid}/runs List run history
POST /userspace/datapipeline/{uid}/run Trigger a run
POST /userspace/datapipeline/{uid}/stop Stop the active run
GET /userspace/datapipeline/{uid}/metrics Get run metrics from airflow-metrics index

DataPipeline Scheduling (3)

Method Path Description
GET /userspace/datapipeline/schedule List pipelines with a schedule; ?due_only=true filters to those currently due
GET /userspace/datapipeline/{uid}/schedule Get schedule, next_scheduled_run, last_scheduled_run for a pipeline
PUT /userspace/datapipeline/{uid}/schedule Set/update the re-run schedule; recomputes next_scheduled_run. Empty body/null schedule disables scheduling.

The re-run schedule is also editable inline via PATCH /userspace/datapipeline/{uid} (the schedule field), matching datapipeline_update.

DataPipelineConnection (7)

Method Path Description
GET /userspace/connection List DataPipelineConnection records
POST /userspace/connection Create a connection record
GET /userspace/connection/{uid} Get a connection record
PATCH /userspace/connection/{uid} Update a connection record
DELETE /userspace/connection/{uid} Delete a connection record
GET /userspace/status/connection Bulk status for all connections
GET /userspace/connection/{uid}/status Test connectivity for a connection

The /userspace/status/{target} bulk status path matches the fedai-rest convention. Valid targets are datapipeline, connection, and effect.

DataEffect (13)

Method Path Description
GET /userspace/effect/schema List all effect operators from the catalog
GET /userspace/effect/schema/{operator_name} Get compose/runtime param schema for one operator
GET /userspace/effect List DataEffect records
POST /userspace/effect Create an effect — same validation as effect_create MCP tool
GET /userspace/effect/{uid} Get a single DataEffect record including live Airflow state
PATCH /userspace/effect/{uid} Update metadata or pause/unpause (action: "pause"\|"unpause")
DELETE /userspace/effect/{uid} Delete an effect (removes DAG from git and ES)
GET /userspace/status/effect Bulk status for all effects
GET /userspace/effect/{uid}/status Status for a single effect; polls CI if pending
GET /userspace/effect/{uid}/runs List run history
POST /userspace/effect/{uid}/run Trigger a DAG run
POST /userspace/effect/{uid}/stop Stop the active run
GET /userspace/effect/{uid}/metrics Get run metrics from airflow-metrics index

Pipeline Scheduling

Users can schedule a pipeline to be re-run on a recurring interval. Scheduling is owned by Conduit's ingestion-orchestration layer; it is distinct from a DAG's own Airflow schedule_interval (which controls the DAG itself, not a run).

Scheduler DAG

  • #REQ.scheduler-is-a-dag — scheduling is implemented as a single scheduler DAG, not by setting each managed pipeline's Airflow schedule_interval. The scheduler DAG wakes on a fixed tick, finds pipelines whose next_scheduled_run is due, and triggers a run of each via the same trigger_dag_run() path a manual run uses.
  • #REQ.schedule-granularity — the most frequent supported schedule interval is 30 minutes. The scheduler tick is sized accordingly.
  • #REQ.sequential-scheduled-runs — when multiple scheduled runs are found due in the same tick, they are triggered sequentially (no concurrent fan-out) for now.
  • After triggering, the scheduler updates last_scheduled_run to the trigger time and recomputes next_scheduled_run from schedule.

Scheduler index

A dedicated scheduler ES index tracks scheduling state independently of the airflow-metrics index.

  • #REQ.scheduler-index-keyed-by-pipeline — the scheduler index is keyed by pipeline ID (the DataPipeline uid), giving one record per pipeline and making duplicate scheduled runs impossible.
  • #REQ.scheduler-crash-safety — each record tracks status so the scheduler can reason about crash scenarios and long-running jobs: a pipeline whose previous scheduled run is still in-flight (or whose status was left mid-flight by a crash) is not double-triggered. This dedup is the reason the index is keyed by pipeline ID rather than per-run.
  • Scheduled-run metrics are emitted to the airflow-metrics index as usual (same path as manual runs); the scheduler index holds only scheduling/dedup state, not run metrics.

Supported Connection Types

Connections (1:1 with pipelines) are registered with Airflow and exposed via connection_schema / connection_create. Each type ships a JSON schema template under conduit/templates/connections/<type>.json and a corresponding Airflow hook/operator wiring.

Available types

Type Category
postgres Relational DB
mysql Relational DB
sqlite Relational DB
mssql (SQL Server) Relational DB
oracle Relational DB
snowflake Cloud data warehouse
bigquery (Google BigQuery) Cloud data warehouse
databricks Lakehouse
mongo Document DB
cassandra Wide-column DB
couchbase Document DB
redis Key-value store
elasticsearch Search index (incl. scenario data index)
azure_blob (Azure Blob Storage) Object storage (file-based)
aws_s3 (AWS Cloud Storage) Object storage (file-based)
gcs (Google Cloud Storage) Object storage (file-based)
sharepoint Document store (file-based)
kafka (Apache Kafka) Streaming
rabbitmq Messaging (topic subscription)
spark (Apache Spark) Compute engine (tentative)
parquet File-based dataset
  • #REQ.connection-type-template — every supported connection type has a JSON schema template under conduit/templates/connections/; connection_schema(conn_type) returns it, and connection_create validates against it before Airflow registration + connectivity test.
  • #REQ.connection-pipeline-1to1 — connections and pipelines are 1:1; a pipeline reads from exactly one connection. Adding a connection type implies the matching pipeline ingest path (operator/hook) is wired, not the connection record alone.

Ingest Filetypes

Data pipelines ingest a defined set of file/data formats. This section covers the pipeline / ingest-orchestration aspect — which formats a pipeline's ingest path can read and the DAG-template/operator wiring that handles them. The downstream uds-adapter aspect (how a read payload becomes a UDS-mapped dataset) is owned by fedai-rest and specified separately.

Supported filetypes

Filetype Category
csv Tabular
xls / xlsx Tabular (spreadsheet)
parquet Tabular (columnar)
json Structured
avro Structured (row, schema-bearing)
xml Structured
pdf Document
raster Geospatial
geotiff Geospatial (raster)
vector Geospatial
geojson Geospatial (vector)
image types beyond jpg / png Imagery
  • #REQ.ingest-filetype-orchestration — for each supported filetype, the pipeline ingest path (DAG template + operator) is responsible for reading and orchestrating the format into the pipeline; conversion into a UDS-mapped dataset is the fedai-rest uds-adapter's responsibility, not conduit's.
  • #REQ.geospatial-ingest — geospatial filetypes (raster, geotiff, vector, geojson) are ingestible by pipelines; their geospatial processing semantics are owned downstream (geodex / uds-adapters), but the ingest-orchestration wiring lives here.

Directory Layout

Follows platform.service-contract service anatomy:

conduit/
├── conduit/                      # Domain library package
│   ├── __init__.py
│   ├── config.py                 # pydantic-settings Config, env var bindings
│   ├── airflow/
│   │   ├── __init__.py
│   │   └── client.py             # Async Airflow REST client (apache-airflow-client)
│   ├── domain/
│   │   ├── __init__.py
│   │   ├── pipeline.py           # DataPipeline domain class + singleton
│   │   ├── pipeline_connection.py# DataPipelineConnection domain class + singleton
│   │   ├── pipeline_metrics.py   # Read-only access to airflow-metrics ES index
│   │   ├── pipeline_scheduler.py # Schedule state + next/last run calc; scheduler ES index access
│   │   └── effect.py             # DataEffect domain class + singleton; CI poll, auto-start, Airflow sync
│   ├── git/
│   │   ├── __init__.py
│   │   └── client.py             # GitLab file API client for DAG repo commits
│   ├── models/
│   │   ├── __init__.py
│   │   └── conduit.py            # Pydantic request/response models (auto-generated)
│   ├── capabilities.py           # Worker package capability cache; refresh at startup from atlas-airflow requirements
│   └── templates/
│       ├── catalog.json          # Ingest operator catalog served via catalog://operators
│       ├── connections/          # Per-type JSON schema files (postgres.json, mongo.json, databricks.json, kafka.json, sharepoint.json, rabbitmq.json, …)
│       ├── dags/                 # Jinja2 DAG templates per pipeline_type (incl. scheduler.py.j2 — the scheduler DAG)
│       └── effects/
│           ├── catalog.json      # Effect operator catalog served via catalog://effects
│           └── operators/        # Jinja2 snippets per operator (sql_extract, http_dispatch, mqtt_publish, mqtt_await_confirmation, …)
├── server/                       # Thin web wrapper
│   ├── __init__.py
│   ├── __main__.py               # Starlette app, lifespan, uvicorn entry
│   ├── api/
│   │   ├── __init__.py
│   │   ├── routes.py             # FastAPI REST endpoints
│   │   └── schema/
│   │       └── conduit.yaml      # OpenAPI schema components
│   ├── mcp/
│   │   ├── __init__.py
│   │   ├── app.py                # FastMCP app, tool registration
│   │   └── resources.py          # MCP resources: catalog://operators, templates://dags/{dag_type}
│   ├── oracle/
│   │   ├── __init__.py
│   │   ├── memory.py             # fire-and-forget memory store via axonis-core Service
│   │   └── registration.py       # OracleRegistration — heartbeat self-registration
│   └── tools/
│       ├── __init__.py
│       ├── datapipeline_tools.py      # datapipeline_* tools
│       ├── datapipeline_run_tools.py  # datapipeline_run_* tools
│       ├── datapipeline_schedule_tools.py # datapipeline_schedule_* tools
│       ├── connection_tools.py        # connection_* tools
│       ├── effect_tools.py            # effect_* tools (create, list, get, status, update, delete, pause, unpause, run, stop, runs, metrics, schema)
│       ├── effect_dispatch_tools.py   # effect_dispatch — inbound Cortex agent_dispatch endpoint
│       └── operational_tools.py      # airflow_health, dag_list
├── tests/
├── charts/conduit/               # Helm chart
├── .gitlab-ci.yml
└── pyproject.toml

Journey-test authoring (conduit → testament)

Beyond ingest, conduit renders journey-test DAGs — the artifact that makes an A-to-Z integration test (product.data-pipeline § A-to-Z integration test) a deployable, runnable object.

Mechanism (wires existing pieces; no new infra):

  1. A journey_test template is registered in conduit/templates/catalog.json alongside the provider-ingest templates, with required_params (e.g. dag_id, hops, verifications).
  2. conduit/templates/renderer.py::render("journey_test", params, dag_id, schedule=None) renders the Jinja2 template conduit/templates/dags/journey_test.py.j2 into a DAG that uses testament's dags/src/call.py::call task for each REST/MCP hop and attaches a verification block per asserting task. (Renderer is Jinja2 over .py.j2; catalog section is ingest_patterns — the new entry sits alongside the 11 provider-ingest templates.)
  3. The rendered DAG is committed into testament/dags/ like every other testament DAG — that is its home; conduit is the authoring/rendering surface, not the storage location. testament's Airflow runs it from there.
  4. Results are read via datapipeline_run_status / datapipeline_run_errors (or directly from the testament Airflow run).

Why conduit owns rendering: conduit is the platform's DAG authoring surface (it already owns render + datapipeline_deploy). testament owns execution + verification. The SDD pipeline's done-bar therefore runs: spec mandate → conduit render(journey_test) → testament DAG run + verifications → done.

Gap this closes: today conduit has zero knowledge of testament (the ingest_patterns catalog has only provider-ingest templates, and grep testament in the conduit repo returns nothing). This section is the contract for the missing journey_test template and the conduit→testament handoff.

Response-body verification (built with this contract). REST/MCP journeys assert on a JSON response body, but every existing DataVerifier method asserts on tabular/ES output, and the call task currently discards its response. The contract therefore builds, as core testament infrastructure: (a) response capture in the call task, and (b) a general verify_response_json(path, ...) matcher that reads a key-path out of the captured response. This is the standard assertion mechanism for all REST/MCP journeys — not deferred to a later ticket.

Authentication

Inbound (client → Conduit)

Conduit uses AxonisMiddleware from axonis.middleware — Bearer token validation against the Keycloak JWKS endpoint. Every endpoint requires a valid Bearer token via Depends(require_auth).

Unauthenticated paths: /health, /service-info

Outbound (Conduit → Airflow)

Conduit authenticates to Airflow via Keycloak token exchange — not Basic Auth. The caller's Bearer token is exchanged for a user-level Airflow token using axonis.auth.async_token_exchange.exchange_token(). The resulting token is set via api_client.set_default_header("Authorization", f"Bearer {token}") on each call.

The service account (CC grant) is used only for Oracle registration. It must never be used for Airflow or Elasticsearch operations — those must always use the caller's user token.

Outbound (Conduit → Elasticsearch)

ES ABAC is enforced by the axonis middleware on every request. No manual Authenticator priming is performed — all ES operations run in a user request context where the middleware has already set the auth state.


Auto-Start (run_on_deploy)

When a pipeline is created with run_on_deploy: true, Conduit automatically triggers an initial Airflow DAG run after CI validation passes.

Implementation: asyncio.create_task() is called within the deploy request handler. The task inherits the caller's contextvars.Context snapshot (including the access token), so get_auth_context() remains valid without any token storage. The task polls deploy_check() every 10 seconds for up to 200 seconds, then retries trigger_dag_run() up to 24 times with 15-second gaps (6 minutes total) to allow Airflow's DAG directory scan interval to complete.

Token rule: Tokens are held in-memory only for the lifetime of the async task. They are never persisted to ES, disk, or any cache.


Oracle Registration

Conduit self-registers with Oracle using axonis.gateway.oracle.OracleRegistration. server/oracle/registration.py instantiates it with conduit-specific config and exports the singleton.

from server.oracle.registration import oracle_registration

# In server/__main__.py lifespan:
await oracle_registration.start()   # registers immediately, then heartbeats at TTL/2
# ...on shutdown:
await oracle_registration.stop()

Registration uses the client_credentials grant via SSO_TOKEN_URL, SSO_CLIENT_ID, and SSO_CLIENT_SECRET. Heartbeat fires at ORACLE_TTL_SECONDS / 2. All failures are swallowed.


Environment Variables

Variable Default Required Purpose
AIRFLOW_BASE_URL Yes Airflow REST API base URL
AIRFLOW_API_VERSION v1 No Airflow REST API version path segment
AIRFLOW_VERIFY true No TLS verification for Airflow: true (default), false (skip — dev only), or a path to a CA bundle
AIRFLOW_HOOK_CHECK_TIMEOUT 5.0 No Seconds to wait for Airflow hook validation during connection creation
CONDUIT_SCHEDULER_TICK_MINUTES 30 No Scheduler DAG tick interval; also the most-frequent supported re-run schedule
CONDUIT_SCHEDULER_INDEX conduit-scheduler No ES index (keyed by pipeline ID) tracking scheduled-run status for crash safety / dedup
CONDUIT_HOST 0.0.0.0 No Bind host
CONDUIT_PORT 8008 No Bind port
CONDUIT_WORKERS 1 No uvicorn worker count
CONDUIT_LOG_LEVEL info No Log level
CONDUIT_DOMAIN localhost No Public DNS name; used in MCP DNS-rebinding allowed-hosts list
MCP_EXTRA_HOSTS No Comma-separated additional hostnames for the MCP DNS-rebinding allowlist (e.g. data-ingest for the k8s service name so in-cluster callers are accepted alongside the public domain)
CONDUIT_PUBLIC_URL No Full public URL Oracle uses to reach Conduit
ORACLE_URL http://localhost:8001 No Oracle base URL for self-registration
ORACLE_TTL_SECONDS 300 No Registration TTL; heartbeat fires at TTL/2
SSO_WELLKNOWN https://sso.axonis.ai/realms/T2S/.well-known/openid-configuration No OIDC well-known endpoint
SSO_ISSUER https://sso.axonis.ai/realms/T2S No Expected JWT issuer claim
SSO_AUDIENCE account No Expected JWT audience claim
SSO_TLS_VERIFY true No Set false only for self-signed certs in dev/staging
SSO_TOKEN_URL No Token endpoint for Oracle registration CC grant and Airflow token exchange
SSO_CLIENT_ID No Service account client ID (Oracle registration only)
SSO_CLIENT_SECRET No Service account client secret (Oracle registration only)
DAG_REPO_URL No GitLab instance URL
DAG_REPO_TOKEN No GitLab project/personal access token
DAG_REPO_PROJECT No GitLab namespace/project path
DAG_REPO_BRANCH main No Target branch for live DAGs
DAG_REPO_DAG_PATH dags No Path within repo for validated live DAGs
DAG_REPO_PENDING_PATH pending No Path for DAGs awaiting CI validation
DAG_REPO_CATALOG_PATH catalog.json No Path to operator catalog JSON
ELASTIC_HOST https://127.0.0.1:9200 Yes Elasticsearch base URL
ELASTIC_USERNAME Yes Elasticsearch username
ELASTIC_PASSWORD Yes Elasticsearch password
ELASTIC_VERIFY true No Set false to skip TLS cert verification (dev only)
ELASTIC_TIMEOUT 20 No Elasticsearch request timeout in seconds
ELASTIC_SCROLL 5m No Scroll window TTL for scan queries
ELASTIC_PKI_CA No Path to CA certificate for mutual TLS

The env file lives at: developers-environment/conduit/conduit.env


Migration from fedai-rest

Conduit is a replacement, not an extension, for the Airflow integration in fedai-rest. REST endpoints are path-compatible with fedai-rest's /userspace/{target} convention — datapipeline and connection are the same target names fedai-rest uses.

Phase 1 — Conduit deployed alongside fedai-rest

  • fedai-rest Airflow endpoints remain unchanged.
  • Conduit is deployed and registered with Oracle.
  • Oracle begins routing Airflow tool calls to Conduit.

Phase 2 — fedai-rest Airflow code removed

After Conduit is validated in production: - Remove AirflowRestClient from fedai-rest. - Remove /datapipeline and /connection REST endpoints from fedai-rest. - Remove Airflow env vars from fedai-rest configuration.

Do not remove fedai-rest Airflow code until Conduit is deployed and confirmed working.


Compliance with platform.service-contract

Requirement Status
/health endpoint Included — probes Airflow on 3s timeout; returns degraded if unreachable
/service-info endpoint Included
MCP at /agentspace Included
REST at /userspace Included
Bearer token validation on all endpoints Included (AxonisMiddleware + require_auth dependency)
Oracle self-registration Included (axonis.gateway.oracle.OracleRegistration)
uv + hatchling Included
ruff linting Included
Bitnami Helm chart Included (charts/conduit/)
CI/CD pipeline Partial — test stage active; deploy stage commented out pending environment config
No cross-service code imports Compliant

Depends on: component.oracle.gateway, platform.axonis-core, platform.service-contract

Realizes: product.data-pipeline

Required by: component.conduit.effect-engine