Scoring Engine
Status & scope
- Stage: POC — VRS Use Case
- Module:
parallax/ops/fusion/metrics.py,parallax/ops/fusion/scorer.py - Milestone: M1 (Metrics) + M2 (Weighted Scoring)
Purpose
The scoring engine is the core math. It takes two records and a lens config, computes per-field similarity scores, and produces a weighted aggregate confidence value in [0, 1]. This is where fusion lives or dies.
Two stages:
1. Single-field metrics — pure functions: (value_a, value_b) → float [0,1]
2. Weighted aggregate — combine metrics into score_pair() → confidence
Stage 1: Single-Field Metrics
metrics.py
def exact_match(a: Any, b: Any) -> float:
"""Binary equality after lowercasing + stripping whitespace.
Returns 1.0 if equal, 0.0 otherwise. None/empty → 0.0."""
def jaro_winkler(a: str, b: str) -> float:
"""String similarity using jellyfish.jaro_winkler_similarity.
Returns [0, 1]. None → 0.0. This is the core name matcher."""
def geo_prefix(a: str, b: str, chars: int = 3) -> float:
"""Postcode prefix matching. Compare first N chars (stripped, uppercased).
1.0 if exact prefix match, decay by character difference.
Full haversine in future phase. None → 0.0."""
def soundex_match(a: str, b: str) -> float:
"""Phonetic matching using jellyfish.soundex.
1.0 if same soundex code, 0.0 if different. None → 0.0."""
Metric Implementations
| Metric | Input | Output | Formula | Library |
|---|---|---|---|---|
exact_match |
any, any | {0, 1} | 1.0 if str(a).strip().lower() == str(b).strip().lower() else 0.0 |
stdlib |
jaro_winkler |
str, str | [0, 1] | jellyfish.jaro_winkler_similarity(a, b) |
jellyfish |
geo_prefix |
str, str | [0, 1] | 1.0 if prefix_a == prefix_b else 0.0 (POC), 1.0 - edit_dist/max_len (enhanced) |
stdlib |
soundex_match |
str, str | {0, 1} | 1.0 if jellyfish.soundex(a) == jellyfish.soundex(b) else 0.0 |
jellyfish |
Metric Registry
The registry is the union of the POC core (exact / jaro_winkler / geo_prefix / soundex) and the metrics added by Phase 3 and the R&D consolidation. Full primitive-level definitions live in component.parallax.primitives-framework; this section enumerates what the parser will accept in match_function.*.metric.
METRIC_REGISTRY: dict[str, Callable] = {
# Identity (string)
"exact": exact_match, # MA-13
"jaro_winkler": jaro_winkler, # MA-19 / MF-13
"token_set_ratio": token_set_ratio, # MA-20 / MF-14
"levenshtein": levenshtein, # MA-16
"soundex": soundex_match, # MF-13b
"metaphone": metaphone_match, # MF-13c (phonetic-equivalence)
"nysiis": nysiis_match, # MF-13d (phonetic-equivalence)
"sorensen_dice": sorensen_dice, # MF-13e (Schnell-PPRL Bloom)
# Set / Vector
"jaccard": jaccard, # MA-14
"cosine": cosine, # MA-15
# Spatial
"geo_prefix": geo_prefix, # MF-04
"geospatial_distance": geospatial_distance, # MA-17
"haversine": haversine, # MA-17 / MF-01
"geohash_match": geohash_match, # MF-05
"uncertainty_aware_distance": uncertainty_aware_distance, # MF-18 (themis §1.1)
# Temporal
"temporal_proximity": temporal_proximity, # MA-18 / MF-06
"interval_overlap": interval_overlap, # MF-06
"time_gap": time_gap, # MF-07
"recency_decay": recency_decay, # MF-09
"space_time_cone": space_time_cone, # MA-07
# Composite
"address_similarity": address_similarity, # MF-16
"category_agreement": category_agreement, # MF-17
# Cross-source / kinematic / directional (themis wishlist §1.4–1.6)
"source_complementarity": source_complementarity, # MF-19
"heading_proximity": heading_proximity, # MF-20
"speed_proximity": speed_proximity, # MF-21
# General-purpose numeric tolerance (Magellan ER fixtures)
"numeric_proximity": numeric_proximity, # MF-22
}
Metric semantics — additions beyond the POC four
| Metric | Inputs | Output | Notes |
|---|---|---|---|
metaphone |
str, str | {0, 1} | 1.0 iff jellyfish.metaphone(a) == jellyfish.metaphone(b). Stricter phonetic equivalence than soundex for western names. |
nysiis |
str, str | {0, 1} | 1.0 iff NYSIIS codes match. Better than soundex on names with silent letters. |
sorensen_dice |
str, str | [0, 1] | Sørensen-Dice on Bloom-encoded q-grams. Fuzzy similarity that survives single-char typos that cross phonetic-code boundaries (Smyth/Smith, Cathryn/Catherine). Input may be raw string (auto-encoded at score time with default params) or pre-encoded base64 from a bloom_filter transform. See component.parallax.primitives-framework NM-20 / MF-13e. |
geohash_match |
str, str | [0, 1] | Geohash prefix similarity with continuous decay across cells. |
uncertainty_aware_distance |
(lat, lon, sigma_m), same |
[0, 1] | Mahalanobis-flavored: exp(-(d/√(σ_a²+σ_b²))²/2), hard-clip beyond max_sigmas. Params: default_sigma_m=100.0, max_sigmas=5.0. Tighter pair gating than uniform geohash_match. |
source_complementarity |
any, any | {0, 1} | 1.0 iff a != b. Cross-source pairs (different federate/sensor) score higher than same-source repetition. |
heading_proximity |
bearing_deg, bearing_deg | [0, 1] | Circular linear decay from 1.0 at identical heading to 0.0 at max_diff_deg (default 45). Handles 359/1 wrap-around. |
speed_proximity |
float, float | [0, 1] | Symmetric ratio min/max. Both zero → 1.0; one zero → 0.0. |
numeric_proximity |
num, num | [0, 1] | Linear decay over tolerance (absolute when relative=False, fractional when True). Strips $ % , and whitespace. |
For metric-level test fixtures, derivation rationales, and parameter envelopes see component.parallax.primitives-framework Registry 2 (MF-01..MF-22).
Stage 2: Weighted Aggregate Scoring
scorer.py
def score_pair(
record_a: dict,
record_b: dict,
match_function: list[MatchField],
null_penalty: float = 0.1,
) -> ScoredPair:
"""Score a single pair of records against a match function.
Algorithm:
1. For each field in match_function:
- Look up metric from METRIC_REGISTRY
- Get values from record_a[field] and record_b[field]
- If either is None: score = 0.0, mark as null field
- Otherwise: score = metric(value_a, value_b)
2. Redistribute null field weights proportionally to non-null fields
3. Compute weighted sum: confidence = Σ(adjusted_weight × score)
4. Apply null penalty: confidence -= (null_count × null_penalty)
5. Clamp to [0.0, 1.0]
Returns ScoredPair with confidence and per-field breakdown.
"""
def score_all_candidates(
df_a: dask.dataframe.DataFrame,
df_b: dask.dataframe.DataFrame,
candidates: list[tuple[str, str]],
match_function: list[MatchField],
null_penalty: float = 0.1,
) -> dask.dataframe.DataFrame:
"""Score all candidate pairs. Returns DataFrame sorted by confidence desc.
Columns: [entity_id_a, entity_id_b, confidence, per_field_scores, null_count]
"""
ScoredPair
@dataclass
class ScoredPair:
entity_id_a: str
entity_id_b: str
confidence: float
per_field_scores: dict[str, float] # e.g. {"name_match": 0.92, "dob_match": 1.0, ...}
null_fields: list[str] # fields that were null on either side
null_count: int
Null Handling Algorithm
Given match_function with N fields, weights W[1..N]:
non_null_fields = fields where both values are non-null
null_fields = fields where either value is null
# Redistribute null weights proportionally
total_non_null_weight = sum(W[i] for i in non_null_fields)
for i in non_null_fields:
adjusted_weight[i] = W[i] / total_non_null_weight # Normalize to sum to 1.0
confidence = sum(adjusted_weight[i] * score[i] for i in non_null_fields)
confidence -= len(null_fields) * null_penalty
confidence = clamp(confidence, 0.0, 1.0)
VRS Scoring Example
Easy match: Margaret Chen ↔ Margaret Chen
name_match: jaro_winkler("Margaret Chen", "Margaret Chen") = 1.00 × 0.25 = 0.250
dob_match: exact("1947-03-15", "1947-03-15") = 1.00 × 0.30 = 0.300
postcode_match: geo_prefix("SW1A 1AA", "SW1A 1AA", 3) = 1.00 × 0.15 = 0.150
phone_match: exact(hash_a, hash_a) = 1.00 × 0.15 = 0.150
email_match: exact(hash_a, hash_a) = 1.00 × 0.15 = 0.150
TOTAL = 1.000 → CONFIRMED
Medium match: Dorothy Williams ↔ Dot Williams
name_match: jaro_winkler("Dorothy Williams", "Dot Williams") = ~0.82 × 0.25 = 0.205
dob_match: exact("1940-08-22", "1940-08-22") = 1.00 × 0.30 = 0.300
postcode_match: geo_prefix("E1 6AN", "E1 6BN", 3) = 1.00 × 0.15 = 0.150
phone_match: exact(hash_a, None) → NULL = 0.00 (redistributed)
email_match: exact(hash_a, hash_b_diff) = 0.00 × 0.176 = 0.000
(redistribute 0.15 to others proportionally)
null_penalty = -0.10
TOTAL ≈ 0.655 → 0.555 after penalty → CANDIDATE
False positive trap: John Smith A ↔ John Smith B
name_match: jaro_winkler("John Smith", "John Smith") = 1.00 × 0.25 = 0.250
dob_match: exact("1970-04-15", "1955-12-01") = 0.00 × 0.30 = 0.000
postcode_match: geo_prefix("E2 8DP", "M4 1HQ", 3) = 0.00 × 0.15 = 0.000
phone_match: exact(hash_a, hash_b) = 0.00 × 0.15 = 0.000
email_match: exact(hash_a, hash_b) = 0.00 × 0.15 = 0.000
TOTAL = 0.250 → REJECTED
Test Fixtures
FIX-01: Metric unit tests
@pytest.mark.parametrize("a,b,expected", [
("SW1A 1AA", "SW1A 1AA", 1.0),
("SW1A 1AA", "SW1A 1AB", 0.0),
(None, "x", 0.0),
("", "", 1.0),
])
def test_exact_match(a, b, expected):
assert exact_match(a, b) == expected
@pytest.mark.parametrize("a,b,min_expected", [
("Margaret Chen", "Margaret Chen", 0.99),
("Dorothy", "Dot", 0.70),
("Mohammed", "Mohammad", 0.90),
("Arthur", "Art", 0.75),
])
def test_jaro_winkler(a, b, min_expected):
assert jaro_winkler(a, b) >= min_expected
FIX-02: score_pair end-to-end
def test_score_pair_easy_match():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
record_a = {"full_name": "Margaret Chen", "date_of_birth": "1947-03-15",
"postcode_prefix": "SW1", "phone_hash": "abc123", "email_hash": "def456"}
record_b = {"full_name": "Margaret Chen", "date_of_birth": "1947-03-15",
"postcode_prefix": "SW1", "phone_hash": "abc123", "email_hash": "def456"}
result = score_pair(record_a, record_b, spec.identity_fusion.match_function)
assert result.confidence >= 0.95
assert result.null_count == 0
FIX-03: False positive rejection
def test_john_smith_false_positive():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
smith_a = {"full_name": "John Smith", "date_of_birth": "1970-04-15",
"postcode_prefix": "E2", "phone_hash": "aaa", "email_hash": "bbb"}
smith_b = {"full_name": "John Smith", "date_of_birth": "1955-12-01",
"postcode_prefix": "M4", "phone_hash": "ccc", "email_hash": "ddd"}
result = score_pair(smith_a, smith_b, spec.identity_fusion.match_function)
assert result.confidence < 0.50 # Well below initial_threshold
FIX-04: Brute-force accuracy on sample data
def test_brute_force_accuracy():
"""Score all 625 pairs. Verify precision/recall against ground truth."""
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
df_a = extract_features(load_csv("fixtures/node_a_customers.csv"), spec)
df_b = extract_features(load_csv("fixtures/node_b_customers.csv"), spec)
ground_truth = load_csv("fixtures/ground_truth_matches.csv")
# Score all pairs
all_candidates = [(a, b) for a in df_a["entity_id"] for b in df_b["entity_id"]]
results = score_all_candidates(df_a, df_b, all_candidates, spec.identity_fusion.match_function)
# At confirmation_threshold (0.80)
confirmed = results[results["confidence"] >= 0.80]
true_matches = set(zip(ground_truth["node_a_id"], ground_truth["node_b_id"]))
confirmed_set = set(zip(confirmed["entity_id_a"], confirmed["entity_id_b"]))
precision = len(confirmed_set & true_matches) / len(confirmed_set) if confirmed_set else 0
recall = len(confirmed_set & true_matches) / len(true_matches) if true_matches else 0
assert precision >= 0.80 # At least 80% precision at 0.80 threshold
FIX-05: Null penalty works
def test_null_penalty():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
full = {"full_name": "X", "date_of_birth": "2000-01-01",
"postcode_prefix": "SW1", "phone_hash": "a", "email_hash": "b"}
partial = {"full_name": "X", "date_of_birth": "2000-01-01",
"postcode_prefix": "SW1", "phone_hash": None, "email_hash": None}
full_score = score_pair(full, full, spec.identity_fusion.match_function)
partial_score = score_pair(full, partial, spec.identity_fusion.match_function)
assert partial_score.confidence < full_score.confidence
assert partial_score.null_count == 2
Performance Targets
| Scenario | Pairs | Target |
|---|---|---|
| POC (25×25) | 625 | < 1 second |
| Medium (1K×1K) | ~10K after blocking | < 5 seconds |
| Large (10K×10K) | ~100K after blocking | < 30 seconds |
Dependencies
pip install jellyfish # jaro_winkler_similarity, soundex
File Layout
parallax/ops/fusion/
├── metrics.py # exact_match, jaro_winkler, geo_prefix, soundex_match, METRIC_REGISTRY
├── scorer.py # score_pair, score_all_candidates
├── scorer_types.py # ScoredPair
└── tests/
├── test_metrics.py
└── test_scorer.py
Integration Points
- component.parallax.lens-parser → here:
MatchFieldobjects drive metric selection and weights - component.parallax.blocking-engine → here: Candidate pairs from blocking are the input
- Here → component.parallax.fusionmatch-model:
score_all_candidates()is called inside FusionMatch model
Depends on: component.parallax.blocking-engine, component.parallax.lens-parser
Realizes: product.fusion
Required by: component.parallax.correlation-persistence, component.parallax.counter-isr, component.parallax.derived-features, component.parallax.fusion-binding, component.parallax.fusionmatch-model, component.parallax.multi-contributor-combination, component.parallax.quorum, component.parallax.three-phase-protocol, component.parallax.tracking-integration