Blocking Engine
Status & scope
- Stage: POC — VRS Use Case
- Module:
parallax/ops/fusion/blocker.py(+parallax/ops/fusion/algorithms/blocking.pyfor MA-01..MA-04 entry points) - Milestone: M3 (Blocking / Candidate Generation)
Runtime API note: Signatures below show
dask.dataframe.DataFramefor the platform end state. The standalone implementation inblocker.pyuseslist[dict](pure Python).BlockingKeyOpinparallax/ops/fusion/dispatch.pyis the Dask bridge invoked through axonis-core'sfeature_engineer.algorithms/blocking.pynames the four MA-01..MA-04 strategies; onlyhash_block(MA-01) is currently implemented — the rest raiseNotImplementedError.
Purpose
Reduce the comparison space from N×M (all pairs) to a manageable set of candidate pairs. Without blocking, 25×25 = 625 comparisons. With blocking, ~60-100 candidate pairs. At production scale (100K×100K), blocking is the difference between 10 billion comparisons and ~1 million.
Blocking runs AFTER feature extraction (component.parallax.feature-extraction) and BEFORE scoring (component.parallax.scoring-engine).
Architecture
Feature Vector A (node A) Feature Vector B (node B)
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ generate_ │ │ generate_ │
│ blocking_keys │ │ blocking_keys │
│ (soundex + │ │ (soundex + │
│ dob_year) │ │ dob_year) │
└───────┬───────┘ └───────┬───────┘
│ │
▼ ▼
{block_key: [ids]} {block_key: [ids]}
│ │
└────────────┬───────────────┘
▼
┌─────────────────┐
│ generate_ │
│ candidates() │
│ Match blocks │
│ by key │
└────────┬────────┘
▼
Candidate Pairs: [(id_a, id_b), ...]
Public API
def generate_blocking_keys(
df: dask.dataframe.DataFrame,
blocking_config: BlockingStrategy,
) -> dict[str, list[str]]:
"""Group records by composite blocking key.
For VRS with blocking_keys [soundex(full_name), year(date_of_birth)]:
- "M626:1947" → ["A001", "A015"] (Margaret, soundex M626, born 1947)
- "S530:1970" → ["A003"] (John Smith, soundex S530, born 1970)
Returns: {composite_key: [entity_ids]}
"""
def generate_candidates(
blocks_a: dict[str, list[str]],
blocks_b: dict[str, list[str]],
) -> list[tuple[str, str]]:
"""Generate candidate pairs from matching blocks.
For each block key present in BOTH nodes, generate all (a, b) pairs.
Returns: [(entity_id_a, entity_id_b), ...]
"""
def block_and_pair(
df_a: dask.dataframe.DataFrame,
df_b: dask.dataframe.DataFrame,
blocking_config: BlockingStrategy,
) -> tuple[list[tuple[str, str]], BlockingStats]:
"""End-to-end: block both sides, generate candidate pairs, return with stats.
Returns:
candidates: list of (id_a, id_b) pairs
stats: BlockingStats with reduction ratio, block distribution
"""
BlockingStats
@dataclass
class BlockingStats:
total_possible_pairs: int # N × M
candidate_pairs: int # After blocking
reduction_ratio: float # total / candidate
num_blocks: int # Unique block keys
largest_block: int # Max records in one block
empty_blocks_a: int # Block keys in A but not B
empty_blocks_b: int # Block keys in B but not A
Composite Blocking Key
For VRS: soundex(first_token(full_name)) + ":" + year(date_of_birth)
def compute_composite_key(row: pd.Series, blocking_keys: list[BlockingKey]) -> str:
"""Compute composite blocking key for a single record.
Example:
row = {"full_name_soundex": "M626", "dob_year": "1947"}
blocking_keys = [BlockingKey("soundex", "full_name"), BlockingKey("year", "date_of_birth")]
→ "M626:1947"
"""
parts = []
for bk in blocking_keys:
col = f"{bk.field}_{bk.method}" # e.g., "full_name_soundex", "dob_year"
val = str(row.get(col, "NULL"))
parts.append(val)
return ":".join(parts)
VRS Example
Input: 25 records per node
Node A blocking keys:
M626:1947 → [A001] (Margaret Chen)
R163:1952 → [A002] (Robert Flanagan)
S530:1970 → [A003] (John Smith — TRAP)
D632:1940 → [A004] (Dorothy Williams)
...
Node B blocking keys:
M626:1947 → [B001] (Margaret Chen)
R163:1952 → [B002] (Robert Flanagan)
S530:1955 → [B003] (John Smith — different DOB year!)
D632:1940 → [B004] (Dot Williams — same soundex as Dorothy)
...
Key insight: John Smith trap
Node A John Smith: soundex S530, DOB year 1970 Node B John Smith: soundex S530, DOB year 1955
Composite keys: S530:1970 vs S530:1955 — different blocks!
The false positive trap is eliminated by blocking before we even score.
Output: ~60-100 candidate pairs (down from 625)
Multi-pass execution
When the lens declares blocking_keys as a list of lists (see component.parallax.lens-parser §"Multi-pass blocking syntax"), the pipeline runs each pass independently and unions the resulting candidate sets.
all_candidates: set[tuple[str, str]] = set()
for pass_keys in spec.identity_fusion.blocking_strategy.blocking_passes:
blocks_a = generate_blocking_keys(records_a, pass_keys, id_field)
blocks_b = generate_blocking_keys(records_b, pass_keys, id_field)
all_candidates.update(generate_candidates(blocks_a, blocks_b, max_block_size=...))
candidates = list(all_candidates)
Partial-pass tolerance. _build_blocking_key_sets() filters out any pass whose fields are not present after field-mapping/normalization, rather than failing the whole run. A pass with a missing field is skipped; remaining passes still execute. This is the standard PPRL multi-pass technique — recall comes from pass coverage, not from any single pass being complete.
Why multi-pass. Single-pass blocking under postcode-change ("moved") records caps recall by definition: if the pass uses [surname_metaphone, dob_year] and surname has also drifted across metaphone codes, that record is unrecoverable. Adding a third pass on [firstname_metaphone, dob_year] restores moved-record recall without weakening the lens contract (Invariant 3 — blocks are evidence, query hash is source of truth).
Recommended pass sets (PPRL-style, illustrative):
| Pass | Keys | Captures |
|---|---|---|
| 1 | [surname_metaphone, dob_year] |
Same-surname people, postcode-tolerant |
| 2 | [firstname_metaphone, dob_year] |
Surname drift (marriage, transliteration), postcode-tolerant |
| 3 | [dob_year, postcode_area] |
Same-residence people, name drift tolerant |
time_bucket variants
_BLOCKING_METHODS registers four temporal bucket sizes for event-correlation scenarios (multi-INT, PNT, ADS-B):
| Method | Bucket | Use |
|---|---|---|
time_bucket |
1h (default) | Coarse co-occurrence |
time_bucket_30m |
30m | Medium co-occurrence |
time_bucket_10m |
10m | Tight co-occurrence (multi_int_island scenarios) |
time_bucket_5m |
5m | Real-time / streaming correlation |
Lens-level usage:
blocking_strategy:
blocking_keys:
- [time_bucket_10m | field: observed_at, geohash_prefix | field: location, chars: 4]
DH-PSI privacy mode
When the convenience wrapper run_fusion(..., privacy="psi") (component.parallax.three-phase-protocol §13) is used, each blocking pass runs DH-PSI on blocks_a/blocks_b before generate_candidates. Only buckets present on both sides survive the PSI filter; non-shared bucket keys never leak. The candidate union behaviour is unchanged — PSI is applied per pass, then candidates are unioned across passes. See component.parallax.three-phase-protocol §5 for the DH-PSI protocol details.
Test Fixtures
FIX-01: Blocking reduces comparisons
def test_blocking_reduces_comparisons():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
df_a = extract_features(load_csv("fixtures/node_a_customers.csv"), spec)
df_b = extract_features(load_csv("fixtures/node_b_customers.csv"), spec)
candidates, stats = block_and_pair(df_a, df_b, spec.identity_fusion.blocking_strategy)
assert stats.total_possible_pairs == 625 # 25 × 25
assert stats.candidate_pairs < 100 # Significant reduction
assert stats.reduction_ratio > 5.0 # At least 5x reduction
FIX-02: No true matches lost
def test_blocking_preserves_true_matches():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
df_a = extract_features(load_csv("fixtures/node_a_customers.csv"), spec)
df_b = extract_features(load_csv("fixtures/node_b_customers.csv"), spec)
ground_truth = load_csv("fixtures/ground_truth_matches.csv")
candidates, _ = block_and_pair(df_a, df_b, spec.identity_fusion.blocking_strategy)
candidate_set = set(candidates)
# Every true match (easy + medium) must be in candidates
for _, row in ground_truth.iterrows():
if row["difficulty"] in ("easy", "medium"):
assert (row["node_a_id"], row["node_b_id"]) in candidate_set, \
f"True match lost: {row['node_a_id']} ↔ {row['node_b_id']}"
FIX-03: John Smith false positive separated by blocking
def test_john_smith_different_blocks():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
df_a = extract_features(load_csv("fixtures/node_a_customers.csv"), spec)
df_b = extract_features(load_csv("fixtures/node_b_customers.csv"), spec)
candidates, _ = block_and_pair(df_a, df_b, spec.identity_fusion.blocking_strategy)
# John Smith A (1970) and John Smith B (1955) should NOT be candidates
# because their blocking keys differ (S530:1970 vs S530:1955)
john_a = "A003" # John Smith node A
john_b = "B003" # John Smith node B (different person)
assert (john_a, john_b) not in set(candidates)
FIX-04: Null blocking key handling
def test_null_blocking_key():
"""Records with null blocking keys go into a 'NULL' bucket."""
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
df_a = load_csv("fixtures/node_a_customers.csv")
df_a.loc[0, "date_of_birth"] = None
features = extract_features(df_a, spec)
blocks = generate_blocking_keys(features, spec.identity_fusion.blocking_strategy)
# Record with null DOB should be in a "M626:NULL" block
null_blocks = [k for k in blocks if "NULL" in k]
assert len(null_blocks) >= 1
FIX-05: Stats are accurate
def test_blocking_stats():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
df_a = extract_features(load_csv("fixtures/node_a_customers.csv"), spec)
df_b = extract_features(load_csv("fixtures/node_b_customers.csv"), spec)
_, stats = block_and_pair(df_a, df_b, spec.identity_fusion.blocking_strategy)
assert stats.total_possible_pairs == 25 * 25
assert stats.candidate_pairs == len(candidates)
assert stats.reduction_ratio == stats.total_possible_pairs / stats.candidate_pairs
assert stats.largest_block >= 1
File Layout
parallax/ops/fusion/
├── blocker.py # generate_blocking_keys, generate_candidates, block_and_pair
├── blocker_types.py # BlockingStats
└── tests/
├── fixtures/
│ ├── node_a_customers.csv
│ ├── node_b_customers.csv
│ └── ground_truth_matches.csv
└── test_blocker.py
Integration Points
- component.parallax.feature-extraction → here: Feature vectors include blocking key columns (soundex, year)
- Here → component.parallax.scoring-engine: Candidate pairs are the input to scoring
- Here → component.parallax.fusionmatch-model:
block_and_pair()called inside FusionMatch model run
Depends on: component.parallax.feature-extraction, component.parallax.lens-parser
Realizes: product.fusion
Required by: component.parallax.correlation-persistence, component.parallax.counter-isr, component.parallax.fusion-binding, component.parallax.fusionmatch-model, component.parallax.scoring-engine, component.parallax.three-phase-protocol