ADI Integration (Signals + Evidence)
Status & scope
- Stage: POC — VRS Use Case
- Module:
cortex/tools/fusion.py - Milestone: M8 (ADI Signal Generation)
Purpose
Connect fusion output to the existing ADI decision workflow. Confirmed matches become Signals that enter the investigation → decision → attestation flow. Evidence blocks carry the full audit trail: which lens, which federates, at what confidence, with what provenance.
Key principle: Fusion is a source of signals and evidence, not a new workflow. All seven invariants are maintained.
Signal Generation
def generate_fusion_signal(
match: ScoredPair,
lens_spec: LensSpec,
run_id: str,
contributing_federates: list[str],
) -> dict:
"""Create an ADI Signal from a confirmed match.
Returns a Signal object matching the existing signal schema.
Type and severity come from lens output_semantics.
"""
return {
"schema_version": 2,
"signal_id": f"sig_fusion_{uuid4()}",
"signal_type": lens_spec.output_semantics.signal_type,
"source": {
"type": "computed",
"system_id": "fusion_engine",
"system_name": "Semantic Lens Fusion",
"adapter_id": "fusion_v1",
"raw_payload_ref": f"fusion_run:{run_id}",
},
"severity": lens_spec.output_semantics.signal_severity,
"title": _build_signal_title(match, lens_spec),
"subject": {
"type": "correlation",
"id": f"corr_{uuid4()}",
"name": _build_subject_name(match, lens_spec),
"federate": None,
},
"detected_at": datetime.utcnow().isoformat() + "Z",
"status": "new",
"confidence": match.confidence,
"metadata": {
"lens_id": lens_spec.lens_id,
"lens_version": lens_spec.version,
"run_id": run_id,
"contributing_sources": contributing_federates,
"per_field_scores": match.per_field_scores,
"correlation_type": lens_spec.output_semantics.correlation_type,
},
"visibility_context": {
"federates": contributing_federates,
"visibility_keys": [f"FUSION_{lens_spec.domain.upper()}_KEY"],
},
"deny_mode": "omit",
"assigned_to": {
"roles_any": _roles_for_domain(lens_spec.domain),
},
}
Evidence Block Generation
def create_fusion_evidence_block(
match: ScoredPair,
lens_spec: LensSpec,
run_id: str,
contributing_federates: list[str],
) -> dict:
"""Create an evidence block from a confirmed match.
block_kind: query_result
evidence_class: from lens output_semantics
query_hash: deterministic from (lens_id, version, run_id, entity pair)
fusion_provenance: full audit trail
"""
# Deterministic query_hash
qh_input = json.dumps({
"tool_name": "fusion_query",
"lens_id": lens_spec.lens_id,
"lens_version": lens_spec.version,
"run_id": run_id,
"entity_a": match.entity_id_a,
"entity_b": match.entity_id_b,
}, sort_keys=True)
query_hash = f"qh:{hashlib.sha256(qh_input.encode()).hexdigest()[:12]}"
return {
"id": f"blk_fusion_{uuid4()}",
"block_kind": "query_result",
"evidence_class": lens_spec.output_semantics.evidence_class,
"evidence_tags": ["fusion", lens_spec.domain, "multi-source"],
"ts": datetime.utcnow().isoformat() + "Z",
"outcome": "OK",
"materialization_mode": "live",
"origin_surface": "monitor",
"lifecycle_stage": "transient",
"query_hash": query_hash,
"result_hash": None, # Set when frozen
"projections": {
"rows": [{
"entity_id_a": match.entity_id_a,
"entity_id_b": match.entity_id_b,
"confidence": match.confidence,
"per_field_scores": json.dumps(match.per_field_scores),
"contributing_federates": len(contributing_federates),
}],
"column_meta": {
"entity_id_a": {"role": "id", "type": "string", "label": "Entity A"},
"entity_id_b": {"role": "id", "type": "string", "label": "Entity B"},
"confidence": {"role": "metric", "type": "number", "label": "Confidence", "format": "percent"},
"per_field_scores": {"role": "dimension", "type": "string", "label": "Field Scores"},
"contributing_federates": {"role": "metric", "type": "number", "label": "Sources"},
},
"viz_hints": {"recommended": "table", "alternatives": ["kpi_card"]},
},
"fusion_provenance": {
"lens_id": lens_spec.lens_id,
"lens_version": lens_spec.version,
"run_id": run_id,
"contributing_sources": [
{"federate_id": f, "confidence": match.confidence}
for f in contributing_federates
],
},
}
Evidence Freezing
def freeze_evidence(block: dict) -> dict:
"""Freeze an evidence block — SHA-256 locks the content forever.
Invariant 4: Frozen means frozen.
The result_hash includes fusion_provenance, so any change to
provenance data would invalidate the hash.
"""
content = json.dumps({
"projections": block["projections"],
"fusion_provenance": block["fusion_provenance"],
}, sort_keys=True)
block["result_hash"] = f"rh:{hashlib.sha256(content.encode()).hexdigest()}"
block["lifecycle_stage"] = "frozen"
return block
Seven Invariants Verification
def verify_invariants(signal: dict, block: dict, lens_spec: LensSpec) -> list[str]:
"""Verify all 7 Axonis invariants are maintained. Returns violations (empty = pass)."""
violations = []
# 1. UDS is sole ABAC authority
if "deny_mode" not in signal or signal["deny_mode"] != "omit":
violations.append("INV-1: Signal missing deny_mode: omit")
# 2. Events are append-only (structural — no UPDATE/DELETE in signal creation)
# 3. Blocks are evidence — query_hash is source of truth
if not block.get("query_hash", "").startswith("qh:"):
violations.append("INV-3: Block missing valid query_hash")
# 4. Frozen means frozen
if block["lifecycle_stage"] == "frozen" and not block.get("result_hash"):
violations.append("INV-4: Frozen block missing result_hash")
# 5. Editions require frozen evidence (verified at Edition submission)
# 6. AI assists, humans attest
if signal["source"]["type"] != "computed":
violations.append("INV-6: Fusion signal must be source.type: computed")
# 7. "No action" is a decision (structural — enforced by signal lifecycle)
return violations
Test Fixtures
FIX-01: Signal generation
def test_generate_fusion_signal():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
match = ScoredPair(
entity_id_a="A001", entity_id_b="B001",
confidence=0.95, per_field_scores={"name_match": 1.0, "dob_match": 1.0},
null_fields=[], null_count=0,
)
signal = generate_fusion_signal(match, spec, "run_001", ["node_a", "node_b"])
assert signal["signal_type"] == "fusion_vulnerability_match"
assert signal["severity"] == "high"
assert signal["confidence"] == 0.95
assert signal["source"]["type"] == "computed"
assert signal["deny_mode"] == "omit"
FIX-02: Evidence block with deterministic query_hash
def test_evidence_block_query_hash():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
match = ScoredPair("A001", "B001", 0.95, {}, [], 0)
block = create_fusion_evidence_block(match, spec, "run_001", None, ["node_a", "node_b"])
assert block["query_hash"].startswith("qh:")
assert block["lifecycle_stage"] == "transient"
assert block["evidence_class"] == "vrs_fusion_correlation"
# Same inputs → same query_hash
block2 = create_fusion_evidence_block(match, spec, "run_001", None, ["node_a", "node_b"])
assert block["query_hash"] == block2["query_hash"]
FIX-03: Freeze evidence
def test_freeze_evidence():
block = create_sample_evidence_block()
assert block["lifecycle_stage"] == "transient"
assert block["result_hash"] is None
frozen = freeze_evidence(block)
assert frozen["lifecycle_stage"] == "frozen"
assert frozen["result_hash"].startswith("rh:")
# Deterministic
frozen2 = freeze_evidence(create_sample_evidence_block())
assert frozen["result_hash"] == frozen2["result_hash"]
FIX-04: Invariant verification passes
def test_invariants_pass():
spec = parse_lens("fixtures/vrs_vulnerability_v1.yaml")
match = ScoredPair("A001", "B001", 0.95, {}, [], 0)
signal = generate_fusion_signal(match, spec, "run_001", ["node_a", "node_b"])
block = create_fusion_evidence_block(match, spec, "run_001", None, ["node_a", "node_b"])
frozen = freeze_evidence(block)
violations = verify_invariants(signal, frozen, spec)
assert violations == []
File Layout
cortex/tools/
├── fusion.py # generate_fusion_signal, create_fusion_evidence_block,
│ # freeze_evidence, verify_invariants
└── tests/
└── test_fusion_adi.py
Integration Points
- component.parallax.fusionmatch-model → here: FusionResult.confirmed → signal + evidence generation
- Here → ADI workflow: Signals appear on Monitor page, trigger investigations
- Here → existing signal schema: No schema changes — uses existing signal + block contracts
Depends on: component.parallax.fusionmatch-model
Realizes: product.fusion, product.signal