Skip to content

Feature Extraction Operations

Status & scope

  • Stage: POC — VRS Use Case
  • Module: parallax/ops/fusion/extractor.py + parallax operation chain
  • Milestone: M5 (Feature Extraction)

Runtime API note: Signatures below are shown with dask.dataframe.DataFrame for the platform-mode end state. The standalone-mode implementation in parallax/ops/fusion/extractor.py accepts list[dict] — pure Python, no Dask dependency — and the dispatch wrappers in parallax/ops/fusion/dispatch.py (ExtractVectorsOp) provide the Dask bridge via map_partitions when invoked through axonis-core's feature_engineer. Both forms are supported. lens_to_operations(spec) (this spec, §lens compilation) emits the op-chain payloads that titan submits through the dispatch path.

Purpose

Convert raw records into feature vectors using lens-defined transforms. Feature extraction runs as a sequence of existing parallax operations in the add_operations() chain — not as custom code. The lens parser (component.parallax.lens-parser) produces the operation sequence; this spec defines how each lens transform maps to an parallax operation.

Architecture

LensSpec.field_groupings
        │
        ▼
┌─────────────────────────────────┐
│ lens_to_operations(spec)        │  Compile lens → parallax op sequence
│ Returns: list[OperationConfig]  │
└─────────────────────────────────┘
        │
        ▼
┌─────────────────────────────────┐
│ parallax add_operations() chain   │  Existing pipeline infrastructure
│ DaskCommand / HomoDaskStandardOp│  Each op: Redis in → transform → Redis out
└─────────────────────────────────┘
        │
        ▼
Feature Vector DataFrame (in Redis)
  Columns: [entity_id, name_jw, dob, postcode_prefix, phone_hash, email_hash]
  Suppressed fields: ABSENT (dropped by operation chain)

Transform → Parallax Operation Mapping

Lens Transform Parallax Operation Parameters Output Column
exact (no-op, passthrough) Same column, unchanged
hash \| algorithm: sha256 Custom op: fusion_hash {"features": ["phone"], "algorithm": "sha256"} phone_hash
jaro_winkler (no-op for extraction, scoring uses raw value) Same column, unchanged
soundex Custom op: fusion_soundex {"features": ["full_name"]} full_name_soundex
geo_prefix \| chars: 3 Custom op: fusion_geo_prefix {"features": ["postcode"], "chars": 3} postcode_prefix
year Custom op: fusion_year {"features": ["date_of_birth"]} dob_year

Suppression is implemented as a final drop operation that removes all fields NOT in identity_hints + entity_id.

Public API

def lens_to_operations(spec: LensSpec) -> list[dict]:
    """Convert a LensSpec into a sequence of parallax operation configs.

    Returns list of operation dicts in the format expected by
    Engineer.create() / feature_engineer():

    [
        {"command": "fusion_hash", "parameters": {"features": ["phone"], "algorithm": "sha256"}},
        {"command": "fusion_hash", "parameters": {"features": ["email"], "algorithm": "sha256"}},
        {"command": "fusion_soundex", "parameters": {"features": ["full_name"]}},
        {"command": "fusion_geo_prefix", "parameters": {"features": ["postcode"], "chars": 3}},
        {"command": "drop", "parameters": {"features": ["vulnerability_status", "case_notes", ...]}},
    ]
    """

def extract_features(df: dask.dataframe.DataFrame, spec: LensSpec) -> dask.dataframe.DataFrame:
    """Apply lens transforms directly to a Dask DataFrame.

    This is the standalone path (used in tests and FusionMatch model).
    In the full pipeline, lens_to_operations() feeds the parallax chain instead.

    Returns DataFrame with only: entity_id + identity_hint columns (transformed).
    All suppressed and context fields are dropped.
    """

New Parallax Operations (Custom)

These register in the parallax OPS dict alongside existing operations:

fusion_hash

class FusionHash(Command):
    """SHA-256 hash of field values. For PII protection in transit."""
    def execute(self):
        import hashlib
        for col in self.parameters["features"]:
            algorithm = self.parameters.get("algorithm", "sha256")
            self.data[f"{col}_hash"] = self.data[col].apply(
                lambda v: hashlib.new(algorithm, str(v).encode()).hexdigest()
                if pd.notna(v) else None,
                meta=(f"{col}_hash", "str")
            )
        return self.data

fusion_soundex

class FusionSoundex(Command):
    """Soundex phonetic encoding. For blocking key generation."""
    def execute(self):
        import jellyfish
        for col in self.parameters["features"]:
            self.data[f"{col}_soundex"] = self.data[col].apply(
                lambda v: jellyfish.soundex(str(v).split()[0]) if pd.notna(v) else None,
                meta=(f"{col}_soundex", "str")
            )
        return self.data

fusion_geo_prefix

class FusionGeoPrefix(Command):
    """Extract first N characters of postcode for area-level matching."""
    def execute(self):
        chars = int(self.parameters.get("chars", 3))
        for col in self.parameters["features"]:
            self.data[f"{col}_prefix"] = self.data[col].apply(
                lambda v: str(v).replace(" ", "")[:chars].upper() if pd.notna(v) else None,
                meta=(f"{col}_prefix", "str")
            )
        return self.data

fusion_year

class FusionYear(Command):
    """Extract year from date field. For blocking key generation."""
    def execute(self):
        for col in self.parameters["features"]:
            self.data[f"{col}_year"] = self.data[col].apply(
                lambda v: str(v)[:4] if pd.notna(v) else None,
                meta=(f"{col}_year", "str")
            )
        return self.data

VRS Example: Full Operation Sequence

For the VRS lens, lens_to_operations() produces:

[
    # 1. Hash PII fields
    {"command": "fusion_hash", "parameters": {"features": ["phone"], "algorithm": "sha256"}},
    {"command": "fusion_hash", "parameters": {"features": ["email"], "algorithm": "sha256"}},

    # 2. Soundex for blocking (name)
    {"command": "fusion_soundex", "parameters": {"features": ["full_name"]}},

    # 3. Geo prefix for area matching
    {"command": "fusion_geo_prefix", "parameters": {"features": ["postcode"], "chars": 3}},

    # 4. Year extraction for blocking (DOB)
    {"command": "fusion_year", "parameters": {"features": ["date_of_birth"]}},

    # 5. Drop suppressed + raw PII (keep only feature columns)
    {"command": "drop", "parameters": {
        "features": ["vulnerability_status", "case_notes", "risk_score",
                      "phone", "email"],
        "applyOnAllFeatures": false
    }}
]

Input DataFrame (25 rows):

| entity_id | full_name       | date_of_birth | postcode  | phone       | email              | vulnerability_category | vulnerability_status | case_notes | risk_score |
|-----------|-----------------|---------------|-----------|-------------|--------------------|-----------------------|---------------------|------------|------------|
| A001      | Margaret Chen   | 1947-03-15    | SW1A 1AA  | 07700900001 | mchen@example.com  | elderly_isolation      | active               | ...        | 7          |

Output Feature Vector (25 rows):

| entity_id | full_name       | date_of_birth | postcode_prefix | phone_hash     | email_hash     | full_name_soundex | dob_year |
|-----------|-----------------|---------------|-----------------|----------------|----------------|-------------------|----------|
| A001      | Margaret Chen   | 1947-03-15    | SW1              | a1b2c3d4...    | e5f6g7h8...    | M626              | 1947     |

Note: full_name kept raw for jaro_winkler scoring. date_of_birth kept raw for exact scoring. Transformed columns added alongside.

Suppression Enforcement (GDPR Critical)

def verify_suppression(features: list[dict], suppressed_fields: list[str]) -> bool:
    """Verify that NO suppressed field exists in any feature vector.

    This is a defense-in-depth check that runs AFTER extraction.
    The drop operation should have removed them, but we verify.
    `suppressed_fields` is the union of both suppression sources —
    field_groupings.suppressed_fields ∪ policy_envelope.field_suppression
    (product.lens#invariants) — as emitted by lens_to_operations.

    Raises SuppressionBreachError if any suppressed field is present.
    """
    suppressed = set(suppressed_fields)
    for fv in features:
        breach = suppressed & set(fv.keys())
        if breach:
            raise SuppressionBreachError(f"Suppressed fields in feature vector: {breach}")
    return True

Test Fixtures

FIX-01: VRS extraction

def test_extract_vrs_features():
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    raw_df = load_csv("fixtures/node_a_customers.csv")
    features = extract_features(raw_df, spec)

    # Correct columns present
    assert "full_name" in features.columns
    assert "phone_hash" in features.columns
    assert "postcode_prefix" in features.columns

    # Suppressed fields absent
    assert "vulnerability_status" not in features.columns
    assert "case_notes" not in features.columns
    assert "risk_score" not in features.columns

    # Raw PII absent
    assert "phone" not in features.columns
    assert "email" not in features.columns

    # Row count preserved
    assert len(features) == len(raw_df)

FIX-02: Hash determinism

def test_hash_determinism():
    """Same input always produces same hash."""
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    df1 = extract_features(load_csv("fixtures/node_a_customers.csv"), spec)
    df2 = extract_features(load_csv("fixtures/node_a_customers.csv"), spec)
    assert (df1["phone_hash"] == df2["phone_hash"]).all().compute()

FIX-03: Null handling

def test_null_fields_produce_null_features():
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    df = load_csv("fixtures/node_a_customers.csv")
    df.loc[0, "phone"] = None
    features = extract_features(df, spec)
    assert features["phone_hash"].iloc[0] is None

FIX-04: Suppression breach detection

def test_suppression_breach_detected():
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    # Manually create a DataFrame WITH suppressed fields
    bad_df = pd.DataFrame({"entity_id": ["A001"], "vulnerability_status": ["active"]})
    with pytest.raises(SuppressionBreachError):
        verify_suppression(bad_df, spec)

FIX-05: lens_to_operations produces valid parallax configs

def test_lens_to_operations_format():
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    ops = lens_to_operations(spec)
    for op in ops:
        assert "command" in op
        assert "parameters" in op
        assert isinstance(op["parameters"], dict)
    # Last operation should be drop (suppression)
    assert ops[-1]["command"] == "drop"

File Layout

parallax/ops/fusion/
├── extractor.py            # lens_to_operations, extract_features, verify_suppression
└── tests/
    └── test_extractor.py

parallax/ops/
├── fusion_hash.py          # FusionHash(Command)
├── fusion_soundex.py       # FusionSoundex(Command)
├── fusion_geo_prefix.py    # FusionGeoPrefix(Command)
├── fusion_year.py          # FusionYear(Command)
└── feature.py              # Add to OPS dict: fusion_hash, fusion_soundex, etc.

Integration Points


Depends on: component.parallax.lens-parser

Realizes: product.lens

Required by: component.parallax.blocking-engine, component.parallax.fusion-binding, component.parallax.fusionmatch-model, component.parallax.primitives-framework