Skip to content

Fusion

Definition

Fusion is parallax's federated entity-resolution process: given a Lens and record sets held by two or more independent parties, it discovers which records across parties refer to the same real-world entity — without raw PII ever crossing the wire. It is realized as a three-phase, signal-first protocol: Phase 1 exchanges only bucket-count signals, Phase 2 exchanges derived feature vectors for shared buckets only, Phase 2.5 generates capped candidate pairs, and Phase 3 scores them by consensus and clusters the results. A single execution is an immutable FusionRun audit record; its outputs are EntityMatch / Correlation (pairwise linkage) and EntityCluster (transitive group). The phase algorithms live in parallax; titan hosts the long-lived FusionWorker that runs them against each party's local data; xanadu carries the messages over RabbitMQ.

Wire envelope: parallax/parallax/ops/fusion/federation/messages.py (Message, MessageType). Run record: parallax/parallax/ops/fusion/models/fusion_run.py:30 (compute) and the persisted FusionRun (UDS). Outputs persisted via parallax/parallax/userspace/fusion.py.

Lifecycle

FusionRun.status: running → completed | partial | failed (partial when some federates are missing), with phase booleans phase1_complete / phase2_complete / phase3_complete. The run-status response surfaces finer states (pending / running / extracted / matched / completed / failed / cancelled); ExecutionMode is ad_hoc | scheduled | reactive. Match/Correlation outputs have their own lifecycle (proposed → confirmed | rejected | stale | decayed | deferred).

Journey through the code

POST /fusion/run (routes.py:112, FusionRunStartRequest) → commands.start_run_do_extract_do_full_runrun_multi_fusion (pipeline.py:627), persisting a FusionRun via FusionRunUDS().create. The parallax ops pipeline is extract → block → score → cluster (extractor.py, blocker.py, scoring/scorer.py, algorithms/clustering.py UnionFind, algorithms/edge_pruning.py).

The federated protocol: titan's FusionWorker (titan/titan/fusion/startup.py:27) assembles a xanaqu Worker + a FusionNodeHandler + a FusionBroker + a FusionNodeRegistry. The coordinator runs a FusionOrchestrator (orchestrator.py:50) that sequences the phases (_phase_signals_phase_psi_phase_artifacts_phase_consensus_phase_clustering). Each participant FusionNodeHandler (node_handler.py:33) imports parallax's Message, routes by MessageType, and runs the phase compute over its local Dask DataFrame, returning counts/vectors only — never records. The FusionBroker (broker.py:21) bridges to xanaqu: outbound via worker.scatter_gather([node], "POST", "/fusion/message", …), inbound via worker.on_queryMessage.from_dictnode_handler.handle_message; local-node calls short-circuit the broker. Transport is RabbitMQ (mutual TLS in prod). MessageTypes include FUSION_RUN_START/COMPLETE, FUSION_BLOCKING_REQUEST/RESPONSE, FUSION_VECTORS_REQUEST/RESPONSE, FUSION_PSI_ROUND1/2_*, LENS_BINDING_REQUEST/RESPONSE, FUSION_NODE_REFUSED.

Data shape

FusionRun (fusion_run.py:30, "metadata only — no raw PII, no entity data"): required run_id, lens_id, lens_version, execution_mode; computed started_at / completed_at, status, expected / participating / missing_federates, phase booleans, total_candidates, total_matches, correlation_ids; a config snapshot (threshold, null penalty, blocking strategy); provenance triggered_by, actor_id. Outputs: EntityMatch (fusion.py:122) — correlation_id, entity_type, lens id/version, status, confidence, a stable entity_ref, contributing_records (record refs, no field values), append-only source_lineage, composite blocking keys, a SHA-256 blocking_key_hash, STANAG-2022 accuracy/credibility, attestation. EntityCluster (fusion.py:330) — cluster_id, members (node id + record id), aggregate_confidence, pairwise_confidences, contributing_sources, meets_evidence_threshold, contradictions. Storage: EntityMatch / EntityCluster / FusionRun → the data-fusion index; Correlationfusion_correlations (parallax/parallax/schema.py:21).

Invariants

  • Raw data never leaves a party; only messages cross. Phase 1 is counts only; Phase 2 is one-way derived transforms plus a local block index; PSI mode crosses only opaque masked integers. Enforced by tests/federation/test_sovereignty.py (raw_records_transmitted == 0; the coordinator never accesses records).
  • The federated path equals the in-process path (component.parallax.three-phase-protocol): the node handler and run_three_phase_local run the same parallax primitives, and results are identical.
  • source_lineage is append-only; entity_ref is stable across runs.
  • Cross-node id-field collisions fail loud unless node_prefix = True.
  • Article-12 record-keeping — every run persists a FusionRun config snapshot.
  • product.lens — Fusion executes against a Lens; the orchestrator derives the match function and blocking key sets from the lens spec, and the FusionRun stamps lens_id / lens_version.
  • product.trained-model — the v2 scoring path resolves titan-side ranker and LLM-adapter models by registry key; titan also supplies the local Dask DataFrame each FusionWorker matches against.
  • product.signal / product.block — outputs feed downstream cortex evidence Blocks and Signals via the Lens's output_semantics (evidence class, signal type).

Open questions

  • Production orchestration surface — two exist: parallax's local federation/hub.py (hub-spoke POC, two-ES) and titan's FusionOrchestrator (xanadu/RabbitMQ, N-party). Code comments suggest titan+xanadu is production and hub.py is a local POC; the spec should say so definitively.
  • correlation_id overload — the wire dedup field and the resolved-correlation record field share a name.
  • Lens binding flowFUSION_RUN_START auto-binds by returning local field names, while the formal LensBinding / review workflow lives in prism; the titan binding handler returns {}. The intended flow needs confirming.
  • Scheduled/reactive modestriggered_by = activation_id implies an activation trigger (cortex Signals?) whose entry point is not present in parallax code.

Realized by: component.parallax.adi-integration, component.parallax.attestation-rationale, component.parallax.blocking-engine, component.parallax.cds-message-mapping, component.parallax.consensus-mission-service, component.parallax.consensus-session, component.parallax.continuous-fusion, component.parallax.coordination-ledger, component.parallax.correlation-persistence, component.parallax.counter-isr, component.parallax.decay-scheduler, component.parallax.derived-features, component.parallax.dissent, component.parallax.fusion-adi-integration, component.parallax.fusion-binding, component.parallax.fusion-governance-lifecycle, component.parallax.fusionmatch-model, component.parallax.island-robustness, component.parallax.local-persistence-adapter, component.parallax.local-quickstart, component.parallax.multi-contributor-combination, component.parallax.node-scaling, component.parallax.observation-reverse-index, component.parallax.primitives-framework, component.parallax.quorum, component.parallax.scoring-engine, component.parallax.service-interface, component.parallax.signal-queue-contract, component.parallax.three-phase-protocol, component.parallax.tracking-integration, component.parallax.vrs-rest-api, component.parallax.wire-message-families, component.prism.lens-families, component.titan.runtime, component.xanadu.messaging