Fusion Primitives Framework
Status & scope
- Stage: Architecture — Covers POC through Production
- Module:
parallax/ops/fusion/(ALL primitives — transforms, metrics, algorithms),cortex/tools/(MCP tool wrappers only),titan/(FATE model runner adapter only) - Milestone: M2 (Foundation) → M5+ (Full Coverage)
- Reference:
Axonis_Fusion_Primitives_Implementation_Matrix.xlsx
Implementation note — base class: Throughout this spec, primitives are written as
class X(FusionTransform): .... The concreteFusionTransform/FusionOpbase lives inparallax/ops/fusion/base.pyand is duck-typed, NOT a subclass of axonis-core'sCommand. axonis-core'sCommandconstructs a Redis client at init, which parallax (a leaf compute package) intentionally avoids. The dispatch contract (__init__(body, data, serialize),.execute(),.parameters,.command) is sufficient —uds.ops.feature.feature_engineernever type-checks the class.
Purpose
The fusion pipeline requires 94 distinct primitives across 6 categories (Canonical Attributes, Normalization, Matching Features, Matching Algorithms, Tracking, Configuration). The current specs hard-code a handful of operations (fusion_hash, fusion_soundex, fusion_geo_prefix, fusion_year, jaro_winkler, exact, geo_prefix, soundex). That covers VRS POC but not the full vision.
This spec defines: 1. A registry pattern — how primitives are discovered, loaded, and invoked 2. A base class contract — what every primitive must implement 3. Where they live — parallax ops for ALL primitives (transforms, metrics, algorithms), titan for FATE model runner, cortex for MCP tool wrappers only 4. How the lens drives selection — lens YAML references primitives by ID, the registry resolves them
Elasticsearch and Dask: Division of Responsibility
Elasticsearch is the core database. Its ingest pipeline normalizes incoming data into ES primitive types — dates to UTC epoch millis, locations to geo_point (WGS84), text through basic analyzers. This is standard ES type coercion for storage, not fusion-specific logic.
All 94 fusion primitives run in Dask via parallax and titan. Every transform, every metric, every blocking algorithm, scoring operation, and clustering step executes as a Dask operation against data pulled from ES (or from CSV/DataFrame in standalone test mode). ES stores and retrieves; Dask computes.
ES also provides aggregation capabilities (terms, geohash_grid, date_histogram, composite) that could optimize blocking in the future, but the primary compute path is Dask.
The 6 Primitive Categories
| Category | Count | Where They Run | Storage/Registration |
|---|---|---|---|
| Canonical Attributes (CA) | 23 | UDS object model + parallax extraction | UDS type system + parallax/ops/fusion/ |
| Normalization (NM) | 19 | parallax Dask transform ops | parallax/ops/fusion/ — FUSION_OPS dict |
| Matching Features (MF) | 17 | parallax fusion engine | parallax/ops/fusion/metrics.py — METRIC_REGISTRY |
| Matching Algorithms (MA) | 20 | parallax fusion engine | parallax/ops/fusion/algorithms/ — ALGORITHM_REGISTRY |
| Tracking (TR) | 7 | parallax Dask ops (Phase 5) | parallax/ops/fusion/tracking.py |
| Configuration (CF) | 8 | Lens Spec fields | Lens YAML config |
Architecture: Three Registries
┌─────────────────────────────────────────────────────────────┐
│ LENS SPEC (YAML) │
│ References primitives by ID: │
│ transforms: [nfkc, casefold, soundex, sha256_hash] │
│ metrics: [jaro_winkler, haversine, exact] │
│ blocking: [hash_blocking] │
│ scoring: [weighted_aggregate] │
└──────────────────────────┬──────────────────────────────────┘
│ resolve by ID
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────────────┐ ┌──────────────┐ ┌──────────────────┐
│ TRANSFORM_REGISTRY│ │METRIC_REGISTRY│ │ALGORITHM_REGISTRY│
│ (parallax ops) │ │(parallax) │ │(parallax) │
│ │ │ │ │ │
│ NM-01..NM-19 │ │ MF-01..MF-17 │ │ MA-01..MA-20 │
│ + CA extractions │ │ │ │ │
│ │ │ │ │ │
│ Each: Command │ │ Each: │ │ Each: │
│ subclass with │ │ (a, b) → [0,1]│ │ candidates → │
│ .execute() │ │ │ │ scored_pairs │
└──────────────────┘ └──────────────┘ └──────────────────┘
Registry 1: Transform Registry (Normalization + Extraction)
Location: parallax/ops/fusion/
Pattern: Extends existing parallax OPS dict
Covers: NM-01 through NM-19, plus CA field extraction ops
Base Class
Every transform op extends parallax's existing Command base:
# parallax/ops/fusion/base.py
from parallax.ops.command import Command
class FusionTransform(Command):
"""Base class for all fusion transform operations.
Subclasses implement execute() which transforms a Dask DataFrame.
The Command base class provides:
- self.data: the input Dask DataFrame
- self.parameters: dict from the operation config
- self.features: list of column names to operate on
Every FusionTransform must:
1. Accept a 'features' list in parameters (columns to transform)
2. Return the modified DataFrame
3. Handle nulls gracefully (null in → null out)
4. Be idempotent (running twice produces same result)
5. Add new columns rather than mutating originals (unless explicitly replacing)
"""
# Primitive metadata — set by each subclass
primitive_id: str = "" # e.g. "NM-01"
primitive_name: str = "" # e.g. "unicode_nfkc"
category: str = "normalization" # or "extraction"
domain_applicability: list = [] # e.g. ["All"] or ["PNT", "Border"]
dependencies: list = [] # pip packages needed
def validate_params(self) -> list[str]:
"""Optional parameter validation. Returns list of error messages."""
return []
Transform Catalog
Text Normalization (NM-01 through NM-07)
# parallax/ops/fusion/text.py
class UnicodeNFKC(FusionTransform):
"""NM-01: Unicode NFKC normalization + diacritics removal."""
primitive_id = "NM-01"
primitive_name = "unicode_nfkc"
def execute(self):
import unicodedata
for col in self.parameters["features"]:
self.data[f"{col}_norm"] = self.data[col].apply(
lambda v: unicodedata.normalize("NFKC", str(v)) if pd.notna(v) else None,
meta=(f"{col}_norm", "str")
)
return self.data
class Casefold(FusionTransform):
"""NM-02: Casefolding + whitespace normalization."""
primitive_id = "NM-02"
primitive_name = "casefold"
def execute(self):
import re
for col in self.parameters["features"]:
self.data[f"{col}_cf"] = self.data[col].apply(
lambda v: re.sub(r'\s+', ' ', str(v).casefold().strip()) if pd.notna(v) else None,
meta=(f"{col}_cf", "str")
)
return self.data
class Tokenize(FusionTransform):
"""NM-03: Tokenization + stopword removal."""
primitive_id = "NM-03"
primitive_name = "tokenize"
dependencies = [] # stdlib only
def execute(self):
stopwords = set(self.parameters.get("stopwords", [
"the", "of", "and", "inc", "ltd", "llc", "plc", "corp",
"company", "limited", "group", "international",
]))
for col in self.parameters["features"]:
self.data[f"{col}_tokens"] = self.data[col].apply(
lambda v: " ".join(
t for t in str(v).lower().split() if t not in stopwords
) if pd.notna(v) else None,
meta=(f"{col}_tokens", "str")
)
return self.data
class NameParser(FusionTransform):
"""NM-04: Name parsing into given/middle/family components."""
primitive_id = "NM-04"
primitive_name = "name_parse"
def execute(self):
for col in self.parameters["features"]:
self.data[f"{col}_given"] = self.data[col].apply(
lambda v: self._parse_name(v).get("given", "") if pd.notna(v) else None,
meta=(f"{col}_given", "str")
)
self.data[f"{col}_family"] = self.data[col].apply(
lambda v: self._parse_name(v).get("family", "") if pd.notna(v) else None,
meta=(f"{col}_family", "str")
)
return self.data
@staticmethod
def _parse_name(name: str) -> dict:
"""Parse 'John Smith' → {given: 'John', family: 'Smith'}
Handles: 'SMITH, John', 'J. Smith', 'Dr. John A. Smith Jr.'
"""
parts = str(name).strip().split()
if len(parts) == 0:
return {"given": "", "family": ""}
if "," in parts[0]:
# "SMITH, John" format
family = parts[0].rstrip(",")
given = " ".join(parts[1:])
else:
given = parts[0]
family = parts[-1] if len(parts) > 1 else ""
return {"given": given, "family": family}
class AliasExpansion(FusionTransform):
"""NM-05: Alias/nickname expansion."""
primitive_id = "NM-05"
primitive_name = "alias_expand"
# Common aliases — configurable via parameters
DEFAULT_ALIASES = {
"bob": "robert", "rob": "robert", "bobby": "robert",
"bill": "william", "will": "william", "willy": "william",
"dick": "richard", "rick": "richard", "rich": "richard",
"mike": "michael", "mick": "michael",
"jim": "james", "jimmy": "james", "jamie": "james",
"dot": "dorothy", "dolly": "dorothy",
"meg": "margaret", "maggie": "margaret", "peggy": "margaret",
"art": "arthur", "artie": "arthur",
"liz": "elizabeth", "beth": "elizabeth", "betty": "elizabeth",
"ted": "edward", "ned": "edward", "ed": "edward",
"tom": "thomas", "tommy": "thomas",
"pat": "patricia", "patty": "patricia",
"sue": "susan", "suzy": "susan",
"joe": "joseph", "joey": "joseph",
"dan": "daniel", "danny": "daniel",
"chris": "christopher", "kit": "christopher",
"sam": "samuel", "sammy": "samuel",
"ben": "benjamin", "benny": "benjamin",
"tony": "anthony",
"dave": "david", "davy": "david",
"steve": "stephen", "steph": "stephen",
"matt": "matthew",
"nick": "nicholas",
"alex": "alexander",
"andy": "andrew", "drew": "andrew",
}
def execute(self):
aliases = {**self.DEFAULT_ALIASES, **self.parameters.get("custom_aliases", {})}
for col in self.parameters["features"]:
self.data[f"{col}_canonical"] = self.data[col].apply(
lambda v: aliases.get(str(v).lower().split()[0], str(v).split()[0]).title()
+ " " + " ".join(str(v).split()[1:])
if pd.notna(v) and str(v).strip() else None,
meta=(f"{col}_canonical", "str")
)
return self.data
class PhoneticEncoding(FusionTransform):
"""NM-06: Phonetic encoding (Soundex, Metaphone, NYSIIS)."""
primitive_id = "NM-06"
primitive_name = "phonetic"
dependencies = ["jellyfish"]
def execute(self):
import jellyfish
algorithm = self.parameters.get("algorithm", "soundex")
encoders = {
"soundex": jellyfish.soundex,
"metaphone": jellyfish.metaphone,
"nysiis": jellyfish.nysiis,
}
encode = encoders[algorithm]
for col in self.parameters["features"]:
self.data[f"{col}_{algorithm}"] = self.data[col].apply(
lambda v: encode(str(v).split()[0]) if pd.notna(v) else None,
meta=(f"{col}_{algorithm}", "str")
)
return self.data
class AddressStandardize(FusionTransform):
"""NM-07: Address standardization."""
primitive_id = "NM-07"
primitive_name = "address_standardize"
UK_ABBREVIATIONS = {
"street": "st", "road": "rd", "avenue": "ave", "lane": "ln",
"drive": "dr", "close": "cl", "court": "ct", "place": "pl",
"gardens": "gdns", "terrace": "ter", "crescent": "cres",
}
def execute(self):
for col in self.parameters["features"]:
self.data[f"{col}_std"] = self.data[col].apply(
lambda v: self._standardize(str(v)) if pd.notna(v) else None,
meta=(f"{col}_std", "str")
)
return self.data
@classmethod
def _standardize(cls, address: str) -> str:
parts = address.lower().strip().split()
return " ".join(cls.UK_ABBREVIATIONS.get(p, p) for p in parts)
Geo Normalization (NM-08 through NM-11)
# parallax/ops/fusion/geo.py
class CRSAlignment(FusionTransform):
"""NM-08: CRS alignment to WGS84."""
primitive_id = "NM-08"
primitive_name = "crs_align"
def execute(self):
# POC: assume input is already WGS84 lat/lon
# Production: use pyproj for CRS conversion
return self.data
class Geocode(FusionTransform):
"""NM-09: Address → lat/lon geocoding."""
primitive_id = "NM-09"
primitive_name = "geocode"
def execute(self):
# POC: UK postcode lookup table
# Production: geocoding service integration
for col in self.parameters["features"]:
self.data[f"{col}_lat"] = self.data[col].apply(
lambda v: self._postcode_to_lat(v) if pd.notna(v) else None,
meta=(f"{col}_lat", "float64")
)
self.data[f"{col}_lon"] = self.data[col].apply(
lambda v: self._postcode_to_lon(v) if pd.notna(v) else None,
meta=(f"{col}_lon", "float64")
)
return self.data
@staticmethod
def _postcode_to_lat(postcode: str) -> float | None:
"""POC: stub — production uses ONS postcode lookup."""
return None # Implement with real lookup
class GeohashEncode(FusionTransform):
"""NM-10: Geohash / H3 / S2 multi-resolution encoding."""
primitive_id = "NM-10"
primitive_name = "geohash_encode"
def execute(self):
precision = int(self.parameters.get("precision", 5))
system = self.parameters.get("system", "geohash") # geohash | h3 | s2
lat_col = self.parameters.get("lat_col", "lat")
lon_col = self.parameters.get("lon_col", "lon")
if system == "geohash":
self.data["geohash"] = self.data.apply(
lambda row: self._encode_geohash(row[lat_col], row[lon_col], precision)
if pd.notna(row[lat_col]) else None,
axis=1, meta=("geohash", "str")
)
return self.data
@staticmethod
def _encode_geohash(lat: float, lon: float, precision: int) -> str:
"""Simple geohash implementation. Production: use python-geohash."""
# Base32 encoding of interleaved lat/lon bits
import struct
# Simplified — use geohash2 library in production
return f"geo_{lat:.{precision}f}_{lon:.{precision}f}"
Temporal Normalization (NM-12 through NM-15)
# parallax/ops/fusion/temporal.py
class UTCNormalize(FusionTransform):
"""NM-12: UTC normalization with timezone preservation."""
primitive_id = "NM-12"
primitive_name = "utc_normalize"
def execute(self):
for col in self.parameters["features"]:
self.data[f"{col}_utc"] = self.data[col].apply(
lambda v: pd.Timestamp(v).tz_localize("UTC").isoformat()
if pd.notna(v) else None,
meta=(f"{col}_utc", "str")
)
return self.data
class IntervalRepresent(FusionTransform):
"""NM-13: Interval representation (start, end, duration)."""
primitive_id = "NM-13"
primitive_name = "interval"
def execute(self):
start_col = self.parameters.get("start_col")
end_col = self.parameters.get("end_col")
self.data["interval_seconds"] = (
pd.to_datetime(self.data[end_col]) - pd.to_datetime(self.data[start_col])
).dt.total_seconds()
return self.data
class TemporalBucket(FusionTransform):
"""NM-15: Temporal rounding/bucketing."""
primitive_id = "NM-15"
primitive_name = "temporal_bucket"
def execute(self):
bucket_size = self.parameters.get("bucket", "1h")
for col in self.parameters["features"]:
self.data[f"{col}_bucket"] = self.data[col].apply(
lambda v: pd.Timestamp(v).floor(bucket_size).isoformat()
if pd.notna(v) else None,
meta=(f"{col}_bucket", "str")
)
return self.data
Unit/Numeric Normalization (NM-16 through NM-19)
# parallax/ops/fusion/numeric.py
class HashTransform(FusionTransform):
"""NM-19: SHA-256 hash transform (PII protection)."""
primitive_id = "NM-19"
primitive_name = "sha256_hash"
def execute(self):
import hashlib
algorithm = self.parameters.get("algorithm", "sha256")
for col in self.parameters["features"]:
self.data[f"{col}_hash"] = self.data[col].apply(
lambda v: hashlib.new(algorithm, str(v).encode()).hexdigest()
if pd.notna(v) else None,
meta=(f"{col}_hash", "str")
)
return self.data
Transform Registration
All fusion transforms register into the existing parallax OPS dict:
# parallax/ops/fusion/__init__.py
from parallax.ops.fusion.text import (
UnicodeNFKC, Casefold, Tokenize, NameParser,
AliasExpansion, PhoneticEncoding, AddressStandardize,
)
from parallax.ops.fusion.geo import CRSAlignment, Geocode, GeohashEncode
from parallax.ops.fusion.temporal import UTCNormalize, IntervalRepresent, TemporalBucket
from parallax.ops.fusion.numeric import HashTransform
FUSION_OPS = {
# Text normalization
"unicode_nfkc": UnicodeNFKC, # NM-01
"casefold": Casefold, # NM-02
"tokenize": Tokenize, # NM-03
"name_parse": NameParser, # NM-04
"alias_expand": AliasExpansion, # NM-05
"phonetic": PhoneticEncoding, # NM-06
"address_standardize": AddressStandardize, # NM-07
# Geo normalization
"crs_align": CRSAlignment, # NM-08
"geocode": Geocode, # NM-09
"geohash_encode": GeohashEncode, # NM-10
# Temporal normalization
"utc_normalize": UTCNormalize, # NM-12
"interval": IntervalRepresent, # NM-13
"temporal_bucket": TemporalBucket, # NM-15
# Numeric/hash
"sha256_hash": HashTransform, # NM-19
# PPRL
"bloom_filter": BloomFilter, # NM-20 — q-gram Bloom (Schnell 2009)
# Numeric binning
"numeric_bucket": NumericBucket, # NM-21 — width-aligned bucket
}
# NM-21: Width-aligned numeric bucketing (parallax/ops/fusion/transforms/numeric.py)
# numeric_bucket(value, width=1.0) -> float | None
# floor(value / width) * width. Used for blocking on noisy continuous
# measurements (RF frequency, bearing, currency amount): same-source
# pairs fall in the same bucket when measurement noise is < width.
# width <= 0 raises ValueError; unparseable input → None.
# NM-20: Bloom-filter PPRL encoding (parallax/ops/fusion/transforms/text.py)
# bloom_encode(value, qgrams=2, bits=1024, hashes=30) -> base64 str | None
# One-way (Schnell 2009 §4); deterministic for replay (FED-SPEC-02).
# Pairs with metric `sorensen_dice` (MF-13e) for fuzzy similarity on
# derived values across federation boundaries.
# Register into parallax's main OPS dict
def register_fusion_ops():
"""Call during parallax startup to register all fusion transforms."""
from parallax.ops.feature import OPS
OPS.update(FUSION_OPS)
Registry 2: Metric Registry (Matching Features)
Location: parallax/ops/fusion/metrics.py (expanded from component.parallax.scoring-engine)
Covers: MF-01 through MF-17
Base Contract
# parallax/ops/fusion/metric_base.py
class FusionMetric:
"""Base class for all fusion similarity metrics.
A metric computes similarity between two values:
(value_a, value_b, **params) → float in [0.0, 1.0]
Where 1.0 = identical, 0.0 = completely different.
"""
primitive_id: str = ""
primitive_name: str = ""
input_types: list[str] = [] # ["str"], ["float", "float"], etc.
domain_applicability: list = []
dependencies: list = []
def compute(self, a, b, **params) -> float:
"""Compute similarity. Must return [0.0, 1.0]. None inputs → 0.0."""
raise NotImplementedError
def validate_inputs(self, a, b) -> bool:
"""Check input compatibility."""
return True
Metric Catalog
# parallax/ops/fusion/metrics.py
import jellyfish
import math
class ExactMatch(FusionMetric):
"""MF-15 / MA-13: Binary equality."""
primitive_id = "MA-13"
primitive_name = "exact"
def compute(self, a, b, **params) -> float:
if a is None or b is None:
return 0.0
return 1.0 if str(a).strip().lower() == str(b).strip().lower() else 0.0
class JaroWinkler(FusionMetric):
"""MF-13 / MA-19: Fuzzy name matching."""
primitive_id = "MA-19"
primitive_name = "jaro_winkler"
dependencies = ["jellyfish"]
def compute(self, a, b, **params) -> float:
if a is None or b is None:
return 0.0
return jellyfish.jaro_winkler_similarity(str(a), str(b))
class TokenSetRatio(FusionMetric):
"""MF-14 / MA-20: Token-set similarity for name reordering."""
primitive_id = "MA-20"
primitive_name = "token_set_ratio"
def compute(self, a, b, **params) -> float:
if a is None or b is None:
return 0.0
tokens_a = set(str(a).lower().split())
tokens_b = set(str(b).lower().split())
if not tokens_a and not tokens_b:
return 1.0
if not tokens_a or not tokens_b:
return 0.0
intersection = tokens_a & tokens_b
union = tokens_a | tokens_b
return len(intersection) / len(union) # Jaccard on tokens
class Levenshtein(FusionMetric):
"""MA-16: Edit distance normalized to [0,1]."""
primitive_id = "MA-16"
primitive_name = "levenshtein"
dependencies = ["jellyfish"]
def compute(self, a, b, **params) -> float:
if a is None or b is None:
return 0.0
sa, sb = str(a), str(b)
max_len = max(len(sa), len(sb))
if max_len == 0:
return 1.0
dist = jellyfish.levenshtein_distance(sa, sb)
return 1.0 - (dist / max_len)
class JaccardSimilarity(FusionMetric):
"""MA-14: Set similarity."""
primitive_id = "MA-14"
primitive_name = "jaccard"
def compute(self, a, b, **params) -> float:
if a is None or b is None:
return 0.0
set_a = set(str(a).lower().split()) if isinstance(a, str) else set(a)
set_b = set(str(b).lower().split()) if isinstance(b, str) else set(b)
if not set_a and not set_b:
return 1.0
union = set_a | set_b
if not union:
return 0.0
return len(set_a & set_b) / len(union)
class CosineSimilarity(FusionMetric):
"""MA-15: Vector similarity."""
primitive_id = "MA-15"
primitive_name = "cosine"
def compute(self, a, b, **params) -> float:
if a is None or b is None:
return 0.0
# a and b are lists/arrays of floats
dot = sum(x * y for x, y in zip(a, b))
mag_a = math.sqrt(sum(x * x for x in a))
mag_b = math.sqrt(sum(x * x for x in b))
if mag_a == 0 or mag_b == 0:
return 0.0
return dot / (mag_a * mag_b)
class HaversineDistance(FusionMetric):
"""MF-01 / MA-17: Geospatial distance similarity."""
primitive_id = "MA-17"
primitive_name = "haversine"
def compute(self, a, b, **params) -> float:
if a is None or b is None:
return 0.0
# a, b are (lat, lon) tuples
max_distance_km = params.get("max_distance_km", 50.0)
lat1, lon1 = float(a[0]), float(a[1])
lat2, lon2 = float(b[0]), float(b[1])
R = 6371.0 # Earth radius km
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a_val = (math.sin(dlat/2)**2 +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon/2)**2)
c = 2 * math.atan2(math.sqrt(a_val), math.sqrt(1 - a_val))
distance = R * c
if distance >= max_distance_km:
return 0.0
return 1.0 - (distance / max_distance_km)
class TemporalProximity(FusionMetric):
"""MF-06 / MA-18: Time-based similarity."""
primitive_id = "MA-18"
primitive_name = "temporal_proximity"
def compute(self, a, b, **params) -> float:
if a is None or b is None:
return 0.0
max_gap_seconds = params.get("max_gap_seconds", 3600)
ta = pd.Timestamp(a)
tb = pd.Timestamp(b)
gap = abs((ta - tb).total_seconds())
if gap >= max_gap_seconds:
return 0.0
return 1.0 - (gap / max_gap_seconds)
class GeoPrefix(FusionMetric):
"""Postcode prefix matching (POC shortcut for haversine)."""
primitive_id = "MF-04"
primitive_name = "geo_prefix"
def compute(self, a, b, **params) -> float:
if a is None or b is None:
return 0.0
chars = params.get("chars", 3)
pa = str(a).replace(" ", "")[:chars].upper()
pb = str(b).replace(" ", "")[:chars].upper()
return 1.0 if pa == pb else 0.0
class SoundexMatch(FusionMetric):
"""Phonetic equality via soundex."""
primitive_id = "MF-13b"
primitive_name = "soundex_match"
dependencies = ["jellyfish"]
def compute(self, a, b, **params) -> float:
if a is None or b is None:
return 0.0
return 1.0 if jellyfish.soundex(str(a)) == jellyfish.soundex(str(b)) else 0.0
class IntervalOverlap(FusionMetric):
"""MF-06: Interval overlap ratio."""
primitive_id = "MF-06"
primitive_name = "interval_overlap"
def compute(self, a, b, **params) -> float:
# a, b are (start, end) tuples
if a is None or b is None:
return 0.0
a_start, a_end = pd.Timestamp(a[0]), pd.Timestamp(a[1])
b_start, b_end = pd.Timestamp(b[0]), pd.Timestamp(b[1])
overlap_start = max(a_start, b_start)
overlap_end = min(a_end, b_end)
if overlap_start >= overlap_end:
return 0.0
overlap = (overlap_end - overlap_start).total_seconds()
total = max(
(a_end - a_start).total_seconds(),
(b_end - b_start).total_seconds(),
)
return overlap / total if total > 0 else 0.0
class RecencyDecay(FusionMetric):
"""MF-09: Recency-weighted scoring."""
primitive_id = "MF-09"
primitive_name = "recency_decay"
def compute(self, a, b, **params) -> float:
if a is None or b is None:
return 0.0
half_life_hours = params.get("half_life_hours", 24)
age_hours = abs((pd.Timestamp("now") - pd.Timestamp(a)).total_seconds()) / 3600
return 2.0 ** (-age_hours / half_life_hours)
class AddressSimilarity(FusionMetric):
"""MF-16: Composite address similarity."""
primitive_id = "MF-16"
primitive_name = "address_similarity"
def compute(self, a, b, **params) -> float:
if a is None or b is None:
return 0.0
# Tokenize, normalize, then Jaccard
tokens_a = set(str(a).lower().replace(",", " ").split())
tokens_b = set(str(b).lower().replace(",", " ").split())
if not tokens_a or not tokens_b:
return 0.0
return len(tokens_a & tokens_b) / len(tokens_a | tokens_b)
class CategoryAgreement(FusionMetric):
"""MF-17: Categorical/attribute set agreement."""
primitive_id = "MF-17"
primitive_name = "category_agreement"
def compute(self, a, b, **params) -> float:
if a is None or b is None:
return 0.0
if isinstance(a, str):
a = set(a.split(","))
if isinstance(b, str):
b = set(b.split(","))
a, b = set(a), set(b)
if not a and not b:
return 1.0
union = a | b
return len(a & b) / len(union) if union else 0.0
Metric Registration
# parallax/ops/fusion/metrics.py
METRIC_REGISTRY: dict[str, FusionMetric] = {
# Identity
"exact": ExactMatch(), # MA-13
"jaro_winkler": JaroWinkler(), # MA-19 / MF-13
"token_set_ratio": TokenSetRatio(), # MA-20 / MF-14
"levenshtein": Levenshtein(), # MA-16
"soundex_match": SoundexMatch(), # MF-13b
# Set/Vector
"jaccard": JaccardSimilarity(), # MA-14
"cosine": CosineSimilarity(), # MA-15
# Spatial
"haversine": HaversineDistance(), # MA-17 / MF-01
"geo_prefix": GeoPrefix(), # MF-04
# Temporal
"temporal_proximity": TemporalProximity(), # MA-18 / MF-06
"interval_overlap": IntervalOverlap(), # MF-06
"recency_decay": RecencyDecay(), # MF-09
# Composite
"address_similarity": AddressSimilarity(), # MF-16
"category_agreement": CategoryAgreement(), # MF-17
# Phonetic-equivalence (string)
"metaphone": MetaphoneMatch(), # MF-13c
"nysiis": NysiisMatch(), # MF-13d
"sorensen_dice": SorensenDice(), # MF-13e — Bloom-PPRL (NM-20)
# Spatial (continuous)
"geohash_match": GeohashMatch(), # MF-05
"uncertainty_aware_distance": UncertaintyAwareDistance(), # MF-18
# Cross-source / kinematic / directional (themis wishlist §1.4–1.6)
"source_complementarity": SourceComplementarity(), # MF-19
"heading_proximity": HeadingProximity(), # MF-20
"speed_proximity": SpeedProximity(), # MF-21
# General-purpose numeric tolerance
"numeric_proximity": NumericProximity(), # MF-22
}
# MF-13e SorensenDice: Bloom-PPRL fuzzy similarity. Accepts raw strings
# (auto-encoded with bloom_encode defaults) or pre-encoded base64 filters
# from an NM-20 bloom_filter transform. Returns 2·|A∩B|/(|A|+|B|).
# MF-18 UncertaintyAwareDistance: Mahalanobis on (lat, lon, sigma_m) tuples
# under uncorrelated isotropic uncertainty. exp(-(d/√(σ_a²+σ_b²))²/2)
# with hard-clip at max_sigmas (default 5.0). Params: default_sigma_m=100.
# MF-19 SourceComplementarity: 1.0 iff inputs differ (cross-source bonus).
# MF-20 HeadingProximity: circular linear decay over max_diff_deg (default 45).
# MF-21 SpeedProximity: symmetric min/max ratio; both-zero → 1.0.
# MF-22 NumericProximity: linear decay over `tolerance`, absolute or relative.
def get_metric(name: str) -> FusionMetric:
"""Resolve a metric by name. Raises KeyError if not found."""
if name not in METRIC_REGISTRY:
raise KeyError(f"Unknown metric '{name}'. Available: {list(METRIC_REGISTRY.keys())}")
return METRIC_REGISTRY[name]
Registry 3: Algorithm Registry (Blocking + Scoring + Clustering)
Location: parallax/ops/fusion/algorithms/
Covers: MA-01 through MA-12
Blocking Algorithms (MA-01 through MA-04)
All blocking runs as Dask operations in parallax. ES aggregation capabilities (terms, geohash_grid, date_histogram, composite) could provide a future optimization path for blocking, but are not the current implementation.
# parallax/ops/fusion/algorithms/blocking.py
class BlockingAlgorithm:
"""Base class for blocking strategies."""
primitive_id: str = ""
primitive_name: str = ""
def generate_keys(self, df, blocking_config) -> dict[str, list[str]]:
raise NotImplementedError
class HashBlocking(BlockingAlgorithm):
"""MA-01: Hash blocking (current implementation in component.parallax.blocking-engine)."""
primitive_id = "MA-01"
primitive_name = "hash_blocking"
def generate_keys(self, df, blocking_config):
# Existing implementation from component.parallax.blocking-engine
...
class SortedNeighborhood(BlockingAlgorithm):
"""MA-02: Sorted neighborhood blocking."""
primitive_id = "MA-02"
primitive_name = "sorted_neighborhood"
def generate_keys(self, df, blocking_config):
window = blocking_config.get("window_size", 5)
sort_key = blocking_config.get("sort_key")
# Sort by key, compare within sliding window
...
class CanopyClustering(BlockingAlgorithm):
"""MA-03: Canopy clustering for blocking."""
primitive_id = "MA-03"
primitive_name = "canopy"
def generate_keys(self, df, blocking_config):
t1 = blocking_config.get("loose_threshold", 0.8)
t2 = blocking_config.get("tight_threshold", 0.5)
...
class LSHBlocking(BlockingAlgorithm):
"""MA-04: Locality-Sensitive Hashing."""
primitive_id = "MA-04"
primitive_name = "lsh"
def generate_keys(self, df, blocking_config):
num_hashes = blocking_config.get("num_hashes", 100)
num_bands = blocking_config.get("num_bands", 20)
...
BLOCKING_REGISTRY = {
"hash_blocking": HashBlocking(),
"sorted_neighborhood": SortedNeighborhood(),
"canopy": CanopyClustering(),
"lsh": LSHBlocking(),
}
Scoring Algorithms (MA-05 through MA-08)
# parallax/ops/fusion/algorithms/scoring.py
class ScoringAlgorithm:
"""Base class for scoring strategies."""
primitive_id: str = ""
def score(self, candidates, match_function, **params) -> pd.DataFrame:
raise NotImplementedError
class WeightedAggregate(ScoringAlgorithm):
"""MA-05: Current implementation from component.parallax.scoring-engine."""
primitive_id = "MA-05"
def score(self, candidates, match_function, **params):
# Existing score_pair / score_all_candidates from component.parallax.scoring-engine
...
class FellegiSunter(ScoringAlgorithm):
"""MA-06: Fellegi-Sunter probabilistic record linkage."""
primitive_id = "MA-06"
def score(self, candidates, match_function, **params):
# Log-likelihood ratio scoring
# m-probability (match) vs u-probability (non-match)
...
class TravelFeasibilityGate(ScoringAlgorithm):
"""MA-07: Space-time cone pre-filter."""
primitive_id = "MA-07"
def score(self, candidates, match_function, **params):
max_speed_kmh = params.get("max_speed_kmh", 900)
# Reject pairs where travel between locations in time gap
# would require exceeding max_speed
...
SCORING_REGISTRY = {
"weighted_aggregate": WeightedAggregate(),
"fellegi_sunter": FellegiSunter(),
"travel_feasibility_gate": TravelFeasibilityGate(),
}
How the Lens Drives Primitive Selection
The lens YAML now references primitives by registry name:
# Extended lens format — field_groupings.identity_hints
field_groupings:
identity_hints:
- field: full_name
transforms: # CHAIN of transforms (replaces single 'transform')
- unicode_nfkc # NM-01
- casefold # NM-02
- name_parse # NM-04
match_metric: jaro_winkler # MA-19
label: Full Name
- field: postcode
transforms:
- address_standardize # NM-07
- geohash_encode | precision: 5 # NM-10
match_metric: geo_prefix | chars: 3
label: Postcode Area
- field: phone
transforms:
- sha256_hash # NM-19
match_metric: exact # MA-13
label: Phone (hashed)
# Extended blocking_strategy
identity_fusion:
blocking_strategy:
method: hash_blocking # MA-01 (from BLOCKING_REGISTRY)
blocking_keys:
- phonetic | field: full_name | algorithm: soundex # NM-06
- temporal_bucket | field: date_of_birth | bucket: year
scoring_algorithm: weighted_aggregate # MA-05 (from SCORING_REGISTRY)
# Or: fellegi_sunter for probabilistic linkage
Lens Parser Extension (component.parallax.lens-parser update)
# In lens_parser.py — resolve transform chains
def resolve_transforms(hints: list[FieldHint]) -> list[dict]:
"""Resolve transform names to registry entries.
Validates that every referenced transform exists in FUSION_OPS.
Returns operation configs for the parallax pipeline.
"""
ops = []
for hint in hints:
for transform_str in hint.transforms:
name, params = parse_transform(transform_str)
if name not in FUSION_OPS:
raise LensValidationError(f"Unknown transform '{name}'")
params["features"] = [hint.field]
ops.append({"command": name, "parameters": params})
return ops
Primitive Lifecycle: From XLSX to Running Code
1. DESIGN (XLSX Matrix)
Primitive ID: NM-04
Category: Normalization > Text
Status: New → Designed → Implemented → Tested
↓
2. IMPLEMENT (Python class)
class NameParser(FusionTransform):
primitive_id = "NM-04"
...
↓
3. REGISTER (in registry dict)
FUSION_OPS["name_parse"] = NameParser
↓
4. REFERENCE (in lens YAML)
transforms: [name_parse]
↓
5. EXECUTE (in Dask via parallax pipeline)
Engineer.create() → feature_engineer() → NameParser.execute()
↓
6. CONSUME (scoring engine)
Transformed values used by metric comparisons
Coverage Analysis: Current Specs vs 94 Primitives
| Category | Total | Covered by Current Specs | Gap |
|---|---|---|---|
| Canonical Attributes | 23 | 4 (entity_id, external_ids, evidence, observations) | 19 — most are UDS model extensions |
| Normalization | 19 | 4 (hash, soundex, geo_prefix, year) | 15 — this spec adds 14 |
| Matching Features | 17 | 4 (exact, jaro_winkler, geo_prefix, soundex) | 13 — this spec adds 12 |
| Matching Algorithms | 20 | 2 (hash_blocking, weighted_aggregate) | 18 — this spec adds 5 core |
| Tracking | 7 | 0 | 7 — Phase 5 (post-POC) |
| Configuration | 8 | 3 (thresholds, time_window, auto-merge) | 5 — mostly lens config |
| TOTAL | 94 | 17 | 77 |
After this spec: ~50 primitives have implementation code or clear stubs. Remaining gap is primarily: Tracking (7, Phase 5), CA model extensions (19, UDS), and advanced algorithms (ML ranking, connected components, LLM disambiguation).
Implementation Priority
POC (Phase 2-4): 35 primitives
Must have for VRS: - Text: NM-01 (NFKC), NM-02 (casefold), NM-04 (name_parse), NM-05 (alias_expand), NM-06 (phonetic), NM-19 (hash) - Geo: NM-10 (geohash) - Temporal: NM-12 (UTC), NM-15 (bucket) - Metrics: MA-13 (exact), MA-19 (jaro_winkler), MA-17 (haversine), MA-20 (token_set_ratio) - Blocking: MA-01 (hash_blocking) - Scoring: MA-05 (weighted_aggregate)
Growth (Phase 4-5): 25 primitives
Needed for AML + multi-domain: - Text: NM-03 (tokenize), NM-07 (address) - Geo: NM-08 (CRS), NM-09 (geocode) - Temporal: NM-13 (interval), NM-14 (partial) - Metrics: MF-06 (overlap), MF-07 (time gap), MF-09 (decay), MF-16 (address) - Blocking: MA-02 (sorted neighborhood), MA-04 (LSH) - Scoring: MA-06 (Fellegi-Sunter), MA-07 (travel gate) - Clustering: MA-10 (union-find)
Production (Phase 5+): 34 primitives
- Tracking: TR-01 through TR-07
- Advanced: MA-08 (ML ranking), MA-11 (connected components), MA-12 (LLM)
- CA extensions: CA-11 through CA-22
Test Strategy
Per-Primitive Tests
Every primitive gets a unit test following this pattern:
def test_primitive_nm04_name_parse():
op = NameParser()
op.parameters = {"features": ["full_name"]}
op.data = pd.DataFrame({"full_name": ["John Smith", "SMITH, John", "Dr. J. Smith Jr."]})
result = op.execute()
assert result["full_name_given"].tolist() == ["John", "John", "Dr."]
assert result["full_name_family"].tolist() == ["Smith", "SMITH", "Jr."]
def test_primitive_ma19_jaro_winkler():
metric = JaroWinkler()
assert metric.compute("Margaret Chen", "Margaret Chen") >= 0.99
assert metric.compute("Dorothy Williams", "Dot Williams") >= 0.70
assert metric.compute("John Smith", "Jane Doe") < 0.50
assert metric.compute(None, "test") == 0.0
Registry Integration Tests
def test_all_fusion_ops_registered():
"""Every implemented primitive must be in a registry."""
from parallax.ops.fusion import FUSION_OPS
assert "unicode_nfkc" in FUSION_OPS
assert "casefold" in FUSION_OPS
# ... all implemented ops
def test_metric_registry_complete():
assert "exact" in METRIC_REGISTRY
assert "jaro_winkler" in METRIC_REGISTRY
assert "haversine" in METRIC_REGISTRY
def test_lens_resolves_all_transforms():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
ops = resolve_transforms(spec.field_groupings.identity_hints)
for op in ops:
assert op["command"] in FUSION_OPS
File Layout
parallax/ops/fusion/
├── __init__.py # FUSION_OPS dict, register_fusion_ops()
├── base.py # FusionTransform base class
├── text.py # NM-01 through NM-07
├── geo.py # NM-08 through NM-11
├── temporal.py # NM-12 through NM-15
├── numeric.py # NM-16 through NM-19
└── tests/
├── test_text.py
├── test_geo.py
├── test_temporal.py
└── test_numeric.py
parallax/ops/fusion/
├── __init__.py # FUSION_OPS dict, register_fusion_ops()
├── base.py # FusionTransform base class
├── text.py # NM-01 through NM-07
├── geo.py # NM-08 through NM-11
├── temporal.py # NM-12 through NM-15
├── numeric.py # NM-16 through NM-19
├── metric_base.py # FusionMetric base class
├── metrics.py # METRIC_REGISTRY + all metric implementations
├── algorithms/
│ ├── __init__.py # BLOCKING_REGISTRY, SCORING_REGISTRY
│ ├── blocking.py # MA-01 through MA-04
│ ├── scoring.py # MA-05 through MA-08
│ └── clustering.py # MA-09 through MA-11
├── tracking.py # TR-01 through TR-07 (Phase 5)
└── tests/
├── test_text.py
├── test_geo.py
├── test_temporal.py
├── test_numeric.py
├── test_metrics.py
├── test_blocking.py
└── test_scoring.py
cortex/tools/
├── fusion.py # MCP tool wrappers (fusion_execute, fusion_status, fusion_results)
└── tests/
└── test_fusion_tools.py
titan/modeling/train/library/
├── fusion_match_op.py # FusionMatch FATE adapter (thin — calls parallax)
└── fusion_result.py # FusionResult for TRAINEDMODEL
Integration Points
- component.parallax.lens-parser → here: Lens transform chains resolve against FUSION_OPS in parallax
- component.parallax.feature-extraction → here:
lens_to_operations()now produces chains, not single ops - component.parallax.blocking-engine → here: Blocking uses BLOCKING_REGISTRY instead of hardcoded hash_blocking
- component.parallax.scoring-engine → here: score_pair uses METRIC_REGISTRY instead of hardcoded 4 metrics
- component.parallax.fusionmatch-model → here: FusionMatch in titan calls parallax's fusion engine for the real work
- cortex → here: MCP tools in cortex are thin wrappers calling parallax (cortex is MCP gateway, not compute)
- XLSX Matrix → here: Every row in the matrix maps to a class in an parallax registry
Entity Search vs Entity Fusion vs Entity Tracking
Why This Distinction Matters
Industry approaches to "entity resolution" (e.g., Databricks KARL/KARLBench, traditional RAG pipelines) conflate three fundamentally different operations. Our 94 primitives cover fusion and tracking — operations that no retrieval-based system addresses. This section clarifies the boundaries.
Three Operations, Not One
| Operation | What It Does | Input | Output | Stateful? | Our Coverage |
|---|---|---|---|---|---|
| Entity Search | Find an entity (or all entities) matching constraints in a corpus | Query + constraints | Retrieved documents / entity list | No | Not in scope — this is retrieval, not fusion |
| Entity Fusion | Determine whether records across heterogeneous systems refer to the same real-world entity | Records from N federated sources + lens | Scored pairs, entity clusters, confidence values, evidence blocks | Per-run | CA (23) + NM (19) + MF (17) + MA (20) = 79 primitives |
| Entity Tracking | Maintain resolved entity identity over time as data changes | Entity clusters + new/updated records | Merge/split events, temporal decay, longitudinal identity | Yes — persistent | TR (7) = 7 primitives |
What Learned Retrieval Systems Do (KARL, Instructed Retriever)
Databricks KARL (KARLBench, March 2026) trains a multi-task RL agent across six enterprise search behaviors. Two are entity-related:
- BrowseComp-Plus (constraint-driven entity search) — find ONE entity matching multiple attribute constraints across documents. Progressive filtering. The entity already exists as a discrete object in a corpus.
- QAMPARI (exhaustive entity retrieval) — find ALL entities meeting a condition across dispersed passages. Comprehensive coverage.
Key finding: single-task training fails (59.6 on entity search alone), but multi-task RL across all six behaviors generalizes to match frontier models (67.5 with parallel compute). Training on any 2 of 6 tasks transfers to the remaining 4.
What KARL does NOT do:
- No normalization — takes text as-is from documents (no unicode_nfkc, casefold, phonetic encoding, address standardization)
- No cross-schema alignment — operates on a single centralized index (Unity Catalog), not N federated schemas
- No pairwise similarity computation — no jaro_winkler("Margaret Smith", "M. Smyth") → 0.82
- No probabilistic matching — no Fellegi-Sunter, no weighted aggregate scoring, no confidence intervals
- No entity clustering — no union-find across N×(N-1)/2 pairwise results from distributed nodes
- No tracking — stateless query-in, answer-out; no temporal persistence, merge/split detection, or longitudinal identity
- No evidence chain — no query hashes, no frozen blocks, no attestation (our Invariants #2, #3, #4, #6)
Where Learned Retrieval Complements Our Pipeline
The multi-task RL pattern is applicable to the non-deterministic stages of our pipeline where adaptability matters more than auditability:
| Our Stage | Current Approach | KARL-Style Enhancement | Phase |
|---|---|---|---|
| Schema Binding | LLM-assisted field discovery (SCHEMA-BINDING.md) | Multi-task model trained on lens semantics + local schema introspection. Combines PMBench-style fact extraction with BrowseComp-style constraint matching. | Future |
| Blocking Strategy | Lens author manually selects blocking keys | Adaptive blocking selection per federate based on data characteristics — tight where clean, loose where messy. | Future |
| Candidate Ranking | Deterministic blocking key match | Instructed retrieval at each federate, using full lens context to retrieve better candidates before scoring. | Future |
Critical constraint: The deterministic stages (transforms, metrics, weighted scoring, evidence blocks) MUST remain lens-driven and auditable. Invariant #6 (AI assists, humans attest) requires the decision chain to be explainable. A learned model can improve what enters the pipeline; it cannot replace the pipeline itself.
Architectural Principle
LEARNED (adaptive, approximate) DETERMINISTIC (auditable, exact)
───────────────────────────── ────────────────────────────────
Schema binding at federate ─────► Normalization transforms (NM)
Candidate retrieval ─────► Blocking keys (MA-01..04)
Blocking strategy selection ─────► Pairwise scoring (MF + MA-05..08)
Entity clustering (MA-09..11)
Evidence blocks + attestation
Tracking (TR-01..07)
The lens is the bridge: it instructs the learned retrieval stage (what to look for, semantic context, field relationships) AND drives the deterministic pipeline (exact transforms, metrics, weights). Same YAML, two consumers.
Centralized vs Federated
KARL assumes all data is in one lakehouse (Databricks Unity Catalog). Our architecture is federated — data never moves (Invariant: "AI goes to the data, not data to AI"). This means:
- KARL builds one index, does instructed retrieval against it
- We send the lens to N federates, each runs local retrieval + transforms against its own schema
- The lens must instruct N different retrievers at N different nodes, each interpreting semantic intent against different local realities
- UDS (Invariant #1) enforces ABAC across all federates — learned retrieval at each node operates within UDS access boundaries
The multi-task RL insight is architecture-independent: it works regardless of where the data sits. But federation makes it harder because the learned model must generalize across schema diversity, not just task diversity.
Depends on: component.parallax.feature-extraction, component.parallax.lens-parser
Realizes: product.fusion
Required by: component.parallax.derived-features, component.parallax.tracking-integration