Skip to content

ADI Integration (Signals + Evidence)

Status & scope

  • Stage: POC — VRS Use Case
  • Module: cortex/tools/fusion.py
  • Milestone: M8 (ADI Signal Generation)

Purpose

Connect fusion output to the existing ADI decision workflow. Confirmed matches become Signals that enter the investigation → decision → attestation flow. Evidence blocks carry the full audit trail: which lens, which federates, at what confidence, with what provenance.

Key principle: Fusion is a source of signals and evidence, not a new workflow. All seven invariants are maintained.

Signal Generation

def generate_fusion_signal(
    match: ScoredPair,
    lens_spec: LensSpec,
    run_id: str,
    contributing_federates: list[str],
) -> dict:
    """Create an ADI Signal from a confirmed match.

    Returns a Signal object matching the existing signal schema.
    Type and severity come from lens output_semantics.
    """
    return {
        "schema_version": 2,
        "signal_id": f"sig_fusion_{uuid4()}",
        "signal_type": lens_spec.output_semantics.signal_type,
        "source": {
            "type": "computed",
            "system_id": "fusion_engine",
            "system_name": "Semantic Lens Fusion",
            "adapter_id": "fusion_v1",
            "raw_payload_ref": f"fusion_run:{run_id}",
        },
        "severity": lens_spec.output_semantics.signal_severity,
        "title": _build_signal_title(match, lens_spec),
        "subject": {
            "type": "correlation",
            "id": f"corr_{uuid4()}",
            "name": _build_subject_name(match, lens_spec),
            "federate": None,
        },
        "detected_at": datetime.utcnow().isoformat() + "Z",
        "status": "new",
        "confidence": match.confidence,
        "metadata": {
            "lens_id": lens_spec.lens_id,
            "lens_version": lens_spec.version,
            "run_id": run_id,
            "contributing_sources": contributing_federates,
            "per_field_scores": match.per_field_scores,
            "correlation_type": lens_spec.output_semantics.correlation_type,
        },
        "visibility_context": {
            "federates": contributing_federates,
            "visibility_keys": [f"FUSION_{lens_spec.domain.upper()}_KEY"],
        },
        "deny_mode": "omit",
        "assigned_to": {
            "roles_any": _roles_for_domain(lens_spec.domain),
        },
    }

Evidence Block Generation

def create_fusion_evidence_block(
    match: ScoredPair,
    lens_spec: LensSpec,
    run_id: str,
    contributing_federates: list[str],
) -> dict:
    """Create an evidence block from a confirmed match.

    block_kind: query_result
    evidence_class: from lens output_semantics
    query_hash: deterministic from (lens_id, version, run_id, entity pair)
    fusion_provenance: full audit trail
    """
    # Deterministic query_hash
    qh_input = json.dumps({
        "tool_name": "fusion_query",
        "lens_id": lens_spec.lens_id,
        "lens_version": lens_spec.version,
        "run_id": run_id,
        "entity_a": match.entity_id_a,
        "entity_b": match.entity_id_b,
    }, sort_keys=True)
    query_hash = f"qh:{hashlib.sha256(qh_input.encode()).hexdigest()[:12]}"

    return {
        "id": f"blk_fusion_{uuid4()}",
        "block_kind": "query_result",
        "evidence_class": lens_spec.output_semantics.evidence_class,
        "evidence_tags": ["fusion", lens_spec.domain, "multi-source"],
        "ts": datetime.utcnow().isoformat() + "Z",
        "outcome": "OK",
        "materialization_mode": "live",
        "origin_surface": "monitor",
        "lifecycle_stage": "transient",
        "query_hash": query_hash,
        "result_hash": None,  # Set when frozen
        "projections": {
            "rows": [{
                "entity_id_a": match.entity_id_a,
                "entity_id_b": match.entity_id_b,
                "confidence": match.confidence,
                "per_field_scores": json.dumps(match.per_field_scores),
                "contributing_federates": len(contributing_federates),
            }],
            "column_meta": {
                "entity_id_a": {"role": "id", "type": "string", "label": "Entity A"},
                "entity_id_b": {"role": "id", "type": "string", "label": "Entity B"},
                "confidence": {"role": "metric", "type": "number", "label": "Confidence", "format": "percent"},
                "per_field_scores": {"role": "dimension", "type": "string", "label": "Field Scores"},
                "contributing_federates": {"role": "metric", "type": "number", "label": "Sources"},
            },
            "viz_hints": {"recommended": "table", "alternatives": ["kpi_card"]},
        },
        "fusion_provenance": {
            "lens_id": lens_spec.lens_id,
            "lens_version": lens_spec.version,
            "run_id": run_id,
            "contributing_sources": [
                {"federate_id": f, "confidence": match.confidence}
                for f in contributing_federates
            ],
        },
    }

Evidence Freezing

def freeze_evidence(block: dict) -> dict:
    """Freeze an evidence block — SHA-256 locks the content forever.

    Invariant 4: Frozen means frozen.
    The result_hash includes fusion_provenance, so any change to
    provenance data would invalidate the hash.
    """
    content = json.dumps({
        "projections": block["projections"],
        "fusion_provenance": block["fusion_provenance"],
    }, sort_keys=True)

    block["result_hash"] = f"rh:{hashlib.sha256(content.encode()).hexdigest()}"
    block["lifecycle_stage"] = "frozen"
    return block

Seven Invariants Verification

def verify_invariants(signal: dict, block: dict, lens_spec: LensSpec) -> list[str]:
    """Verify all 7 Axonis invariants are maintained. Returns violations (empty = pass)."""
    violations = []

    # 1. UDS is sole ABAC authority
    if "deny_mode" not in signal or signal["deny_mode"] != "omit":
        violations.append("INV-1: Signal missing deny_mode: omit")

    # 2. Events are append-only (structural — no UPDATE/DELETE in signal creation)

    # 3. Blocks are evidence — query_hash is source of truth
    if not block.get("query_hash", "").startswith("qh:"):
        violations.append("INV-3: Block missing valid query_hash")

    # 4. Frozen means frozen
    if block["lifecycle_stage"] == "frozen" and not block.get("result_hash"):
        violations.append("INV-4: Frozen block missing result_hash")

    # 5. Editions require frozen evidence (verified at Edition submission)

    # 6. AI assists, humans attest
    if signal["source"]["type"] != "computed":
        violations.append("INV-6: Fusion signal must be source.type: computed")

    # 7. "No action" is a decision (structural — enforced by signal lifecycle)

    return violations

Test Fixtures

FIX-01: Signal generation

def test_generate_fusion_signal():
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    match = ScoredPair(
        entity_id_a="A001", entity_id_b="B001",
        confidence=0.95, per_field_scores={"name_match": 1.0, "dob_match": 1.0},
        null_fields=[], null_count=0,
    )
    signal = generate_fusion_signal(match, spec, "run_001", ["node_a", "node_b"])

    assert signal["signal_type"] == "fusion_vulnerability_match"
    assert signal["severity"] == "high"
    assert signal["confidence"] == 0.95
    assert signal["source"]["type"] == "computed"
    assert signal["deny_mode"] == "omit"

FIX-02: Evidence block with deterministic query_hash

def test_evidence_block_query_hash():
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    match = ScoredPair("A001", "B001", 0.95, {}, [], 0)
    block = create_fusion_evidence_block(match, spec, "run_001", None, ["node_a", "node_b"])

    assert block["query_hash"].startswith("qh:")
    assert block["lifecycle_stage"] == "transient"
    assert block["evidence_class"] == "vrs_fusion_correlation"

    # Same inputs → same query_hash
    block2 = create_fusion_evidence_block(match, spec, "run_001", None, ["node_a", "node_b"])
    assert block["query_hash"] == block2["query_hash"]

FIX-03: Freeze evidence

def test_freeze_evidence():
    block = create_sample_evidence_block()
    assert block["lifecycle_stage"] == "transient"
    assert block["result_hash"] is None

    frozen = freeze_evidence(block)
    assert frozen["lifecycle_stage"] == "frozen"
    assert frozen["result_hash"].startswith("rh:")

    # Deterministic
    frozen2 = freeze_evidence(create_sample_evidence_block())
    assert frozen["result_hash"] == frozen2["result_hash"]

FIX-04: Invariant verification passes

def test_invariants_pass():
    spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
    match = ScoredPair("A001", "B001", 0.95, {}, [], 0)
    signal = generate_fusion_signal(match, spec, "run_001", ["node_a", "node_b"])
    block = create_fusion_evidence_block(match, spec, "run_001", None, ["node_a", "node_b"])
    frozen = freeze_evidence(block)

    violations = verify_invariants(signal, frozen, spec)
    assert violations == []

File Layout

cortex/tools/
├── fusion.py               # generate_fusion_signal, create_fusion_evidence_block,
│                           # freeze_evidence, verify_invariants
└── tests/
    └── test_fusion_adi.py

Integration Points

  • component.parallax.fusionmatch-model → here: FusionResult.confirmed → signal + evidence generation
  • Here → ADI workflow: Signals appear on Monitor page, trigger investigations
  • Here → existing signal schema: No schema changes — uses existing signal + block contracts

Depends on: component.parallax.fusionmatch-model

Realizes: product.fusion, product.signal