Fusion
Definition
Fusion is parallax's federated entity-resolution process: given a Lens and record sets held by two or more independent parties, it discovers which records across parties refer to the same real-world entity — without raw PII ever crossing the wire. It is realized as a three-phase, signal-first protocol: Phase 1 exchanges only bucket-count signals, Phase 2 exchanges derived feature vectors for shared buckets only, Phase 2.5 generates capped candidate pairs, and Phase 3 scores them by consensus and clusters the results. A single execution is an immutable FusionRun audit record; its outputs are EntityMatch / Correlation (pairwise linkage) and EntityCluster (transitive group). The phase algorithms live in parallax; titan hosts the long-lived FusionWorker that runs them against each party's local data; xanadu carries the messages over RabbitMQ.
Wire envelope: parallax/parallax/ops/fusion/federation/messages.py (Message, MessageType). Run record:
parallax/parallax/ops/fusion/models/fusion_run.py:30 (compute) and the persisted FusionRun (UDS). Outputs persisted
via parallax/parallax/userspace/fusion.py.
Lifecycle
FusionRun.status: running → completed | partial | failed (partial when some federates are missing), with phase
booleans phase1_complete / phase2_complete / phase3_complete. The run-status response surfaces finer states
(pending / running / extracted / matched / completed / failed / cancelled); ExecutionMode is ad_hoc | scheduled |
reactive. Match/Correlation outputs have their own lifecycle (proposed → confirmed | rejected | stale | decayed |
deferred).
Journey through the code
POST /fusion/run (routes.py:112, FusionRunStartRequest) → commands.start_run → _do_extract → _do_full_run →
run_multi_fusion (pipeline.py:627), persisting a FusionRun via FusionRunUDS().create. The parallax ops pipeline
is extract → block → score → cluster (extractor.py, blocker.py, scoring/scorer.py,
algorithms/clustering.py UnionFind, algorithms/edge_pruning.py).
The federated protocol: titan's FusionWorker (titan/titan/fusion/startup.py:27) assembles a xanaqu Worker + a
FusionNodeHandler + a FusionBroker + a FusionNodeRegistry. The coordinator runs a FusionOrchestrator
(orchestrator.py:50) that sequences the phases (_phase_signals → _phase_psi → _phase_artifacts →
_phase_consensus → _phase_clustering). Each participant FusionNodeHandler (node_handler.py:33) imports
parallax's Message, routes by MessageType, and runs the phase compute over its local Dask DataFrame, returning
counts/vectors only — never records. The FusionBroker (broker.py:21) bridges to xanaqu: outbound via
worker.scatter_gather([node], "POST", "/fusion/message", …), inbound via worker.on_query →
Message.from_dict → node_handler.handle_message; local-node calls short-circuit the broker. Transport is RabbitMQ
(mutual TLS in prod). MessageTypes include FUSION_RUN_START/COMPLETE, FUSION_BLOCKING_REQUEST/RESPONSE,
FUSION_VECTORS_REQUEST/RESPONSE, FUSION_PSI_ROUND1/2_*, LENS_BINDING_REQUEST/RESPONSE, FUSION_NODE_REFUSED.
Data shape
FusionRun (fusion_run.py:30, "metadata only — no raw PII, no entity data"): required run_id, lens_id,
lens_version, execution_mode; computed started_at / completed_at, status, expected /
participating / missing_federates, phase booleans, total_candidates, total_matches, correlation_ids; a config
snapshot (threshold, null penalty, blocking strategy); provenance triggered_by, actor_id. Outputs: EntityMatch
(fusion.py:122) — correlation_id, entity_type, lens id/version, status, confidence, a stable entity_ref,
contributing_records (record refs, no field values), append-only source_lineage, composite blocking keys, a
SHA-256 blocking_key_hash, STANAG-2022 accuracy/credibility, attestation. EntityCluster (fusion.py:330) —
cluster_id, members (node id + record id), aggregate_confidence, pairwise_confidences, contributing_sources,
meets_evidence_threshold, contradictions. Storage: EntityMatch / EntityCluster / FusionRun → the data-fusion
index; Correlation → fusion_correlations (parallax/parallax/schema.py:21).
Invariants
- Raw data never leaves a party; only messages cross. Phase 1 is counts only; Phase 2 is one-way derived transforms
plus a local block index; PSI mode crosses only opaque masked integers. Enforced by
tests/federation/test_sovereignty.py(raw_records_transmitted == 0; the coordinator never accesses records). - The federated path equals the in-process path (
component.parallax.three-phase-protocol): the node handler andrun_three_phase_localrun the same parallax primitives, and results are identical. source_lineageis append-only;entity_refis stable across runs.- Cross-node id-field collisions fail loud unless
node_prefix = True. - Article-12 record-keeping — every run persists a FusionRun config snapshot.
Related products
product.lens— Fusion executes against a Lens; the orchestrator derives the match function and blocking key sets from the lens spec, and the FusionRun stampslens_id/lens_version.product.trained-model— the v2 scoring path resolves titan-side ranker and LLM-adapter models by registry key; titan also supplies the local Dask DataFrame each FusionWorker matches against.product.signal/product.block— outputs feed downstream cortex evidence Blocks and Signals via the Lens'soutput_semantics(evidence class, signal type).
Open questions
- Production orchestration surface — two exist: parallax's local
federation/hub.py(hub-spoke POC, two-ES) and titan'sFusionOrchestrator(xanadu/RabbitMQ, N-party). Code comments suggest titan+xanadu is production and hub.py is a local POC; the spec should say so definitively. correlation_idoverload — the wire dedup field and the resolved-correlation record field share a name.- Lens binding flow —
FUSION_RUN_STARTauto-binds by returning local field names, while the formalLensBinding/ review workflow lives in prism; the titan binding handler returns{}. The intended flow needs confirming. - Scheduled/reactive modes —
triggered_by = activation_idimplies an activation trigger (cortex Signals?) whose entry point is not present in parallax code.
Realized by: component.parallax.adi-integration, component.parallax.attestation-rationale, component.parallax.blocking-engine, component.parallax.cds-message-mapping, component.parallax.consensus-mission-service, component.parallax.consensus-session, component.parallax.continuous-fusion, component.parallax.coordination-ledger, component.parallax.correlation-persistence, component.parallax.counter-isr, component.parallax.decay-scheduler, component.parallax.derived-features, component.parallax.dissent, component.parallax.fusion-adi-integration, component.parallax.fusion-binding, component.parallax.fusion-governance-lifecycle, component.parallax.fusionmatch-model, component.parallax.island-robustness, component.parallax.local-persistence-adapter, component.parallax.local-quickstart, component.parallax.multi-contributor-combination, component.parallax.node-scaling, component.parallax.observation-reverse-index, component.parallax.primitives-framework, component.parallax.quorum, component.parallax.scoring-engine, component.parallax.service-interface, component.parallax.signal-queue-contract, component.parallax.three-phase-protocol, component.parallax.tracking-integration, component.parallax.vrs-rest-api, component.parallax.wire-message-families, component.prism.lens-families, component.titan.runtime, component.xanadu.messaging