Skip to content

Fusion Primitives Framework

Status & scope

  • Stage: Architecture — Covers POC through Production
  • Module: parallax/ops/fusion/ (ALL primitives — transforms, metrics, algorithms), cortex/tools/ (MCP tool wrappers only), titan/ (FATE model runner adapter only)
  • Milestone: M2 (Foundation) → M5+ (Full Coverage)
  • Reference: Axonis_Fusion_Primitives_Implementation_Matrix.xlsx

Implementation note — base class: Throughout this spec, primitives are written as class X(FusionTransform): .... The concrete FusionTransform / FusionOp base lives in parallax/ops/fusion/base.py and is duck-typed, NOT a subclass of axonis-core's Command. axonis-core's Command constructs a Redis client at init, which parallax (a leaf compute package) intentionally avoids. The dispatch contract (__init__(body, data, serialize), .execute(), .parameters, .command) is sufficient — uds.ops.feature.feature_engineer never type-checks the class.

Purpose

The fusion pipeline requires 94 distinct primitives across 6 categories (Canonical Attributes, Normalization, Matching Features, Matching Algorithms, Tracking, Configuration). The current specs hard-code a handful of operations (fusion_hash, fusion_soundex, fusion_geo_prefix, fusion_year, jaro_winkler, exact, geo_prefix, soundex). That covers VRS POC but not the full vision.

This spec defines: 1. A registry pattern — how primitives are discovered, loaded, and invoked 2. A base class contract — what every primitive must implement 3. Where they live — parallax ops for ALL primitives (transforms, metrics, algorithms), titan for FATE model runner, cortex for MCP tool wrappers only 4. How the lens drives selection — lens YAML references primitives by ID, the registry resolves them

Elasticsearch and Dask: Division of Responsibility

Elasticsearch is the core database. Its ingest pipeline normalizes incoming data into ES primitive types — dates to UTC epoch millis, locations to geo_point (WGS84), text through basic analyzers. This is standard ES type coercion for storage, not fusion-specific logic.

All 94 fusion primitives run in Dask via parallax and titan. Every transform, every metric, every blocking algorithm, scoring operation, and clustering step executes as a Dask operation against data pulled from ES (or from CSV/DataFrame in standalone test mode). ES stores and retrieves; Dask computes.

ES also provides aggregation capabilities (terms, geohash_grid, date_histogram, composite) that could optimize blocking in the future, but the primary compute path is Dask.

The 6 Primitive Categories

Category Count Where They Run Storage/Registration
Canonical Attributes (CA) 23 UDS object model + parallax extraction UDS type system + parallax/ops/fusion/
Normalization (NM) 19 parallax Dask transform ops parallax/ops/fusion/ — FUSION_OPS dict
Matching Features (MF) 17 parallax fusion engine parallax/ops/fusion/metrics.py — METRIC_REGISTRY
Matching Algorithms (MA) 20 parallax fusion engine parallax/ops/fusion/algorithms/ — ALGORITHM_REGISTRY
Tracking (TR) 7 parallax Dask ops (Phase 5) parallax/ops/fusion/tracking.py
Configuration (CF) 8 Lens Spec fields Lens YAML config

Architecture: Three Registries

┌─────────────────────────────────────────────────────────────┐
│                    LENS SPEC (YAML)                          │
│  References primitives by ID:                                │
│    transforms: [nfkc, casefold, soundex, sha256_hash]       │
│    metrics: [jaro_winkler, haversine, exact]                │
│    blocking: [hash_blocking]                                 │
│    scoring: [weighted_aggregate]                             │
└──────────────────────────┬──────────────────────────────────┘
                           │ resolve by ID
           ┌───────────────┼───────────────┐
           ▼               ▼               ▼
┌──────────────────┐ ┌──────────────┐ ┌──────────────────┐
│ TRANSFORM_REGISTRY│ │METRIC_REGISTRY│ │ALGORITHM_REGISTRY│
│ (parallax ops)      │ │(parallax)      │ │(parallax)          │
│                   │ │               │ │                  │
│ NM-01..NM-19     │ │ MF-01..MF-17 │ │ MA-01..MA-20    │
│ + CA extractions │ │               │ │                  │
│                   │ │               │ │                  │
│ Each: Command     │ │ Each:         │ │ Each:            │
│   subclass with   │ │ (a, b) → [0,1]│ │ candidates →     │
│   .execute()      │ │               │ │   scored_pairs   │
└──────────────────┘ └──────────────┘ └──────────────────┘

Registry 1: Transform Registry (Normalization + Extraction)

Location: parallax/ops/fusion/ Pattern: Extends existing parallax OPS dict Covers: NM-01 through NM-19, plus CA field extraction ops

Base Class

Every transform op extends parallax's existing Command base:

# parallax/ops/fusion/base.py
from parallax.ops.command import Command

class FusionTransform(Command):
    """Base class for all fusion transform operations.

    Subclasses implement execute() which transforms a Dask DataFrame.
    The Command base class provides:
      - self.data: the input Dask DataFrame
      - self.parameters: dict from the operation config
      - self.features: list of column names to operate on

    Every FusionTransform must:
      1. Accept a 'features' list in parameters (columns to transform)
      2. Return the modified DataFrame
      3. Handle nulls gracefully (null in → null out)
      4. Be idempotent (running twice produces same result)
      5. Add new columns rather than mutating originals (unless explicitly replacing)
    """

    # Primitive metadata — set by each subclass
    primitive_id: str = ""          # e.g. "NM-01"
    primitive_name: str = ""        # e.g. "unicode_nfkc"
    category: str = "normalization" # or "extraction"
    domain_applicability: list = [] # e.g. ["All"] or ["PNT", "Border"]
    dependencies: list = []         # pip packages needed

    def validate_params(self) -> list[str]:
        """Optional parameter validation. Returns list of error messages."""
        return []

Transform Catalog

Text Normalization (NM-01 through NM-07)

# parallax/ops/fusion/text.py

class UnicodeNFKC(FusionTransform):
    """NM-01: Unicode NFKC normalization + diacritics removal."""
    primitive_id = "NM-01"
    primitive_name = "unicode_nfkc"

    def execute(self):
        import unicodedata
        for col in self.parameters["features"]:
            self.data[f"{col}_norm"] = self.data[col].apply(
                lambda v: unicodedata.normalize("NFKC", str(v)) if pd.notna(v) else None,
                meta=(f"{col}_norm", "str")
            )
        return self.data


class Casefold(FusionTransform):
    """NM-02: Casefolding + whitespace normalization."""
    primitive_id = "NM-02"
    primitive_name = "casefold"

    def execute(self):
        import re
        for col in self.parameters["features"]:
            self.data[f"{col}_cf"] = self.data[col].apply(
                lambda v: re.sub(r'\s+', ' ', str(v).casefold().strip()) if pd.notna(v) else None,
                meta=(f"{col}_cf", "str")
            )
        return self.data


class Tokenize(FusionTransform):
    """NM-03: Tokenization + stopword removal."""
    primitive_id = "NM-03"
    primitive_name = "tokenize"
    dependencies = []  # stdlib only

    def execute(self):
        stopwords = set(self.parameters.get("stopwords", [
            "the", "of", "and", "inc", "ltd", "llc", "plc", "corp",
            "company", "limited", "group", "international",
        ]))
        for col in self.parameters["features"]:
            self.data[f"{col}_tokens"] = self.data[col].apply(
                lambda v: " ".join(
                    t for t in str(v).lower().split() if t not in stopwords
                ) if pd.notna(v) else None,
                meta=(f"{col}_tokens", "str")
            )
        return self.data


class NameParser(FusionTransform):
    """NM-04: Name parsing into given/middle/family components."""
    primitive_id = "NM-04"
    primitive_name = "name_parse"

    def execute(self):
        for col in self.parameters["features"]:
            self.data[f"{col}_given"] = self.data[col].apply(
                lambda v: self._parse_name(v).get("given", "") if pd.notna(v) else None,
                meta=(f"{col}_given", "str")
            )
            self.data[f"{col}_family"] = self.data[col].apply(
                lambda v: self._parse_name(v).get("family", "") if pd.notna(v) else None,
                meta=(f"{col}_family", "str")
            )
        return self.data

    @staticmethod
    def _parse_name(name: str) -> dict:
        """Parse 'John Smith' → {given: 'John', family: 'Smith'}
        Handles: 'SMITH, John', 'J. Smith', 'Dr. John A. Smith Jr.'
        """
        parts = str(name).strip().split()
        if len(parts) == 0:
            return {"given": "", "family": ""}
        if "," in parts[0]:
            # "SMITH, John" format
            family = parts[0].rstrip(",")
            given = " ".join(parts[1:])
        else:
            given = parts[0]
            family = parts[-1] if len(parts) > 1 else ""
        return {"given": given, "family": family}


class AliasExpansion(FusionTransform):
    """NM-05: Alias/nickname expansion."""
    primitive_id = "NM-05"
    primitive_name = "alias_expand"

    # Common aliases — configurable via parameters
    DEFAULT_ALIASES = {
        "bob": "robert", "rob": "robert", "bobby": "robert",
        "bill": "william", "will": "william", "willy": "william",
        "dick": "richard", "rick": "richard", "rich": "richard",
        "mike": "michael", "mick": "michael",
        "jim": "james", "jimmy": "james", "jamie": "james",
        "dot": "dorothy", "dolly": "dorothy",
        "meg": "margaret", "maggie": "margaret", "peggy": "margaret",
        "art": "arthur", "artie": "arthur",
        "liz": "elizabeth", "beth": "elizabeth", "betty": "elizabeth",
        "ted": "edward", "ned": "edward", "ed": "edward",
        "tom": "thomas", "tommy": "thomas",
        "pat": "patricia", "patty": "patricia",
        "sue": "susan", "suzy": "susan",
        "joe": "joseph", "joey": "joseph",
        "dan": "daniel", "danny": "daniel",
        "chris": "christopher", "kit": "christopher",
        "sam": "samuel", "sammy": "samuel",
        "ben": "benjamin", "benny": "benjamin",
        "tony": "anthony",
        "dave": "david", "davy": "david",
        "steve": "stephen", "steph": "stephen",
        "matt": "matthew",
        "nick": "nicholas",
        "alex": "alexander",
        "andy": "andrew", "drew": "andrew",
    }

    def execute(self):
        aliases = {**self.DEFAULT_ALIASES, **self.parameters.get("custom_aliases", {})}
        for col in self.parameters["features"]:
            self.data[f"{col}_canonical"] = self.data[col].apply(
                lambda v: aliases.get(str(v).lower().split()[0], str(v).split()[0]).title()
                + " " + " ".join(str(v).split()[1:])
                if pd.notna(v) and str(v).strip() else None,
                meta=(f"{col}_canonical", "str")
            )
        return self.data


class PhoneticEncoding(FusionTransform):
    """NM-06: Phonetic encoding (Soundex, Metaphone, NYSIIS)."""
    primitive_id = "NM-06"
    primitive_name = "phonetic"
    dependencies = ["jellyfish"]

    def execute(self):
        import jellyfish
        algorithm = self.parameters.get("algorithm", "soundex")
        encoders = {
            "soundex": jellyfish.soundex,
            "metaphone": jellyfish.metaphone,
            "nysiis": jellyfish.nysiis,
        }
        encode = encoders[algorithm]
        for col in self.parameters["features"]:
            self.data[f"{col}_{algorithm}"] = self.data[col].apply(
                lambda v: encode(str(v).split()[0]) if pd.notna(v) else None,
                meta=(f"{col}_{algorithm}", "str")
            )
        return self.data


class AddressStandardize(FusionTransform):
    """NM-07: Address standardization."""
    primitive_id = "NM-07"
    primitive_name = "address_standardize"

    UK_ABBREVIATIONS = {
        "street": "st", "road": "rd", "avenue": "ave", "lane": "ln",
        "drive": "dr", "close": "cl", "court": "ct", "place": "pl",
        "gardens": "gdns", "terrace": "ter", "crescent": "cres",
    }

    def execute(self):
        for col in self.parameters["features"]:
            self.data[f"{col}_std"] = self.data[col].apply(
                lambda v: self._standardize(str(v)) if pd.notna(v) else None,
                meta=(f"{col}_std", "str")
            )
        return self.data

    @classmethod
    def _standardize(cls, address: str) -> str:
        parts = address.lower().strip().split()
        return " ".join(cls.UK_ABBREVIATIONS.get(p, p) for p in parts)

Geo Normalization (NM-08 through NM-11)

# parallax/ops/fusion/geo.py

class CRSAlignment(FusionTransform):
    """NM-08: CRS alignment to WGS84."""
    primitive_id = "NM-08"
    primitive_name = "crs_align"

    def execute(self):
        # POC: assume input is already WGS84 lat/lon
        # Production: use pyproj for CRS conversion
        return self.data


class Geocode(FusionTransform):
    """NM-09: Address → lat/lon geocoding."""
    primitive_id = "NM-09"
    primitive_name = "geocode"

    def execute(self):
        # POC: UK postcode lookup table
        # Production: geocoding service integration
        for col in self.parameters["features"]:
            self.data[f"{col}_lat"] = self.data[col].apply(
                lambda v: self._postcode_to_lat(v) if pd.notna(v) else None,
                meta=(f"{col}_lat", "float64")
            )
            self.data[f"{col}_lon"] = self.data[col].apply(
                lambda v: self._postcode_to_lon(v) if pd.notna(v) else None,
                meta=(f"{col}_lon", "float64")
            )
        return self.data

    @staticmethod
    def _postcode_to_lat(postcode: str) -> float | None:
        """POC: stub — production uses ONS postcode lookup."""
        return None  # Implement with real lookup


class GeohashEncode(FusionTransform):
    """NM-10: Geohash / H3 / S2 multi-resolution encoding."""
    primitive_id = "NM-10"
    primitive_name = "geohash_encode"

    def execute(self):
        precision = int(self.parameters.get("precision", 5))
        system = self.parameters.get("system", "geohash")  # geohash | h3 | s2
        lat_col = self.parameters.get("lat_col", "lat")
        lon_col = self.parameters.get("lon_col", "lon")

        if system == "geohash":
            self.data["geohash"] = self.data.apply(
                lambda row: self._encode_geohash(row[lat_col], row[lon_col], precision)
                if pd.notna(row[lat_col]) else None,
                axis=1, meta=("geohash", "str")
            )
        return self.data

    @staticmethod
    def _encode_geohash(lat: float, lon: float, precision: int) -> str:
        """Simple geohash implementation. Production: use python-geohash."""
        # Base32 encoding of interleaved lat/lon bits
        import struct
        # Simplified — use geohash2 library in production
        return f"geo_{lat:.{precision}f}_{lon:.{precision}f}"

Temporal Normalization (NM-12 through NM-15)

# parallax/ops/fusion/temporal.py

class UTCNormalize(FusionTransform):
    """NM-12: UTC normalization with timezone preservation."""
    primitive_id = "NM-12"
    primitive_name = "utc_normalize"

    def execute(self):
        for col in self.parameters["features"]:
            self.data[f"{col}_utc"] = self.data[col].apply(
                lambda v: pd.Timestamp(v).tz_localize("UTC").isoformat()
                if pd.notna(v) else None,
                meta=(f"{col}_utc", "str")
            )
        return self.data


class IntervalRepresent(FusionTransform):
    """NM-13: Interval representation (start, end, duration)."""
    primitive_id = "NM-13"
    primitive_name = "interval"

    def execute(self):
        start_col = self.parameters.get("start_col")
        end_col = self.parameters.get("end_col")
        self.data["interval_seconds"] = (
            pd.to_datetime(self.data[end_col]) - pd.to_datetime(self.data[start_col])
        ).dt.total_seconds()
        return self.data


class TemporalBucket(FusionTransform):
    """NM-15: Temporal rounding/bucketing."""
    primitive_id = "NM-15"
    primitive_name = "temporal_bucket"

    def execute(self):
        bucket_size = self.parameters.get("bucket", "1h")
        for col in self.parameters["features"]:
            self.data[f"{col}_bucket"] = self.data[col].apply(
                lambda v: pd.Timestamp(v).floor(bucket_size).isoformat()
                if pd.notna(v) else None,
                meta=(f"{col}_bucket", "str")
            )
        return self.data

Unit/Numeric Normalization (NM-16 through NM-19)

# parallax/ops/fusion/numeric.py

class HashTransform(FusionTransform):
    """NM-19: SHA-256 hash transform (PII protection)."""
    primitive_id = "NM-19"
    primitive_name = "sha256_hash"

    def execute(self):
        import hashlib
        algorithm = self.parameters.get("algorithm", "sha256")
        for col in self.parameters["features"]:
            self.data[f"{col}_hash"] = self.data[col].apply(
                lambda v: hashlib.new(algorithm, str(v).encode()).hexdigest()
                if pd.notna(v) else None,
                meta=(f"{col}_hash", "str")
            )
        return self.data

Transform Registration

All fusion transforms register into the existing parallax OPS dict:

# parallax/ops/fusion/__init__.py

from parallax.ops.fusion.text import (
    UnicodeNFKC, Casefold, Tokenize, NameParser,
    AliasExpansion, PhoneticEncoding, AddressStandardize,
)
from parallax.ops.fusion.geo import CRSAlignment, Geocode, GeohashEncode
from parallax.ops.fusion.temporal import UTCNormalize, IntervalRepresent, TemporalBucket
from parallax.ops.fusion.numeric import HashTransform

FUSION_OPS = {
    # Text normalization
    "unicode_nfkc": UnicodeNFKC,       # NM-01
    "casefold": Casefold,               # NM-02
    "tokenize": Tokenize,              # NM-03
    "name_parse": NameParser,           # NM-04
    "alias_expand": AliasExpansion,     # NM-05
    "phonetic": PhoneticEncoding,       # NM-06
    "address_standardize": AddressStandardize,  # NM-07

    # Geo normalization
    "crs_align": CRSAlignment,         # NM-08
    "geocode": Geocode,                # NM-09
    "geohash_encode": GeohashEncode,   # NM-10

    # Temporal normalization
    "utc_normalize": UTCNormalize,      # NM-12
    "interval": IntervalRepresent,      # NM-13
    "temporal_bucket": TemporalBucket,  # NM-15

    # Numeric/hash
    "sha256_hash": HashTransform,       # NM-19

    # PPRL
    "bloom_filter": BloomFilter,        # NM-20 — q-gram Bloom (Schnell 2009)

    # Numeric binning
    "numeric_bucket": NumericBucket,    # NM-21 — width-aligned bucket
}

# NM-21: Width-aligned numeric bucketing (parallax/ops/fusion/transforms/numeric.py)
#   numeric_bucket(value, width=1.0) -> float | None
#   floor(value / width) * width. Used for blocking on noisy continuous
#   measurements (RF frequency, bearing, currency amount): same-source
#   pairs fall in the same bucket when measurement noise is < width.
#   width <= 0 raises ValueError; unparseable input → None.

# NM-20: Bloom-filter PPRL encoding (parallax/ops/fusion/transforms/text.py)
#   bloom_encode(value, qgrams=2, bits=1024, hashes=30) -> base64 str | None
#   One-way (Schnell 2009 §4); deterministic for replay (FED-SPEC-02).
#   Pairs with metric `sorensen_dice` (MF-13e) for fuzzy similarity on
#   derived values across federation boundaries.

# Register into parallax's main OPS dict
def register_fusion_ops():
    """Call during parallax startup to register all fusion transforms."""
    from parallax.ops.feature import OPS
    OPS.update(FUSION_OPS)

Registry 2: Metric Registry (Matching Features)

Location: parallax/ops/fusion/metrics.py (expanded from component.parallax.scoring-engine) Covers: MF-01 through MF-17

Base Contract

# parallax/ops/fusion/metric_base.py

class FusionMetric:
    """Base class for all fusion similarity metrics.

    A metric computes similarity between two values:
      (value_a, value_b, **params) → float in [0.0, 1.0]

    Where 1.0 = identical, 0.0 = completely different.
    """
    primitive_id: str = ""
    primitive_name: str = ""
    input_types: list[str] = []     # ["str"], ["float", "float"], etc.
    domain_applicability: list = []
    dependencies: list = []

    def compute(self, a, b, **params) -> float:
        """Compute similarity. Must return [0.0, 1.0]. None inputs → 0.0."""
        raise NotImplementedError

    def validate_inputs(self, a, b) -> bool:
        """Check input compatibility."""
        return True

Metric Catalog

# parallax/ops/fusion/metrics.py

import jellyfish
import math

class ExactMatch(FusionMetric):
    """MF-15 / MA-13: Binary equality."""
    primitive_id = "MA-13"
    primitive_name = "exact"

    def compute(self, a, b, **params) -> float:
        if a is None or b is None:
            return 0.0
        return 1.0 if str(a).strip().lower() == str(b).strip().lower() else 0.0


class JaroWinkler(FusionMetric):
    """MF-13 / MA-19: Fuzzy name matching."""
    primitive_id = "MA-19"
    primitive_name = "jaro_winkler"
    dependencies = ["jellyfish"]

    def compute(self, a, b, **params) -> float:
        if a is None or b is None:
            return 0.0
        return jellyfish.jaro_winkler_similarity(str(a), str(b))


class TokenSetRatio(FusionMetric):
    """MF-14 / MA-20: Token-set similarity for name reordering."""
    primitive_id = "MA-20"
    primitive_name = "token_set_ratio"

    def compute(self, a, b, **params) -> float:
        if a is None or b is None:
            return 0.0
        tokens_a = set(str(a).lower().split())
        tokens_b = set(str(b).lower().split())
        if not tokens_a and not tokens_b:
            return 1.0
        if not tokens_a or not tokens_b:
            return 0.0
        intersection = tokens_a & tokens_b
        union = tokens_a | tokens_b
        return len(intersection) / len(union)  # Jaccard on tokens


class Levenshtein(FusionMetric):
    """MA-16: Edit distance normalized to [0,1]."""
    primitive_id = "MA-16"
    primitive_name = "levenshtein"
    dependencies = ["jellyfish"]

    def compute(self, a, b, **params) -> float:
        if a is None or b is None:
            return 0.0
        sa, sb = str(a), str(b)
        max_len = max(len(sa), len(sb))
        if max_len == 0:
            return 1.0
        dist = jellyfish.levenshtein_distance(sa, sb)
        return 1.0 - (dist / max_len)


class JaccardSimilarity(FusionMetric):
    """MA-14: Set similarity."""
    primitive_id = "MA-14"
    primitive_name = "jaccard"

    def compute(self, a, b, **params) -> float:
        if a is None or b is None:
            return 0.0
        set_a = set(str(a).lower().split()) if isinstance(a, str) else set(a)
        set_b = set(str(b).lower().split()) if isinstance(b, str) else set(b)
        if not set_a and not set_b:
            return 1.0
        union = set_a | set_b
        if not union:
            return 0.0
        return len(set_a & set_b) / len(union)


class CosineSimilarity(FusionMetric):
    """MA-15: Vector similarity."""
    primitive_id = "MA-15"
    primitive_name = "cosine"

    def compute(self, a, b, **params) -> float:
        if a is None or b is None:
            return 0.0
        # a and b are lists/arrays of floats
        dot = sum(x * y for x, y in zip(a, b))
        mag_a = math.sqrt(sum(x * x for x in a))
        mag_b = math.sqrt(sum(x * x for x in b))
        if mag_a == 0 or mag_b == 0:
            return 0.0
        return dot / (mag_a * mag_b)


class HaversineDistance(FusionMetric):
    """MF-01 / MA-17: Geospatial distance similarity."""
    primitive_id = "MA-17"
    primitive_name = "haversine"

    def compute(self, a, b, **params) -> float:
        if a is None or b is None:
            return 0.0
        # a, b are (lat, lon) tuples
        max_distance_km = params.get("max_distance_km", 50.0)
        lat1, lon1 = float(a[0]), float(a[1])
        lat2, lon2 = float(b[0]), float(b[1])

        R = 6371.0  # Earth radius km
        dlat = math.radians(lat2 - lat1)
        dlon = math.radians(lon2 - lon1)
        a_val = (math.sin(dlat/2)**2 +
                 math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
                 math.sin(dlon/2)**2)
        c = 2 * math.atan2(math.sqrt(a_val), math.sqrt(1 - a_val))
        distance = R * c

        if distance >= max_distance_km:
            return 0.0
        return 1.0 - (distance / max_distance_km)


class TemporalProximity(FusionMetric):
    """MF-06 / MA-18: Time-based similarity."""
    primitive_id = "MA-18"
    primitive_name = "temporal_proximity"

    def compute(self, a, b, **params) -> float:
        if a is None or b is None:
            return 0.0
        max_gap_seconds = params.get("max_gap_seconds", 3600)
        ta = pd.Timestamp(a)
        tb = pd.Timestamp(b)
        gap = abs((ta - tb).total_seconds())
        if gap >= max_gap_seconds:
            return 0.0
        return 1.0 - (gap / max_gap_seconds)


class GeoPrefix(FusionMetric):
    """Postcode prefix matching (POC shortcut for haversine)."""
    primitive_id = "MF-04"
    primitive_name = "geo_prefix"

    def compute(self, a, b, **params) -> float:
        if a is None or b is None:
            return 0.0
        chars = params.get("chars", 3)
        pa = str(a).replace(" ", "")[:chars].upper()
        pb = str(b).replace(" ", "")[:chars].upper()
        return 1.0 if pa == pb else 0.0


class SoundexMatch(FusionMetric):
    """Phonetic equality via soundex."""
    primitive_id = "MF-13b"
    primitive_name = "soundex_match"
    dependencies = ["jellyfish"]

    def compute(self, a, b, **params) -> float:
        if a is None or b is None:
            return 0.0
        return 1.0 if jellyfish.soundex(str(a)) == jellyfish.soundex(str(b)) else 0.0


class IntervalOverlap(FusionMetric):
    """MF-06: Interval overlap ratio."""
    primitive_id = "MF-06"
    primitive_name = "interval_overlap"

    def compute(self, a, b, **params) -> float:
        # a, b are (start, end) tuples
        if a is None or b is None:
            return 0.0
        a_start, a_end = pd.Timestamp(a[0]), pd.Timestamp(a[1])
        b_start, b_end = pd.Timestamp(b[0]), pd.Timestamp(b[1])
        overlap_start = max(a_start, b_start)
        overlap_end = min(a_end, b_end)
        if overlap_start >= overlap_end:
            return 0.0
        overlap = (overlap_end - overlap_start).total_seconds()
        total = max(
            (a_end - a_start).total_seconds(),
            (b_end - b_start).total_seconds(),
        )
        return overlap / total if total > 0 else 0.0


class RecencyDecay(FusionMetric):
    """MF-09: Recency-weighted scoring."""
    primitive_id = "MF-09"
    primitive_name = "recency_decay"

    def compute(self, a, b, **params) -> float:
        if a is None or b is None:
            return 0.0
        half_life_hours = params.get("half_life_hours", 24)
        age_hours = abs((pd.Timestamp("now") - pd.Timestamp(a)).total_seconds()) / 3600
        return 2.0 ** (-age_hours / half_life_hours)


class AddressSimilarity(FusionMetric):
    """MF-16: Composite address similarity."""
    primitive_id = "MF-16"
    primitive_name = "address_similarity"

    def compute(self, a, b, **params) -> float:
        if a is None or b is None:
            return 0.0
        # Tokenize, normalize, then Jaccard
        tokens_a = set(str(a).lower().replace(",", " ").split())
        tokens_b = set(str(b).lower().replace(",", " ").split())
        if not tokens_a or not tokens_b:
            return 0.0
        return len(tokens_a & tokens_b) / len(tokens_a | tokens_b)


class CategoryAgreement(FusionMetric):
    """MF-17: Categorical/attribute set agreement."""
    primitive_id = "MF-17"
    primitive_name = "category_agreement"

    def compute(self, a, b, **params) -> float:
        if a is None or b is None:
            return 0.0
        if isinstance(a, str):
            a = set(a.split(","))
        if isinstance(b, str):
            b = set(b.split(","))
        a, b = set(a), set(b)
        if not a and not b:
            return 1.0
        union = a | b
        return len(a & b) / len(union) if union else 0.0

Metric Registration

# parallax/ops/fusion/metrics.py

METRIC_REGISTRY: dict[str, FusionMetric] = {
    # Identity
    "exact": ExactMatch(),               # MA-13
    "jaro_winkler": JaroWinkler(),       # MA-19 / MF-13
    "token_set_ratio": TokenSetRatio(),  # MA-20 / MF-14
    "levenshtein": Levenshtein(),        # MA-16
    "soundex_match": SoundexMatch(),     # MF-13b

    # Set/Vector
    "jaccard": JaccardSimilarity(),      # MA-14
    "cosine": CosineSimilarity(),        # MA-15

    # Spatial
    "haversine": HaversineDistance(),     # MA-17 / MF-01
    "geo_prefix": GeoPrefix(),           # MF-04

    # Temporal
    "temporal_proximity": TemporalProximity(),  # MA-18 / MF-06
    "interval_overlap": IntervalOverlap(),      # MF-06
    "recency_decay": RecencyDecay(),            # MF-09

    # Composite
    "address_similarity": AddressSimilarity(),  # MF-16
    "category_agreement": CategoryAgreement(),  # MF-17

    # Phonetic-equivalence (string)
    "metaphone": MetaphoneMatch(),       # MF-13c
    "nysiis": NysiisMatch(),             # MF-13d
    "sorensen_dice": SorensenDice(),     # MF-13e — Bloom-PPRL (NM-20)

    # Spatial (continuous)
    "geohash_match": GeohashMatch(),                       # MF-05
    "uncertainty_aware_distance": UncertaintyAwareDistance(),  # MF-18

    # Cross-source / kinematic / directional (themis wishlist §1.4–1.6)
    "source_complementarity": SourceComplementarity(),  # MF-19
    "heading_proximity": HeadingProximity(),            # MF-20
    "speed_proximity": SpeedProximity(),                # MF-21

    # General-purpose numeric tolerance
    "numeric_proximity": NumericProximity(),            # MF-22
}

# MF-13e SorensenDice: Bloom-PPRL fuzzy similarity. Accepts raw strings
#   (auto-encoded with bloom_encode defaults) or pre-encoded base64 filters
#   from an NM-20 bloom_filter transform. Returns 2·|A∩B|/(|A|+|B|).
# MF-18 UncertaintyAwareDistance: Mahalanobis on (lat, lon, sigma_m) tuples
#   under uncorrelated isotropic uncertainty. exp(-(d/√(σ_a²+σ_b²))²/2)
#   with hard-clip at max_sigmas (default 5.0). Params: default_sigma_m=100.
# MF-19 SourceComplementarity: 1.0 iff inputs differ (cross-source bonus).
# MF-20 HeadingProximity: circular linear decay over max_diff_deg (default 45).
# MF-21 SpeedProximity: symmetric min/max ratio; both-zero → 1.0.
# MF-22 NumericProximity: linear decay over `tolerance`, absolute or relative.

def get_metric(name: str) -> FusionMetric:
    """Resolve a metric by name. Raises KeyError if not found."""
    if name not in METRIC_REGISTRY:
        raise KeyError(f"Unknown metric '{name}'. Available: {list(METRIC_REGISTRY.keys())}")
    return METRIC_REGISTRY[name]

Registry 3: Algorithm Registry (Blocking + Scoring + Clustering)

Location: parallax/ops/fusion/algorithms/ Covers: MA-01 through MA-12

Blocking Algorithms (MA-01 through MA-04)

All blocking runs as Dask operations in parallax. ES aggregation capabilities (terms, geohash_grid, date_histogram, composite) could provide a future optimization path for blocking, but are not the current implementation.

# parallax/ops/fusion/algorithms/blocking.py

class BlockingAlgorithm:
    """Base class for blocking strategies."""
    primitive_id: str = ""
    primitive_name: str = ""

    def generate_keys(self, df, blocking_config) -> dict[str, list[str]]:
        raise NotImplementedError


class HashBlocking(BlockingAlgorithm):
    """MA-01: Hash blocking (current implementation in component.parallax.blocking-engine)."""
    primitive_id = "MA-01"
    primitive_name = "hash_blocking"

    def generate_keys(self, df, blocking_config):
        # Existing implementation from component.parallax.blocking-engine
        ...


class SortedNeighborhood(BlockingAlgorithm):
    """MA-02: Sorted neighborhood blocking."""
    primitive_id = "MA-02"
    primitive_name = "sorted_neighborhood"

    def generate_keys(self, df, blocking_config):
        window = blocking_config.get("window_size", 5)
        sort_key = blocking_config.get("sort_key")
        # Sort by key, compare within sliding window
        ...


class CanopyClustering(BlockingAlgorithm):
    """MA-03: Canopy clustering for blocking."""
    primitive_id = "MA-03"
    primitive_name = "canopy"

    def generate_keys(self, df, blocking_config):
        t1 = blocking_config.get("loose_threshold", 0.8)
        t2 = blocking_config.get("tight_threshold", 0.5)
        ...


class LSHBlocking(BlockingAlgorithm):
    """MA-04: Locality-Sensitive Hashing."""
    primitive_id = "MA-04"
    primitive_name = "lsh"

    def generate_keys(self, df, blocking_config):
        num_hashes = blocking_config.get("num_hashes", 100)
        num_bands = blocking_config.get("num_bands", 20)
        ...


BLOCKING_REGISTRY = {
    "hash_blocking": HashBlocking(),
    "sorted_neighborhood": SortedNeighborhood(),
    "canopy": CanopyClustering(),
    "lsh": LSHBlocking(),
}

Scoring Algorithms (MA-05 through MA-08)

# parallax/ops/fusion/algorithms/scoring.py

class ScoringAlgorithm:
    """Base class for scoring strategies."""
    primitive_id: str = ""

    def score(self, candidates, match_function, **params) -> pd.DataFrame:
        raise NotImplementedError


class WeightedAggregate(ScoringAlgorithm):
    """MA-05: Current implementation from component.parallax.scoring-engine."""
    primitive_id = "MA-05"

    def score(self, candidates, match_function, **params):
        # Existing score_pair / score_all_candidates from component.parallax.scoring-engine
        ...


class FellegiSunter(ScoringAlgorithm):
    """MA-06: Fellegi-Sunter probabilistic record linkage."""
    primitive_id = "MA-06"

    def score(self, candidates, match_function, **params):
        # Log-likelihood ratio scoring
        # m-probability (match) vs u-probability (non-match)
        ...


class TravelFeasibilityGate(ScoringAlgorithm):
    """MA-07: Space-time cone pre-filter."""
    primitive_id = "MA-07"

    def score(self, candidates, match_function, **params):
        max_speed_kmh = params.get("max_speed_kmh", 900)
        # Reject pairs where travel between locations in time gap
        # would require exceeding max_speed
        ...


SCORING_REGISTRY = {
    "weighted_aggregate": WeightedAggregate(),
    "fellegi_sunter": FellegiSunter(),
    "travel_feasibility_gate": TravelFeasibilityGate(),
}

How the Lens Drives Primitive Selection

The lens YAML now references primitives by registry name:

# Extended lens format — field_groupings.identity_hints
field_groupings:
  identity_hints:
    - field: full_name
      transforms:                    # CHAIN of transforms (replaces single 'transform')
        - unicode_nfkc               # NM-01
        - casefold                   # NM-02
        - name_parse                 # NM-04
      match_metric: jaro_winkler     # MA-19
      label: Full Name

    - field: postcode
      transforms:
        - address_standardize        # NM-07
        - geohash_encode | precision: 5  # NM-10
      match_metric: geo_prefix | chars: 3
      label: Postcode Area

    - field: phone
      transforms:
        - sha256_hash                # NM-19
      match_metric: exact            # MA-13
      label: Phone (hashed)

# Extended blocking_strategy
identity_fusion:
  blocking_strategy:
    method: hash_blocking            # MA-01 (from BLOCKING_REGISTRY)
    blocking_keys:
      - phonetic | field: full_name | algorithm: soundex  # NM-06
      - temporal_bucket | field: date_of_birth | bucket: year

  scoring_algorithm: weighted_aggregate  # MA-05 (from SCORING_REGISTRY)
  # Or: fellegi_sunter for probabilistic linkage

Lens Parser Extension (component.parallax.lens-parser update)

# In lens_parser.py — resolve transform chains
def resolve_transforms(hints: list[FieldHint]) -> list[dict]:
    """Resolve transform names to registry entries.

    Validates that every referenced transform exists in FUSION_OPS.
    Returns operation configs for the parallax pipeline.
    """
    ops = []
    for hint in hints:
        for transform_str in hint.transforms:
            name, params = parse_transform(transform_str)
            if name not in FUSION_OPS:
                raise LensValidationError(f"Unknown transform '{name}'")
            params["features"] = [hint.field]
            ops.append({"command": name, "parameters": params})
    return ops

Primitive Lifecycle: From XLSX to Running Code

1. DESIGN (XLSX Matrix)
   Primitive ID: NM-04
   Category: Normalization > Text
   Status: New → Designed → Implemented → Tested
   ↓
2. IMPLEMENT (Python class)
   class NameParser(FusionTransform):
       primitive_id = "NM-04"
       ...
   ↓
3. REGISTER (in registry dict)
   FUSION_OPS["name_parse"] = NameParser
   ↓
4. REFERENCE (in lens YAML)
   transforms: [name_parse]
   ↓
5. EXECUTE (in Dask via parallax pipeline)
   Engineer.create() → feature_engineer() → NameParser.execute()
   ↓
6. CONSUME (scoring engine)
   Transformed values used by metric comparisons

Coverage Analysis: Current Specs vs 94 Primitives

Category Total Covered by Current Specs Gap
Canonical Attributes 23 4 (entity_id, external_ids, evidence, observations) 19 — most are UDS model extensions
Normalization 19 4 (hash, soundex, geo_prefix, year) 15 — this spec adds 14
Matching Features 17 4 (exact, jaro_winkler, geo_prefix, soundex) 13 — this spec adds 12
Matching Algorithms 20 2 (hash_blocking, weighted_aggregate) 18 — this spec adds 5 core
Tracking 7 0 7 — Phase 5 (post-POC)
Configuration 8 3 (thresholds, time_window, auto-merge) 5 — mostly lens config
TOTAL 94 17 77

After this spec: ~50 primitives have implementation code or clear stubs. Remaining gap is primarily: Tracking (7, Phase 5), CA model extensions (19, UDS), and advanced algorithms (ML ranking, connected components, LLM disambiguation).

Implementation Priority

POC (Phase 2-4): 35 primitives

Must have for VRS: - Text: NM-01 (NFKC), NM-02 (casefold), NM-04 (name_parse), NM-05 (alias_expand), NM-06 (phonetic), NM-19 (hash) - Geo: NM-10 (geohash) - Temporal: NM-12 (UTC), NM-15 (bucket) - Metrics: MA-13 (exact), MA-19 (jaro_winkler), MA-17 (haversine), MA-20 (token_set_ratio) - Blocking: MA-01 (hash_blocking) - Scoring: MA-05 (weighted_aggregate)

Growth (Phase 4-5): 25 primitives

Needed for AML + multi-domain: - Text: NM-03 (tokenize), NM-07 (address) - Geo: NM-08 (CRS), NM-09 (geocode) - Temporal: NM-13 (interval), NM-14 (partial) - Metrics: MF-06 (overlap), MF-07 (time gap), MF-09 (decay), MF-16 (address) - Blocking: MA-02 (sorted neighborhood), MA-04 (LSH) - Scoring: MA-06 (Fellegi-Sunter), MA-07 (travel gate) - Clustering: MA-10 (union-find)

Production (Phase 5+): 34 primitives

  • Tracking: TR-01 through TR-07
  • Advanced: MA-08 (ML ranking), MA-11 (connected components), MA-12 (LLM)
  • CA extensions: CA-11 through CA-22

Test Strategy

Per-Primitive Tests

Every primitive gets a unit test following this pattern:

def test_primitive_nm04_name_parse():
    op = NameParser()
    op.parameters = {"features": ["full_name"]}
    op.data = pd.DataFrame({"full_name": ["John Smith", "SMITH, John", "Dr. J. Smith Jr."]})
    result = op.execute()
    assert result["full_name_given"].tolist() == ["John", "John", "Dr."]
    assert result["full_name_family"].tolist() == ["Smith", "SMITH", "Jr."]

def test_primitive_ma19_jaro_winkler():
    metric = JaroWinkler()
    assert metric.compute("Margaret Chen", "Margaret Chen") >= 0.99
    assert metric.compute("Dorothy Williams", "Dot Williams") >= 0.70
    assert metric.compute("John Smith", "Jane Doe") < 0.50
    assert metric.compute(None, "test") == 0.0

Registry Integration Tests

def test_all_fusion_ops_registered():
    """Every implemented primitive must be in a registry."""
    from parallax.ops.fusion import FUSION_OPS
    assert "unicode_nfkc" in FUSION_OPS
    assert "casefold" in FUSION_OPS
    # ... all implemented ops

def test_metric_registry_complete():
    assert "exact" in METRIC_REGISTRY
    assert "jaro_winkler" in METRIC_REGISTRY
    assert "haversine" in METRIC_REGISTRY

def test_lens_resolves_all_transforms():
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    ops = resolve_transforms(spec.field_groupings.identity_hints)
    for op in ops:
        assert op["command"] in FUSION_OPS

File Layout

parallax/ops/fusion/
├── __init__.py             # FUSION_OPS dict, register_fusion_ops()
├── base.py                 # FusionTransform base class
├── text.py                 # NM-01 through NM-07
├── geo.py                  # NM-08 through NM-11
├── temporal.py             # NM-12 through NM-15
├── numeric.py              # NM-16 through NM-19
└── tests/
    ├── test_text.py
    ├── test_geo.py
    ├── test_temporal.py
    └── test_numeric.py

parallax/ops/fusion/
├── __init__.py             # FUSION_OPS dict, register_fusion_ops()
├── base.py                 # FusionTransform base class
├── text.py                 # NM-01 through NM-07
├── geo.py                  # NM-08 through NM-11
├── temporal.py             # NM-12 through NM-15
├── numeric.py              # NM-16 through NM-19
├── metric_base.py          # FusionMetric base class
├── metrics.py              # METRIC_REGISTRY + all metric implementations
├── algorithms/
│   ├── __init__.py         # BLOCKING_REGISTRY, SCORING_REGISTRY
│   ├── blocking.py         # MA-01 through MA-04
│   ├── scoring.py          # MA-05 through MA-08
│   └── clustering.py       # MA-09 through MA-11
├── tracking.py             # TR-01 through TR-07 (Phase 5)
└── tests/
    ├── test_text.py
    ├── test_geo.py
    ├── test_temporal.py
    ├── test_numeric.py
    ├── test_metrics.py
    ├── test_blocking.py
    └── test_scoring.py

cortex/tools/
├── fusion.py               # MCP tool wrappers (fusion_execute, fusion_status, fusion_results)
└── tests/
    └── test_fusion_tools.py

titan/modeling/train/library/
├── fusion_match_op.py      # FusionMatch FATE adapter (thin — calls parallax)
└── fusion_result.py        # FusionResult for TRAINEDMODEL

Integration Points

Entity Search vs Entity Fusion vs Entity Tracking

Why This Distinction Matters

Industry approaches to "entity resolution" (e.g., Databricks KARL/KARLBench, traditional RAG pipelines) conflate three fundamentally different operations. Our 94 primitives cover fusion and tracking — operations that no retrieval-based system addresses. This section clarifies the boundaries.

Three Operations, Not One

Operation What It Does Input Output Stateful? Our Coverage
Entity Search Find an entity (or all entities) matching constraints in a corpus Query + constraints Retrieved documents / entity list No Not in scope — this is retrieval, not fusion
Entity Fusion Determine whether records across heterogeneous systems refer to the same real-world entity Records from N federated sources + lens Scored pairs, entity clusters, confidence values, evidence blocks Per-run CA (23) + NM (19) + MF (17) + MA (20) = 79 primitives
Entity Tracking Maintain resolved entity identity over time as data changes Entity clusters + new/updated records Merge/split events, temporal decay, longitudinal identity Yes — persistent TR (7) = 7 primitives

What Learned Retrieval Systems Do (KARL, Instructed Retriever)

Databricks KARL (KARLBench, March 2026) trains a multi-task RL agent across six enterprise search behaviors. Two are entity-related:

  • BrowseComp-Plus (constraint-driven entity search) — find ONE entity matching multiple attribute constraints across documents. Progressive filtering. The entity already exists as a discrete object in a corpus.
  • QAMPARI (exhaustive entity retrieval) — find ALL entities meeting a condition across dispersed passages. Comprehensive coverage.

Key finding: single-task training fails (59.6 on entity search alone), but multi-task RL across all six behaviors generalizes to match frontier models (67.5 with parallel compute). Training on any 2 of 6 tasks transfers to the remaining 4.

What KARL does NOT do: - No normalization — takes text as-is from documents (no unicode_nfkc, casefold, phonetic encoding, address standardization) - No cross-schema alignment — operates on a single centralized index (Unity Catalog), not N federated schemas - No pairwise similarity computation — no jaro_winkler("Margaret Smith", "M. Smyth") → 0.82 - No probabilistic matching — no Fellegi-Sunter, no weighted aggregate scoring, no confidence intervals - No entity clustering — no union-find across N×(N-1)/2 pairwise results from distributed nodes - No tracking — stateless query-in, answer-out; no temporal persistence, merge/split detection, or longitudinal identity - No evidence chain — no query hashes, no frozen blocks, no attestation (our Invariants #2, #3, #4, #6)

Where Learned Retrieval Complements Our Pipeline

The multi-task RL pattern is applicable to the non-deterministic stages of our pipeline where adaptability matters more than auditability:

Our Stage Current Approach KARL-Style Enhancement Phase
Schema Binding LLM-assisted field discovery (SCHEMA-BINDING.md) Multi-task model trained on lens semantics + local schema introspection. Combines PMBench-style fact extraction with BrowseComp-style constraint matching. Future
Blocking Strategy Lens author manually selects blocking keys Adaptive blocking selection per federate based on data characteristics — tight where clean, loose where messy. Future
Candidate Ranking Deterministic blocking key match Instructed retrieval at each federate, using full lens context to retrieve better candidates before scoring. Future

Critical constraint: The deterministic stages (transforms, metrics, weighted scoring, evidence blocks) MUST remain lens-driven and auditable. Invariant #6 (AI assists, humans attest) requires the decision chain to be explainable. A learned model can improve what enters the pipeline; it cannot replace the pipeline itself.

Architectural Principle

LEARNED (adaptive, approximate)          DETERMINISTIC (auditable, exact)
─────────────────────────────           ────────────────────────────────
Schema binding at federate    ─────►    Normalization transforms (NM)
Candidate retrieval           ─────►    Blocking keys (MA-01..04)
Blocking strategy selection   ─────►    Pairwise scoring (MF + MA-05..08)
                                        Entity clustering (MA-09..11)
                                        Evidence blocks + attestation
                                        Tracking (TR-01..07)

The lens is the bridge: it instructs the learned retrieval stage (what to look for, semantic context, field relationships) AND drives the deterministic pipeline (exact transforms, metrics, weights). Same YAML, two consumers.

Centralized vs Federated

KARL assumes all data is in one lakehouse (Databricks Unity Catalog). Our architecture is federated — data never moves (Invariant: "AI goes to the data, not data to AI"). This means:

  • KARL builds one index, does instructed retrieval against it
  • We send the lens to N federates, each runs local retrieval + transforms against its own schema
  • The lens must instruct N different retrievers at N different nodes, each interpreting semantic intent against different local realities
  • UDS (Invariant #1) enforces ABAC across all federates — learned retrieval at each node operates within UDS access boundaries

The multi-task RL insight is architecture-independent: it works regardless of where the data sits. But federation makes it harder because the learned model must generalize across schema diversity, not just task diversity.


Depends on: component.parallax.feature-extraction, component.parallax.lens-parser

Realizes: product.fusion

Required by: component.parallax.derived-features, component.parallax.tracking-integration