Skip to content

Blocking Engine

Status & scope

  • Stage: POC — VRS Use Case
  • Module: parallax/ops/fusion/blocker.py (+ parallax/ops/fusion/algorithms/blocking.py for MA-01..MA-04 entry points)
  • Milestone: M3 (Blocking / Candidate Generation)

Runtime API note: Signatures below show dask.dataframe.DataFrame for the platform end state. The standalone implementation in blocker.py uses list[dict] (pure Python). BlockingKeyOp in parallax/ops/fusion/dispatch.py is the Dask bridge invoked through axonis-core's feature_engineer. algorithms/blocking.py names the four MA-01..MA-04 strategies; only hash_block (MA-01) is currently implemented — the rest raise NotImplementedError.

Purpose

Reduce the comparison space from N×M (all pairs) to a manageable set of candidate pairs. Without blocking, 25×25 = 625 comparisons. With blocking, ~60-100 candidate pairs. At production scale (100K×100K), blocking is the difference between 10 billion comparisons and ~1 million.

Blocking runs AFTER feature extraction (component.parallax.feature-extraction) and BEFORE scoring (component.parallax.scoring-engine).

Architecture

Feature Vector A (node A)    Feature Vector B (node B)
        │                            │
        ▼                            ▼
┌───────────────┐            ┌───────────────┐
│ generate_     │            │ generate_     │
│ blocking_keys │            │ blocking_keys │
│ (soundex +    │            │ (soundex +    │
│  dob_year)    │            │  dob_year)    │
└───────┬───────┘            └───────┬───────┘
        │                            │
        ▼                            ▼
   {block_key: [ids]}          {block_key: [ids]}
        │                            │
        └────────────┬───────────────┘
                     ▼
            ┌─────────────────┐
            │ generate_       │
            │ candidates()    │
            │ Match blocks    │
            │ by key          │
            └────────┬────────┘
                     ▼
            Candidate Pairs: [(id_a, id_b), ...]

Public API

def generate_blocking_keys(
    df: dask.dataframe.DataFrame,
    blocking_config: BlockingStrategy,
) -> dict[str, list[str]]:
    """Group records by composite blocking key.

    For VRS with blocking_keys [soundex(full_name), year(date_of_birth)]:
      - "M626:1947" → ["A001", "A015"]  (Margaret, soundex M626, born 1947)
      - "S530:1970" → ["A003"]           (John Smith, soundex S530, born 1970)

    Returns: {composite_key: [entity_ids]}
    """

def generate_candidates(
    blocks_a: dict[str, list[str]],
    blocks_b: dict[str, list[str]],
) -> list[tuple[str, str]]:
    """Generate candidate pairs from matching blocks.

    For each block key present in BOTH nodes, generate all (a, b) pairs.

    Returns: [(entity_id_a, entity_id_b), ...]
    """

def block_and_pair(
    df_a: dask.dataframe.DataFrame,
    df_b: dask.dataframe.DataFrame,
    blocking_config: BlockingStrategy,
) -> tuple[list[tuple[str, str]], BlockingStats]:
    """End-to-end: block both sides, generate candidate pairs, return with stats.

    Returns:
        candidates: list of (id_a, id_b) pairs
        stats: BlockingStats with reduction ratio, block distribution
    """

BlockingStats

@dataclass
class BlockingStats:
    total_possible_pairs: int      # N × M
    candidate_pairs: int           # After blocking
    reduction_ratio: float         # total / candidate
    num_blocks: int                # Unique block keys
    largest_block: int             # Max records in one block
    empty_blocks_a: int            # Block keys in A but not B
    empty_blocks_b: int            # Block keys in B but not A

Composite Blocking Key

For VRS: soundex(first_token(full_name)) + ":" + year(date_of_birth)

def compute_composite_key(row: pd.Series, blocking_keys: list[BlockingKey]) -> str:
    """Compute composite blocking key for a single record.

    Example:
        row = {"full_name_soundex": "M626", "dob_year": "1947"}
        blocking_keys = [BlockingKey("soundex", "full_name"), BlockingKey("year", "date_of_birth")]
        → "M626:1947"
    """
    parts = []
    for bk in blocking_keys:
        col = f"{bk.field}_{bk.method}"  # e.g., "full_name_soundex", "dob_year"
        val = str(row.get(col, "NULL"))
        parts.append(val)
    return ":".join(parts)

VRS Example

Input: 25 records per node

Node A blocking keys:

M626:1947 → [A001]  (Margaret Chen)
R163:1952 → [A002]  (Robert Flanagan)
S530:1970 → [A003]  (John Smith — TRAP)
D632:1940 → [A004]  (Dorothy Williams)
...

Node B blocking keys:

M626:1947 → [B001]  (Margaret Chen)
R163:1952 → [B002]  (Robert Flanagan)
S530:1955 → [B003]  (John Smith — different DOB year!)
D632:1940 → [B004]  (Dot Williams — same soundex as Dorothy)
...

Key insight: John Smith trap

Node A John Smith: soundex S530, DOB year 1970 Node B John Smith: soundex S530, DOB year 1955

Composite keys: S530:1970 vs S530:1955different blocks! The false positive trap is eliminated by blocking before we even score.

Output: ~60-100 candidate pairs (down from 625)

Multi-pass execution

When the lens declares blocking_keys as a list of lists (see component.parallax.lens-parser §"Multi-pass blocking syntax"), the pipeline runs each pass independently and unions the resulting candidate sets.

all_candidates: set[tuple[str, str]] = set()
for pass_keys in spec.identity_fusion.blocking_strategy.blocking_passes:
    blocks_a = generate_blocking_keys(records_a, pass_keys, id_field)
    blocks_b = generate_blocking_keys(records_b, pass_keys, id_field)
    all_candidates.update(generate_candidates(blocks_a, blocks_b, max_block_size=...))
candidates = list(all_candidates)

Partial-pass tolerance. _build_blocking_key_sets() filters out any pass whose fields are not present after field-mapping/normalization, rather than failing the whole run. A pass with a missing field is skipped; remaining passes still execute. This is the standard PPRL multi-pass technique — recall comes from pass coverage, not from any single pass being complete.

Why multi-pass. Single-pass blocking under postcode-change ("moved") records caps recall by definition: if the pass uses [surname_metaphone, dob_year] and surname has also drifted across metaphone codes, that record is unrecoverable. Adding a third pass on [firstname_metaphone, dob_year] restores moved-record recall without weakening the lens contract (Invariant 3 — blocks are evidence, query hash is source of truth).

Recommended pass sets (PPRL-style, illustrative):

Pass Keys Captures
1 [surname_metaphone, dob_year] Same-surname people, postcode-tolerant
2 [firstname_metaphone, dob_year] Surname drift (marriage, transliteration), postcode-tolerant
3 [dob_year, postcode_area] Same-residence people, name drift tolerant

time_bucket variants

_BLOCKING_METHODS registers four temporal bucket sizes for event-correlation scenarios (multi-INT, PNT, ADS-B):

Method Bucket Use
time_bucket 1h (default) Coarse co-occurrence
time_bucket_30m 30m Medium co-occurrence
time_bucket_10m 10m Tight co-occurrence (multi_int_island scenarios)
time_bucket_5m 5m Real-time / streaming correlation

Lens-level usage:

blocking_strategy:
  blocking_keys:
    - [time_bucket_10m | field: observed_at, geohash_prefix | field: location, chars: 4]

DH-PSI privacy mode

When the convenience wrapper run_fusion(..., privacy="psi") (component.parallax.three-phase-protocol §13) is used, each blocking pass runs DH-PSI on blocks_a/blocks_b before generate_candidates. Only buckets present on both sides survive the PSI filter; non-shared bucket keys never leak. The candidate union behaviour is unchanged — PSI is applied per pass, then candidates are unioned across passes. See component.parallax.three-phase-protocol §5 for the DH-PSI protocol details.

Test Fixtures

FIX-01: Blocking reduces comparisons

def test_blocking_reduces_comparisons():
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    df_a = extract_features(load_csv("fixtures/node_a_customers.csv"), spec)
    df_b = extract_features(load_csv("fixtures/node_b_customers.csv"), spec)

    candidates, stats = block_and_pair(df_a, df_b, spec.identity_fusion.blocking_strategy)

    assert stats.total_possible_pairs == 625  # 25 × 25
    assert stats.candidate_pairs < 100        # Significant reduction
    assert stats.reduction_ratio > 5.0        # At least 5x reduction

FIX-02: No true matches lost

def test_blocking_preserves_true_matches():
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    df_a = extract_features(load_csv("fixtures/node_a_customers.csv"), spec)
    df_b = extract_features(load_csv("fixtures/node_b_customers.csv"), spec)
    ground_truth = load_csv("fixtures/ground_truth_matches.csv")

    candidates, _ = block_and_pair(df_a, df_b, spec.identity_fusion.blocking_strategy)
    candidate_set = set(candidates)

    # Every true match (easy + medium) must be in candidates
    for _, row in ground_truth.iterrows():
        if row["difficulty"] in ("easy", "medium"):
            assert (row["node_a_id"], row["node_b_id"]) in candidate_set, \
                f"True match lost: {row['node_a_id']} ↔ {row['node_b_id']}"

FIX-03: John Smith false positive separated by blocking

def test_john_smith_different_blocks():
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    df_a = extract_features(load_csv("fixtures/node_a_customers.csv"), spec)
    df_b = extract_features(load_csv("fixtures/node_b_customers.csv"), spec)

    candidates, _ = block_and_pair(df_a, df_b, spec.identity_fusion.blocking_strategy)

    # John Smith A (1970) and John Smith B (1955) should NOT be candidates
    # because their blocking keys differ (S530:1970 vs S530:1955)
    john_a = "A003"  # John Smith node A
    john_b = "B003"  # John Smith node B (different person)
    assert (john_a, john_b) not in set(candidates)

FIX-04: Null blocking key handling

def test_null_blocking_key():
    """Records with null blocking keys go into a 'NULL' bucket."""
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    df_a = load_csv("fixtures/node_a_customers.csv")
    df_a.loc[0, "date_of_birth"] = None
    features = extract_features(df_a, spec)

    blocks = generate_blocking_keys(features, spec.identity_fusion.blocking_strategy)
    # Record with null DOB should be in a "M626:NULL" block
    null_blocks = [k for k in blocks if "NULL" in k]
    assert len(null_blocks) >= 1

FIX-05: Stats are accurate

def test_blocking_stats():
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    df_a = extract_features(load_csv("fixtures/node_a_customers.csv"), spec)
    df_b = extract_features(load_csv("fixtures/node_b_customers.csv"), spec)

    _, stats = block_and_pair(df_a, df_b, spec.identity_fusion.blocking_strategy)

    assert stats.total_possible_pairs == 25 * 25
    assert stats.candidate_pairs == len(candidates)
    assert stats.reduction_ratio == stats.total_possible_pairs / stats.candidate_pairs
    assert stats.largest_block >= 1

File Layout

parallax/ops/fusion/
├── blocker.py              # generate_blocking_keys, generate_candidates, block_and_pair
├── blocker_types.py        # BlockingStats
└── tests/
    ├── fixtures/
    │   ├── node_a_customers.csv
    │   ├── node_b_customers.csv
    │   └── ground_truth_matches.csv
    └── test_blocker.py

Integration Points


Depends on: component.parallax.feature-extraction, component.parallax.lens-parser

Realizes: product.fusion

Required by: component.parallax.correlation-persistence, component.parallax.counter-isr, component.parallax.fusion-binding, component.parallax.fusionmatch-model, component.parallax.scoring-engine, component.parallax.three-phase-protocol