Skip to content

Three-Phase Federated Screening Protocol

Status & scope

  • Stage: IMPLEMENTED
  • Module: parallax/ops/fusion/federation/three_phase.py (in-process pipeline), wire.py (per-node / coordinator decomposition for the federated wire), hub.py (hub-spoke orchestration), psi.py (DH-PSI)
  • Milestone: M6 (Federation Protocol)

1. Purpose

Federated entity resolution requires comparing records across organisational boundaries. The naive approach — sharing raw data — violates data sovereignty and privacy regulations (GDPR Article 5, EU AI Act Article 12). The three-phase protocol solves this through progressive disclosure: each phase reveals strictly less information than the next, and no phase ever transmits raw PII.

Design principles:

  • Privacy by design. Raw PII never leaves its source node. Only derived, one-way-transformed values cross the wire.
  • Progressive disclosure. Phase 1 reveals counts only. Phase 2 reveals derived feature vectors for candidates only. Phase 3 reveals confidence scores only.
  • Minimum necessary. Each phase transmits the least information required for the next phase to proceed. Records not in shared buckets are never exchanged.
  • Equivalence. The three-phase pipeline produces identical matches to the single-phase run_fusion() pipeline for the same input data. Privacy costs nothing in accuracy.

2. Protocol Overview

The protocol runs in three sequential phases. In production, each phase is a POST/response over HTTPS with mutual TLS (mTLS). Locally, phases are simulated with function calls.

Node A (VRS)                                   Node B (Firm)
    │                                               │
    │  ── Phase 1: Bucket Signals ──────────────►   │
    │  {composite_key: record_count}                │
    │  ◄──────────────────────────────────────────  │
    │  {composite_key: record_count}                │
    │                                               │
    │  Shared buckets identified                    │
    │                                               │
    │  ── Phase 2: Targeted Vectors ────────────►   │
    │  derived feature vectors (shared buckets only)│
    │  ◄──────────────────────────────────────────  │
    │  derived feature vectors (shared buckets only)│
    │                                               │
    │  ── Phase 3: Consensus Scoring ───────────►   │
    │  confidence scores + per-field scores         │
    │  ◄──────────────────────────────────────────  │
    │                                               │

Phase 1: Bucket Signals

Both nodes compute blocking keys (per component.parallax.blocking-engine) and exchange counts only{composite_key: record_count}. No entity IDs, no features, no PII. Purpose: identify shared buckets and estimate overlap. Records whose blocking keys have no counterpart on the other side are excluded from all subsequent phases.

Phase 2: Targeted Vectors

Only records in shared buckets exchange derived feature vectors. Derivation applies one-way transforms (soundex, year extraction, SHA-256 hashing, etc. per component.parallax.derived-features) to produce vectors that enable matching without exposing raw PII. Purpose: enable scoring without raw data.

Phase 3: Consensus Scoring

Derived vector pairs are scored using the derived match function. The output is confidence scores and per-field score breakdowns — no PII, no feature vectors. Purpose: identify matches above threshold.

3. Phase Functions

phase1_bucket_signals()

def phase1_bucket_signals(
    node_records: dict[str, list[dict]],
    blocking_key_sets: list[list[str]],
    id_field: str = "local_id",
) -> dict[str, dict[str, int]]:

Computes bucket signals for each node. Returns {node_id: {composite_bucket_key: record_count}}. Values are integer counts only — no entity IDs, no features.

Internally calls generate_blocking_keys() (component.parallax.blocking-engine) per blocking key set and aggregates counts per composite key.

_find_shared_buckets()

def _find_shared_buckets(
    signals: dict[str, dict[str, int]],
) -> dict[tuple[str, str], set[str]]:

From bucket signals, finds which bucket keys are shared between each node pair. Returns {(node_a, node_b): {shared_key_1, shared_key_2, ...}}. Plaintext set intersection — both sides see all keys.

_find_shared_buckets_psi()

def _find_shared_buckets_psi(
    signals: dict[str, dict[str, int]],
) -> tuple[dict[tuple[str, str], set[str]], int]:

Same functional result as _find_shared_buckets(), but uses DH-PSI (see Section 5) so neither side learns the other's non-matching bucket keys. Returns ({(node_a, node_b): shared_keys}, psi_ops).

phase2_targeted_vectors()

def phase2_targeted_vectors(
    node_records: dict[str, list[dict]],
    shared_buckets: dict[tuple[str, str], set[str]],
    match_function: list[dict],
    blocking_key_sets: list[list[str]],
    id_field: str = "local_id",
) -> tuple[
    dict[tuple[str, str], tuple[list[dict], list[dict]]],  # pair_filtered (raw)
    dict[tuple[str, str], tuple[list[dict], list[dict]]],  # pair_vectors (derived)
    int,                                                  # vectors_sent
    int,                                                  # vectors_total
]:

For each node pair with shared buckets:

  1. Filters to records in shared buckets only (via _records_in_buckets())
  2. Derives features using one-way transforms (via derive_features() from component.parallax.derived-features)
  3. Returns both the shared-bucket raw records (for Phase 2.5 blocking) and the derived vectors

Returns: - pair_filtered: {(node_a, node_b): (raw_filtered_a, raw_filtered_b)} — shared-bucket raw records, consumed by Phase 2.5 - pair_vectors: {(node_a, node_b): (derived_vectors_a, derived_vectors_b)} - vectors_sent: number of derived vectors exchanged - vectors_total: total records across all nodes

phase25_candidates()

def phase25_candidates(
    pair_filtered: dict[tuple[str, str], tuple[list[dict], list[dict]]],
    blocking_key_sets: list[list[str]],
    max_block_size: int,
    id_field: str = "local_id",
) -> dict[tuple[str, str], list[tuple[str, str]]]:

Phase 2.5 sits between Phase 2 and Phase 3. It re-runs the lens-declared blocking passes on the shared-bucket records from Phase 2 and lets generate_candidates() (component.parallax.blocking-engine) drop buckets whose cartesian product exceeds max_block_size. This brings the three-phase blocking semantics in line with run_fusion_derived (the §8 equivalence target) and honors the max_block_size contract from §13.

phase3_consensus()

def phase3_consensus(
    pair_candidates: dict[tuple[str, str], list[tuple[str, str]]],
    pair_vectors: dict[tuple[str, str], tuple[list[dict], list[dict]]],
    derived_match_function: list[dict],
    threshold: float = 0.70,
    null_penalty: float = 0.1,
    id_field: str = "local_id",
    node_prefix: bool = False,
) -> tuple[list[FusionMatch], list[ScoredPair]]:

Scores the Phase 2.5 candidate pairs against the Phase 2 derived vectors using score_pair_derived(). Returns (matches, all_scored) where matches are pairs above threshold, sorted by confidence descending. When node_prefix is True, FusionMatch IDs are emitted as f"{node_id}:{local_id}" for safe N-party clustering.

run_three_phase_local()

def run_three_phase_local(
    nodes: dict[str, list[dict]],
    lens_path: str | None = None,
    spec: Any = None,
    threshold: float = 0.70,
    null_penalty: float = 0.1,
    id_field: str = "local_id",
    field_maps: dict[str, dict[str, str]] | None = None,
    max_block_size: int = 200,
    entity_registry: Any = None,
    use_psi: bool = False,
    node_prefix: bool = False,
) -> ThreePhaseResult:

Full local POC pipeline. Orchestrates all three phases:

  1. Parses lens (if spec not provided)
  2. Normalizes all node records
  3. Builds match function and blocking key sets from first node's data
  4. Builds derived match function
  5. Phase 1: bucket signals
  6. Shared bucket discovery (plaintext or PSI depending on use_psi)
  7. Phase 2: targeted derived vectors
  8. Phase 3: consensus scoring

Requires at least 2 nodes (raises ValueError otherwise).

ThreePhaseResult

@dataclass
class ThreePhaseResult:
    matches: list[FusionMatch]
    total_candidates: int
    total_pairs_possible: int
    all_scored: list[ScoredPair]
    phase1_signals: dict
    phase2_vectors_sent: int
    phase2_vectors_total: int
    entity_registry: Any
    psi_enabled: bool
    psi_ops: int
    per_pair_candidates: dict[tuple[str, str], int]

The in-process pipeline above and the federated wire run share one implementation: three_phase.py delegates each phase to the node-side and coordinator-side primitives in parallax/ops/fusion/federation/wire.py (node_bucket_signals, shared_buckets, node_phase2_artifacts, coordinator_candidates, score_consensus). titan's FusionNodeHandler / FusionOrchestrator call those same primitives over the wire, so the in-process and federated paths are equivalent by construction (§8).

4. Hub-Spoke Architecture

The hub-spoke model specialises the three-phase protocol for the VRS screening use case.

  • Node A (hub) = SC Central (authority). Owns VRS registry data, audit trail, consent enforcement, output gating, vulnerability code lookup, and conflict detection.
  • Node B (spoke) = firm. Thin node — ES + engine instance only. Submits customer data for screening.

run_hub_screening()

def run_hub_screening(
    node_a_records: list[dict],
    node_b_records: list[dict],
    vrs_payloads: dict[str, dict],
    lens_path: str | None = None,
    spec: Any = None,
    threshold: float = 0.50,
    null_penalty: float = 0.1,
    use_psi: bool = False,
    consent_policy_a: ConsentPolicy | None = None,
    consent_policy_b: ConsentPolicy | None = None,
    match_fields: list[str] | None = None,
    screening_purpose: str = "internal_compliance",
    actor_id: str = "system",
) -> HubResult:

Orchestrates the full vertical stack on Node A:

  1. Consent check — filter records without consent (pre-pipeline)
  2. Create FusionRun audit record
  3. Three-phase matching (with optional PSI)
  4. Output assembly with Type A/B permission gating + purpose consent
  5. Conflict detection — reclassify 1-to-many matches
  6. Complete FusionRun with results

Output Gating: Type A vs Type B

Output gating is enforced on Node A. The firm never sees raw VRS data.

Field Type A Type B
match_status Yes Yes
confidence Yes Yes
vulnerability_codes Yes (codes only) Yes
vulnerability_count Yes Yes
decline_credit Yes Yes
registration_type Yes Yes
vulnerability_details No (empty list) Yes (full: code, name, description, fca_driver, outcomes)

The screening_purpose parameter (e.g. "internal_compliance", "regulated_third_party_disclosure") is checked against each VRS record's permitted_purposes field (pipe-delimited string). If the screening purpose is not in the person's permitted purposes, that match is suppressed — the _assemble_gated_output() function returns None and the match is excluded from results. The purpose_denied_count on HubResult tracks how many matches were suppressed.

Match Classification

Matches are classified using per-field scores and overall confidence:

Status Criteria
confirmed All match fields >= 0.85 AND confidence >= 0.95
probable Confidence >= 0.70
no_match Confidence < 0.70
conflict 1-to-many: one customer matches multiple VRS records, or one VRS record matches multiple customers

Classification thresholds:

CONFIRMED_THRESHOLD = 0.95
PROBABLE_THRESHOLD = 0.70
FIELD_MATCH_THRESHOLD = 0.85

Conflict Detection

The _detect_conflicts() function reclassifies matches where: - One customer ID maps to multiple VRS IDs (1-to-many on spoke side) - One VRS ID maps to multiple customer IDs (1-to-many on hub side)

All matches involved in a 1-to-many mapping are reclassified as "conflict". Only confirmed and probable matches participate in conflict detection — no_match entries are ignored.

HubResult

@dataclass
class HubResult:
    matches: list[GatedMatch]
    fusion_run: FusionRun | None
    consent_result_a: ConsentCheckResult | None
    consent_result_b: ConsentCheckResult | None
    three_phase: ThreePhaseResult | None
    confirmed_count: int
    probable_count: int
    conflict_count: int
    no_match_count: int
    purpose_denied_count: int
    total_screened: int
    total_vrs: int

GatedMatch

@dataclass
class GatedMatch:
    customer_id: str
    vrs_id: str
    match_status: str           # confirmed | probable | conflict | no_match
    confidence: float
    matched_fields: list[str]
    conflicted_fields: list[str]
    permission_type: str        # A or B
    vulnerability_codes: list[str]
    vulnerability_count: int
    decline_credit: str
    registration_type: str
    vulnerability_details: list[dict]   # [] for Type A

5. DH-PSI Integration

An optional privacy enhancement for Phase 1. When use_psi=True, shared bucket discovery uses Diffie-Hellman Private Set Intersection instead of plaintext set comparison. Neither side learns the other's non-matching bucket keys or counts.

Protocol

Uses RFC 3526 Group 14 (2048-bit MODP safe prime). Two rounds, three messages:

Round 1: Each side masks elements with their secret exponent
         A sends: {H(x_i)^a}  →  B
         B sends: {H(y_j)^b}  →  A

Round 2: Each side double-masks the received set with their own secret
         B sends: {H(x_i)^(ab)}  →  A
         A sends: {H(y_j)^(ab)}  →  B

Result:  H(x_i)^(ab) == H(y_j)^(ab)  iff  x_i == y_j

Privacy guarantees: - Neither side learns anything about non-intersecting elements - Masked values are computationally indistinguishable from random - Double-masked values only match for equal inputs - No raw blocking keys or derived features are revealed

Important limitation: PSI does NOT work through CDG (opaque integers). The protocol requires the actual string values to hash into the group.

PSI Functions

PsiNode

class PsiNode:
    def __init__(self, node_id: str): ...
    def mask(self, elements: list[str]) -> list[int]: ...
    def double_mask(self, received: list[int]) -> list[int]: ...

Each node holds a secret exponent (random in [2, p-2]). mask() computes H(element)^secret mod p. double_mask() raises already-masked values to the node's secret: (H(x)^a)^b = H(x)^(ab).

psi_intersect()

def psi_intersect(
    elements_a: list[str],
    elements_b: list[str],
) -> PsiResult:

Runs the full DH-PSI protocol locally. Returns PsiResult with intersection from both sides' perspectives.

PsiResult

@dataclass
class PsiResult:
    intersection_a: list[str]   # Elements from side A in intersection
    intersection_b: list[str]   # Elements from side B in intersection
    size: int                   # |intersection|
    set_a_size: int             # |A|
    set_b_size: int             # |B|
    rounds: int = 2             # Protocol rounds

psi_find_shared_buckets()

def psi_find_shared_buckets(
    node_a_keys: dict[str, int],
    node_b_keys: dict[str, int],
) -> set[str]:

Integration helper for the three-phase protocol. Wraps psi_intersect() to find shared blocking bucket keys between two nodes.

psi_filter_candidates()

def psi_filter_candidates(
    records_a: list[dict],
    records_b: list[dict],
    blocking_key_sets: list[list[str]],
    id_field: str = "local_id",
) -> tuple[list[dict], list[dict], int]:

Uses PSI to filter to candidate records across all blocking passes. Returns (filtered_a, filtered_b, psi_ops).

Hash-to-Group

def _hash_to_group(element: str) -> int:

Maps a string to a group element in [2, p-2] using SHA-256, then reduces mod (p-2) and adds 2 to stay in the safe range (avoiding 0 and 1).

Performance

Uses gmpy2.powmod (a hard dependency) for compiled modular exponentiation — ~50x faster than the Python built-in pow.

Consent is enforced at three layers, each progressively narrower.

Before the three-phase protocol begins, records are filtered by a ConsentPolicy. Records without consent are excluded before blocking even starts.

@dataclass(frozen=True)
class ConsentPolicy:
    lens_id: str
    federate_id: str
    consent_field: str = "consent_fusion"
    consent_values: tuple[str, ...] = ("yes", "true", "1", "granted")
    default_consent: ConsentDefault = ConsentDefault.DENY
    log_denials: bool = True

ConsentDefault.DENY (opt-in model): missing consent field means no consent. ConsentDefault.ALLOW (opt-out model): missing consent field means implicit consent.

def filter_by_consent(
    records: list[dict],
    policy: ConsentPolicy,
    id_field: str = "local_id",
) -> ConsentCheckResult:

Returns ConsentCheckResult with eligible and excluded record lists, excluded_ids, total_checked, and consent_rate.

Each VRS record has permitted_purposes (pipe-delimited string, e.g. "internal_compliance|regulated_third_party_disclosure"). The hub checks the firm's screening_purpose against this list. If the purpose is not permitted, the match is suppressed — the person did not consent to this type of disclosure.

Even for consented matches, the permission type controls disclosure depth. Type A returns codes and counts only. Type B returns full vulnerability details. This is set at VRS registration time and enforced on Node A.

def check_record_consent(record: dict, policy: ConsentPolicy) -> bool:

Checks a single record against the policy. Returns True if eligible.

def validate_correlation_consent(
    record_ids: list[str],
    consent_results: dict[str, ConsentCheckResult],
) -> list[str]:

Final check before creating a CorrelationRecord. Returns list of record IDs that lack consent.

7. FusionRun Audit Trail

Every screening run creates an immutable FusionRun audit record for Article 12 record-keeping. Contains metadata only — no raw PII, no entity data.

@dataclass(frozen=True)
class FusionRun:
    run_id: str
    lens_id: str
    lens_version: str
    execution_mode: str              # "ad_hoc" | "scheduled" | "reactive"
    started_at: str
    completed_at: str
    status: str                      # "running" | "completed" | "failed" | "partial"
    expected_federates: tuple[str, ...]
    participating_federates: tuple[str, ...]
    missing_federates: tuple[str, ...]
    phase1_complete: bool
    phase2_complete: bool
    phase3_complete: bool
    total_candidates: int
    total_matches: int
    correlation_ids: tuple[str, ...]
    threshold: float
    null_penalty: float
    blocking_strategy: str
    triggered_by: str                # activation_id or "manual"
    actor_id: str

The frozen dataclass guarantees immutability. Completion is handled by creating a new instance via dataclasses.replace():

def create_fusion_run(
    run_id: str, lens_id: str, lens_version: str, execution_mode: str,
    expected_federates: tuple[str, ...], threshold: float = 0.70,
    null_penalty: float = 0.1, blocking_strategy: str = "hash",
    triggered_by: str = "manual", actor_id: str = "",
) -> FusionRun:

def complete_fusion_run(
    run: FusionRun, correlation_ids: tuple[str, ...],
    participating_federates: tuple[str, ...],
    missing_federates: tuple[str, ...] = (),
    total_candidates: int = 0, total_matches: int = 0,
    phase1_complete: bool = True, phase2_complete: bool = True,
    phase3_complete: bool = True,
) -> FusionRun:

Status is set to "partial" if missing_federates is non-empty, otherwise "completed".

Serialization: fusion_run_to_dict() and fusion_run_from_dict() for ES/JSON persistence.

8. Equivalence Guarantee

The three-phase pipeline produces identical matches to the single-phase run_fusion_derived() pipeline for the same input data. This is proven by test:

  • TestTwoNodeEquivalence.test_same_matches_at_050 — same match pairs at threshold 0.50
  • TestTwoNodeEquivalence.test_same_matches_at_070 — same match pairs at threshold 0.70
  • TestHubWithPsi.test_psi_produces_same_matches — PSI mode produces same matches as plaintext

The equivalence holds because: 1. The same normalization, blocking, and derived feature transforms are applied 2. The same scoring function (score_pair_derived) is used 3. Bucket filtering only eliminates pairs that would score below threshold anyway (no shared blocking key means no matching features)

9. Data Flow Summary

Phase Direction Content NOT Included
1 Both Blocking key counts ({composite_key: int}) No entity IDs, no features, no PII
2 Both Derived feature vectors (one-way transforms) No raw PII, no records outside shared buckets
3 Hub → Spoke Confidence scores + per-field scores No PII, no feature vectors

Wire Protocol (Production)

In production each phase is an axonis.fusion.Message exchanged over xanadu (RabbitMQ). titan's FusionOrchestrator (coordinator) sends phase requests to titan FusionNodeHandler nodes, which run the per-node compute from parallax.ops.fusion.federation.wire.

Phase 1:      FUSION_BLOCKING_REQUEST → FUSION_BLOCKING_RESPONSE  {bucket_signals: {key: count}}
Phase 1-PSI:  FUSION_PSI_ROUND1/2_REQUEST → ..._RESPONSE — optional DH-PSI variant (use_psi);
              only opaque masked integers cross the wire, the intersector returns shared keys
Phase 2:      FUSION_VECTORS_REQUEST  → FUSION_VECTORS_RESPONSE   {block_index, derived_vectors}
Phase 3:      scored on the coordinator from the Phase 2 artifacts; result rides FUSION_RUN_COMPLETE

axonis.fusion.MessageType is the canonical envelope contract. component.parallax.wire-protocol-adapters (superseded) holds the deployment-transport analysis (CDG / mTLS / Xanadu).

10. Test Coverage

tests/test_three_phase.py

Test Class What It Proves
TestPhase1BucketSignals Phase 1 returns counts only — no entity IDs, no raw PII leaked into signals
TestPhase2TargetedVectors Phase 2 exchanges derived vectors only for shared-bucket records — no raw PII in vectors, fewer vectors than total records
TestTwoNodeEquivalence Three-phase produces identical matches to single-phase at 0.50 and 0.70 thresholds; TP >= 18, FP == 0; P018 false positive rejected; vectors sent < total
TestFiveNodeThreePhase Multi-node (5-node) protocol produces matches and Phase 1 signals for all nodes
TestPartialFailure Dropping one node (4/5) produces a subset of full results — no false matches from partial failure
TestEdgeCases Minimum 2 nodes enforced; disjoint data produces 0 matches; 3-node split works

tests/test_hub.py

Test Class What It Proves
TestHubScreening Full vertical stack: returns HubResult with GatedMatch objects, TP >= 16, FP == 0, P018 rejected, FusionRun created with correlation IDs, counts consistent
TestOutputGating Type A has codes but no details; Type B has codes and full details with required fields; vulnerability count matches codes
TestConsentFiltering DENY default filters all records without consent field; ALLOW default passes all; no policy passes all
TestPurposeConsent internal_compliance returns all matches; regulated_third_party_disclosure suppresses Type A matches; unknown purpose suppresses all matches
TestHubWithPsi PSI mode produces same matches as plaintext mode
TestClassification Confirmed/probable/no_match classification logic per threshold rules
TestConflictDetection 1-customer-to-many-VRS and 1-VRS-to-many-customers both reclassified as conflict; unique matches unaffected; no_match not conflicted

11. Files

File Purpose
parallax/ops/fusion/federation/three_phase.py Three-phase in-process pipeline — phase functions, run_three_phase_local
parallax/ops/fusion/federation/wire.py Per-node / coordinator phase primitives for the federated wire
parallax/ops/fusion/federation/hub.py Hub-spoke orchestrator — consent, matching, output gating, conflict detection
parallax/ops/fusion/federation/psi.py DH-PSI implementation — PsiNode, psi_intersect, integration helpers
parallax/ops/fusion/models/consent.py Consent model — ConsentPolicy, filter_by_consent, validate_correlation_consent
parallax/ops/fusion/models/fusion_run.py FusionRun audit model — immutable frozen dataclass, create/complete lifecycle
tests/test_three_phase.py Three-phase protocol tests (18 tests)
tests/test_hub.py Hub-spoke orchestrator tests

12. Dependencies

Spec Relationship
component.parallax.blocking-engine (Blocking Engine) Phase 1 uses generate_blocking_keys() for bucket computation
component.parallax.scoring-engine (Scoring Engine) Phase 3 uses scoring primitives (metrics, weighted aggregate)
component.parallax.derived-features (Derived Features) Phase 2 uses derive_features() and build_derived_match_function() for one-way transforms

13. Convenience-wrapper API (run_fusion, run_multi_fusion)

parallax/ops/fusion/pipeline.py exposes the single-call API used by Themis benchmarks, integration tests, and in-process demos. These wrappers compose blocking + scoring + (optionally) DH-PSI and UnionFind clustering, returning a FusionResult or MultiFusionResult instead of the structured ThreePhaseResult returned by §3.

def run_fusion(
    node_a: list[dict],
    node_b: list[dict],
    lens_path: str | None = None,
    threshold: float = 0.70,
    null_penalty: float = 0.1,
    id_field: str = "local_id",
    field_map_a: dict[str, str] | None = None,
    field_map_b: dict[str, str] | None = None,
    spec: Any = None,
    privacy: str | None = None,      # None | "psi" | "three_phase"
    max_block_size: int = 200,
) -> FusionResult: ...

def run_multi_fusion(
    nodes: dict[str, list[dict]],
    lens_path: str | None = None,
    threshold: float = 0.70,
    null_penalty: float = 0.1,
    id_field: str = "local_id",
    field_maps: dict[str, dict[str, str]] | None = None,
    spec: Any = None,
    privacy: str | None = None,
    max_block_size: int = 200,
) -> MultiFusionResult: ...

privacy modes

_PRIVACY_MODES = (None, "psi", "three_phase"). Invalid values raise ValueError at call time.

Mode Blocking-phase behaviour Bucket-existence leakage
None (default) Plaintext bilateral set intersection of blocking-key universes Hub learns each side's bucket-key universe
"psi" DH-PSI (§5) on blocks_a/blocks_b per pass before generate_candidates Neither side learns the other's non-shared bucket keys
"three_phase" Dispatches to the full three-phase pipeline (§3) — extract → blocking → score with progressive disclosure Per §3 / §4

For multi-pass blocking (component.parallax.blocking-engine §"Multi-pass execution"), PSI is applied per pass, then candidates are unioned across passes. For N-party run_multi_fusion, PSI runs bilaterally on each of N-choose-2 node pairs; transitivity is recovered by the post-pair UnionFind step.

max_block_size

Hard cap on the per-bucket cartesian product inside generate_candidates. Buckets with len(a) * len(b) > max_block_size are dropped to bound worst-case runtime under skewed key distributions. Applied in all three privacy modes — in privacy="three_phase" it is honored by Phase 2.5 (per-pair blocking with the cap on shared-bucket-filtered records). Default 200 (≈90s/pair at 1k records on the Disney 5-way reference); raise for higher recall on highly-skewed surnames, lower to bound tail latency.

Equivalence

privacy=None and privacy="psi" produce identical match sets on shared blocking keys — PSI changes what the hub sees, not what it matches. This is asserted by TestHubWithPsi.test_psi_produces_same_matches.

privacy="three_phase" produces identical matches to run_fusion_derived per §8 (both pipelines score on derived features through the same score_pair_derived, with the same blocking semantics now that Phase 2.5 routes through generate_candidates). This invariant is asserted by TestRunFusionThreePhase.test_three_phase_matches_run_fusion_derived and by TestTwoNodeEquivalence in tests/test_three_phase.py.

The relationship between privacy="three_phase" and privacy=None is not universal equivalence: privacy=None scores raw values via score_pair, while three_phase scores derived values via score_pair_derived. component.parallax.derived-features §"Accuracy Impact" documents the recall trade-off in general. On fixtures where raw and derived scoring agree — exact-match lenses (derived SHA-256 preserves equality) and the VRS POC fuzzy fixture (identical TP/FP per component.parallax.derived-features) — the two match sets coincide. TestRunFusionThreePhase.test_three_phase_parity_on_exact_match_lens asserts this restricted parity on the ADSB exact-match lens; do not generalise it to fuzzy lenses.

Authoritative reference

FED-component.parallax.orchestration (themis/docs/FED-component.parallax.orchestration-ADVERSARIAL-RESILIENCE.md) defines the federation-privacy adversarial model and EU AI Act Article 9 minimum-data principle that privacy="psi" operationalises. Cited from the run_fusion docstring; the spec itself lives in the themis repo.


Depends on: component.parallax.blocking-engine, component.parallax.derived-features, component.parallax.scoring-engine

Realizes: product.fusion

Required by: component.parallax.cds-message-mapping, component.parallax.fusionmatch-model, component.parallax.island-robustness, component.parallax.node-scaling, component.parallax.quorum, component.parallax.vrs-rest-api, component.parallax.wire-message-families