Skip to content

FusionMatch Model (FATE Pipeline Integration)

Status & scope

  • Stage: POC — VRS Use Case
  • Module: titan/modeling/train/library/fusion_match_op.py, titan/modeling/train/library/fusion_scorer.py
  • Milestone: M4 (Dask Integration)

Purpose

FusionMatch is a new model type that registers in the existing FATE pipeline alongside HomoLR, HomoNN, HomoSecureBoost, HeteroKmeans, etc. It performs entity matching instead of training a statistical model. It receives data from the operations chain and produces match candidates.

Model Registration

models.json entry

{
  "FusionMatch": {
    "model_name": "Federated Entity Matching",
    "type": "entity_matching",
    "library": "federated",
    "framework": "federated",
    "model_description": "Matches entities across federated nodes using a Semantic Lens. Computes weighted similarity scores across configurable fields, applies blocking for performance, and produces confirmed match candidates with provenance.",
    "partitioning": "horizontal",
    "releases": [],
    "hyperparameters": {
      "type": "object",
      "x-display-name": "FusionMatch Parameters",
      "x-parameter-groups": {
        "basic": {
          "task_config": ["task_type", "output_type"],
          "objects": ["lens_config"],
          "dynamic": ["initial_threshold", "confirmation_threshold"]
        },
        "advanced": {
          "boolean": ["mint_passport"],
          "dynamic": ["null_penalty", "min_independent_sources"]
        }
      },
      "required": ["version", "lens_config", "task_type", "output_type"],
      "properties": {
        "version": {
          "type": "string",
          "default": "1.11",
          "x-show": false
        },
        "task_type": {
          "type": "string",
          "x-display-name": "Task Type",
          "enum": ["Entity Matching"],
          "default": "Entity Matching"
        },
        "output_type": {
          "type": "string",
          "x-display-name": "Output Type",
          "enum": ["Match Candidates"],
          "default": "Match Candidates"
        },
        "lens_config": {
          "type": "object",
          "x-display-name": "Semantic Lens",
          "description": "Lens YAML configuration defining field matching rules, weights, and thresholds."
        },
        "initial_threshold": {
          "type": "number",
          "format": "float",
          "x-display-name": "Initial Threshold",
          "description": "Minimum confidence to propose a match candidate.",
          "minimum": 0.0,
          "maximum": 1.0,
          "default": 0.65,
          "x-priority": true
        },
        "confirmation_threshold": {
          "type": "number",
          "format": "float",
          "x-display-name": "Confirmation Threshold",
          "description": "Minimum confidence to confirm a match.",
          "minimum": 0.0,
          "maximum": 1.0,
          "default": 0.80,
          "x-priority": true
        },
        "null_penalty": {
          "type": "number",
          "format": "float",
          "x-display-name": "Null Penalty",
          "description": "Confidence penalty per null field.",
          "minimum": 0.0,
          "maximum": 0.5,
          "default": 0.1
        },
        "min_independent_sources": {
          "type": "integer",
          "x-display-name": "Min Independent Sources",
          "description": "Minimum federate nodes confirming a match.",
          "minimum": 2,
          "default": 2
        }
      }
    }
  }
}

script_mapping entry

# In federated_model_runner_v2.py
script_mapping = {
    "CoordinatedLR": "coordinated_lr",
    "HomoLR": "homo_lr",
    "HomoNN": "homo_nn",
    "HomoSecureBoost": "homo_secure_boost",
    "HeteroSecureBoost": "hetero_secure_boost",
    "HeteroKmeans": "hetero_kmeans",
    # ...existing models...
    "FusionMatch": "fusion_match",     # NEW
}

Pipeline Flow

Standard ML pipeline:

Reader → add_operations() → PSI → DataSplit → Model → Evaluation

FusionMatch pipeline (PSI + DataSplit bypassed):

Reader → add_operations(lens transforms) → FusionMatch

nocode_start() modification

def nocode_start(self):
    model_name = self.config.get("definition")

    if model_name == "FusionMatch":
        # Bypass PSI and DataSplit — not applicable to entity matching
        pipeline = self.build_fusion_pipeline()
    else:
        # Existing flow for ML models
        pipeline = self.build_ml_pipeline()

FusionMatch Implementation

fusion_match_op.py — FATE Pipeline Component

class FusionMatch:
    """Entity matching model for the FATE pipeline.

    Receives:
      - Local feature vectors (from operations chain)
      - Remote feature vectors (from xanadu/federation exchange)
      - Lens config (from hyperparameters)

    Produces:
      - Match candidates with confidence scores
      - Confirmed matches above confirmation_threshold
      - Passport tokens for confirmed matches (if enabled)
      - Performance metrics (precision, recall, reduction ratio)
    """

    def __init__(self, config: dict):
        self.lens_spec = parse_lens_from_dict(config["lens_config"])
        self.initial_threshold = config.get("initial_threshold", 0.65)
        self.confirmation_threshold = config.get("confirmation_threshold", 0.80)
        self.null_penalty = config.get("null_penalty", 0.1)

    def run(self, local_df: dask.dataframe.DataFrame,
            remote_df: dask.dataframe.DataFrame) -> FusionResult:
        """Execute the full fusion pipeline.

        1. Block both sides → candidate pairs
        2. Score each candidate pair
        3. Filter by thresholds
        4. Mint passports for confirmed matches
        5. Compute performance metrics
        """
        # Step 1: Blocking
        candidates, blocking_stats = block_and_pair(
            local_df, remote_df,
            self.lens_spec.identity_fusion.blocking_strategy
        )

        # Step 2: Scoring
        scored = score_all_candidates(
            local_df, remote_df, candidates,
            self.lens_spec.identity_fusion.match_function,
            self.null_penalty
        )

        # Step 3: Threshold filtering
        confirmed = scored[scored["confidence"] >= self.confirmation_threshold]
        proposed = scored[
            (scored["confidence"] >= self.initial_threshold) &
            (scored["confidence"] < self.confirmation_threshold)
        ]
        rejected = scored[scored["confidence"] < self.initial_threshold]

        return FusionResult(
            lens_id=self.lens_spec.lens_id,
            lens_version=self.lens_spec.version,
            confirmed=confirmed,
            proposed=proposed,
            rejected=rejected,
            blocking_stats=blocking_stats,
        )

fusion_scorer.py — Dask DataFrame Operation

def dask_fusion_score(
    left_df: dask.dataframe.DataFrame,
    right_df: dask.dataframe.DataFrame,
    lens_config: dict,
) -> dask.dataframe.DataFrame:
    """Wrap score_pair to operate on Dask DataFrames.

    This is the Dask-native entry point called by FusionMatch.run().
    Input: two Dask DataFrames (left = local, right = remote).
    Output: Dask DataFrame with columns [left_id, right_id, confidence, per_field_scores].
    """

FusionResult

@dataclass
class FusionResult:
    lens_id: str
    lens_version: str
    confirmed: pd.DataFrame         # Matches above confirmation_threshold
    proposed: pd.DataFrame           # Between initial and confirmation
    rejected: pd.DataFrame           # Below initial_threshold
    blocking_stats: BlockingStats

    @property
    def performance(self) -> dict:
        """Performance metrics for TRAINEDMODEL node."""
        return {
            "Total Candidates": len(self.confirmed) + len(self.proposed) + len(self.rejected),
            "Confirmed Matches": len(self.confirmed),
            "Proposed (Ambiguous)": len(self.proposed),
            "Rejected": len(self.rejected),
            "Comparison Reduction": f"{self.blocking_stats.reduction_ratio:.1f}x",
            "Avg Confidence (Confirmed)": float(self.confirmed["confidence"].mean())
                if len(self.confirmed) > 0 else 0.0,
        }

UI Workflow Node

NO_CODE_MODEL node (before training)

{
  "type": "NO_CODE_MODEL",
  "id": "FusionMatch",
  "workflow": {
    "model_name": "Federated Entity Matching",
    "name": "FusionMatch",
    "partitioning": "horizontal",
    "type": "entity_matching",
    "class": "no-code"
  },
  "inputs": {
    "input_1": {"connections": [{"input": "output_1", "node": "<dataset_node_id>"}]}
  }
}

TRAINEDMODEL node (after completion)

{
  "type": "TRAINEDMODEL",
  "workflow": {
    "framework": "federated",
    "library": "federated",
    "model_name": "VRS Vulnerability Match",
    "definition": "FusionMatch",
    "status": "TRAINING COMPLETE",
    "type": "entity_matching",
    "nocode": true,
    "parameters": {
      "lens_id": "vrs_vulnerability_v1",
      "initial_threshold": 0.65,
      "confirmation_threshold": 0.80,
      "null_penalty": 0.1,
      "task_type": "Entity Matching",
      "output_type": "Match Candidates"
    },
    "performance": {
      "Total Candidates": 87,
      "Confirmed Matches": 18,
      "Proposed (Ambiguous)": 12,
      "Rejected": 57,
      "Comparison Reduction": "12.4x",
      "Avg Confidence (Confirmed)": 0.89
    }
  }
}

Test Fixtures

FIX-01: FusionMatch runs end-to-end

def test_fusion_match_e2e():
    config = {
        "lens_config": load_yaml("fixtures/vrs_vulnerability_v1.yaml"),
        "initial_threshold": 0.65,
        "confirmation_threshold": 0.80,
    }
    model = FusionMatch(config)

    df_a = load_dask_csv("fixtures/node_a_customers.csv")
    df_b = load_dask_csv("fixtures/node_b_customers.csv")

    # Apply feature extraction
    spec = model.lens_spec
    features_a = extract_features(df_a, spec)
    features_b = extract_features(df_b, spec)

    result = model.run(features_a, features_b)

    assert len(result.confirmed) > 0
    assert all(result.confirmed["confidence"] >= 0.80)
    assert result.blocking_stats.reduction_ratio > 1.0

FIX-02: Same results on Dask vs pandas

def test_dask_matches_pandas():
    """Dask-parallel results identical to pure Python."""
    config = load_yaml("fixtures/vrs_vulnerability_v1.yaml")
    # ... run both and compare

FIX-03: Performance metrics are correct

def test_performance_metrics():
    result = run_fusion_on_sample_data()
    perf = result.performance
    assert "Total Candidates" in perf
    assert "Confirmed Matches" in perf
    assert perf["Confirmed Matches"] + perf["Proposed (Ambiguous)"] + perf["Rejected"] == perf["Total Candidates"]

File Layout

titan/modeling/train/library/
├── fusion_match_op.py       # FusionMatch class
├── fusion_scorer.py         # dask_fusion_score
└── fusion_result.py         # FusionResult dataclass

titan/modeling/train/
└── federated_model_runner_v2.py   # Add "FusionMatch" to script_mapping

Integration Points


Depends on: component.parallax.blocking-engine, component.parallax.feature-extraction, component.parallax.lens-parser, component.parallax.scoring-engine, component.parallax.three-phase-protocol, component.titan.runtime

Realizes: product.fusion

Required by: component.parallax.adi-integration