Skip to content

Titan — ML Runtime + Federated Fusion Node

Status & scope

Status: Partial — titan.fusion.{orchestrator,node_handler} and the ML runtime conform. The FusionWorker startup wrapper, FusionBroker, FusionNodeRegistry, and the server/ package are not yet implemented — see Open Items.

Package: titan

Depends on: platform.axonis-core, component.xanadu.messaging; imports uds.* (uds.core.dataspace, uds.userspace.*, uds.ops, uds.libs) from fedai-rest

Milestone: P1

Purpose

Titan is the platform's per-node ML runtime. It is not an external-facing service; it executes work on behalf of other services (cortex, oracle, parallax) and participates in cross-node federated runs.

Two distinct responsibilities sharing one library:

  1. ML runtime — model training, prediction, explanation, and serving across PyTorch, TensorFlow, JAX, XGBoost, and Seldon. Distributed data loading via Dask.
  2. Federated fusion node — receives blocking and vector requests from a coordinator (also Titan, on a peer node), executes them against the local DataSpace, and returns anonymised results without exposing raw records.

Architecture

   parallax                            ← Lens specs, fusion runs
     │
     ▼
   titan (coordinator node)            ← FusionOrchestrator drives the run
     │
     │  axonis.fusion.Message over Xanadu (RabbitMQ)
     │
     ▼
   titan (participant node 1..N)       ← FusionNodeHandler executes locally
     │
     ▼
   uds.core.dataspace.DataSpace        ← raw records never leave this boundary

   forge / cortex / oracle  ─────►  titan (in-process or via Forge K8s job)
                                       └─ training, prediction, serving

Titan is invoked in two patterns:

  • Embedded compute — called as a library by services that need to run training, predict, or explain (typically via Forge-managed K8s jobs or Dask workers).
  • Long-running fusion node — runs as a xanadu Worker (or Coordinator) under the FusionWorker startup wrapper.

Package Structure

titan/
  titan/
    __init__.py
    schema.py               # Index aliases, ES mappings specific to titan
    fusion/
      __init__.py
      orchestrator.py       # FusionOrchestrator — coordinator-side run driver
      node_handler.py       # FusionNodeHandler — participant-side blocking/vectors
    modeling/
      app.py                # Training app entry
      asset.py              # Model asset lifecycle
      assets/               # Asset adapters
      data/                 # Data loaders
      libs/                 # Codec, transform, geospatial helpers
      pretrained_models/    # Pretrained model wrappers
      serving/              # Seldon/K8s serving wrappers
      train/                # Training routines (incl. federated_model_runner_v2)
    system/
      __init__.py
      estimators/           # Estimator implementations
    external/               # External integrations (e.g. FATE, third-party libs)
    userspace/              # Titan-owned domain classes
  charts/                   # Helm chart (deployed by Forge when running as fusion node)
  scripts/
  tests/
  Dockerfile
  pyproject.toml

Naming exception: Titan does not yet ship a server/ package. It is a library imported by other services and a long-running FusionWorker invoked via xanadu CLI. A future minor revision should add a thin server/ per platform.service-contract to expose the MCP tools listed below.

Federated Fusion — coordinator side

titan.fusion.orchestrator.FusionOrchestrator drives a run.

class FusionOrchestrator:
    def __init__(
        self,
        lens_spec: dict,
        participating_nodes: list[str],
        threshold: float = 0.70,
        null_penalty: float = 0.1,
        send_fn,           # async (message) -> response, injected by the broker layer
    ): ...

    async def run(self) -> FederatedFusionResult: ...

Run phases (each step is an axonis.fusion.Message over xanadu):

  1. FUSION_RUN_START (broadcast) — announce run, distribute lens spec
  2. FUSION_BLOCKING_REQUEST (per node) → FUSION_BLOCKING_RESPONSE (opaque blocking keys, no values)
  3. Coordinator computes cross-node candidate pairs by intersecting blocking keys
  4. FUSION_VECTORS_REQUEST (per node, candidate record IDs only) → FUSION_VECTORS_RESPONSE (feature vectors for the requested fields only)
  5. Coordinator scores candidates locally (parallax.ops.fusion.scorer.score_pair)
  6. Coordinator clusters confirmed matches locally (parallax.ops.fusion.cluster.UnionFind)
  7. FUSION_RUN_COMPLETE (broadcast) — close run, surface metrics

Federated Fusion — participant side

titan.fusion.node_handler.FusionNodeHandler services incoming requests against the local uds.core.dataspace.DataSpace.

Required handlers: - on_fusion_run_start(message) — initialise per-run state, validate lens spec - on_fusion_blocking_request(message) — compute opaque blocking keys for the configured fields; return only hashes - on_fusion_vectors_request(message) — load feature vectors for the requested record IDs and field list; return vectors only for the requested fields - on_fusion_run_complete(message) — release run state - on_lens_binding_request(message) — return the lens binding for a given spec (field mapping for a participant node)

FusionWorker startup wrapper (planned)

Production deployments should not instantiate the orchestrator/handler directly. The intended pattern is a long-lived FusionWorker composing:

  • xanaqu.core.Worker (or Coordinator) — RabbitMQ peer membership, heartbeats, routing
  • FusionNodeHandler — registered as the on_query callback for fusion message types
  • FusionBroker — translates axonis.fusion.Message ↔ xanadu's HTTP-envelope (method, path, body) protocol
  • FusionNodeRegistry — exposes worker.known_peers as the participant list seen by the coordinator

Current state: FusionWorker, FusionBroker, and FusionNodeRegistry are not yet implemented. Deployments instantiate FusionNodeHandler directly inside a xanaqu.core.Worker. See Open Items.

Sovereignty Invariants

  • Blocking keys are opaque hashes (soundex, year prefix, etc). Raw field values must not cross node boundaries.
  • Feature vectors are returned only for record IDs in candidate pairs and only for the explicitly requested field list.
  • Scoring and clustering execute on the coordinator only, against already-anonymised vectors.
  • The participant node may decline a request. Refusal returns a FUSION_NODE_REFUSED message; the run continues without that node.

ML Runtime — operations

Library-level entry points used by other services:

Operation Module Notes
Train titan.modeling.train Dask cluster + K8s job orchestration via uds.core.airflow
Federated train titan.modeling.train.federated_model_runner_v2 FATE-Flow integration; distinct from in-titan fusion
Predict titan.system.estimators Loaded model + DataSpace input
Serve titan.modeling.serving Seldon/K8s deployment
Explain titan.modeling.libs.explain LIME + custom explainers
Asset I/O titan.modeling.asset Checkpoint and artifact lifecycle

Distributed Op Execution

Titan hosts the DistributedCluster half of the platform's two-cluster op execution model: the same /userspace/operation payload that a frontend previews on its in-process LocalCluster is re-submitted here against the full Dataset during training. Op authors should treat both clusters as one code path — map_partitions, no per-worker Authenticator, token forwarded on body["_auth_token"] via axonis.operations.dispatcher.attach_token. See component.fedai-rest.dataspace (Two-Cluster Op Execution) for the dispatcher contract and the operation_dispatch ownership map.

Estimator pipeline & quality analysis

The estimator pipeline is the end-to-end runtime path from a fitting request to a scored, persisted model: real-token auth → federated data loading → fit → quality metrics → recommended ops. It is the runtime behind cortex/oracle "fit this dataset" requests and runs identically across the platform's dataset types.

Pipeline stages

titan.system.estimators drives the fit; data arrives through titan.modeling.data against uds.core.dataspace.DataSpace. The stages are:

  1. Auth — fit requests carry a real Keycloak token on body["_auth_token"]; the loader resolves it through axonis.auth.authenticator.Authenticator (no AUTH_DISABLED bypass). ABAC scoping applies to every read.
  2. Federated load — datasets are loaded lazily as Dask DataFrames; partitions may be sourced across federate nodes via the fusion node path. Raw records stay within their owning node's boundary.
  3. Fit — the estimator fits against the loaded frame, staying lazy until the fit itself forces compute.
  4. Quality metrics — fit emits a quality report (see below).
  5. Recommended ops — the report surfaces follow-on op suggestions (e.g. missing best-practice transform, data-quality remediation) drawn from the dataset analysis.

  6. #REQ.estimator-real-auth — the estimator pipeline MUST authenticate with a real token via Authenticator; no auth-disabled or stubbed-identity path exists, including in dev.

  7. #REQ.estimator-write-isolation — model artifacts and metrics written by a fit MUST be isolated per run (no cross-run artifact clobbering); each run owns its workspace and ES write target.
  8. #REQ.estimator-lazy-load — federated data loading MUST keep Dask DataFrames lazy (map_partitions, .persist()), reserving .compute() for aggregations and final small results.

Quality metrics

Every fit produces a quality report covering two axes:

  • Dataset quality — analysis routines determine dataset fitness: missingness, cardinality, type consistency, leakage signals. Surfaced as hints a caller (or Beacon workflow) can act on.
  • Modeling quality — fit-level metrics appropriate to the estimator/task (e.g. classification scores + confusion matrix, regression error, time-series forecast error).

  • #REQ.estimator-quality-report — a completed fit MUST emit both dataset-quality and modeling-quality metrics, persisted on the run's TrainedModel record for retrieval by describe_trainedmodel consumers.

Dataset-type coverage

The pipeline must fit across all platform dataset types from the same production wire (serialized fit request, not a hand-built domain object):

Dataset type Status Notes
Tabular conform Reference end-to-end path
Time-series conform Uses the time-series ops below; forecast-error metrics
Image blocked Binary-column dtype not yet handled on the load path
Text / NLP blocked ES mapping depth limit on ingest
  • #REQ.estimator-dataset-parity — the fit contract (auth, load, fit, quality report) MUST be identical across dataset types; type-specific handling lives in the loader/estimator, never in a divergent control path.

Time-series ML ops

Time-series modeling support: serializable pipeline steps that transform a time-indexed Dataset and feed sequential model training. These are ML-runtime ops (the geospatial half of the originating requirement is owned by geodex and is not part of titan). Large series stay on Dask per #REQ.estimator-lazy-load.

Transformation & feature-extraction ops

Each op is a /userspace/operation step executed through the two-cluster path (#ml-runtime.distributed-op): previewed on LocalCluster, rerun full-dataset on DistributedCluster.

Op Purpose Key params
Set time index Parse a timestamp column into the primary time index column, freq?
Filter by time range Select rows within [start_time, end_time] start_time, end_time
Resample / aggregate Resample to an interval with an aggregation function rule, agg_func, fill_method?
Rolling window Rolling statistics over a time/row window window, min_periods, metrics
Time alignment / join Align multiple series on a common timeline with fill/interpolation other_datasets, merge_type, interpolation?
Lag / lead creation Shifted feature columns for temporal dependence columns, shift_periods
Detrend / differencing Remove trend/seasonality via differencing columns, period
Feature extraction Generate statistical features over the series columns, feature_set?, window_size?
  • #REQ.timeseries-op-validation — each time-series op MUST validate its preconditions (e.g. a time-based window requires a datetime index; start_time <= end_time; agg_func is a recognised reducer) and surface a user-readable error rather than failing silently.
  • #REQ.timeseries-resample-lazy — resample/rolling/feature-extraction ops on large series MUST run on Dask (chunked), not by forcing a full in-memory .compute().

Sequential / federated time-series modeling

Time-aligned features feed sequential models (RNN/LSTM/Transformer) trained either in-titan or federated via FATE-Flow (titan.modeling.train.federated_model_runner_v2).

  • #REQ.timeseries-alignment-before-fit — sequential/federated training MUST consume data aligned to uniform time steps; alignment (forward-fill or interpolation over a shared interval) happens in the op pipeline before fit, so each federate contributes comparable segments.
  • Federation handles partial updates from nodes holding different time segments; only model updates cross the node boundary, never raw series (consistent with #sovereignty).

Federated LLM model storage & serving

Titan owns the model-side contract for federated LLMs: how a fine-tuned LLM is stored, versioned, and made servable. The serving-infrastructure mechanics (pod/container provisioning, autoscaling, ingress) belong to forge and are specced there; this section is only the titan-side storage/serving contract that forge consumes.

Storage & versioning

A federated LLM is a TrainedModel whose asset is an LLM artifact (weights + adapter, e.g. a PEFT/LoRA delta over a base model). Asset I/O reuses titan.modeling.asset.

  • #REQ.fed-llm-versioned-store — each federated fine-tune iteration MUST be stored as a distinct, retrievable version (no in-place overwrite of a prior iteration's artifact), so a model can be rolled back or compared across federated rounds.
  • #REQ.fed-llm-secure-access — LLM artifacts MUST be access-controlled with the same token/ABAC scoping as any other TrainedModel; retrieval requires a real token, and raw artifacts never transit a node boundary that sovereignty forbids.
  • Fine-tunes are produced federated via FATE (#ml-runtime); the storage contract is agnostic to whether the producing run was federated or local.

Serving contract

Titan exposes a servable LLM the same way it serves any model (titan.modeling.serving) — the artifact + metadata forge needs to stand up a serving pod.

  • #REQ.fed-llm-serve-handle — a stored federated LLM version MUST be resolvable to a serving handle (artifact location + base-model reference + adapter reference + runtime requirements) that forge can deploy without re-deriving model internals.
  • #REQ.fed-llm-fine-tune-from-served — the runtime MUST support producing a new fine-tuned LLM version from an existing served/stored base (apply a federated fine-tune to an existing artifact and register the result as a new version).

Training metrics surface

Titan exposes the live training-metrics surface that a dashboard renders. The UI (the training dashboard view) is owned by beacon and is not specced here; this section covers only the backend surface titan must expose.

  • #REQ.training-dask-metrics — during a training run, titan MUST expose live Dask metrics (cluster/worker status, task progress, resource utilisation) — minimally the Dask scheduler dashboard endpoint for the run's cluster — for a frontend to surface.
  • #REQ.training-run-metrics — titan MUST expose per-run training info (current stage, status, accumulating quality metrics from #estimator-quality.metrics) queryable by model_status / describe_trainedmodel consumers while the run is in flight, not only after completion.

MCP Tools (planned)

When Titan adopts a server/ per platform.service-contract, the following tools should be exposed:

  • train(model_spec, dataset_alias) → run_id
  • predict(model_id, dataset_alias) → predictions
  • explain(model_id, record_id) → feature contributions
  • serve(model_id, replicas) → endpoint info
  • model_status(model_id) → state, metrics
  • fusion_run_start(lens_spec, participating_nodes) → fusion_run_id
  • fusion_run_status(fusion_run_id) → state, metrics

Until then, these capabilities are reachable only as library calls or via xanaqu-gateway REST routes.

Dependencies

Heavy compute. Titan's pyproject pulls:

dependencies = [
  "axonis-core>=4.10.2",        # auth, schema, foundation
  # uds.* (DataSpace, ops, libs, userspace, adapters) imported from fedai-rest in-tree
  "t2s-fate-client>3.2.6",      # FATE-Flow v1
  "t2s-fate-client-v2>=3.8.0",  # FATE-Flow v2
  "fastapi",                    # planned server/
  "distributed==2023.3.0",      # Dask
  "seldon-core>=1.18.0",        # serving
  # plus: torch, jax, keras, xgboost, sentence-transformers, peft, gensim,
  # statsmodels, lime, imgaug, bokeh, annoy, …
]

Shared protocol module: Message and MessageType are imported from axonis.fusion (axonis-core/axonis/fusion.py). The earlier plan to migrate ownership to parallax.protocol.FusionMessage was superseded — the type landed in axonis-core as a shared foundation dependency. Note: some consumer wheels (parallax, cortex, prism) still ship a published axonis.protocol.FusionMessage / FusionMessageType surface from a parallel build; consumers should migrate to the axonis.fusion.Message / MessageType names per axonis-core main.

Configuration

Environment variables consumed by Titan at runtime:

Var Purpose
ATLAS_WORKSPACE Working directory for logs and model artifacts (default: tempfile.gettempdir())
ATLAS_LOG_LEVEL Logging level (default: INFO)
FEDERATE_DOMAIN Local REST API domain used for HTTP forwarding when running as a xanadu node
FEDERATE_UUID Stable UUID for this node's federate record

Titan inherits xanadu's XNQ_* configuration when running as a FusionWorker; see component.xanadu.messaging.

Code Style

  • Async public APIs where they cross a network boundary (run, on_* handlers); sync where they don't.
  • Dataclasses for payloads (FederatedFusionResult, axonis.fusion.Message).
  • No unnecessary abstraction over Dask/torch/jax — use them directly. Wrap only when Forge or uds.* needs a uniform surface.

Open Items

  • No server/ package. Titan should adopt platform.service-contract's dual-interface contract, exposing the MCP tools listed above. P2 work.
  • FusionWorker startup wrapper not implemented. The FusionWorker, FusionBroker, and FusionNodeRegistry classes described in this spec do not yet exist. Production fusion nodes currently instantiate FusionNodeHandler directly inside a xanadu Worker. P2 work.
  • Fusion message ownership in parallax. parallax originally called for parallax.protocol.FusionMessage; the type landed in axonis-core as axonis.fusion.Message / MessageType instead. The parallax spec should be updated to reflect the authoritative location.
  • Fusion-vs-FATE overlap. titan.modeling.train.federated_model_runner_v2 performs federated training via FATE-Flow; titan.fusion performs federated entity resolution via xanadu. Same word, different mechanism. Document the boundary in the titan repo CLAUDE.md.
  • Image / Text dataset fits blocked. Per #estimator-quality.dataset-types, image fits are blocked on binary-column dtype handling in the load path and text/NLP fits on the ES mapping depth limit. Both must reach parity with the tabular/time-series path.
  • No live training-metrics endpoint. #training-metrics mandates a live Dask + per-run metrics surface; titan currently exposes metrics only via the post-hoc TrainedModel record. The in-flight surface is unimplemented pending the server/ package.
  • Federated LLM storage/serving contract unimplemented. #fed-llm describes the versioned-store and serving-handle contract; the asset path does not yet treat LLM adapter artifacts as first-class versioned assets.

Depends on: component.xanadu.messaging, platform.axonis-core, platform.service-contract

Realizes: product.fusion, product.model, product.trained-model

Required by: component.parallax.fusionmatch-model