FusionMatch Model (FATE Pipeline Integration)
Status & scope
- Stage: POC — VRS Use Case
- Module:
titan/modeling/train/library/fusion_match_op.py,titan/modeling/train/library/fusion_scorer.py - Milestone: M4 (Dask Integration)
Purpose
FusionMatch is a new model type that registers in the existing FATE pipeline alongside HomoLR, HomoNN, HomoSecureBoost, HeteroKmeans, etc. It performs entity matching instead of training a statistical model. It receives data from the operations chain and produces match candidates.
Model Registration
models.json entry
{
"FusionMatch": {
"model_name": "Federated Entity Matching",
"type": "entity_matching",
"library": "federated",
"framework": "federated",
"model_description": "Matches entities across federated nodes using a Semantic Lens. Computes weighted similarity scores across configurable fields, applies blocking for performance, and produces confirmed match candidates with provenance.",
"partitioning": "horizontal",
"releases": [],
"hyperparameters": {
"type": "object",
"x-display-name": "FusionMatch Parameters",
"x-parameter-groups": {
"basic": {
"task_config": ["task_type", "output_type"],
"objects": ["lens_config"],
"dynamic": ["initial_threshold", "confirmation_threshold"]
},
"advanced": {
"boolean": ["mint_passport"],
"dynamic": ["null_penalty", "min_independent_sources"]
}
},
"required": ["version", "lens_config", "task_type", "output_type"],
"properties": {
"version": {
"type": "string",
"default": "1.11",
"x-show": false
},
"task_type": {
"type": "string",
"x-display-name": "Task Type",
"enum": ["Entity Matching"],
"default": "Entity Matching"
},
"output_type": {
"type": "string",
"x-display-name": "Output Type",
"enum": ["Match Candidates"],
"default": "Match Candidates"
},
"lens_config": {
"type": "object",
"x-display-name": "Semantic Lens",
"description": "Lens YAML configuration defining field matching rules, weights, and thresholds."
},
"initial_threshold": {
"type": "number",
"format": "float",
"x-display-name": "Initial Threshold",
"description": "Minimum confidence to propose a match candidate.",
"minimum": 0.0,
"maximum": 1.0,
"default": 0.65,
"x-priority": true
},
"confirmation_threshold": {
"type": "number",
"format": "float",
"x-display-name": "Confirmation Threshold",
"description": "Minimum confidence to confirm a match.",
"minimum": 0.0,
"maximum": 1.0,
"default": 0.80,
"x-priority": true
},
"null_penalty": {
"type": "number",
"format": "float",
"x-display-name": "Null Penalty",
"description": "Confidence penalty per null field.",
"minimum": 0.0,
"maximum": 0.5,
"default": 0.1
},
"min_independent_sources": {
"type": "integer",
"x-display-name": "Min Independent Sources",
"description": "Minimum federate nodes confirming a match.",
"minimum": 2,
"default": 2
}
}
}
}
}
script_mapping entry
# In federated_model_runner_v2.py
script_mapping = {
"CoordinatedLR": "coordinated_lr",
"HomoLR": "homo_lr",
"HomoNN": "homo_nn",
"HomoSecureBoost": "homo_secure_boost",
"HeteroSecureBoost": "hetero_secure_boost",
"HeteroKmeans": "hetero_kmeans",
# ...existing models...
"FusionMatch": "fusion_match", # NEW
}
Pipeline Flow
Standard ML pipeline:
Reader → add_operations() → PSI → DataSplit → Model → Evaluation
FusionMatch pipeline (PSI + DataSplit bypassed):
Reader → add_operations(lens transforms) → FusionMatch
nocode_start() modification
def nocode_start(self):
model_name = self.config.get("definition")
if model_name == "FusionMatch":
# Bypass PSI and DataSplit — not applicable to entity matching
pipeline = self.build_fusion_pipeline()
else:
# Existing flow for ML models
pipeline = self.build_ml_pipeline()
FusionMatch Implementation
fusion_match_op.py — FATE Pipeline Component
class FusionMatch:
"""Entity matching model for the FATE pipeline.
Receives:
- Local feature vectors (from operations chain)
- Remote feature vectors (from xanadu/federation exchange)
- Lens config (from hyperparameters)
Produces:
- Match candidates with confidence scores
- Confirmed matches above confirmation_threshold
- Passport tokens for confirmed matches (if enabled)
- Performance metrics (precision, recall, reduction ratio)
"""
def __init__(self, config: dict):
self.lens_spec = parse_lens_from_dict(config["lens_config"])
self.initial_threshold = config.get("initial_threshold", 0.65)
self.confirmation_threshold = config.get("confirmation_threshold", 0.80)
self.null_penalty = config.get("null_penalty", 0.1)
def run(self, local_df: dask.dataframe.DataFrame,
remote_df: dask.dataframe.DataFrame) -> FusionResult:
"""Execute the full fusion pipeline.
1. Block both sides → candidate pairs
2. Score each candidate pair
3. Filter by thresholds
4. Mint passports for confirmed matches
5. Compute performance metrics
"""
# Step 1: Blocking
candidates, blocking_stats = block_and_pair(
local_df, remote_df,
self.lens_spec.identity_fusion.blocking_strategy
)
# Step 2: Scoring
scored = score_all_candidates(
local_df, remote_df, candidates,
self.lens_spec.identity_fusion.match_function,
self.null_penalty
)
# Step 3: Threshold filtering
confirmed = scored[scored["confidence"] >= self.confirmation_threshold]
proposed = scored[
(scored["confidence"] >= self.initial_threshold) &
(scored["confidence"] < self.confirmation_threshold)
]
rejected = scored[scored["confidence"] < self.initial_threshold]
return FusionResult(
lens_id=self.lens_spec.lens_id,
lens_version=self.lens_spec.version,
confirmed=confirmed,
proposed=proposed,
rejected=rejected,
blocking_stats=blocking_stats,
)
fusion_scorer.py — Dask DataFrame Operation
def dask_fusion_score(
left_df: dask.dataframe.DataFrame,
right_df: dask.dataframe.DataFrame,
lens_config: dict,
) -> dask.dataframe.DataFrame:
"""Wrap score_pair to operate on Dask DataFrames.
This is the Dask-native entry point called by FusionMatch.run().
Input: two Dask DataFrames (left = local, right = remote).
Output: Dask DataFrame with columns [left_id, right_id, confidence, per_field_scores].
"""
FusionResult
@dataclass
class FusionResult:
lens_id: str
lens_version: str
confirmed: pd.DataFrame # Matches above confirmation_threshold
proposed: pd.DataFrame # Between initial and confirmation
rejected: pd.DataFrame # Below initial_threshold
blocking_stats: BlockingStats
@property
def performance(self) -> dict:
"""Performance metrics for TRAINEDMODEL node."""
return {
"Total Candidates": len(self.confirmed) + len(self.proposed) + len(self.rejected),
"Confirmed Matches": len(self.confirmed),
"Proposed (Ambiguous)": len(self.proposed),
"Rejected": len(self.rejected),
"Comparison Reduction": f"{self.blocking_stats.reduction_ratio:.1f}x",
"Avg Confidence (Confirmed)": float(self.confirmed["confidence"].mean())
if len(self.confirmed) > 0 else 0.0,
}
UI Workflow Node
NO_CODE_MODEL node (before training)
{
"type": "NO_CODE_MODEL",
"id": "FusionMatch",
"workflow": {
"model_name": "Federated Entity Matching",
"name": "FusionMatch",
"partitioning": "horizontal",
"type": "entity_matching",
"class": "no-code"
},
"inputs": {
"input_1": {"connections": [{"input": "output_1", "node": "<dataset_node_id>"}]}
}
}
TRAINEDMODEL node (after completion)
{
"type": "TRAINEDMODEL",
"workflow": {
"framework": "federated",
"library": "federated",
"model_name": "VRS Vulnerability Match",
"definition": "FusionMatch",
"status": "TRAINING COMPLETE",
"type": "entity_matching",
"nocode": true,
"parameters": {
"lens_id": "vrs_vulnerability_v1",
"initial_threshold": 0.65,
"confirmation_threshold": 0.80,
"null_penalty": 0.1,
"task_type": "Entity Matching",
"output_type": "Match Candidates"
},
"performance": {
"Total Candidates": 87,
"Confirmed Matches": 18,
"Proposed (Ambiguous)": 12,
"Rejected": 57,
"Comparison Reduction": "12.4x",
"Avg Confidence (Confirmed)": 0.89
}
}
}
Test Fixtures
FIX-01: FusionMatch runs end-to-end
def test_fusion_match_e2e():
config = {
"lens_config": load_yaml("fixtures/vrs_vulnerability_v1.yaml"),
"initial_threshold": 0.65,
"confirmation_threshold": 0.80,
}
model = FusionMatch(config)
df_a = load_dask_csv("fixtures/node_a_customers.csv")
df_b = load_dask_csv("fixtures/node_b_customers.csv")
# Apply feature extraction
spec = model.lens_spec
features_a = extract_features(df_a, spec)
features_b = extract_features(df_b, spec)
result = model.run(features_a, features_b)
assert len(result.confirmed) > 0
assert all(result.confirmed["confidence"] >= 0.80)
assert result.blocking_stats.reduction_ratio > 1.0
FIX-02: Same results on Dask vs pandas
def test_dask_matches_pandas():
"""Dask-parallel results identical to pure Python."""
config = load_yaml("fixtures/vrs_vulnerability_v1.yaml")
# ... run both and compare
FIX-03: Performance metrics are correct
def test_performance_metrics():
result = run_fusion_on_sample_data()
perf = result.performance
assert "Total Candidates" in perf
assert "Confirmed Matches" in perf
assert perf["Confirmed Matches"] + perf["Proposed (Ambiguous)"] + perf["Rejected"] == perf["Total Candidates"]
File Layout
titan/modeling/train/library/
├── fusion_match_op.py # FusionMatch class
├── fusion_scorer.py # dask_fusion_score
└── fusion_result.py # FusionResult dataclass
titan/modeling/train/
└── federated_model_runner_v2.py # Add "FusionMatch" to script_mapping
Integration Points
- script_mapping → here:
"FusionMatch": "fusion_match"in federated_model_runner_v2.py - nocode_start() → here: Pipeline bypass (no PSI, no DataSplit)
- component.parallax.lens-parser → here: Lens config from hyperparameters →
parse_lens_from_dict() - component.parallax.feature-extraction → here: Feature extraction in
add_operations()chain - component.parallax.blocking-engine → here:
block_and_pair()for candidate generation - component.parallax.scoring-engine → here:
score_all_candidates()for match scoring - Here → component.parallax.adi-integration: FusionResult → ADI signal + evidence generation
- Here → xanadu: Remote feature vectors received via federation exchange
Depends on: component.parallax.blocking-engine, component.parallax.feature-extraction, component.parallax.lens-parser, component.parallax.scoring-engine, component.parallax.three-phase-protocol, component.titan.runtime
Realizes: product.fusion
Required by: component.parallax.adi-integration