Titan — ML Runtime + Federated Fusion Node
Status & scope
Status: Partial — titan.fusion.{orchestrator,node_handler} and the ML runtime conform. The FusionWorker startup wrapper, FusionBroker, FusionNodeRegistry, and the server/ package are not yet implemented — see Open Items.
Package: titan
Depends on: platform.axonis-core, component.xanadu.messaging; imports uds.* (uds.core.dataspace, uds.userspace.*, uds.ops, uds.libs) from fedai-rest
Milestone: P1
Purpose
Titan is the platform's per-node ML runtime. It is not an external-facing service; it executes work on behalf of other services (cortex, oracle, parallax) and participates in cross-node federated runs.
Two distinct responsibilities sharing one library:
- ML runtime — model training, prediction, explanation, and serving across PyTorch, TensorFlow, JAX, XGBoost, and Seldon. Distributed data loading via Dask.
- Federated fusion node — receives blocking and vector requests from a coordinator (also Titan, on a peer node), executes them against the local DataSpace, and returns anonymised results without exposing raw records.
Architecture
parallax ← Lens specs, fusion runs
│
▼
titan (coordinator node) ← FusionOrchestrator drives the run
│
│ axonis.fusion.Message over Xanadu (RabbitMQ)
│
▼
titan (participant node 1..N) ← FusionNodeHandler executes locally
│
▼
uds.core.dataspace.DataSpace ← raw records never leave this boundary
forge / cortex / oracle ─────► titan (in-process or via Forge K8s job)
└─ training, prediction, serving
Titan is invoked in two patterns:
- Embedded compute — called as a library by services that need to run training, predict, or explain (typically via Forge-managed K8s jobs or Dask workers).
- Long-running fusion node — runs as a xanadu
Worker(orCoordinator) under theFusionWorkerstartup wrapper.
Package Structure
titan/
titan/
__init__.py
schema.py # Index aliases, ES mappings specific to titan
fusion/
__init__.py
orchestrator.py # FusionOrchestrator — coordinator-side run driver
node_handler.py # FusionNodeHandler — participant-side blocking/vectors
modeling/
app.py # Training app entry
asset.py # Model asset lifecycle
assets/ # Asset adapters
data/ # Data loaders
libs/ # Codec, transform, geospatial helpers
pretrained_models/ # Pretrained model wrappers
serving/ # Seldon/K8s serving wrappers
train/ # Training routines (incl. federated_model_runner_v2)
system/
__init__.py
estimators/ # Estimator implementations
external/ # External integrations (e.g. FATE, third-party libs)
userspace/ # Titan-owned domain classes
charts/ # Helm chart (deployed by Forge when running as fusion node)
scripts/
tests/
Dockerfile
pyproject.toml
Naming exception: Titan does not yet ship a server/ package. It is a library imported by other services and a long-running FusionWorker invoked via xanadu CLI. A future minor revision should add a thin server/ per platform.service-contract to expose the MCP tools listed below.
Federated Fusion — coordinator side
titan.fusion.orchestrator.FusionOrchestrator drives a run.
class FusionOrchestrator:
def __init__(
self,
lens_spec: dict,
participating_nodes: list[str],
threshold: float = 0.70,
null_penalty: float = 0.1,
send_fn, # async (message) -> response, injected by the broker layer
): ...
async def run(self) -> FederatedFusionResult: ...
Run phases (each step is an axonis.fusion.Message over xanadu):
FUSION_RUN_START(broadcast) — announce run, distribute lens specFUSION_BLOCKING_REQUEST(per node) →FUSION_BLOCKING_RESPONSE(opaque blocking keys, no values)- Coordinator computes cross-node candidate pairs by intersecting blocking keys
FUSION_VECTORS_REQUEST(per node, candidate record IDs only) →FUSION_VECTORS_RESPONSE(feature vectors for the requested fields only)- Coordinator scores candidates locally (
parallax.ops.fusion.scorer.score_pair) - Coordinator clusters confirmed matches locally (
parallax.ops.fusion.cluster.UnionFind) FUSION_RUN_COMPLETE(broadcast) — close run, surface metrics
Federated Fusion — participant side
titan.fusion.node_handler.FusionNodeHandler services incoming requests against the local uds.core.dataspace.DataSpace.
Required handlers:
- on_fusion_run_start(message) — initialise per-run state, validate lens spec
- on_fusion_blocking_request(message) — compute opaque blocking keys for the configured fields; return only hashes
- on_fusion_vectors_request(message) — load feature vectors for the requested record IDs and field list; return vectors only for the requested fields
- on_fusion_run_complete(message) — release run state
- on_lens_binding_request(message) — return the lens binding for a given spec (field mapping for a participant node)
FusionWorker startup wrapper (planned)
Production deployments should not instantiate the orchestrator/handler directly. The intended pattern is a long-lived FusionWorker composing:
xanaqu.core.Worker(orCoordinator) — RabbitMQ peer membership, heartbeats, routingFusionNodeHandler— registered as theon_querycallback for fusion message typesFusionBroker— translatesaxonis.fusion.Message↔ xanadu's HTTP-envelope(method, path, body)protocolFusionNodeRegistry— exposesworker.known_peersas the participant list seen by the coordinator
Current state: FusionWorker, FusionBroker, and FusionNodeRegistry are not yet implemented. Deployments instantiate FusionNodeHandler directly inside a xanaqu.core.Worker. See Open Items.
Sovereignty Invariants
- Blocking keys are opaque hashes (soundex, year prefix, etc). Raw field values must not cross node boundaries.
- Feature vectors are returned only for record IDs in candidate pairs and only for the explicitly requested field list.
- Scoring and clustering execute on the coordinator only, against already-anonymised vectors.
- The participant node may decline a request. Refusal returns a
FUSION_NODE_REFUSEDmessage; the run continues without that node.
ML Runtime — operations
Library-level entry points used by other services:
| Operation | Module | Notes |
|---|---|---|
| Train | titan.modeling.train |
Dask cluster + K8s job orchestration via uds.core.airflow |
| Federated train | titan.modeling.train.federated_model_runner_v2 |
FATE-Flow integration; distinct from in-titan fusion |
| Predict | titan.system.estimators |
Loaded model + DataSpace input |
| Serve | titan.modeling.serving |
Seldon/K8s deployment |
| Explain | titan.modeling.libs.explain |
LIME + custom explainers |
| Asset I/O | titan.modeling.asset |
Checkpoint and artifact lifecycle |
Distributed Op Execution
Titan hosts the DistributedCluster half of the platform's two-cluster op execution model: the same /userspace/operation payload that a frontend previews on its in-process LocalCluster is re-submitted here against the full Dataset during training. Op authors should treat both clusters as one code path — map_partitions, no per-worker Authenticator, token forwarded on body["_auth_token"] via axonis.operations.dispatcher.attach_token. See component.fedai-rest.dataspace (Two-Cluster Op Execution) for the dispatcher contract and the operation_dispatch ownership map.
Estimator pipeline & quality analysis
The estimator pipeline is the end-to-end runtime path from a fitting request to a scored, persisted model: real-token auth → federated data loading → fit → quality metrics → recommended ops. It is the runtime behind cortex/oracle "fit this dataset" requests and runs identically across the platform's dataset types.
Pipeline stages
titan.system.estimators drives the fit; data arrives through titan.modeling.data against uds.core.dataspace.DataSpace. The stages are:
- Auth — fit requests carry a real Keycloak token on
body["_auth_token"]; the loader resolves it throughaxonis.auth.authenticator.Authenticator(noAUTH_DISABLEDbypass). ABAC scoping applies to every read. - Federated load — datasets are loaded lazily as Dask DataFrames; partitions may be sourced across federate nodes via the fusion node path. Raw records stay within their owning node's boundary.
- Fit — the estimator fits against the loaded frame, staying lazy until the fit itself forces compute.
- Quality metrics — fit emits a quality report (see below).
-
Recommended ops — the report surfaces follow-on op suggestions (e.g. missing best-practice transform, data-quality remediation) drawn from the dataset analysis.
-
#REQ.estimator-real-auth — the estimator pipeline MUST authenticate with a real token via
Authenticator; no auth-disabled or stubbed-identity path exists, including in dev. - #REQ.estimator-write-isolation — model artifacts and metrics written by a fit MUST be isolated per run (no cross-run artifact clobbering); each run owns its workspace and ES write target.
- #REQ.estimator-lazy-load — federated data loading MUST keep Dask DataFrames lazy (
map_partitions,.persist()), reserving.compute()for aggregations and final small results.
Quality metrics
Every fit produces a quality report covering two axes:
- Dataset quality — analysis routines determine dataset fitness: missingness, cardinality, type consistency, leakage signals. Surfaced as hints a caller (or Beacon workflow) can act on.
-
Modeling quality — fit-level metrics appropriate to the estimator/task (e.g. classification scores + confusion matrix, regression error, time-series forecast error).
-
#REQ.estimator-quality-report — a completed fit MUST emit both dataset-quality and modeling-quality metrics, persisted on the run's
TrainedModelrecord for retrieval bydescribe_trainedmodelconsumers.
Dataset-type coverage
The pipeline must fit across all platform dataset types from the same production wire (serialized fit request, not a hand-built domain object):
| Dataset type | Status | Notes |
|---|---|---|
| Tabular | conform | Reference end-to-end path |
| Time-series | conform | Uses the time-series ops below; forecast-error metrics |
| Image | blocked | Binary-column dtype not yet handled on the load path |
| Text / NLP | blocked | ES mapping depth limit on ingest |
- #REQ.estimator-dataset-parity — the fit contract (auth, load, fit, quality report) MUST be identical across dataset types; type-specific handling lives in the loader/estimator, never in a divergent control path.
Time-series ML ops
Time-series modeling support: serializable pipeline steps that transform a time-indexed Dataset and feed sequential model training. These are ML-runtime ops (the geospatial half of the originating requirement is owned by geodex and is not part of titan). Large series stay on Dask per #REQ.estimator-lazy-load.
Transformation & feature-extraction ops
Each op is a /userspace/operation step executed through the two-cluster path (#ml-runtime.distributed-op): previewed on LocalCluster, rerun full-dataset on DistributedCluster.
| Op | Purpose | Key params |
|---|---|---|
| Set time index | Parse a timestamp column into the primary time index | column, freq? |
| Filter by time range | Select rows within [start_time, end_time] |
start_time, end_time |
| Resample / aggregate | Resample to an interval with an aggregation function | rule, agg_func, fill_method? |
| Rolling window | Rolling statistics over a time/row window | window, min_periods, metrics |
| Time alignment / join | Align multiple series on a common timeline with fill/interpolation | other_datasets, merge_type, interpolation? |
| Lag / lead creation | Shifted feature columns for temporal dependence | columns, shift_periods |
| Detrend / differencing | Remove trend/seasonality via differencing | columns, period |
| Feature extraction | Generate statistical features over the series | columns, feature_set?, window_size? |
- #REQ.timeseries-op-validation — each time-series op MUST validate its preconditions (e.g. a time-based window requires a datetime index;
start_time <= end_time;agg_funcis a recognised reducer) and surface a user-readable error rather than failing silently. - #REQ.timeseries-resample-lazy — resample/rolling/feature-extraction ops on large series MUST run on Dask (chunked), not by forcing a full in-memory
.compute().
Sequential / federated time-series modeling
Time-aligned features feed sequential models (RNN/LSTM/Transformer) trained either in-titan or federated via FATE-Flow (titan.modeling.train.federated_model_runner_v2).
- #REQ.timeseries-alignment-before-fit — sequential/federated training MUST consume data aligned to uniform time steps; alignment (forward-fill or interpolation over a shared interval) happens in the op pipeline before fit, so each federate contributes comparable segments.
- Federation handles partial updates from nodes holding different time segments; only model updates cross the node boundary, never raw series (consistent with #sovereignty).
Federated LLM model storage & serving
Titan owns the model-side contract for federated LLMs: how a fine-tuned LLM is stored, versioned, and made servable. The serving-infrastructure mechanics (pod/container provisioning, autoscaling, ingress) belong to forge and are specced there; this section is only the titan-side storage/serving contract that forge consumes.
Storage & versioning
A federated LLM is a TrainedModel whose asset is an LLM artifact (weights + adapter, e.g. a PEFT/LoRA delta over a base model). Asset I/O reuses titan.modeling.asset.
- #REQ.fed-llm-versioned-store — each federated fine-tune iteration MUST be stored as a distinct, retrievable version (no in-place overwrite of a prior iteration's artifact), so a model can be rolled back or compared across federated rounds.
- #REQ.fed-llm-secure-access — LLM artifacts MUST be access-controlled with the same token/ABAC scoping as any other
TrainedModel; retrieval requires a real token, and raw artifacts never transit a node boundary that sovereignty forbids. - Fine-tunes are produced federated via FATE (
#ml-runtime); the storage contract is agnostic to whether the producing run was federated or local.
Serving contract
Titan exposes a servable LLM the same way it serves any model (titan.modeling.serving) — the artifact + metadata forge needs to stand up a serving pod.
- #REQ.fed-llm-serve-handle — a stored federated LLM version MUST be resolvable to a serving handle (artifact location + base-model reference + adapter reference + runtime requirements) that forge can deploy without re-deriving model internals.
- #REQ.fed-llm-fine-tune-from-served — the runtime MUST support producing a new fine-tuned LLM version from an existing served/stored base (apply a federated fine-tune to an existing artifact and register the result as a new version).
Training metrics surface
Titan exposes the live training-metrics surface that a dashboard renders. The UI (the training dashboard view) is owned by beacon and is not specced here; this section covers only the backend surface titan must expose.
- #REQ.training-dask-metrics — during a training run, titan MUST expose live Dask metrics (cluster/worker status, task progress, resource utilisation) — minimally the Dask scheduler dashboard endpoint for the run's cluster — for a frontend to surface.
- #REQ.training-run-metrics — titan MUST expose per-run training info (current stage, status, accumulating quality metrics from
#estimator-quality.metrics) queryable bymodel_status/describe_trainedmodelconsumers while the run is in flight, not only after completion.
MCP Tools (planned)
When Titan adopts a server/ per platform.service-contract, the following tools should be exposed:
train(model_spec, dataset_alias)→ run_idpredict(model_id, dataset_alias)→ predictionsexplain(model_id, record_id)→ feature contributionsserve(model_id, replicas)→ endpoint infomodel_status(model_id)→ state, metricsfusion_run_start(lens_spec, participating_nodes)→ fusion_run_idfusion_run_status(fusion_run_id)→ state, metrics
Until then, these capabilities are reachable only as library calls or via xanaqu-gateway REST routes.
Dependencies
Heavy compute. Titan's pyproject pulls:
dependencies = [
"axonis-core>=4.10.2", # auth, schema, foundation
# uds.* (DataSpace, ops, libs, userspace, adapters) imported from fedai-rest in-tree
"t2s-fate-client>3.2.6", # FATE-Flow v1
"t2s-fate-client-v2>=3.8.0", # FATE-Flow v2
"fastapi", # planned server/
"distributed==2023.3.0", # Dask
"seldon-core>=1.18.0", # serving
# plus: torch, jax, keras, xgboost, sentence-transformers, peft, gensim,
# statsmodels, lime, imgaug, bokeh, annoy, …
]
Shared protocol module: Message and MessageType are imported from axonis.fusion (axonis-core/axonis/fusion.py). The earlier plan to migrate ownership to parallax.protocol.FusionMessage was superseded — the type landed in axonis-core as a shared foundation dependency. Note: some consumer wheels (parallax, cortex, prism) still ship a published axonis.protocol.FusionMessage / FusionMessageType surface from a parallel build; consumers should migrate to the axonis.fusion.Message / MessageType names per axonis-core main.
Configuration
Environment variables consumed by Titan at runtime:
| Var | Purpose |
|---|---|
ATLAS_WORKSPACE |
Working directory for logs and model artifacts (default: tempfile.gettempdir()) |
ATLAS_LOG_LEVEL |
Logging level (default: INFO) |
FEDERATE_DOMAIN |
Local REST API domain used for HTTP forwarding when running as a xanadu node |
FEDERATE_UUID |
Stable UUID for this node's federate record |
Titan inherits xanadu's XNQ_* configuration when running as a FusionWorker; see component.xanadu.messaging.
Code Style
- Async public APIs where they cross a network boundary (
run,on_*handlers); sync where they don't. - Dataclasses for payloads (
FederatedFusionResult,axonis.fusion.Message). - No unnecessary abstraction over Dask/torch/jax — use them directly. Wrap only when Forge or
uds.*needs a uniform surface.
Open Items
- No
server/package. Titan should adopt platform.service-contract's dual-interface contract, exposing the MCP tools listed above. P2 work. FusionWorkerstartup wrapper not implemented. TheFusionWorker,FusionBroker, andFusionNodeRegistryclasses described in this spec do not yet exist. Production fusion nodes currently instantiateFusionNodeHandlerdirectly inside a xanaduWorker. P2 work.- Fusion message ownership in parallax. parallax originally called for
parallax.protocol.FusionMessage; the type landed inaxonis-coreasaxonis.fusion.Message/MessageTypeinstead. The parallax spec should be updated to reflect the authoritative location. - Fusion-vs-FATE overlap.
titan.modeling.train.federated_model_runner_v2performs federated training via FATE-Flow;titan.fusionperforms federated entity resolution via xanadu. Same word, different mechanism. Document the boundary in the titan repo CLAUDE.md. - Image / Text dataset fits blocked. Per
#estimator-quality.dataset-types, image fits are blocked on binary-column dtype handling in the load path and text/NLP fits on the ES mapping depth limit. Both must reach parity with the tabular/time-series path. - No live training-metrics endpoint.
#training-metricsmandates a live Dask + per-run metrics surface; titan currently exposes metrics only via the post-hocTrainedModelrecord. The in-flight surface is unimplemented pending theserver/package. - Federated LLM storage/serving contract unimplemented.
#fed-llmdescribes the versioned-store and serving-handle contract; the asset path does not yet treat LLM adapter artifacts as first-class versioned assets.
Depends on: component.xanadu.messaging, platform.axonis-core, platform.service-contract
Realizes: product.fusion, product.model, product.trained-model
Required by: component.parallax.fusionmatch-model