Feature Extraction Operations
Status & scope
- Stage: POC — VRS Use Case
- Module:
parallax/ops/fusion/extractor.py+ parallax operation chain - Milestone: M5 (Feature Extraction)
Runtime API note: Signatures below are shown with
dask.dataframe.DataFramefor the platform-mode end state. The standalone-mode implementation inparallax/ops/fusion/extractor.pyacceptslist[dict]— pure Python, no Dask dependency — and the dispatch wrappers inparallax/ops/fusion/dispatch.py(ExtractVectorsOp) provide the Dask bridge viamap_partitionswhen invoked through axonis-core'sfeature_engineer. Both forms are supported.lens_to_operations(spec)(this spec, §lens compilation) emits the op-chain payloads that titan submits through the dispatch path.
Purpose
Convert raw records into feature vectors using lens-defined transforms. Feature extraction runs as a sequence of existing parallax operations in the add_operations() chain — not as custom code. The lens parser (component.parallax.lens-parser) produces the operation sequence; this spec defines how each lens transform maps to an parallax operation.
Architecture
LensSpec.field_groupings
│
▼
┌─────────────────────────────────┐
│ lens_to_operations(spec) │ Compile lens → parallax op sequence
│ Returns: list[OperationConfig] │
└─────────────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ parallax add_operations() chain │ Existing pipeline infrastructure
│ DaskCommand / HomoDaskStandardOp│ Each op: Redis in → transform → Redis out
└─────────────────────────────────┘
│
▼
Feature Vector DataFrame (in Redis)
Columns: [entity_id, name_jw, dob, postcode_prefix, phone_hash, email_hash]
Suppressed fields: ABSENT (dropped by operation chain)
Transform → Parallax Operation Mapping
| Lens Transform | Parallax Operation | Parameters | Output Column |
|---|---|---|---|
exact |
(no-op, passthrough) | — | Same column, unchanged |
hash \| algorithm: sha256 |
Custom op: fusion_hash |
{"features": ["phone"], "algorithm": "sha256"} |
phone_hash |
jaro_winkler |
(no-op for extraction, scoring uses raw value) | — | Same column, unchanged |
soundex |
Custom op: fusion_soundex |
{"features": ["full_name"]} |
full_name_soundex |
geo_prefix \| chars: 3 |
Custom op: fusion_geo_prefix |
{"features": ["postcode"], "chars": 3} |
postcode_prefix |
year |
Custom op: fusion_year |
{"features": ["date_of_birth"]} |
dob_year |
Suppression is implemented as a final drop operation that removes all fields NOT in identity_hints + entity_id.
Public API
def lens_to_operations(spec: LensSpec) -> list[dict]:
"""Convert a LensSpec into a sequence of parallax operation configs.
Returns list of operation dicts in the format expected by
Engineer.create() / feature_engineer():
[
{"command": "fusion_hash", "parameters": {"features": ["phone"], "algorithm": "sha256"}},
{"command": "fusion_hash", "parameters": {"features": ["email"], "algorithm": "sha256"}},
{"command": "fusion_soundex", "parameters": {"features": ["full_name"]}},
{"command": "fusion_geo_prefix", "parameters": {"features": ["postcode"], "chars": 3}},
{"command": "drop", "parameters": {"features": ["vulnerability_status", "case_notes", ...]}},
]
"""
def extract_features(df: dask.dataframe.DataFrame, spec: LensSpec) -> dask.dataframe.DataFrame:
"""Apply lens transforms directly to a Dask DataFrame.
This is the standalone path (used in tests and FusionMatch model).
In the full pipeline, lens_to_operations() feeds the parallax chain instead.
Returns DataFrame with only: entity_id + identity_hint columns (transformed).
All suppressed and context fields are dropped.
"""
New Parallax Operations (Custom)
These register in the parallax OPS dict alongside existing operations:
fusion_hash
class FusionHash(Command):
"""SHA-256 hash of field values. For PII protection in transit."""
def execute(self):
import hashlib
for col in self.parameters["features"]:
algorithm = self.parameters.get("algorithm", "sha256")
self.data[f"{col}_hash"] = self.data[col].apply(
lambda v: hashlib.new(algorithm, str(v).encode()).hexdigest()
if pd.notna(v) else None,
meta=(f"{col}_hash", "str")
)
return self.data
fusion_soundex
class FusionSoundex(Command):
"""Soundex phonetic encoding. For blocking key generation."""
def execute(self):
import jellyfish
for col in self.parameters["features"]:
self.data[f"{col}_soundex"] = self.data[col].apply(
lambda v: jellyfish.soundex(str(v).split()[0]) if pd.notna(v) else None,
meta=(f"{col}_soundex", "str")
)
return self.data
fusion_geo_prefix
class FusionGeoPrefix(Command):
"""Extract first N characters of postcode for area-level matching."""
def execute(self):
chars = int(self.parameters.get("chars", 3))
for col in self.parameters["features"]:
self.data[f"{col}_prefix"] = self.data[col].apply(
lambda v: str(v).replace(" ", "")[:chars].upper() if pd.notna(v) else None,
meta=(f"{col}_prefix", "str")
)
return self.data
fusion_year
class FusionYear(Command):
"""Extract year from date field. For blocking key generation."""
def execute(self):
for col in self.parameters["features"]:
self.data[f"{col}_year"] = self.data[col].apply(
lambda v: str(v)[:4] if pd.notna(v) else None,
meta=(f"{col}_year", "str")
)
return self.data
VRS Example: Full Operation Sequence
For the VRS lens, lens_to_operations() produces:
[
# 1. Hash PII fields
{"command": "fusion_hash", "parameters": {"features": ["phone"], "algorithm": "sha256"}},
{"command": "fusion_hash", "parameters": {"features": ["email"], "algorithm": "sha256"}},
# 2. Soundex for blocking (name)
{"command": "fusion_soundex", "parameters": {"features": ["full_name"]}},
# 3. Geo prefix for area matching
{"command": "fusion_geo_prefix", "parameters": {"features": ["postcode"], "chars": 3}},
# 4. Year extraction for blocking (DOB)
{"command": "fusion_year", "parameters": {"features": ["date_of_birth"]}},
# 5. Drop suppressed + raw PII (keep only feature columns)
{"command": "drop", "parameters": {
"features": ["vulnerability_status", "case_notes", "risk_score",
"phone", "email"],
"applyOnAllFeatures": false
}}
]
Input DataFrame (25 rows):
| entity_id | full_name | date_of_birth | postcode | phone | email | vulnerability_category | vulnerability_status | case_notes | risk_score |
|-----------|-----------------|---------------|-----------|-------------|--------------------|-----------------------|---------------------|------------|------------|
| A001 | Margaret Chen | 1947-03-15 | SW1A 1AA | 07700900001 | mchen@example.com | elderly_isolation | active | ... | 7 |
Output Feature Vector (25 rows):
| entity_id | full_name | date_of_birth | postcode_prefix | phone_hash | email_hash | full_name_soundex | dob_year |
|-----------|-----------------|---------------|-----------------|----------------|----------------|-------------------|----------|
| A001 | Margaret Chen | 1947-03-15 | SW1 | a1b2c3d4... | e5f6g7h8... | M626 | 1947 |
Note: full_name kept raw for jaro_winkler scoring. date_of_birth kept raw for exact scoring. Transformed columns added alongside.
Suppression Enforcement (GDPR Critical)
def verify_suppression(features: list[dict], suppressed_fields: list[str]) -> bool:
"""Verify that NO suppressed field exists in any feature vector.
This is a defense-in-depth check that runs AFTER extraction.
The drop operation should have removed them, but we verify.
`suppressed_fields` is the union of both suppression sources —
field_groupings.suppressed_fields ∪ policy_envelope.field_suppression
(product.lens#invariants) — as emitted by lens_to_operations.
Raises SuppressionBreachError if any suppressed field is present.
"""
suppressed = set(suppressed_fields)
for fv in features:
breach = suppressed & set(fv.keys())
if breach:
raise SuppressionBreachError(f"Suppressed fields in feature vector: {breach}")
return True
Test Fixtures
FIX-01: VRS extraction
def test_extract_vrs_features():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
raw_df = load_csv("fixtures/node_a_customers.csv")
features = extract_features(raw_df, spec)
# Correct columns present
assert "full_name" in features.columns
assert "phone_hash" in features.columns
assert "postcode_prefix" in features.columns
# Suppressed fields absent
assert "vulnerability_status" not in features.columns
assert "case_notes" not in features.columns
assert "risk_score" not in features.columns
# Raw PII absent
assert "phone" not in features.columns
assert "email" not in features.columns
# Row count preserved
assert len(features) == len(raw_df)
FIX-02: Hash determinism
def test_hash_determinism():
"""Same input always produces same hash."""
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
df1 = extract_features(load_csv("fixtures/node_a_customers.csv"), spec)
df2 = extract_features(load_csv("fixtures/node_a_customers.csv"), spec)
assert (df1["phone_hash"] == df2["phone_hash"]).all().compute()
FIX-03: Null handling
def test_null_fields_produce_null_features():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
df = load_csv("fixtures/node_a_customers.csv")
df.loc[0, "phone"] = None
features = extract_features(df, spec)
assert features["phone_hash"].iloc[0] is None
FIX-04: Suppression breach detection
def test_suppression_breach_detected():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
# Manually create a DataFrame WITH suppressed fields
bad_df = pd.DataFrame({"entity_id": ["A001"], "vulnerability_status": ["active"]})
with pytest.raises(SuppressionBreachError):
verify_suppression(bad_df, spec)
FIX-05: lens_to_operations produces valid parallax configs
def test_lens_to_operations_format():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
ops = lens_to_operations(spec)
for op in ops:
assert "command" in op
assert "parameters" in op
assert isinstance(op["parameters"], dict)
# Last operation should be drop (suppression)
assert ops[-1]["command"] == "drop"
File Layout
parallax/ops/fusion/
├── extractor.py # lens_to_operations, extract_features, verify_suppression
└── tests/
└── test_extractor.py
parallax/ops/
├── fusion_hash.py # FusionHash(Command)
├── fusion_soundex.py # FusionSoundex(Command)
├── fusion_geo_prefix.py # FusionGeoPrefix(Command)
├── fusion_year.py # FusionYear(Command)
└── feature.py # Add to OPS dict: fusion_hash, fusion_soundex, etc.
Integration Points
- component.parallax.lens-parser → here:
LensSpec.field_groupingsdrives the operation sequence - Here → parallax: Custom ops register in
OPSdict infeature.py - Here → component.parallax.blocking-engine: Blocking keys (soundex, year) are added as columns during extraction
- Here → component.parallax.fusionmatch-model:
lens_to_operations()output feedsadd_operations()in FATE pipeline - Here → Redis: Feature vectors stored in Redis cache, keyed by dataset name
Depends on: component.parallax.lens-parser
Realizes: product.lens
Required by: component.parallax.blocking-engine, component.parallax.fusion-binding, component.parallax.fusionmatch-model, component.parallax.primitives-framework