Three-Phase Federated Screening Protocol
Status & scope
- Stage: IMPLEMENTED
- Module:
parallax/ops/fusion/federation/—three_phase.py(in-process pipeline),wire.py(per-node / coordinator decomposition for the federated wire),hub.py(hub-spoke orchestration),psi.py(DH-PSI) - Milestone: M6 (Federation Protocol)
1. Purpose
Federated entity resolution requires comparing records across organisational boundaries. The naive approach — sharing raw data — violates data sovereignty and privacy regulations (GDPR Article 5, EU AI Act Article 12). The three-phase protocol solves this through progressive disclosure: each phase reveals strictly less information than the next, and no phase ever transmits raw PII.
Design principles:
- Privacy by design. Raw PII never leaves its source node. Only derived, one-way-transformed values cross the wire.
- Progressive disclosure. Phase 1 reveals counts only. Phase 2 reveals derived feature vectors for candidates only. Phase 3 reveals confidence scores only.
- Minimum necessary. Each phase transmits the least information required for the next phase to proceed. Records not in shared buckets are never exchanged.
- Equivalence. The three-phase pipeline produces identical matches to the single-phase
run_fusion()pipeline for the same input data. Privacy costs nothing in accuracy.
2. Protocol Overview
The protocol runs in three sequential phases. In production, each phase is a POST/response over HTTPS with mutual TLS (mTLS). Locally, phases are simulated with function calls.
Node A (VRS) Node B (Firm)
│ │
│ ── Phase 1: Bucket Signals ──────────────► │
│ {composite_key: record_count} │
│ ◄────────────────────────────────────────── │
│ {composite_key: record_count} │
│ │
│ Shared buckets identified │
│ │
│ ── Phase 2: Targeted Vectors ────────────► │
│ derived feature vectors (shared buckets only)│
│ ◄────────────────────────────────────────── │
│ derived feature vectors (shared buckets only)│
│ │
│ ── Phase 3: Consensus Scoring ───────────► │
│ confidence scores + per-field scores │
│ ◄────────────────────────────────────────── │
│ │
Phase 1: Bucket Signals
Both nodes compute blocking keys (per component.parallax.blocking-engine) and exchange counts only — {composite_key: record_count}. No entity IDs, no features, no PII. Purpose: identify shared buckets and estimate overlap. Records whose blocking keys have no counterpart on the other side are excluded from all subsequent phases.
Phase 2: Targeted Vectors
Only records in shared buckets exchange derived feature vectors. Derivation applies one-way transforms (soundex, year extraction, SHA-256 hashing, etc. per component.parallax.derived-features) to produce vectors that enable matching without exposing raw PII. Purpose: enable scoring without raw data.
Phase 3: Consensus Scoring
Derived vector pairs are scored using the derived match function. The output is confidence scores and per-field score breakdowns — no PII, no feature vectors. Purpose: identify matches above threshold.
3. Phase Functions
phase1_bucket_signals()
def phase1_bucket_signals(
node_records: dict[str, list[dict]],
blocking_key_sets: list[list[str]],
id_field: str = "local_id",
) -> dict[str, dict[str, int]]:
Computes bucket signals for each node. Returns {node_id: {composite_bucket_key: record_count}}. Values are integer counts only — no entity IDs, no features.
Internally calls generate_blocking_keys() (component.parallax.blocking-engine) per blocking key set and aggregates counts per composite key.
_find_shared_buckets()
def _find_shared_buckets(
signals: dict[str, dict[str, int]],
) -> dict[tuple[str, str], set[str]]:
From bucket signals, finds which bucket keys are shared between each node pair. Returns {(node_a, node_b): {shared_key_1, shared_key_2, ...}}. Plaintext set intersection — both sides see all keys.
_find_shared_buckets_psi()
def _find_shared_buckets_psi(
signals: dict[str, dict[str, int]],
) -> tuple[dict[tuple[str, str], set[str]], int]:
Same functional result as _find_shared_buckets(), but uses DH-PSI (see Section 5) so neither side learns the other's non-matching bucket keys. Returns ({(node_a, node_b): shared_keys}, psi_ops).
phase2_targeted_vectors()
def phase2_targeted_vectors(
node_records: dict[str, list[dict]],
shared_buckets: dict[tuple[str, str], set[str]],
match_function: list[dict],
blocking_key_sets: list[list[str]],
id_field: str = "local_id",
) -> tuple[
dict[tuple[str, str], tuple[list[dict], list[dict]]], # pair_filtered (raw)
dict[tuple[str, str], tuple[list[dict], list[dict]]], # pair_vectors (derived)
int, # vectors_sent
int, # vectors_total
]:
For each node pair with shared buckets:
- Filters to records in shared buckets only (via
_records_in_buckets()) - Derives features using one-way transforms (via
derive_features()from component.parallax.derived-features) - Returns both the shared-bucket raw records (for Phase 2.5 blocking) and the derived vectors
Returns:
- pair_filtered: {(node_a, node_b): (raw_filtered_a, raw_filtered_b)} — shared-bucket
raw records, consumed by Phase 2.5
- pair_vectors: {(node_a, node_b): (derived_vectors_a, derived_vectors_b)}
- vectors_sent: number of derived vectors exchanged
- vectors_total: total records across all nodes
phase25_candidates()
def phase25_candidates(
pair_filtered: dict[tuple[str, str], tuple[list[dict], list[dict]]],
blocking_key_sets: list[list[str]],
max_block_size: int,
id_field: str = "local_id",
) -> dict[tuple[str, str], list[tuple[str, str]]]:
Phase 2.5 sits between Phase 2 and Phase 3. It re-runs the lens-declared
blocking passes on the shared-bucket records from Phase 2 and lets
generate_candidates() (component.parallax.blocking-engine) drop buckets whose cartesian product exceeds
max_block_size. This brings the three-phase blocking semantics in line with
run_fusion_derived (the §8 equivalence target) and honors the max_block_size
contract from §13.
phase3_consensus()
def phase3_consensus(
pair_candidates: dict[tuple[str, str], list[tuple[str, str]]],
pair_vectors: dict[tuple[str, str], tuple[list[dict], list[dict]]],
derived_match_function: list[dict],
threshold: float = 0.70,
null_penalty: float = 0.1,
id_field: str = "local_id",
node_prefix: bool = False,
) -> tuple[list[FusionMatch], list[ScoredPair]]:
Scores the Phase 2.5 candidate pairs against the Phase 2 derived vectors using
score_pair_derived(). Returns (matches, all_scored) where matches are pairs
above threshold, sorted by confidence descending. When node_prefix is True,
FusionMatch IDs are emitted as f"{node_id}:{local_id}" for safe N-party
clustering.
run_three_phase_local()
def run_three_phase_local(
nodes: dict[str, list[dict]],
lens_path: str | None = None,
spec: Any = None,
threshold: float = 0.70,
null_penalty: float = 0.1,
id_field: str = "local_id",
field_maps: dict[str, dict[str, str]] | None = None,
max_block_size: int = 200,
entity_registry: Any = None,
use_psi: bool = False,
node_prefix: bool = False,
) -> ThreePhaseResult:
Full local POC pipeline. Orchestrates all three phases:
- Parses lens (if
specnot provided) - Normalizes all node records
- Builds match function and blocking key sets from first node's data
- Builds derived match function
- Phase 1: bucket signals
- Shared bucket discovery (plaintext or PSI depending on
use_psi) - Phase 2: targeted derived vectors
- Phase 3: consensus scoring
Requires at least 2 nodes (raises ValueError otherwise).
ThreePhaseResult
@dataclass
class ThreePhaseResult:
matches: list[FusionMatch]
total_candidates: int
total_pairs_possible: int
all_scored: list[ScoredPair]
phase1_signals: dict
phase2_vectors_sent: int
phase2_vectors_total: int
entity_registry: Any
psi_enabled: bool
psi_ops: int
per_pair_candidates: dict[tuple[str, str], int]
The in-process pipeline above and the federated wire run share one
implementation: three_phase.py delegates each phase to the node-side and
coordinator-side primitives in parallax/ops/fusion/federation/wire.py
(node_bucket_signals, shared_buckets, node_phase2_artifacts,
coordinator_candidates, score_consensus). titan's FusionNodeHandler /
FusionOrchestrator call those same primitives over the wire, so the
in-process and federated paths are equivalent by construction (§8).
4. Hub-Spoke Architecture
The hub-spoke model specialises the three-phase protocol for the VRS screening use case.
- Node A (hub) = SC Central (authority). Owns VRS registry data, audit trail, consent enforcement, output gating, vulnerability code lookup, and conflict detection.
- Node B (spoke) = firm. Thin node — ES + engine instance only. Submits customer data for screening.
run_hub_screening()
def run_hub_screening(
node_a_records: list[dict],
node_b_records: list[dict],
vrs_payloads: dict[str, dict],
lens_path: str | None = None,
spec: Any = None,
threshold: float = 0.50,
null_penalty: float = 0.1,
use_psi: bool = False,
consent_policy_a: ConsentPolicy | None = None,
consent_policy_b: ConsentPolicy | None = None,
match_fields: list[str] | None = None,
screening_purpose: str = "internal_compliance",
actor_id: str = "system",
) -> HubResult:
Orchestrates the full vertical stack on Node A:
- Consent check — filter records without consent (pre-pipeline)
- Create FusionRun audit record
- Three-phase matching (with optional PSI)
- Output assembly with Type A/B permission gating + purpose consent
- Conflict detection — reclassify 1-to-many matches
- Complete FusionRun with results
Output Gating: Type A vs Type B
Output gating is enforced on Node A. The firm never sees raw VRS data.
| Field | Type A | Type B |
|---|---|---|
match_status |
Yes | Yes |
confidence |
Yes | Yes |
vulnerability_codes |
Yes (codes only) | Yes |
vulnerability_count |
Yes | Yes |
decline_credit |
Yes | Yes |
registration_type |
Yes | Yes |
vulnerability_details |
No (empty list) | Yes (full: code, name, description, fca_driver, outcomes) |
Purpose Consent Enforcement
The screening_purpose parameter (e.g. "internal_compliance", "regulated_third_party_disclosure") is checked against each VRS record's permitted_purposes field (pipe-delimited string). If the screening purpose is not in the person's permitted purposes, that match is suppressed — the _assemble_gated_output() function returns None and the match is excluded from results. The purpose_denied_count on HubResult tracks how many matches were suppressed.
Match Classification
Matches are classified using per-field scores and overall confidence:
| Status | Criteria |
|---|---|
confirmed |
All match fields >= 0.85 AND confidence >= 0.95 |
probable |
Confidence >= 0.70 |
no_match |
Confidence < 0.70 |
conflict |
1-to-many: one customer matches multiple VRS records, or one VRS record matches multiple customers |
Classification thresholds:
CONFIRMED_THRESHOLD = 0.95
PROBABLE_THRESHOLD = 0.70
FIELD_MATCH_THRESHOLD = 0.85
Conflict Detection
The _detect_conflicts() function reclassifies matches where:
- One customer ID maps to multiple VRS IDs (1-to-many on spoke side)
- One VRS ID maps to multiple customer IDs (1-to-many on hub side)
All matches involved in a 1-to-many mapping are reclassified as "conflict". Only confirmed and probable matches participate in conflict detection — no_match entries are ignored.
HubResult
@dataclass
class HubResult:
matches: list[GatedMatch]
fusion_run: FusionRun | None
consent_result_a: ConsentCheckResult | None
consent_result_b: ConsentCheckResult | None
three_phase: ThreePhaseResult | None
confirmed_count: int
probable_count: int
conflict_count: int
no_match_count: int
purpose_denied_count: int
total_screened: int
total_vrs: int
GatedMatch
@dataclass
class GatedMatch:
customer_id: str
vrs_id: str
match_status: str # confirmed | probable | conflict | no_match
confidence: float
matched_fields: list[str]
conflicted_fields: list[str]
permission_type: str # A or B
vulnerability_codes: list[str]
vulnerability_count: int
decline_credit: str
registration_type: str
vulnerability_details: list[dict] # [] for Type A
5. DH-PSI Integration
An optional privacy enhancement for Phase 1. When use_psi=True, shared bucket discovery uses Diffie-Hellman Private Set Intersection instead of plaintext set comparison. Neither side learns the other's non-matching bucket keys or counts.
Protocol
Uses RFC 3526 Group 14 (2048-bit MODP safe prime). Two rounds, three messages:
Round 1: Each side masks elements with their secret exponent
A sends: {H(x_i)^a} → B
B sends: {H(y_j)^b} → A
Round 2: Each side double-masks the received set with their own secret
B sends: {H(x_i)^(ab)} → A
A sends: {H(y_j)^(ab)} → B
Result: H(x_i)^(ab) == H(y_j)^(ab) iff x_i == y_j
Privacy guarantees: - Neither side learns anything about non-intersecting elements - Masked values are computationally indistinguishable from random - Double-masked values only match for equal inputs - No raw blocking keys or derived features are revealed
Important limitation: PSI does NOT work through CDG (opaque integers). The protocol requires the actual string values to hash into the group.
PSI Functions
PsiNode
class PsiNode:
def __init__(self, node_id: str): ...
def mask(self, elements: list[str]) -> list[int]: ...
def double_mask(self, received: list[int]) -> list[int]: ...
Each node holds a secret exponent (random in [2, p-2]). mask() computes H(element)^secret mod p. double_mask() raises already-masked values to the node's secret: (H(x)^a)^b = H(x)^(ab).
psi_intersect()
def psi_intersect(
elements_a: list[str],
elements_b: list[str],
) -> PsiResult:
Runs the full DH-PSI protocol locally. Returns PsiResult with intersection from both sides' perspectives.
PsiResult
@dataclass
class PsiResult:
intersection_a: list[str] # Elements from side A in intersection
intersection_b: list[str] # Elements from side B in intersection
size: int # |intersection|
set_a_size: int # |A|
set_b_size: int # |B|
rounds: int = 2 # Protocol rounds
psi_find_shared_buckets()
def psi_find_shared_buckets(
node_a_keys: dict[str, int],
node_b_keys: dict[str, int],
) -> set[str]:
Integration helper for the three-phase protocol. Wraps psi_intersect() to find shared blocking bucket keys between two nodes.
psi_filter_candidates()
def psi_filter_candidates(
records_a: list[dict],
records_b: list[dict],
blocking_key_sets: list[list[str]],
id_field: str = "local_id",
) -> tuple[list[dict], list[dict], int]:
Uses PSI to filter to candidate records across all blocking passes. Returns (filtered_a, filtered_b, psi_ops).
Hash-to-Group
def _hash_to_group(element: str) -> int:
Maps a string to a group element in [2, p-2] using SHA-256, then reduces mod (p-2) and adds 2 to stay in the safe range (avoiding 0 and 1).
Performance
Uses gmpy2.powmod (a hard dependency) for compiled modular exponentiation — ~50x faster than the Python built-in pow.
6. Consent Model
Consent is enforced at three layers, each progressively narrower.
Layer 1: Federate Opt-In (Pre-Pipeline)
Before the three-phase protocol begins, records are filtered by a ConsentPolicy. Records without consent are excluded before blocking even starts.
@dataclass(frozen=True)
class ConsentPolicy:
lens_id: str
federate_id: str
consent_field: str = "consent_fusion"
consent_values: tuple[str, ...] = ("yes", "true", "1", "granted")
default_consent: ConsentDefault = ConsentDefault.DENY
log_denials: bool = True
ConsentDefault.DENY (opt-in model): missing consent field means no consent. ConsentDefault.ALLOW (opt-out model): missing consent field means implicit consent.
def filter_by_consent(
records: list[dict],
policy: ConsentPolicy,
id_field: str = "local_id",
) -> ConsentCheckResult:
Returns ConsentCheckResult with eligible and excluded record lists, excluded_ids, total_checked, and consent_rate.
Layer 2: Purpose Consent (Per-Person)
Each VRS record has permitted_purposes (pipe-delimited string, e.g. "internal_compliance|regulated_third_party_disclosure"). The hub checks the firm's screening_purpose against this list. If the purpose is not permitted, the match is suppressed — the person did not consent to this type of disclosure.
Layer 3: Type A/B Permission Gating
Even for consented matches, the permission type controls disclosure depth. Type A returns codes and counts only. Type B returns full vulnerability details. This is set at VRS registration time and enforced on Node A.
Consent Check Functions
def check_record_consent(record: dict, policy: ConsentPolicy) -> bool:
Checks a single record against the policy. Returns True if eligible.
def validate_correlation_consent(
record_ids: list[str],
consent_results: dict[str, ConsentCheckResult],
) -> list[str]:
Final check before creating a CorrelationRecord. Returns list of record IDs that lack consent.
7. FusionRun Audit Trail
Every screening run creates an immutable FusionRun audit record for Article 12 record-keeping. Contains metadata only — no raw PII, no entity data.
@dataclass(frozen=True)
class FusionRun:
run_id: str
lens_id: str
lens_version: str
execution_mode: str # "ad_hoc" | "scheduled" | "reactive"
started_at: str
completed_at: str
status: str # "running" | "completed" | "failed" | "partial"
expected_federates: tuple[str, ...]
participating_federates: tuple[str, ...]
missing_federates: tuple[str, ...]
phase1_complete: bool
phase2_complete: bool
phase3_complete: bool
total_candidates: int
total_matches: int
correlation_ids: tuple[str, ...]
threshold: float
null_penalty: float
blocking_strategy: str
triggered_by: str # activation_id or "manual"
actor_id: str
The frozen dataclass guarantees immutability. Completion is handled by creating a new instance via dataclasses.replace():
def create_fusion_run(
run_id: str, lens_id: str, lens_version: str, execution_mode: str,
expected_federates: tuple[str, ...], threshold: float = 0.70,
null_penalty: float = 0.1, blocking_strategy: str = "hash",
triggered_by: str = "manual", actor_id: str = "",
) -> FusionRun:
def complete_fusion_run(
run: FusionRun, correlation_ids: tuple[str, ...],
participating_federates: tuple[str, ...],
missing_federates: tuple[str, ...] = (),
total_candidates: int = 0, total_matches: int = 0,
phase1_complete: bool = True, phase2_complete: bool = True,
phase3_complete: bool = True,
) -> FusionRun:
Status is set to "partial" if missing_federates is non-empty, otherwise "completed".
Serialization: fusion_run_to_dict() and fusion_run_from_dict() for ES/JSON persistence.
8. Equivalence Guarantee
The three-phase pipeline produces identical matches to the single-phase run_fusion_derived() pipeline for the same input data. This is proven by test:
TestTwoNodeEquivalence.test_same_matches_at_050— same match pairs at threshold 0.50TestTwoNodeEquivalence.test_same_matches_at_070— same match pairs at threshold 0.70TestHubWithPsi.test_psi_produces_same_matches— PSI mode produces same matches as plaintext
The equivalence holds because:
1. The same normalization, blocking, and derived feature transforms are applied
2. The same scoring function (score_pair_derived) is used
3. Bucket filtering only eliminates pairs that would score below threshold anyway (no shared blocking key means no matching features)
9. Data Flow Summary
| Phase | Direction | Content | NOT Included |
|---|---|---|---|
| 1 | Both | Blocking key counts ({composite_key: int}) |
No entity IDs, no features, no PII |
| 2 | Both | Derived feature vectors (one-way transforms) | No raw PII, no records outside shared buckets |
| 3 | Hub → Spoke | Confidence scores + per-field scores | No PII, no feature vectors |
Wire Protocol (Production)
In production each phase is an axonis.fusion.Message exchanged over xanadu
(RabbitMQ). titan's FusionOrchestrator (coordinator) sends phase requests to
titan FusionNodeHandler nodes, which run the per-node compute from
parallax.ops.fusion.federation.wire.
Phase 1: FUSION_BLOCKING_REQUEST → FUSION_BLOCKING_RESPONSE {bucket_signals: {key: count}}
Phase 1-PSI: FUSION_PSI_ROUND1/2_REQUEST → ..._RESPONSE — optional DH-PSI variant (use_psi);
only opaque masked integers cross the wire, the intersector returns shared keys
Phase 2: FUSION_VECTORS_REQUEST → FUSION_VECTORS_RESPONSE {block_index, derived_vectors}
Phase 3: scored on the coordinator from the Phase 2 artifacts; result rides FUSION_RUN_COMPLETE
axonis.fusion.MessageType is the canonical envelope contract. component.parallax.wire-protocol-adapters
(superseded) holds the deployment-transport analysis (CDG / mTLS / Xanadu).
10. Test Coverage
tests/test_three_phase.py
| Test Class | What It Proves |
|---|---|
TestPhase1BucketSignals |
Phase 1 returns counts only — no entity IDs, no raw PII leaked into signals |
TestPhase2TargetedVectors |
Phase 2 exchanges derived vectors only for shared-bucket records — no raw PII in vectors, fewer vectors than total records |
TestTwoNodeEquivalence |
Three-phase produces identical matches to single-phase at 0.50 and 0.70 thresholds; TP >= 18, FP == 0; P018 false positive rejected; vectors sent < total |
TestFiveNodeThreePhase |
Multi-node (5-node) protocol produces matches and Phase 1 signals for all nodes |
TestPartialFailure |
Dropping one node (4/5) produces a subset of full results — no false matches from partial failure |
TestEdgeCases |
Minimum 2 nodes enforced; disjoint data produces 0 matches; 3-node split works |
tests/test_hub.py
| Test Class | What It Proves |
|---|---|
TestHubScreening |
Full vertical stack: returns HubResult with GatedMatch objects, TP >= 16, FP == 0, P018 rejected, FusionRun created with correlation IDs, counts consistent |
TestOutputGating |
Type A has codes but no details; Type B has codes and full details with required fields; vulnerability count matches codes |
TestConsentFiltering |
DENY default filters all records without consent field; ALLOW default passes all; no policy passes all |
TestPurposeConsent |
internal_compliance returns all matches; regulated_third_party_disclosure suppresses Type A matches; unknown purpose suppresses all matches |
TestHubWithPsi |
PSI mode produces same matches as plaintext mode |
TestClassification |
Confirmed/probable/no_match classification logic per threshold rules |
TestConflictDetection |
1-customer-to-many-VRS and 1-VRS-to-many-customers both reclassified as conflict; unique matches unaffected; no_match not conflicted |
11. Files
| File | Purpose |
|---|---|
parallax/ops/fusion/federation/three_phase.py |
Three-phase in-process pipeline — phase functions, run_three_phase_local |
parallax/ops/fusion/federation/wire.py |
Per-node / coordinator phase primitives for the federated wire |
parallax/ops/fusion/federation/hub.py |
Hub-spoke orchestrator — consent, matching, output gating, conflict detection |
parallax/ops/fusion/federation/psi.py |
DH-PSI implementation — PsiNode, psi_intersect, integration helpers |
parallax/ops/fusion/models/consent.py |
Consent model — ConsentPolicy, filter_by_consent, validate_correlation_consent |
parallax/ops/fusion/models/fusion_run.py |
FusionRun audit model — immutable frozen dataclass, create/complete lifecycle |
tests/test_three_phase.py |
Three-phase protocol tests (18 tests) |
tests/test_hub.py |
Hub-spoke orchestrator tests |
12. Dependencies
| Spec | Relationship |
|---|---|
| component.parallax.blocking-engine (Blocking Engine) | Phase 1 uses generate_blocking_keys() for bucket computation |
| component.parallax.scoring-engine (Scoring Engine) | Phase 3 uses scoring primitives (metrics, weighted aggregate) |
| component.parallax.derived-features (Derived Features) | Phase 2 uses derive_features() and build_derived_match_function() for one-way transforms |
13. Convenience-wrapper API (run_fusion, run_multi_fusion)
parallax/ops/fusion/pipeline.py exposes the single-call API used by Themis benchmarks, integration tests, and in-process demos. These wrappers compose blocking + scoring + (optionally) DH-PSI and UnionFind clustering, returning a FusionResult or MultiFusionResult instead of the structured ThreePhaseResult returned by §3.
def run_fusion(
node_a: list[dict],
node_b: list[dict],
lens_path: str | None = None,
threshold: float = 0.70,
null_penalty: float = 0.1,
id_field: str = "local_id",
field_map_a: dict[str, str] | None = None,
field_map_b: dict[str, str] | None = None,
spec: Any = None,
privacy: str | None = None, # None | "psi" | "three_phase"
max_block_size: int = 200,
) -> FusionResult: ...
def run_multi_fusion(
nodes: dict[str, list[dict]],
lens_path: str | None = None,
threshold: float = 0.70,
null_penalty: float = 0.1,
id_field: str = "local_id",
field_maps: dict[str, dict[str, str]] | None = None,
spec: Any = None,
privacy: str | None = None,
max_block_size: int = 200,
) -> MultiFusionResult: ...
privacy modes
_PRIVACY_MODES = (None, "psi", "three_phase"). Invalid values raise ValueError at call time.
| Mode | Blocking-phase behaviour | Bucket-existence leakage |
|---|---|---|
None (default) |
Plaintext bilateral set intersection of blocking-key universes | Hub learns each side's bucket-key universe |
"psi" |
DH-PSI (§5) on blocks_a/blocks_b per pass before generate_candidates |
Neither side learns the other's non-shared bucket keys |
"three_phase" |
Dispatches to the full three-phase pipeline (§3) — extract → blocking → score with progressive disclosure | Per §3 / §4 |
For multi-pass blocking (component.parallax.blocking-engine §"Multi-pass execution"), PSI is applied per pass, then candidates are unioned across passes. For N-party run_multi_fusion, PSI runs bilaterally on each of N-choose-2 node pairs; transitivity is recovered by the post-pair UnionFind step.
max_block_size
Hard cap on the per-bucket cartesian product inside generate_candidates. Buckets with len(a) * len(b) > max_block_size are dropped to bound worst-case runtime under skewed key distributions. Applied in all three privacy modes — in privacy="three_phase" it is honored by Phase 2.5 (per-pair blocking with the cap on shared-bucket-filtered records). Default 200 (≈90s/pair at 1k records on the Disney 5-way reference); raise for higher recall on highly-skewed surnames, lower to bound tail latency.
Equivalence
privacy=None and privacy="psi" produce identical match sets on shared blocking keys — PSI changes what the hub sees, not what it matches. This is asserted by TestHubWithPsi.test_psi_produces_same_matches.
privacy="three_phase" produces identical matches to run_fusion_derived per §8 (both pipelines score on derived features through the same score_pair_derived, with the same blocking semantics now that Phase 2.5 routes through generate_candidates). This invariant is asserted by TestRunFusionThreePhase.test_three_phase_matches_run_fusion_derived and by TestTwoNodeEquivalence in tests/test_three_phase.py.
The relationship between privacy="three_phase" and privacy=None is not universal equivalence: privacy=None scores raw values via score_pair, while three_phase scores derived values via score_pair_derived. component.parallax.derived-features §"Accuracy Impact" documents the recall trade-off in general. On fixtures where raw and derived scoring agree — exact-match lenses (derived SHA-256 preserves equality) and the VRS POC fuzzy fixture (identical TP/FP per component.parallax.derived-features) — the two match sets coincide. TestRunFusionThreePhase.test_three_phase_parity_on_exact_match_lens asserts this restricted parity on the ADSB exact-match lens; do not generalise it to fuzzy lenses.
Authoritative reference
FED-component.parallax.orchestration (themis/docs/FED-component.parallax.orchestration-ADVERSARIAL-RESILIENCE.md) defines the federation-privacy adversarial model and EU AI Act Article 9 minimum-data principle that privacy="psi" operationalises. Cited from the run_fusion docstring; the spec itself lives in the themis repo.
Depends on: component.parallax.blocking-engine, component.parallax.derived-features, component.parallax.scoring-engine
Realizes: product.fusion
Required by: component.parallax.cds-message-mapping, component.parallax.fusionmatch-model, component.parallax.island-robustness, component.parallax.node-scaling, component.parallax.quorum, component.parallax.vrs-rest-api, component.parallax.wire-message-families