Xanadu — Federation Messaging
Purpose
Xanadu is the platform's federation messaging layer. It transports messages between Axonis nodes over RabbitMQ (aio-pika, TLS) and provides peer discovery, heartbeating, and lifecycle management.
It is not a platform.service-contract dual-interface service. It runs as a standalone process per node (coordinator or worker), exposes a thin REST gateway, and registers with Oracle with mcp_path="".
Naming exception: the repo is
xanadu; the Python package isxanaqu. The package name predates the repo rename. Consumers (titan, parallax) importxanaqu.*. platform.service-contract's package-name rule does not apply.
Architecture
Two deployment modes:
Single-broker mode (default)
┌─────────────┐ ┌─────────┐ ┌─────────┐
│ Coordinator │ │ Worker │ │ Worker │
└──────┬──────┘ └────┬────┘ └────┬────┘
│ │ │
└───────────────┴──────────────┘
│
┌──────▼──────┐
│ RabbitMQ │
│ (single) │
└─────────────┘
All federates connect to one RabbitMQ instance. Routing via shared exchanges.
Multi-broker mode
┌─────────────┐ ┌─────────┐ ┌─────────┐
│ Coordinator │ │ Worker │ │ Worker │
└──────┬──────┘ └────┬────┘ └────┬────┘
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ RabbitMQ │ │ RabbitMQ │ │ RabbitMQ │
│ (coord) │◄───────►│ (peer A) │◄──────►│ (peer B) │
└───────────┘ └───────────┘ └───────────┘
Each federate has its own RabbitMQ. PeerBrokerManager connects to peer brokers on demand. Workers advertise their broker endpoint in registration metadata. Direct peer messages route via the target's broker.
Package Structure
xanadu/
xanaqu/
__init__.py
main.py # CLI: xanaqu, xanaqu-gateway
config.py # Config dataclass — env, dict, merge loaders
auth.py # ServiceTokenProvider — OAuth2 client-credentials
token_validator.py # JWT validation via JWKS
registry.py # FederateInfo + heartbeat health checks
core/
federate.py # Federate (abstract)
coordinator.py # Coordinator implementation
worker.py # Worker implementation (default: StreamBroker)
messaging/
broker.py # Broker — RabbitMQ connection, exchanges, queues
stream_broker.py # StreamBroker — RabbitMQ Streams support
broker_endpoint.py # BrokerEndpoint — connection details
broker_admin.py # HTTP Management API client (provisioning)
peer_broker_manager.py # Lazy peer connections (multi-broker mode)
message.py # Message + factory methods
type.py # MessageType enum
serialization.py # Wire format
broker_errors.py # Exception hierarchy
api/
app.py # FastAPI factory
gateway.py # Gateway — wraps Federate with REST
schemas.py # Pydantic schemas
routes/
status.py # GET /health, GET /status
messages.py # POST /messages
federates.py # GET /federates
federation.py # POST /federate/{send,query,deregister}
proxy.py # Catch-all federation proxy (registered last)
chart/ # Helm chart
scripts/
tests/
conftest.py # SimulationBus, SimulationBroker fixtures
simulation.py # In-memory broker simulation infra
sync_coordinator.py
sync_worker.py
unittests/{messaging,core,api,config}/
integration/
Dockerfile
pyproject.toml
Federate Model
Federate is the abstract base. Two concrete implementations:
| Class | Default federate_id | Default broker | Purpose |
|---|---|---|---|
Coordinator |
always "coordinator" |
Broker |
Central registry, broadcasts, peer lifecycle |
Worker |
configurable | StreamBroker |
Compute node, peer-to-peer, scatter-gather |
Each federate owns:
- A unique federate_id (workers) or the constant "coordinator"
- A Broker (or StreamBroker) connected to its RabbitMQ
- Optional federation_groups — scope for peer visibility
- Optional metadata — registration payload (UUID, party identifiers, location, advertised broker endpoint)
Federate Lifecycle
- Federate registers with coordinator (
Type.REGISTER); registration includes broker endpoint metadata. - Coordinator sends
PEER_LISTof existing peers to the new federate. - Coordinator broadcasts
PEER_JOINEDto existing federates. - Federates can now call
send_to_peer(peer_id, payload)for direct messaging. - On missed heartbeats, coordinator marks the peer offline and broadcasts
PEER_STATUS_CHANGED. The federate record persists — restart and reactivation are non-destructive. - On explicit
deregister(), coordinator broadcastsPEER_LEFT. The federate record is removed permanently. Worker.stop()does not deregister. K8s pod restarts survive without destroying federate records.- After 3 consecutive missed heartbeat ACKs, the worker auto-reregisters with the coordinator.
REGISTER_REJECTEDfrom the coordinator halts the worker (_running = False).
Message Routing
| Pattern | Exchange | Direction |
|---|---|---|
| Broadcast | fl.broadcast (fanout) |
Coordinator → all workers |
| Point-to-point | fl.direct (direct) |
Either direction; routing key = coordinator or {federate_id} |
Worker queues:
- fl.worker.{id} — direct queue
- fl.worker.{id}.broadcast — broadcast queue
Federation Messaging — HTTP Envelope Forwarding
Two patterns for forwarding HTTP requests between federates via RabbitMQ.
Fire-and-forget CRUD (Type.DATA)
For create/update/delete operations where no response is needed.
worker.send_data(target_id, method, path, body)
worker.send_data_multi(target_ids, method, path, body)
worker.broadcast_data(method, path, body)
Receiver registers a callback:
worker.on_data(async fn(method, path, body, headers) -> None)
Scatter-gather queries (Type.QUERY_REQUEST / Type.QUERY_RESPONSE)
For read paths that need to aggregate responses from multiple federates.
responses = await worker.scatter_gather(
targets, # dict[federate_id, record_limit] OR list[federate_id]
method, path, body,
timeout=30,
record_quorum=None,
)
When targets is a dict keyed by federate_id with record-limit values, the quorum is auto-computed and responses are bounded. When targets is a list, no limits are applied.
Receiver registers a callback that returns the response payload:
worker.on_query(async fn(method, path, body, headers) -> response_payload)
Responses are matched by correlation_id; concurrent queries are isolated.
Federation groups
Federates declare federation_groups in metadata (config or XNQ_FEDERATION_GROUPS). The coordinator scopes PEER_LIST, PEER_JOINED, PEER_LEFT, and PEER_STATUS_CHANGED to peers within shared groups. scatter_gather() accepts a groups parameter to restrict targets.
REST Gateway (xanaqu/api/)
A FastAPI app wraps a Federate. Routes:
| Method | Path | File |
|---|---|---|
| GET | /health |
routes/status.py |
| GET | /status |
routes/status.py |
| POST | /messages |
routes/messages.py |
| GET | /federates |
routes/federates.py |
| POST | /federate/send |
routes/federation.py (202 Accepted, fire-and-forget) |
| POST | /federate/query |
routes/federation.py (scatter-gather with timeout) |
| POST | /federate/deregister |
routes/federation.py (permanent removal, distinct from heartbeat timeout) |
| any | catch-all | routes/proxy.py (registered last) |
Federation proxy
The catch-all proxy.py route lets any HTTP request opt into transparent federation by embedding a federation block in its JSON body:
{
"federation": {
"federates": {"worker-a": 100, "worker-b": 50}
},
"query": "..."
}
federatesas dict (federate_id → record_limit) → scatter-gather. Local + remote responses merged onhits/results.federationstatus map added.federatesas list → fire-and-forget. Local executes synchronously; remote targets receiveDATAmessages asynchronously. Local response returned immediately withfederationstatus map.
The proxy strips the federation block, runs the local request first, then routes to remote federates. The proxy router must be included last in create_app() to avoid shadowing named routes.
Loop avoidance
When the gateway forwards a DATA or QUERY message to the local REST API, it injects federated: true in the request headers. Athena (or any service receiving forwarded calls) inspects this header and suppresses re-federation, preventing infinite forwarding loops.
CLI Entry Points
Defined in xanaqu/main.py:
# Bare federate (no REST)
xanaqu coordinator
xanaqu worker --federate-id node-a
# Federate + REST gateway
xanaqu-gateway coordinator --coordinator --api-port 8000
xanaqu-gateway development --api-port 8001
Or via helper script with config files: ./scripts/run-local.sh {coordinator,development}.
Authentication
| Surface | Mechanism |
|---|---|
| Worker → Coordinator registration | OAuth2 client-credentials via ServiceTokenProvider (set token_provider= on Worker) |
| Coordinator JWT validation | TokenValidator reads JWTs against JWKS when federation_auth_enabled=True |
| RabbitMQ | TLS via aio-pika. Set RABBITMQ_VERIFY=False for dev |
Dependencies
dependencies = [
"fastapi>=0.100.0",
"aio-pika",
"Authlib", # OAuth2 client-credentials
"PyJWT[crypto]", # JWKS validation
"httpx",
"pydantic>=2",
]
Note: xanadu does not depend on axonis-core for runtime — only for auth helpers and schema constants in some integrations. This keeps the federation layer thin and independently versionable.
Configuration
Config dataclass loaded by:
Config()— defaultsConfig.from_env(prefix='XNQ_')— env vars (XNQ_RABBITMQ_HOST,XNQ_HEARTBEAT_INTERVAL, …)Config.from_dict(data)— dict, ignoring unknown keysconfig.merge(overrides)— returns new Config
The CLI uses Config.from_env() then merges CLI overrides.
Multi-broker coordinator vars
For workers connecting to a coordinator on a different RabbitMQ:
XNQ_COORDINATOR_HOST
XNQ_COORDINATOR_PORT
XNQ_COORDINATOR_USER
XNQ_COORDINATOR_PASSWORD
XNQ_COORDINATOR_VHOST
XNQ_COORDINATOR_SSL
SSO service-token vars
SSO_TOKEN_URL
SSO_CLIENT_ID
SSO_CLIENT_SECRET
SSO_VERIFY
When SSO_TOKEN_URL follows Keycloak's /protocol/openid-connect/token pattern, SSO_JWKS_URL (/certs) and SSO_ISSUER (realm URL) are derived automatically.
Non-XNQ_-prefixed fallbacks
FEDERATE_DOMAIN # Local REST API for on_data/on_query forwarding
FEDERATE_PROTOCOL_V1 # Default scheme: https
XNQ_RMQ_USER # Runtime credential override (note: RMQ, not RABBITMQ)
XNQ_RMQ_PASSWORD
Registration metadata (worker → coordinator)
| Var | Purpose |
|---|---|
FEDERATE_UUID |
Stable UUID; defaults to federate_id |
FEDERATE_PARTY_V{N} |
Party identifiers per API version (party.v1, party.v2, …) |
FEDERATE_LOCATION_LATITUDE |
Geographic coordinate |
FEDERATE_LOCATION_LONGITUDE |
Geographic coordinate |
XNQ_RABBITMQ_EXTERNAL_HOST |
Advertised RabbitMQ host (defaults to internal) |
XNQ_RABBITMQ_EXTERNAL_PORT |
Advertised RabbitMQ port |
Testing
asyncio_mode = "auto" — async tests run without @pytest.mark.asyncio.
In-memory simulation infra (tests/simulation.py):
| Class | Replaces | Purpose |
|---|---|---|
SimulationBus |
shared RabbitMQ | Connects multiple SimulationBroker instances |
SimulationBroker |
Broker |
In-memory, drop-in replacement |
SimulationStreamBroker |
StreamBroker |
In-memory streams |
SimulationStream |
RabbitMQ Streams | Append-only log + reader offsets |
SimulationPeerBrokerManager |
PeerBrokerManager |
Multi-broker tests without real RMQ |
Sync wrappers (tests/sync_coordinator.py, tests/sync_worker.py) run the async federates in a background thread for tests that can't use async/await.
Integration tests run against a real RabbitMQ:
docker compose -f docker-compose.test.yml up -d
uv run pytest tests/integration --integration --rabbitmq-config=config/development.yaml -v
Code Style
- Empty
__init__.py(just module docstrings). - No type hints in function signatures; dataclass fields are the exception.
- Keep
(object)in class definitions. - Delete old files on restructure — no re-export shims, no backwards-compat hacks.
- Conventional commits (
fix:,feat:,chore:,fix(scope):);python-semantic-releasederives versions and changelogs.
Operational Notes
- Windows:
loop.add_signal_handler()raisesNotImplementedError. Wrap intry/except. - YAML 1.1: PyYAML parses
on/off/yes/noas booleans. Use the strict loader pattern from fedai-rest's resolver if accepting YAML config. init=Falsedataclass fields: breakfrom_dict/merge. Filter withf.initwhen iterating.federation_auth_enabled: when set, the coordinator rejects unauthenticated registrations — make sure workers configuretoken_provider=before stop.
Federation Control Plane
Beyond the federate-to-federate messaging primitives above, xanadu hosts the federation control plane: the reliable, RabbitMQ-backed substrate that replaces the legacy point-to-point REST federation (Athena P2P registration + the Elasticsearch synchronization index + application-level retry/sync). The control plane delivers presence announcements, object replication, and topology discovery across a full mesh of federates, with delivery reliability provided by the broker rather than by application code.
- #REQ.no-p2p-rest — federation control messages (presence, object sync, topology) flow over RabbitMQ. No point-to-point REST calls are made between federates for control-plane purposes.
- #REQ.no-sync-index — reliability (retry, dead-lettering, redelivery) is provided by RabbitMQ infrastructure. The control plane does not maintain an application-level Elasticsearch synchronization index or application-level retry loop; the dead-letter queue replaces that role.
- #REQ.full-mesh-discovery — every federate discovers every other federate via broadcast presence announcements. No central federate inventory must be queried point-to-point for a federate to learn the mesh.
Control-plane topology
The control plane runs on a dedicated federation vhost with its own exchange/queue layout, distinct from the fl.* worker messaging exchanges in #message-routing:
| Resource | Kind | Role |
|---|---|---|
<ns>.control |
fanout exchange | broadcasts to all federates |
<ns>.direct |
direct exchange | targeted messages, routing key = federate domain |
<ns>.dlx |
fanout exchange | dead-letter exchange |
<ns>.control.{domain} |
queue | per-federate broadcast receiver (bound to <ns>.control) |
<ns>.sync.{domain} |
queue | per-federate object replication (bound to <ns>.direct, routing key = {domain}) |
<ns>.register.{domain} |
queue | per-federate registration events |
<ns>.dlq.{domain} |
queue | failed messages (bound to <ns>.dlx) |
- #REQ.dlx-on-control-queues — every durable control/sync queue is declared with
x-dead-letter-exchangepointing at the DLX and a message TTL (default 24h). Messages that fail processing arenacked without requeue and land on the DLQ rather than spinning in place. - #REQ.durable-control-topology — control-plane exchanges and queues are declared
durable=true; control-plane messages are published persistent (delivery_mode=2).
broker_admin.py (the HTTP Management API client) provisions this topology; provisioning is idempotent and runs on federate startup.
Per-federate provisioning
When a federate is brought up or a remote federate joins, the control plane provisions broker resources for it through the Management API:
- vhost per federation namespace (e.g.
federation-{domain}). - user per federate (
fed-{domain}) with a generated credential and configure/write/read permissions scoped to the control-plane resource pattern. Generated credentials are stored as a Kubernetes Secret (never inline in config). - #REQ.provision-on-registration — federate registration drives provisioning: local control-plane resources are ensured on startup, and a joining remote federate's upstream link is configured when its announce is received (or on explicit registration). Departure deprovisions the federate's upstream and per-federate resources.
Federation policies
The control plane applies broker policies (via the Management API) governing federation behavior:
- Federation-upstream policy — the control exchange is federated across the upstream set so a broadcast on any federate's
<ns>.controlreaches the whole mesh. - HA policy — control-plane queues mirror across broker nodes when the broker is clustered (
ha-mode: all, automatic sync). - TTL policy — control messages carry a short TTL (default 5 min) and unused control queues expire, so stale presence/control traffic self-cleans.
Upstream links
In multi-broker deployments each federate's broker links to remote federates' brokers as federation upstreams (#architecture.multi-broker is the peer-broker view of this). On join, an upstream entry is created from the remote federate's advertised broker URI (amqps://…, drawn from registration metadata — see #configuration.registration-metadata) and appended to the upstream set; ack-mode=on-confirm, trust-user-id=false. On departure the upstream is removed.
Control Messages
The control plane carries a typed control envelope, distinct from the worker Message of #http-envelope-forwarding. Each control message carries:
| Field | Purpose |
|---|---|
type |
control message type (see below) |
source_domain |
originating federate's domain |
timestamp |
emission time |
correlation_id |
UUID for end-to-end tracing |
payload |
type-specific body |
auth_context |
optional token + markings for downstream authorization |
Control message types:
| Type | Meaning |
|---|---|
federate.announce |
federate presence broadcast (domain, name, location, broker URI, capabilities) |
federate.depart |
federate leaving the mesh |
federate.heartbeat |
liveness signal |
object.create / object.update / object.delete |
object replication across the mesh |
topology.request / topology.response |
mesh-topology discovery |
- #REQ.control-correlation-id — every control message carries a
correlation_idthat is preserved across the broadcast/consume path so a single logical operation is traceable end-to-end.
Publish semantics
- Broadcast — published to the fanout control exchange; every federate's control queue receives it. Used for
federate.announce,federate.depart, andobject.*sync. - Direct — published to the direct exchange with
routing_key = target_domain; only the addressed federate's sync queue receives it.
Consume semantics
A control-plane consumer dispatches each message by type to a registered handler.
- #REQ.skip-self — a federate ignores control messages whose
source_domainequals its own domain (ack and drop), so fanout broadcasts don't loop back onto the originator. - #REQ.ack-or-dlq — a successfully handled message is
acked; a handler that raises causes anackwithout requeue, routing the message to the DLQ for inspection rather than reprocessing. - Consumers apply a bounded
prefetch(QoS) so a slow handler cannot be flooded.
Object replication
object.create / object.update / object.delete replicate a local object change to the federation. The originating federate performs its local write first, then broadcasts the change (object type, uid, payload, version).
- #REQ.replication-idempotent — a receiving federate compares the incoming object version against any local copy and skips application when the local version is greater than or equal to the incoming version, making replication idempotent and safe under redelivery.
- #REQ.federated-from-marker — a replicated object is written locally with a federation provenance marker (
uds.federated_from = <source_domain>) so federated copies are distinguishable from locally originated objects. - #REQ.federate-filter — only objects eligible for federation are broadcast; non-federated objects stay local.
Migration from REST Federation
The control plane supersedes the legacy REST federation path. Migration is gated by a federation-mode flag rather than a hard cutover:
- #REQ.federation-mode-flag — a
FEDERATION_MODEsetting selectsrest|rabbitmq|dual. Indualmode both the legacy REST path and the RabbitMQ control plane run, so message flow can be validated for equivalence before the REST path is removed per federate.
Migration sequence: provision RMQ resources → run the control-plane consumer in dual mode → validate parity with REST behavior → switch each federate to rabbitmq → remove REST federation code → delete the Elasticsearch synchronization index. End state: all control traffic flows over RMQ, zero P2P REST calls, no synchronization index, and new-federate registration provisions RMQ resources automatically.
Control-Plane Observability
- Queue depth and consumer lag for control/sync queues are exported to Prometheus.
- DLQ growth is alerted on — a rising DLQ signals systematic handler failures.
correlation_idis logged across publish and consume so a control operation can be traced through the mesh.
Open Items
- No platform.service-contract conformance. Intentional — xanadu is an infrastructure service, not a tool host. It registers with Oracle with
mcp_path="". Consider documenting the registration shape explicitly in component.oracle.gateway. - Package rename.
xanaqulegacy name persists. A future major version could align package and repo names; consumers (titan, parallax) would need a coordinated bump. - Fusion message ownership resolved.
MessageandMessageTypeare owned byaxonis-coreataxonis.fusion(axonis-core/axonis/fusion.py); see parallax and component.titan.runtime "Shared protocol module". Xanadu carries the message verbatim regardless. Titan still has stalefrom protocol.messages import FusionMessageimports intitan/fusion/{orchestrator,broker,node_handler}.pythat need to be repointed ataxonis.fusion— tracked in titan, not blocking xanadu. Some consumer wheels (parallax, cortex, prism) ship a parallelaxonis.protocol.FusionMessagesurface; alignment toaxonis.fusion.Messageis open work on those consumers.
Depends on: platform.axonis-core, platform.service-contract
Realizes: product.fusion
Required by: component.titan.runtime