Skip to content

Continuous Fusion — Incremental Entity Management + Signal Emission

Status & scope

  • Stage: DRAFT
  • Module: athena/ops/fusion/continuous/ (new)
  • Part of: Umbrella #6 · Sentinel service (Oracle architecture)
  • Milestone: M5 (Operational System)

Problem

Fusion today is a batch tool. You invoke run_fusion(node_a, node_b) on two complete datasets and get matches back. This works for VRS-style screening ("here are 175 customers, screen them now") but fails for operational scenarios where:

  1. Records arrive continuously. A sensor reading, a transaction, an AIS ping, a new customer registration. You can't wait to accumulate a batch.
  2. Matches must be assessed against known state. "Is this the same vessel we tracked yesterday?" requires matching the new record against the existing correlation store — not against a second dataset.
  3. Impact propagates. A new match on entity X means three open investigations that reference X now have new evidence. Two Prism lens scores that consume X's correlation as an input layer now have stale scores. This propagation doesn't happen today.
  4. Not every match deserves a signal. A low-confidence match on a low-priority entity in an area with no open investigations is noise. A high-confidence match on a watchlisted entity with three open investigations is urgent. Today there's no filtering — every match above threshold looks the same.

The result: Axonis is an analytical tool (run when asked), not an operational system (run continuously, emit when something changes). The gap is the always-on entity manager.

Decision

Build a continuous fusion pipeline that: - Listens for ingest events (record arrival) - Runs incremental matching against the persistent correlation store (component.parallax.local-persistence-adapter) - Maintains entity state (create / update / conflict) - Assesses downstream impact (what changed because of this match) - Filters and prioritizes signals (suppress noise, promote urgency) - Emits typed computed signals through the promotion boundary into DES

This is NOT a rewrite of run_fusion. Batch fusion stays for bulk screening. Continuous fusion is a new pipeline that reuses the same primitives (metrics, blocking, scoring, clustering) but operates record-by-record against a persistent store instead of set-by-set in memory.

Architecture

Ingest Event
(Airflow callback, webhook, ES write trigger, manual)
        │
        ▼
┌──────────────────────────────────────────────────────┐
│  ContinuousFusionPipeline                            │
│                                                      │
│  ┌─────────────────────┐                             │
│  │ 1. NORMALIZE + DERIVE│  Same primitives as batch  │
│  │    (per lens spec)   │  NM-01..NM-19 + derivations│
│  └──────────┬──────────┘                             │
│             ▼                                        │
│  ┌─────────────────────┐                             │
│  │ 2. INCREMENTAL MATCH │  Block against stored      │
│  │    vs correlation    │  entities, not a second     │
│  │    store             │  dataset                    │
│  └──────────┬──────────┘                             │
│             │                                        │
│     ┌───────┼───────┐                                │
│     ▼       ▼       ▼                                │
│   NEW     MATCH   CONFLICT                           │
│   ENTITY  (update  (two stored                       │
│   (store) cluster) entities claim                    │
│                    this record)                      │
│             │                                        │
│             ▼                                        │
│  ┌─────────────────────┐                             │
│  │ 3. IMPACT ASSESSMENT │  What downstream state     │
│  │    (reverse index)   │  changed?                  │
│  └──────────┬──────────┘                             │
│             ▼                                        │
│  ┌─────────────────────┐                             │
│  │ 4. SIGNAL TRIAGE     │  Score: urgency ×          │
│  │    + FILTER          │  confidence × novelty ×    │
│  │                      │  impact. Suppress or       │
│  │                      │  promote.                  │
│  └──────────┬──────────┘                             │
│             ▼                                        │
│  Emit computed signal (typed, with signal_class      │
│  = "computed", producing_engine = "fusion",          │
│  producing_lens_id = lens_id)                        │
│                                                      │
└──────────────────────────────────────────────────────┘
        │
        ▼
  Promotion Boundary → DES Signal → Investigation lifecycle

Public API

Core pipeline

# parallax/ops/fusion/continuous/pipeline.py

@dataclass(frozen=True)
class IncrementalMatchResult:
    """Result of matching one record against the correlation store."""
    outcome: str               # "new_entity" | "matched" | "conflict"
    record: dict               # the incoming record (normalized)
    correlation_id: str | None # if matched or conflict
    confidence: float          # match confidence (0 if new_entity)
    per_field_scores: dict     # field-level breakdown
    candidate_count: int       # how many candidates were evaluated
    blocking_keys: list[str]   # keys used for lookup


@dataclass(frozen=True)
class ImpactEvent:
    """A downstream state change caused by a new match."""
    affected_type: str         # "investigation" | "lens_score" | "edition"
    affected_id: str           # the investigation/lens/edition that changed
    reason: str                # human-readable impact description
    severity: str              # "high" | "medium" | "low"


@dataclass(frozen=True)
class TriagedSignal:
    """A computed signal with triage scoring applied."""
    signal_class: str          # "computed"
    producing_engine: str      # "fusion"
    producing_lens_id: str
    match_result: IncrementalMatchResult
    impact_events: list[ImpactEvent]
    triage_score: float        # composite priority score
    suppressed: bool           # True if below triage threshold
    suppression_reason: str    # why it was suppressed (if applicable)


def match_incoming(
    record: dict,
    store: CorrelationStore,
    lens_spec: LensSpec,
    threshold: float | None = None,
    null_penalty: float = 0.1,
) -> IncrementalMatchResult:
    """Match one incoming record against all stored correlations for this lens.

    Steps:
      1. Normalize the record using the lens spec's field groupings.
      2. Derive features (one-way transforms for federation safety).
      3. Compute blocking keys.
      4. Query the store for existing correlations whose blocking keys
         overlap with the incoming record's keys.
      5. Score the incoming record against each candidate's stored
         derived features.
      6. Classify: new_entity (no candidates above threshold),
         matched (one candidate above threshold), or conflict
         (multiple candidates above threshold, or same-source
         contradiction).

    Does NOT write to the store. Returns the result for the caller
    to decide whether to persist (separation of concerns —
    the pipeline decides, the caller commits).
    """


def assess_impact(
    match_result: IncrementalMatchResult,
    store: CorrelationStore,
) -> list[ImpactEvent]:
    """Given a match result, determine what downstream state changed.

    Uses the observation reverse index (component.parallax.observation-reverse-index) to find:
      - Open investigations referencing the affected entity
      - Active lens scores that consume the affected correlation
      - Pending editions citing evidence from affected blocks

    Returns impact events sorted by severity descending.
    """


def triage_signal(
    match_result: IncrementalMatchResult,
    impacts: list[ImpactEvent],
    context: TriageContext,
) -> TriagedSignal:
    """Score and filter a potential signal before emission.

    Triage score = urgency × confidence × novelty × impact, where:
      - urgency: lens-configured base urgency for this signal type
      - confidence: match_result.confidence
      - novelty: 1.0 if entity is new or hasn't been seen in > decay_period;
                 decays toward 0 for frequently-matched entities
      - impact: max(impact_event.severity) scaled to [0, 1]

    Suppression rules (configurable per lens):
      - Below triage_threshold → suppress with reason
      - Duplicate of a signal emitted within dedup_window → suppress
      - Entity on a suppress_list → suppress with reason
    """

Triage configuration (in lens spec)

# Extension to lens spec for continuous mode
continuous:
  enabled: true
  trigger: ingest_event           # ingest_event | webhook | schedule
  triage:
    threshold: 0.3                # below this triage score → suppress
    dedup_window: "1h"            # suppress duplicate signals within window
    urgency_base: 0.5             # lens-level urgency weight
    suppress_list: []             # entity IDs to always suppress
  impact:
    check_investigations: true    # look for affected open investigations
    check_lens_scores: true       # look for affected Prism scores
    check_editions: false         # don't check edition impact (expensive)

Runner (CLI + daemon)

# parallax/ops/fusion/continuous/runner.py

def run_continuous_once(
    record: dict,
    store: CorrelationStore,
    lens_spec: LensSpec,
    emit_signal: Callable[[TriagedSignal], None],  # callback for signal emission
    dry_run: bool = False,
) -> TriagedSignal:
    """Full pipeline: match → impact → triage → emit. One record."""


def run_continuous_batch(
    records: list[dict],
    store: CorrelationStore,
    lens_spec: LensSpec,
    emit_signal: Callable[[TriagedSignal], None],
    dry_run: bool = False,
) -> list[TriagedSignal]:
    """Process a batch of incoming records through the continuous pipeline.
    Each record is matched incrementally (not against each other —
    against the store). Order-dependent: record N may match against
    an entity created by record N-1.
    """

How it reuses existing primitives

Step Existing primitive How continuous uses it
Normalize normalize_records() (NM-01..NM-19) Called per-record instead of per-dataset
Derive derive_features() + DERIVATION_TYPE_REGISTRY Same derived features, same privacy envelope
Block generate_blocking_keys() + _BLOCKING_METHODS Keys generated for incoming record, then used as a QUERY against the store (not a cross-product)
Score score_pair_derived() + METRIC_REGISTRY Same scorer — one pair at a time (incoming vs each candidate)
Cluster UnionFind + EntityCluster Update existing cluster with new member, or create new cluster
Conflict EntityCluster.contradictions Same contradiction detection
Reverse index component.parallax.observation-reverse-index find_correlations_by_observation() Impact assessment queries affected investigations
Decay apply_decay() + component.parallax.decay-scheduler Novelty scoring uses decay clock

No new primitives. The continuous pipeline is a new orchestration over existing compute.

Store interaction pattern

match_incoming(record, store, spec)
    │
    │ READ: store.list_correlations(lens_id, status=PROPOSED|CONFIRMED)
    │ READ: blocking keys from stored correlations
    │
    ├─→ NEW ENTITY:
    │     WRITE: store.write_correlation(new_cr)  # status=PROPOSED
    │     WRITE: store.append_lineage(cr_id, CREATED event)
    │
    ├─→ MATCH:
    │     WRITE: store.append_lineage(cr_id, RECORD_ADDED event)
    │     WRITE: update confidence on correlation
    │
    └─→ CONFLICT:
          WRITE: store.append_lineage(cr_id, CONFLICT_DETECTED event)
          # Does NOT auto-resolve — emits attention signal for human

All writes go through CorrelationStore interface (component.parallax.local-persistence-adapter). Works on SQLite (standalone) or ES (production). The continuous pipeline never touches storage directly.

New LineageAction type

class LineageAction(str, Enum):
    # ... existing ...
    RECORD_MATCHED_INCREMENTAL = "record_matched_incremental"  # NEW
    CONFLICT_DETECTED = "conflict_detected"                    # NEW

These distinguish continuous-mode events from batch-mode events in the lineage. Analysts reviewing the ledger can see whether a match came from a batch run or an incremental arrival.

Invariants

  1. Match is read-only until the caller commits. match_incoming() returns a result but does NOT write to the store. The caller (runner, service, test) decides whether to persist. This enables dry-run, preview, and rollback.
  2. Order matters. Records processed in sequence N, N+1, N+2 may produce different results than N+2, N, N+1 (because N creates an entity that N+1 matches against). The pipeline is deterministic for a given order but not commutative. Document this.
  3. Triage is conservative by default. If the triage threshold is unset, emit all signals (no suppression). Suppression is opt-in per lens.
  4. Conflicts always emit. Even if triage would suppress a match signal, a conflict signal is never suppressed — it always reaches a human.
  5. Impact assessment is bounded. Check investigations and lens scores by default. Edition impact check is opt-in (expensive — requires reading all pending editions for the entity).

Testing

Tests in tests/test_continuous_fusion.py.

Core pipeline tests

  • test_match_incoming_new_entity — first record for a lens, no store state → NEW_ENTITY
  • test_match_incoming_match — second record matching a stored entity → MATCHED
  • test_match_incoming_conflict — record matches two different stored entities → CONFLICT
  • test_match_incoming_below_threshold — close but not close enough → NEW_ENTITY (not a match)
  • test_match_incoming_uses_blocking_acceleration — only candidates with overlapping blocking keys are scored (not full scan)
  • test_match_incoming_does_not_write — verify store is unchanged after match_incoming returns

Impact assessment tests

  • test_impact_finds_affected_investigation — match on entity X, investigation I references X → impact event
  • test_impact_no_affected_state — match on entity with no downstream references → empty list
  • test_impact_severity_ranking — multiple impacts sorted by severity

Triage tests

  • test_triage_suppresses_below_threshold — low confidence + low impact → suppressed
  • test_triage_promotes_high_impact — high confidence + open investigation → high triage score
  • test_triage_dedup_within_window — same entity matched twice in 30 minutes → second suppressed
  • test_triage_conflict_never_suppressed — conflict signal emitted regardless of triage score
  • test_triage_novelty_decay — frequently-seen entity scores lower novelty

Integration tests

  • test_continuous_batch_order_dependent — process [A, B, C] vs [C, A, B], verify different entity counts
  • test_continuous_end_to_end — 50 records through the pipeline, verify: entities created, matches found, impacts assessed, signals emitted, lineage correct
  • test_continuous_with_decay_scheduler — run continuous + decay in parallel, verify interactions are clean

Performance

  • test_incremental_match_latency — single record against 10K stored correlations, P99 < 200ms
  • test_continuous_batch_throughput — 1000 records/second on commodity hardware with SQLite store

Acceptance

  • [ ] match_incoming() works end-to-end against SQLite-backed CorrelationStore
  • [ ] Three outcomes (new_entity / matched / conflict) all tested
  • [ ] Impact assessment finds affected investigations via reverse index
  • [ ] Triage scoring with suppression + dedup + novelty decay all tested
  • [ ] Conflict signals never suppressed
  • [ ] Continuous lens config section parseable by lens_parser
  • [ ] CLI: axonis-cli continuous --lens <path> --record <json> processes one record
  • [ ] CLI: axonis-cli continuous --lens <path> --stream <file> processes a JSONL stream
  • [ ] Performance: P99 < 200ms per record against 10K stored correlations
  • [ ] End-to-end demo: examples/continuous_fusion_demo.py processes a VRS-style stream and emits signals

Rollback plan

Continuous fusion is a new module (parallax/ops/fusion/continuous/). Nothing in batch fusion or the three-phase protocol depends on it.

Rollback: 1. Delete parallax/ops/fusion/continuous/ 2. Remove RECORD_MATCHED_INCREMENTAL and CONFLICT_DETECTED from LineageAction enum (additive — no consumer breaks) 3. Remove continuous: section from any lens specs that use it (optional config — parser ignores unknown keys)

Batch fusion, three-phase protocol, and all existing functionality are unchanged.

What this enables

Capability Before component.parallax.continuous-fusion After component.parallax.continuous-fusion
"Is this new person someone we've seen?" Run batch fusion manually Automatic on ingest
"What changed because of this match?" Manual investigation Impact events emitted
"Should I care about this match?" Every match looks the same Triage score prioritizes
"How fast can I respond to a new threat?" Minutes to hours (batch cycle) Seconds (per-record)
Sentinel service (Oracle architecture) Not buildable — no continuous compute Sentinel = this pipeline + threshold monitoring
DES computed signals from engines Not emitted — engines write directly to ES Signals flow through promotion boundary

§B — Prism Continuous Reassessment Extension

Same pattern, different engine. Prism's continuous mode is triggered by Parallax's output — when an entity changes, Prism re-scores any lens that references it.

Trigger cascade

Parallax continuous (§A above)
  │ emits: computed signal (entity_changed)
  │   payload: { entity_id, correlation_id, delta_confidence, new_members }
  │
  ▼
Prism continuous (this section)
  │ "which lenses use entity X as an input layer?"
  │ re-score each → compare previous vs new
  │ emit only if score CHANGED meaningfully
  │
  ▼ emits: computed signal (assessment_changed)
  │   payload: { lens_id, entity_id, previous_score, new_score, delta,
  │              previous_status, new_status, status_changed, layers_affected }
  │
  ▼
Sentinel (triage + routing) → DES

Public API

# axonis-lens/lens/continuous/pipeline.py

@dataclass(frozen=True)
class EntityChangeEvent:
    """Produced by Parallax continuous. Consumed by Prism continuous."""
    entity_id: str
    correlation_id: str
    producing_lens_id: str
    delta_confidence: float
    new_members: list[str]       # record_refs added to the cluster
    timestamp: str               # ISO8601


@dataclass(frozen=True)
class ReassessmentResult:
    """One lens re-scored for one entity."""
    lens_id: str
    entity_id: str
    previous_score: float
    new_score: float
    delta: float
    previous_status: str         # NORMAL / ADVISORY / ELEVATED / HIGH / CRITICAL
    new_status: str
    status_changed: bool         # True if threshold crossed
    layers_affected: dict        # { layer_name: { old: float, new: float } }
    evidence_id: str             # frozen evidence block for the reassessment


def reassess_on_change(
    event: EntityChangeEvent,
    store: AssessmentStore,
    lens_specs: list[LensSpec],
    noise_threshold: float = 0.02,
) -> list[ReassessmentResult]:
    """Re-score every lens affected by an entity change.

    Steps:
      1. Query store for lenses whose input layers reference event.entity_id.
      2. For each affected lens, re-run execute_lens with updated inputs.
      3. Compare new score vs stored previous score.
      4. Return results ONLY where abs(delta) > noise_threshold.
         Unchanged scores are silent — no signal emitted.
      5. Update stored score for next comparison.

    Does NOT emit signals directly — returns results for the caller
    (Sentinel or runner) to decide how to emit.
    """

AssessmentStore (new — mirrors CorrelationStore pattern)

# axonis-lens/lens/continuous/store.py

class AssessmentStore(ABC):
    """Persistent store for lens assessment scores per entity.

    Same interface pattern as CorrelationStore (component.parallax.local-persistence-adapter). SQLite
    backend for standalone; ES backend for production.
    """

    @abstractmethod
    def read_score(self, lens_id: str, entity_id: str) -> StoredScore | None:
        """Last known score for this entity under this lens."""

    @abstractmethod
    def write_score(self, lens_id: str, entity_id: str, score: StoredScore) -> None:
        """Persist the latest score. Append-only — history preserved."""

    @abstractmethod
    def find_lenses_for_entity(self, entity_id: str) -> list[str]:
        """Which lenses have ever scored this entity? Reverse lookup."""

What it reuses from existing Prism

Step Existing primitive How continuous uses it
Re-score a lens execute_lens() / run_lens() Called per affected lens — same compute, just triggered by event instead of by analyst
Layer scoring COST_PRIMITIVES (10 cost fns) Same cost models, same weights
Threshold classification threshold.evaluate() Same thresholds — status_changed is the delta
Evidence freeze evidence.create_evidence_block() Each reassessment produces a frozen evidence block

Configuration (in Prism lens spec)

# Extension to Prism lens spec for continuous mode
continuous:
  enabled: true
  trigger: entity_change          # entity_change | schedule | manual
  noise_threshold: 0.02           # suppress deltas below this
  reassess_layers:                # which layers to re-evaluate (default: all)
    - outage_severity
    - weather_threat

Invariants

  1. Silent when nothing changed. If re-scoring produces delta < noise_threshold, no signal is emitted, no stored score updated.
  2. Status change always emits. Even if delta is small, crossing a threshold boundary (ADVISORY → ELEVATED) always produces a signal.
  3. Evidence is frozen per reassessment. Every re-score that produces a signal also produces a frozen evidence block with the previous and new state.
  4. Store is append-only. Previous scores are history, not overwritten. Enables replay and trend analysis.

Testing

  • test_reassess_no_change_silent — entity updated but score delta < threshold → no result
  • test_reassess_status_change_emits — score crosses ADVISORY → ELEVATED → result emitted
  • test_reassess_multiple_lenses — entity change affects 3 lenses → 3 reassessments
  • test_reassess_evidence_frozen — each result has a valid evidence block with hash
  • test_cascade_parallax_to_prism — end-to-end: Parallax match_incoming → entity_change → Prism reassess → signal

Rollback

Delete axonis-lens/lens/continuous/. No Prism batch compute affected. AssessmentStore is new — empty on removal. Parallax continuous works independently of Prism continuous.


§C — Shared Operations: axonis.core Extraction

The continuous pipelines in both engines need a handful of shared pure functions that today exist independently in athena, parallax-prod, and axonis-lens (reimplemented 2-3 times each). Rather than unify all operations, extract only the proven shared ones into the axonis.core foundation package.

What moves to axonis.core

axonis.core/
├── transforms/                     ← pure functions, zero heavy deps
│   ├── scale.py                    ← min_max_normalize, log_scale, standard_scale
│   │                                  (from athena/ops/scale.py + parallax numeric.py)
│   ├── encode.py                   ← categorical encoding, one-hot
│   │                                  (from athena/ops/encode.py)
│   └── codecs.py                   ← numpy ↔ bytes, JSON serialization helpers
│                                      (from athena/libs/codecs.py)
├── pipeline/
│   └── dag.py                      ← topological sort + DAG execution ordering
│                                      (from athena/libs/topological_graph.py)
├── federation/
│   └── mergers.py                  ← merge federated query results
│                                      (from athena/libs/mergers.py)

What STAYS in each engine (does NOT move)

Engine Keeps Why
Athena All ML ops (PCA, t-SNE, UMAP, FFT, image, raster, spaCy NLP, augmentation) Heavy deps (scikit-learn, opencv, GDAL). ML-training-specific.
Parallax All fusion ops (metrics, blocking, scoring, clustering, tracking, derived features, transliteration, aliases) Engine-specialized. More mature than athena equivalents.
Prism All cost models, layer scoring, surface compute, observation primitives Engine-specialized. Different domain.

What gets DELETED after extraction (dedup)

Duplicate In Canonical moves to
min_max_normalize parallax numeric.py + athena scale.py axonis.core.transforms.scale
log_scale parallax numeric.py + athena scale.py axonis.core.transforms.scale
codecs (numpy ↔ bytes) athena libs/codecs.py (only copy but needed by multiple engines) axonis.core.transforms.codecs

Effort

Task Weeks
Create axonis.core package with 5 files 0.5
Update imports in parallax-prod (replace local scale/codecs with core) 0.5
Update imports in atlas-fl-athena (replace libs/codecs, libs/mergers, libs/topological_graph with core) 0.5
Tests: ensure existing test suites pass with imports redirected 0.5
Total 2 weeks

Invariant

axonis.core NEVER imports scikit-learn, opencv, GDAL, dask, torch, or any heavy ML dependency. If a function requires these, it stays in the engine that owns the dependency. Core is pure Python + numpy (numpy is acceptable — it's already everywhere).


Out of scope (tracked separately)

  • Cross-org continuous fusion (federation + continuous) → requires Xanadu integration + streaming protocol, not batch three-phase
  • Backfill mode ("replay historical ingest through continuous pipeline") → use replay_fusion (#8) for historical analysis, continuous for forward-looking
  • Auto-investigation creation ("match of sufficient importance auto-opens an investigation") → DES lifecycle concern, not fusion concern. Cortex decides whether a signal becomes an investigation.
  • Streaming transport (Kafka, Pulsar, Redis Streams) → v1 uses callback pattern; transport is pluggable. Don't over-invest in transport before the pipeline logic is proven.

Depends on: component.parallax.attestation-rationale, component.parallax.decay-scheduler, component.parallax.local-persistence-adapter, component.parallax.observation-reverse-index

Realizes: product.fusion

Required by: component.parallax.wire-message-families