Skip to content

Xanadu — Federation Messaging

Purpose

Xanadu is the platform's federation messaging layer. It transports messages between Axonis nodes over RabbitMQ (aio-pika, TLS) and provides peer discovery, heartbeating, and lifecycle management.

It is not a platform.service-contract dual-interface service. It runs as a standalone process per node (coordinator or worker), exposes a thin REST gateway, and registers with Oracle with mcp_path="".

Naming exception: the repo is xanadu; the Python package is xanaqu. The package name predates the repo rename. Consumers (titan, parallax) import xanaqu.*. platform.service-contract's package-name rule does not apply.

Architecture

Two deployment modes:

Single-broker mode (default)

  ┌─────────────┐   ┌─────────┐   ┌─────────┐
  │ Coordinator │   │ Worker  │   │ Worker  │
  └──────┬──────┘   └────┬────┘   └────┬────┘
         │               │              │
         └───────────────┴──────────────┘
                         │
                  ┌──────▼──────┐
                  │  RabbitMQ   │
                  │  (single)   │
                  └─────────────┘

All federates connect to one RabbitMQ instance. Routing via shared exchanges.

Multi-broker mode

  ┌─────────────┐         ┌─────────┐         ┌─────────┐
  │ Coordinator │         │ Worker  │         │ Worker  │
  └──────┬──────┘         └────┬────┘         └────┬────┘
         │                     │                    │
   ┌─────▼─────┐         ┌─────▼─────┐        ┌─────▼─────┐
   │ RabbitMQ  │         │ RabbitMQ  │        │ RabbitMQ  │
   │ (coord)   │◄───────►│ (peer A)  │◄──────►│ (peer B)  │
   └───────────┘         └───────────┘        └───────────┘

Each federate has its own RabbitMQ. PeerBrokerManager connects to peer brokers on demand. Workers advertise their broker endpoint in registration metadata. Direct peer messages route via the target's broker.

Package Structure

xanadu/
  xanaqu/
    __init__.py
    main.py                       # CLI: xanaqu, xanaqu-gateway
    config.py                     # Config dataclass — env, dict, merge loaders
    auth.py                       # ServiceTokenProvider — OAuth2 client-credentials
    token_validator.py            # JWT validation via JWKS
    registry.py                   # FederateInfo + heartbeat health checks
    core/
      federate.py                 # Federate (abstract)
      coordinator.py              # Coordinator implementation
      worker.py                   # Worker implementation (default: StreamBroker)
    messaging/
      broker.py                   # Broker — RabbitMQ connection, exchanges, queues
      stream_broker.py            # StreamBroker — RabbitMQ Streams support
      broker_endpoint.py          # BrokerEndpoint — connection details
      broker_admin.py             # HTTP Management API client (provisioning)
      peer_broker_manager.py      # Lazy peer connections (multi-broker mode)
      message.py                  # Message + factory methods
      type.py                     # MessageType enum
      serialization.py            # Wire format
      broker_errors.py            # Exception hierarchy
    api/
      app.py                      # FastAPI factory
      gateway.py                  # Gateway — wraps Federate with REST
      schemas.py                  # Pydantic schemas
      routes/
        status.py                 # GET /health, GET /status
        messages.py               # POST /messages
        federates.py              # GET /federates
        federation.py             # POST /federate/{send,query,deregister}
        proxy.py                  # Catch-all federation proxy (registered last)
  chart/                          # Helm chart
  scripts/
  tests/
    conftest.py                   # SimulationBus, SimulationBroker fixtures
    simulation.py                 # In-memory broker simulation infra
    sync_coordinator.py
    sync_worker.py
    unittests/{messaging,core,api,config}/
    integration/
  Dockerfile
  pyproject.toml

Federate Model

Federate is the abstract base. Two concrete implementations:

Class Default federate_id Default broker Purpose
Coordinator always "coordinator" Broker Central registry, broadcasts, peer lifecycle
Worker configurable StreamBroker Compute node, peer-to-peer, scatter-gather

Each federate owns: - A unique federate_id (workers) or the constant "coordinator" - A Broker (or StreamBroker) connected to its RabbitMQ - Optional federation_groups — scope for peer visibility - Optional metadata — registration payload (UUID, party identifiers, location, advertised broker endpoint)

Federate Lifecycle

  1. Federate registers with coordinator (Type.REGISTER); registration includes broker endpoint metadata.
  2. Coordinator sends PEER_LIST of existing peers to the new federate.
  3. Coordinator broadcasts PEER_JOINED to existing federates.
  4. Federates can now call send_to_peer(peer_id, payload) for direct messaging.
  5. On missed heartbeats, coordinator marks the peer offline and broadcasts PEER_STATUS_CHANGED. The federate record persists — restart and reactivation are non-destructive.
  6. On explicit deregister(), coordinator broadcasts PEER_LEFT. The federate record is removed permanently.
  7. Worker.stop() does not deregister. K8s pod restarts survive without destroying federate records.
  8. After 3 consecutive missed heartbeat ACKs, the worker auto-reregisters with the coordinator.
  9. REGISTER_REJECTED from the coordinator halts the worker (_running = False).

Message Routing

Pattern Exchange Direction
Broadcast fl.broadcast (fanout) Coordinator → all workers
Point-to-point fl.direct (direct) Either direction; routing key = coordinator or {federate_id}

Worker queues: - fl.worker.{id} — direct queue - fl.worker.{id}.broadcast — broadcast queue

Federation Messaging — HTTP Envelope Forwarding

Two patterns for forwarding HTTP requests between federates via RabbitMQ.

Fire-and-forget CRUD (Type.DATA)

For create/update/delete operations where no response is needed.

worker.send_data(target_id, method, path, body)
worker.send_data_multi(target_ids, method, path, body)
worker.broadcast_data(method, path, body)

Receiver registers a callback:

worker.on_data(async fn(method, path, body, headers) -> None)

Scatter-gather queries (Type.QUERY_REQUEST / Type.QUERY_RESPONSE)

For read paths that need to aggregate responses from multiple federates.

responses = await worker.scatter_gather(
    targets,         # dict[federate_id, record_limit] OR list[federate_id]
    method, path, body,
    timeout=30,
    record_quorum=None,
)

When targets is a dict keyed by federate_id with record-limit values, the quorum is auto-computed and responses are bounded. When targets is a list, no limits are applied.

Receiver registers a callback that returns the response payload:

worker.on_query(async fn(method, path, body, headers) -> response_payload)

Responses are matched by correlation_id; concurrent queries are isolated.

Federation groups

Federates declare federation_groups in metadata (config or XNQ_FEDERATION_GROUPS). The coordinator scopes PEER_LIST, PEER_JOINED, PEER_LEFT, and PEER_STATUS_CHANGED to peers within shared groups. scatter_gather() accepts a groups parameter to restrict targets.

REST Gateway (xanaqu/api/)

A FastAPI app wraps a Federate. Routes:

Method Path File
GET /health routes/status.py
GET /status routes/status.py
POST /messages routes/messages.py
GET /federates routes/federates.py
POST /federate/send routes/federation.py (202 Accepted, fire-and-forget)
POST /federate/query routes/federation.py (scatter-gather with timeout)
POST /federate/deregister routes/federation.py (permanent removal, distinct from heartbeat timeout)
any catch-all routes/proxy.py (registered last)

Federation proxy

The catch-all proxy.py route lets any HTTP request opt into transparent federation by embedding a federation block in its JSON body:

{
  "federation": {
    "federates": {"worker-a": 100, "worker-b": 50}
  },
  "query": "..."
}
  • federates as dict (federate_id → record_limit) → scatter-gather. Local + remote responses merged on hits/results. federation status map added.
  • federates as list → fire-and-forget. Local executes synchronously; remote targets receive DATA messages asynchronously. Local response returned immediately with federation status map.

The proxy strips the federation block, runs the local request first, then routes to remote federates. The proxy router must be included last in create_app() to avoid shadowing named routes.

Loop avoidance

When the gateway forwards a DATA or QUERY message to the local REST API, it injects federated: true in the request headers. Athena (or any service receiving forwarded calls) inspects this header and suppresses re-federation, preventing infinite forwarding loops.

CLI Entry Points

Defined in xanaqu/main.py:

# Bare federate (no REST)
xanaqu coordinator
xanaqu worker --federate-id node-a

# Federate + REST gateway
xanaqu-gateway coordinator --coordinator --api-port 8000
xanaqu-gateway development --api-port 8001

Or via helper script with config files: ./scripts/run-local.sh {coordinator,development}.

Authentication

Surface Mechanism
Worker → Coordinator registration OAuth2 client-credentials via ServiceTokenProvider (set token_provider= on Worker)
Coordinator JWT validation TokenValidator reads JWTs against JWKS when federation_auth_enabled=True
RabbitMQ TLS via aio-pika. Set RABBITMQ_VERIFY=False for dev

Dependencies

dependencies = [
  "fastapi>=0.100.0",
  "aio-pika",
  "Authlib",                    # OAuth2 client-credentials
  "PyJWT[crypto]",              # JWKS validation
  "httpx",
  "pydantic>=2",
]

Note: xanadu does not depend on axonis-core for runtime — only for auth helpers and schema constants in some integrations. This keeps the federation layer thin and independently versionable.

Configuration

Config dataclass loaded by:

  • Config() — defaults
  • Config.from_env(prefix='XNQ_') — env vars (XNQ_RABBITMQ_HOST, XNQ_HEARTBEAT_INTERVAL, …)
  • Config.from_dict(data) — dict, ignoring unknown keys
  • config.merge(overrides) — returns new Config

The CLI uses Config.from_env() then merges CLI overrides.

Multi-broker coordinator vars

For workers connecting to a coordinator on a different RabbitMQ:

XNQ_COORDINATOR_HOST
XNQ_COORDINATOR_PORT
XNQ_COORDINATOR_USER
XNQ_COORDINATOR_PASSWORD
XNQ_COORDINATOR_VHOST
XNQ_COORDINATOR_SSL

SSO service-token vars

SSO_TOKEN_URL
SSO_CLIENT_ID
SSO_CLIENT_SECRET
SSO_VERIFY

When SSO_TOKEN_URL follows Keycloak's /protocol/openid-connect/token pattern, SSO_JWKS_URL (/certs) and SSO_ISSUER (realm URL) are derived automatically.

Non-XNQ_-prefixed fallbacks

FEDERATE_DOMAIN          # Local REST API for on_data/on_query forwarding
FEDERATE_PROTOCOL_V1     # Default scheme: https
XNQ_RMQ_USER             # Runtime credential override (note: RMQ, not RABBITMQ)
XNQ_RMQ_PASSWORD

Registration metadata (worker → coordinator)

Var Purpose
FEDERATE_UUID Stable UUID; defaults to federate_id
FEDERATE_PARTY_V{N} Party identifiers per API version (party.v1, party.v2, …)
FEDERATE_LOCATION_LATITUDE Geographic coordinate
FEDERATE_LOCATION_LONGITUDE Geographic coordinate
XNQ_RABBITMQ_EXTERNAL_HOST Advertised RabbitMQ host (defaults to internal)
XNQ_RABBITMQ_EXTERNAL_PORT Advertised RabbitMQ port

Testing

asyncio_mode = "auto" — async tests run without @pytest.mark.asyncio.

In-memory simulation infra (tests/simulation.py):

Class Replaces Purpose
SimulationBus shared RabbitMQ Connects multiple SimulationBroker instances
SimulationBroker Broker In-memory, drop-in replacement
SimulationStreamBroker StreamBroker In-memory streams
SimulationStream RabbitMQ Streams Append-only log + reader offsets
SimulationPeerBrokerManager PeerBrokerManager Multi-broker tests without real RMQ

Sync wrappers (tests/sync_coordinator.py, tests/sync_worker.py) run the async federates in a background thread for tests that can't use async/await.

Integration tests run against a real RabbitMQ:

docker compose -f docker-compose.test.yml up -d
uv run pytest tests/integration --integration --rabbitmq-config=config/development.yaml -v

Code Style

  • Empty __init__.py (just module docstrings).
  • No type hints in function signatures; dataclass fields are the exception.
  • Keep (object) in class definitions.
  • Delete old files on restructure — no re-export shims, no backwards-compat hacks.
  • Conventional commits (fix:, feat:, chore:, fix(scope):); python-semantic-release derives versions and changelogs.

Operational Notes

  • Windows: loop.add_signal_handler() raises NotImplementedError. Wrap in try/except.
  • YAML 1.1: PyYAML parses on/off/yes/no as booleans. Use the strict loader pattern from fedai-rest's resolver if accepting YAML config.
  • init=False dataclass fields: break from_dict / merge. Filter with f.init when iterating.
  • federation_auth_enabled: when set, the coordinator rejects unauthenticated registrations — make sure workers configure token_provider= before stop.

Federation Control Plane

Beyond the federate-to-federate messaging primitives above, xanadu hosts the federation control plane: the reliable, RabbitMQ-backed substrate that replaces the legacy point-to-point REST federation (Athena P2P registration + the Elasticsearch synchronization index + application-level retry/sync). The control plane delivers presence announcements, object replication, and topology discovery across a full mesh of federates, with delivery reliability provided by the broker rather than by application code.

  • #REQ.no-p2p-rest — federation control messages (presence, object sync, topology) flow over RabbitMQ. No point-to-point REST calls are made between federates for control-plane purposes.
  • #REQ.no-sync-index — reliability (retry, dead-lettering, redelivery) is provided by RabbitMQ infrastructure. The control plane does not maintain an application-level Elasticsearch synchronization index or application-level retry loop; the dead-letter queue replaces that role.
  • #REQ.full-mesh-discovery — every federate discovers every other federate via broadcast presence announcements. No central federate inventory must be queried point-to-point for a federate to learn the mesh.

Control-plane topology

The control plane runs on a dedicated federation vhost with its own exchange/queue layout, distinct from the fl.* worker messaging exchanges in #message-routing:

Resource Kind Role
<ns>.control fanout exchange broadcasts to all federates
<ns>.direct direct exchange targeted messages, routing key = federate domain
<ns>.dlx fanout exchange dead-letter exchange
<ns>.control.{domain} queue per-federate broadcast receiver (bound to <ns>.control)
<ns>.sync.{domain} queue per-federate object replication (bound to <ns>.direct, routing key = {domain})
<ns>.register.{domain} queue per-federate registration events
<ns>.dlq.{domain} queue failed messages (bound to <ns>.dlx)
  • #REQ.dlx-on-control-queues — every durable control/sync queue is declared with x-dead-letter-exchange pointing at the DLX and a message TTL (default 24h). Messages that fail processing are nacked without requeue and land on the DLQ rather than spinning in place.
  • #REQ.durable-control-topology — control-plane exchanges and queues are declared durable=true; control-plane messages are published persistent (delivery_mode=2).

broker_admin.py (the HTTP Management API client) provisions this topology; provisioning is idempotent and runs on federate startup.

Per-federate provisioning

When a federate is brought up or a remote federate joins, the control plane provisions broker resources for it through the Management API:

  • vhost per federation namespace (e.g. federation-{domain}).
  • user per federate (fed-{domain}) with a generated credential and configure/write/read permissions scoped to the control-plane resource pattern. Generated credentials are stored as a Kubernetes Secret (never inline in config).
  • #REQ.provision-on-registration — federate registration drives provisioning: local control-plane resources are ensured on startup, and a joining remote federate's upstream link is configured when its announce is received (or on explicit registration). Departure deprovisions the federate's upstream and per-federate resources.

Federation policies

The control plane applies broker policies (via the Management API) governing federation behavior:

  • Federation-upstream policy — the control exchange is federated across the upstream set so a broadcast on any federate's <ns>.control reaches the whole mesh.
  • HA policy — control-plane queues mirror across broker nodes when the broker is clustered (ha-mode: all, automatic sync).
  • TTL policy — control messages carry a short TTL (default 5 min) and unused control queues expire, so stale presence/control traffic self-cleans.

Upstream links

In multi-broker deployments each federate's broker links to remote federates' brokers as federation upstreams (#architecture.multi-broker is the peer-broker view of this). On join, an upstream entry is created from the remote federate's advertised broker URI (amqps://…, drawn from registration metadata — see #configuration.registration-metadata) and appended to the upstream set; ack-mode=on-confirm, trust-user-id=false. On departure the upstream is removed.

Control Messages

The control plane carries a typed control envelope, distinct from the worker Message of #http-envelope-forwarding. Each control message carries:

Field Purpose
type control message type (see below)
source_domain originating federate's domain
timestamp emission time
correlation_id UUID for end-to-end tracing
payload type-specific body
auth_context optional token + markings for downstream authorization

Control message types:

Type Meaning
federate.announce federate presence broadcast (domain, name, location, broker URI, capabilities)
federate.depart federate leaving the mesh
federate.heartbeat liveness signal
object.create / object.update / object.delete object replication across the mesh
topology.request / topology.response mesh-topology discovery
  • #REQ.control-correlation-id — every control message carries a correlation_id that is preserved across the broadcast/consume path so a single logical operation is traceable end-to-end.

Publish semantics

  • Broadcast — published to the fanout control exchange; every federate's control queue receives it. Used for federate.announce, federate.depart, and object.* sync.
  • Direct — published to the direct exchange with routing_key = target_domain; only the addressed federate's sync queue receives it.

Consume semantics

A control-plane consumer dispatches each message by type to a registered handler.

  • #REQ.skip-self — a federate ignores control messages whose source_domain equals its own domain (ack and drop), so fanout broadcasts don't loop back onto the originator.
  • #REQ.ack-or-dlq — a successfully handled message is acked; a handler that raises causes a nack without requeue, routing the message to the DLQ for inspection rather than reprocessing.
  • Consumers apply a bounded prefetch (QoS) so a slow handler cannot be flooded.

Object replication

object.create / object.update / object.delete replicate a local object change to the federation. The originating federate performs its local write first, then broadcasts the change (object type, uid, payload, version).

  • #REQ.replication-idempotent — a receiving federate compares the incoming object version against any local copy and skips application when the local version is greater than or equal to the incoming version, making replication idempotent and safe under redelivery.
  • #REQ.federated-from-marker — a replicated object is written locally with a federation provenance marker (uds.federated_from = <source_domain>) so federated copies are distinguishable from locally originated objects.
  • #REQ.federate-filter — only objects eligible for federation are broadcast; non-federated objects stay local.

Migration from REST Federation

The control plane supersedes the legacy REST federation path. Migration is gated by a federation-mode flag rather than a hard cutover:

  • #REQ.federation-mode-flag — a FEDERATION_MODE setting selects rest | rabbitmq | dual. In dual mode both the legacy REST path and the RabbitMQ control plane run, so message flow can be validated for equivalence before the REST path is removed per federate.

Migration sequence: provision RMQ resources → run the control-plane consumer in dual mode → validate parity with REST behavior → switch each federate to rabbitmq → remove REST federation code → delete the Elasticsearch synchronization index. End state: all control traffic flows over RMQ, zero P2P REST calls, no synchronization index, and new-federate registration provisions RMQ resources automatically.

Control-Plane Observability

  • Queue depth and consumer lag for control/sync queues are exported to Prometheus.
  • DLQ growth is alerted on — a rising DLQ signals systematic handler failures.
  • correlation_id is logged across publish and consume so a control operation can be traced through the mesh.

Open Items

  • No platform.service-contract conformance. Intentional — xanadu is an infrastructure service, not a tool host. It registers with Oracle with mcp_path="". Consider documenting the registration shape explicitly in component.oracle.gateway.
  • Package rename. xanaqu legacy name persists. A future major version could align package and repo names; consumers (titan, parallax) would need a coordinated bump.
  • Fusion message ownership resolved. Message and MessageType are owned by axonis-core at axonis.fusion (axonis-core/axonis/fusion.py); see parallax and component.titan.runtime "Shared protocol module". Xanadu carries the message verbatim regardless. Titan still has stale from protocol.messages import FusionMessage imports in titan/fusion/{orchestrator,broker,node_handler}.py that need to be repointed at axonis.fusion — tracked in titan, not blocking xanadu. Some consumer wheels (parallax, cortex, prism) ship a parallel axonis.protocol.FusionMessage surface; alignment to axonis.fusion.Message is open work on those consumers.

Depends on: platform.axonis-core, platform.service-contract

Realizes: product.fusion

Required by: component.titan.runtime