Continuous Fusion — Incremental Entity Management + Signal Emission
Status & scope
- Stage: DRAFT
- Module:
athena/ops/fusion/continuous/(new) - Part of: Umbrella #6 · Sentinel service (Oracle architecture)
- Milestone: M5 (Operational System)
Problem
Fusion today is a batch tool. You invoke run_fusion(node_a, node_b) on two complete datasets and get matches back. This works for VRS-style screening ("here are 175 customers, screen them now") but fails for operational scenarios where:
- Records arrive continuously. A sensor reading, a transaction, an AIS ping, a new customer registration. You can't wait to accumulate a batch.
- Matches must be assessed against known state. "Is this the same vessel we tracked yesterday?" requires matching the new record against the existing correlation store — not against a second dataset.
- Impact propagates. A new match on entity X means three open investigations that reference X now have new evidence. Two Prism lens scores that consume X's correlation as an input layer now have stale scores. This propagation doesn't happen today.
- Not every match deserves a signal. A low-confidence match on a low-priority entity in an area with no open investigations is noise. A high-confidence match on a watchlisted entity with three open investigations is urgent. Today there's no filtering — every match above threshold looks the same.
The result: Axonis is an analytical tool (run when asked), not an operational system (run continuously, emit when something changes). The gap is the always-on entity manager.
Decision
Build a continuous fusion pipeline that:
- Listens for ingest events (record arrival)
- Runs incremental matching against the persistent correlation store (component.parallax.local-persistence-adapter)
- Maintains entity state (create / update / conflict)
- Assesses downstream impact (what changed because of this match)
- Filters and prioritizes signals (suppress noise, promote urgency)
- Emits typed computed signals through the promotion boundary into DES
This is NOT a rewrite of run_fusion. Batch fusion stays for bulk screening. Continuous fusion is a new pipeline that reuses the same primitives (metrics, blocking, scoring, clustering) but operates record-by-record against a persistent store instead of set-by-set in memory.
Architecture
Ingest Event
(Airflow callback, webhook, ES write trigger, manual)
│
▼
┌──────────────────────────────────────────────────────┐
│ ContinuousFusionPipeline │
│ │
│ ┌─────────────────────┐ │
│ │ 1. NORMALIZE + DERIVE│ Same primitives as batch │
│ │ (per lens spec) │ NM-01..NM-19 + derivations│
│ └──────────┬──────────┘ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ 2. INCREMENTAL MATCH │ Block against stored │
│ │ vs correlation │ entities, not a second │
│ │ store │ dataset │
│ └──────────┬──────────┘ │
│ │ │
│ ┌───────┼───────┐ │
│ ▼ ▼ ▼ │
│ NEW MATCH CONFLICT │
│ ENTITY (update (two stored │
│ (store) cluster) entities claim │
│ this record) │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ 3. IMPACT ASSESSMENT │ What downstream state │
│ │ (reverse index) │ changed? │
│ └──────────┬──────────┘ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ 4. SIGNAL TRIAGE │ Score: urgency × │
│ │ + FILTER │ confidence × novelty × │
│ │ │ impact. Suppress or │
│ │ │ promote. │
│ └──────────┬──────────┘ │
│ ▼ │
│ Emit computed signal (typed, with signal_class │
│ = "computed", producing_engine = "fusion", │
│ producing_lens_id = lens_id) │
│ │
└──────────────────────────────────────────────────────┘
│
▼
Promotion Boundary → DES Signal → Investigation lifecycle
Public API
Core pipeline
# parallax/ops/fusion/continuous/pipeline.py
@dataclass(frozen=True)
class IncrementalMatchResult:
"""Result of matching one record against the correlation store."""
outcome: str # "new_entity" | "matched" | "conflict"
record: dict # the incoming record (normalized)
correlation_id: str | None # if matched or conflict
confidence: float # match confidence (0 if new_entity)
per_field_scores: dict # field-level breakdown
candidate_count: int # how many candidates were evaluated
blocking_keys: list[str] # keys used for lookup
@dataclass(frozen=True)
class ImpactEvent:
"""A downstream state change caused by a new match."""
affected_type: str # "investigation" | "lens_score" | "edition"
affected_id: str # the investigation/lens/edition that changed
reason: str # human-readable impact description
severity: str # "high" | "medium" | "low"
@dataclass(frozen=True)
class TriagedSignal:
"""A computed signal with triage scoring applied."""
signal_class: str # "computed"
producing_engine: str # "fusion"
producing_lens_id: str
match_result: IncrementalMatchResult
impact_events: list[ImpactEvent]
triage_score: float # composite priority score
suppressed: bool # True if below triage threshold
suppression_reason: str # why it was suppressed (if applicable)
def match_incoming(
record: dict,
store: CorrelationStore,
lens_spec: LensSpec,
threshold: float | None = None,
null_penalty: float = 0.1,
) -> IncrementalMatchResult:
"""Match one incoming record against all stored correlations for this lens.
Steps:
1. Normalize the record using the lens spec's field groupings.
2. Derive features (one-way transforms for federation safety).
3. Compute blocking keys.
4. Query the store for existing correlations whose blocking keys
overlap with the incoming record's keys.
5. Score the incoming record against each candidate's stored
derived features.
6. Classify: new_entity (no candidates above threshold),
matched (one candidate above threshold), or conflict
(multiple candidates above threshold, or same-source
contradiction).
Does NOT write to the store. Returns the result for the caller
to decide whether to persist (separation of concerns —
the pipeline decides, the caller commits).
"""
def assess_impact(
match_result: IncrementalMatchResult,
store: CorrelationStore,
) -> list[ImpactEvent]:
"""Given a match result, determine what downstream state changed.
Uses the observation reverse index (component.parallax.observation-reverse-index) to find:
- Open investigations referencing the affected entity
- Active lens scores that consume the affected correlation
- Pending editions citing evidence from affected blocks
Returns impact events sorted by severity descending.
"""
def triage_signal(
match_result: IncrementalMatchResult,
impacts: list[ImpactEvent],
context: TriageContext,
) -> TriagedSignal:
"""Score and filter a potential signal before emission.
Triage score = urgency × confidence × novelty × impact, where:
- urgency: lens-configured base urgency for this signal type
- confidence: match_result.confidence
- novelty: 1.0 if entity is new or hasn't been seen in > decay_period;
decays toward 0 for frequently-matched entities
- impact: max(impact_event.severity) scaled to [0, 1]
Suppression rules (configurable per lens):
- Below triage_threshold → suppress with reason
- Duplicate of a signal emitted within dedup_window → suppress
- Entity on a suppress_list → suppress with reason
"""
Triage configuration (in lens spec)
# Extension to lens spec for continuous mode
continuous:
enabled: true
trigger: ingest_event # ingest_event | webhook | schedule
triage:
threshold: 0.3 # below this triage score → suppress
dedup_window: "1h" # suppress duplicate signals within window
urgency_base: 0.5 # lens-level urgency weight
suppress_list: [] # entity IDs to always suppress
impact:
check_investigations: true # look for affected open investigations
check_lens_scores: true # look for affected Prism scores
check_editions: false # don't check edition impact (expensive)
Runner (CLI + daemon)
# parallax/ops/fusion/continuous/runner.py
def run_continuous_once(
record: dict,
store: CorrelationStore,
lens_spec: LensSpec,
emit_signal: Callable[[TriagedSignal], None], # callback for signal emission
dry_run: bool = False,
) -> TriagedSignal:
"""Full pipeline: match → impact → triage → emit. One record."""
def run_continuous_batch(
records: list[dict],
store: CorrelationStore,
lens_spec: LensSpec,
emit_signal: Callable[[TriagedSignal], None],
dry_run: bool = False,
) -> list[TriagedSignal]:
"""Process a batch of incoming records through the continuous pipeline.
Each record is matched incrementally (not against each other —
against the store). Order-dependent: record N may match against
an entity created by record N-1.
"""
How it reuses existing primitives
| Step | Existing primitive | How continuous uses it |
|---|---|---|
| Normalize | normalize_records() (NM-01..NM-19) |
Called per-record instead of per-dataset |
| Derive | derive_features() + DERIVATION_TYPE_REGISTRY |
Same derived features, same privacy envelope |
| Block | generate_blocking_keys() + _BLOCKING_METHODS |
Keys generated for incoming record, then used as a QUERY against the store (not a cross-product) |
| Score | score_pair_derived() + METRIC_REGISTRY |
Same scorer — one pair at a time (incoming vs each candidate) |
| Cluster | UnionFind + EntityCluster |
Update existing cluster with new member, or create new cluster |
| Conflict | EntityCluster.contradictions |
Same contradiction detection |
| Reverse index | component.parallax.observation-reverse-index find_correlations_by_observation() |
Impact assessment queries affected investigations |
| Decay | apply_decay() + component.parallax.decay-scheduler |
Novelty scoring uses decay clock |
No new primitives. The continuous pipeline is a new orchestration over existing compute.
Store interaction pattern
match_incoming(record, store, spec)
│
│ READ: store.list_correlations(lens_id, status=PROPOSED|CONFIRMED)
│ READ: blocking keys from stored correlations
│
├─→ NEW ENTITY:
│ WRITE: store.write_correlation(new_cr) # status=PROPOSED
│ WRITE: store.append_lineage(cr_id, CREATED event)
│
├─→ MATCH:
│ WRITE: store.append_lineage(cr_id, RECORD_ADDED event)
│ WRITE: update confidence on correlation
│
└─→ CONFLICT:
WRITE: store.append_lineage(cr_id, CONFLICT_DETECTED event)
# Does NOT auto-resolve — emits attention signal for human
All writes go through CorrelationStore interface (component.parallax.local-persistence-adapter). Works on SQLite (standalone) or ES (production). The continuous pipeline never touches storage directly.
New LineageAction type
class LineageAction(str, Enum):
# ... existing ...
RECORD_MATCHED_INCREMENTAL = "record_matched_incremental" # NEW
CONFLICT_DETECTED = "conflict_detected" # NEW
These distinguish continuous-mode events from batch-mode events in the lineage. Analysts reviewing the ledger can see whether a match came from a batch run or an incremental arrival.
Invariants
- Match is read-only until the caller commits.
match_incoming()returns a result but does NOT write to the store. The caller (runner, service, test) decides whether to persist. This enables dry-run, preview, and rollback. - Order matters. Records processed in sequence N, N+1, N+2 may produce different results than N+2, N, N+1 (because N creates an entity that N+1 matches against). The pipeline is deterministic for a given order but not commutative. Document this.
- Triage is conservative by default. If the triage threshold is unset, emit all signals (no suppression). Suppression is opt-in per lens.
- Conflicts always emit. Even if triage would suppress a match signal, a conflict signal is never suppressed — it always reaches a human.
- Impact assessment is bounded. Check investigations and lens scores by default. Edition impact check is opt-in (expensive — requires reading all pending editions for the entity).
Testing
Tests in tests/test_continuous_fusion.py.
Core pipeline tests
test_match_incoming_new_entity— first record for a lens, no store state → NEW_ENTITYtest_match_incoming_match— second record matching a stored entity → MATCHEDtest_match_incoming_conflict— record matches two different stored entities → CONFLICTtest_match_incoming_below_threshold— close but not close enough → NEW_ENTITY (not a match)test_match_incoming_uses_blocking_acceleration— only candidates with overlapping blocking keys are scored (not full scan)test_match_incoming_does_not_write— verify store is unchanged after match_incoming returns
Impact assessment tests
test_impact_finds_affected_investigation— match on entity X, investigation I references X → impact eventtest_impact_no_affected_state— match on entity with no downstream references → empty listtest_impact_severity_ranking— multiple impacts sorted by severity
Triage tests
test_triage_suppresses_below_threshold— low confidence + low impact → suppressedtest_triage_promotes_high_impact— high confidence + open investigation → high triage scoretest_triage_dedup_within_window— same entity matched twice in 30 minutes → second suppressedtest_triage_conflict_never_suppressed— conflict signal emitted regardless of triage scoretest_triage_novelty_decay— frequently-seen entity scores lower novelty
Integration tests
test_continuous_batch_order_dependent— process [A, B, C] vs [C, A, B], verify different entity countstest_continuous_end_to_end— 50 records through the pipeline, verify: entities created, matches found, impacts assessed, signals emitted, lineage correcttest_continuous_with_decay_scheduler— run continuous + decay in parallel, verify interactions are clean
Performance
test_incremental_match_latency— single record against 10K stored correlations, P99 < 200mstest_continuous_batch_throughput— 1000 records/second on commodity hardware with SQLite store
Acceptance
- [ ]
match_incoming()works end-to-end against SQLite-backed CorrelationStore - [ ] Three outcomes (new_entity / matched / conflict) all tested
- [ ] Impact assessment finds affected investigations via reverse index
- [ ] Triage scoring with suppression + dedup + novelty decay all tested
- [ ] Conflict signals never suppressed
- [ ] Continuous lens config section parseable by lens_parser
- [ ] CLI:
axonis-cli continuous --lens <path> --record <json>processes one record - [ ] CLI:
axonis-cli continuous --lens <path> --stream <file>processes a JSONL stream - [ ] Performance: P99 < 200ms per record against 10K stored correlations
- [ ] End-to-end demo:
examples/continuous_fusion_demo.pyprocesses a VRS-style stream and emits signals
Rollback plan
Continuous fusion is a new module (parallax/ops/fusion/continuous/). Nothing in batch fusion or the three-phase protocol depends on it.
Rollback:
1. Delete parallax/ops/fusion/continuous/
2. Remove RECORD_MATCHED_INCREMENTAL and CONFLICT_DETECTED from LineageAction enum (additive — no consumer breaks)
3. Remove continuous: section from any lens specs that use it (optional config — parser ignores unknown keys)
Batch fusion, three-phase protocol, and all existing functionality are unchanged.
What this enables
| Capability | Before component.parallax.continuous-fusion | After component.parallax.continuous-fusion |
|---|---|---|
| "Is this new person someone we've seen?" | Run batch fusion manually | Automatic on ingest |
| "What changed because of this match?" | Manual investigation | Impact events emitted |
| "Should I care about this match?" | Every match looks the same | Triage score prioritizes |
| "How fast can I respond to a new threat?" | Minutes to hours (batch cycle) | Seconds (per-record) |
| Sentinel service (Oracle architecture) | Not buildable — no continuous compute | Sentinel = this pipeline + threshold monitoring |
| DES computed signals from engines | Not emitted — engines write directly to ES | Signals flow through promotion boundary |
§B — Prism Continuous Reassessment Extension
Same pattern, different engine. Prism's continuous mode is triggered by Parallax's output — when an entity changes, Prism re-scores any lens that references it.
Trigger cascade
Parallax continuous (§A above)
│ emits: computed signal (entity_changed)
│ payload: { entity_id, correlation_id, delta_confidence, new_members }
│
▼
Prism continuous (this section)
│ "which lenses use entity X as an input layer?"
│ re-score each → compare previous vs new
│ emit only if score CHANGED meaningfully
│
▼ emits: computed signal (assessment_changed)
│ payload: { lens_id, entity_id, previous_score, new_score, delta,
│ previous_status, new_status, status_changed, layers_affected }
│
▼
Sentinel (triage + routing) → DES
Public API
# axonis-lens/lens/continuous/pipeline.py
@dataclass(frozen=True)
class EntityChangeEvent:
"""Produced by Parallax continuous. Consumed by Prism continuous."""
entity_id: str
correlation_id: str
producing_lens_id: str
delta_confidence: float
new_members: list[str] # record_refs added to the cluster
timestamp: str # ISO8601
@dataclass(frozen=True)
class ReassessmentResult:
"""One lens re-scored for one entity."""
lens_id: str
entity_id: str
previous_score: float
new_score: float
delta: float
previous_status: str # NORMAL / ADVISORY / ELEVATED / HIGH / CRITICAL
new_status: str
status_changed: bool # True if threshold crossed
layers_affected: dict # { layer_name: { old: float, new: float } }
evidence_id: str # frozen evidence block for the reassessment
def reassess_on_change(
event: EntityChangeEvent,
store: AssessmentStore,
lens_specs: list[LensSpec],
noise_threshold: float = 0.02,
) -> list[ReassessmentResult]:
"""Re-score every lens affected by an entity change.
Steps:
1. Query store for lenses whose input layers reference event.entity_id.
2. For each affected lens, re-run execute_lens with updated inputs.
3. Compare new score vs stored previous score.
4. Return results ONLY where abs(delta) > noise_threshold.
Unchanged scores are silent — no signal emitted.
5. Update stored score for next comparison.
Does NOT emit signals directly — returns results for the caller
(Sentinel or runner) to decide how to emit.
"""
AssessmentStore (new — mirrors CorrelationStore pattern)
# axonis-lens/lens/continuous/store.py
class AssessmentStore(ABC):
"""Persistent store for lens assessment scores per entity.
Same interface pattern as CorrelationStore (component.parallax.local-persistence-adapter). SQLite
backend for standalone; ES backend for production.
"""
@abstractmethod
def read_score(self, lens_id: str, entity_id: str) -> StoredScore | None:
"""Last known score for this entity under this lens."""
@abstractmethod
def write_score(self, lens_id: str, entity_id: str, score: StoredScore) -> None:
"""Persist the latest score. Append-only — history preserved."""
@abstractmethod
def find_lenses_for_entity(self, entity_id: str) -> list[str]:
"""Which lenses have ever scored this entity? Reverse lookup."""
What it reuses from existing Prism
| Step | Existing primitive | How continuous uses it |
|---|---|---|
| Re-score a lens | execute_lens() / run_lens() |
Called per affected lens — same compute, just triggered by event instead of by analyst |
| Layer scoring | COST_PRIMITIVES (10 cost fns) |
Same cost models, same weights |
| Threshold classification | threshold.evaluate() |
Same thresholds — status_changed is the delta |
| Evidence freeze | evidence.create_evidence_block() |
Each reassessment produces a frozen evidence block |
Configuration (in Prism lens spec)
# Extension to Prism lens spec for continuous mode
continuous:
enabled: true
trigger: entity_change # entity_change | schedule | manual
noise_threshold: 0.02 # suppress deltas below this
reassess_layers: # which layers to re-evaluate (default: all)
- outage_severity
- weather_threat
Invariants
- Silent when nothing changed. If re-scoring produces delta < noise_threshold, no signal is emitted, no stored score updated.
- Status change always emits. Even if delta is small, crossing a threshold boundary (ADVISORY → ELEVATED) always produces a signal.
- Evidence is frozen per reassessment. Every re-score that produces a signal also produces a frozen evidence block with the previous and new state.
- Store is append-only. Previous scores are history, not overwritten. Enables replay and trend analysis.
Testing
test_reassess_no_change_silent— entity updated but score delta < threshold → no resulttest_reassess_status_change_emits— score crosses ADVISORY → ELEVATED → result emittedtest_reassess_multiple_lenses— entity change affects 3 lenses → 3 reassessmentstest_reassess_evidence_frozen— each result has a valid evidence block with hashtest_cascade_parallax_to_prism— end-to-end: Parallax match_incoming → entity_change → Prism reassess → signal
Rollback
Delete axonis-lens/lens/continuous/. No Prism batch compute affected. AssessmentStore is new — empty on removal. Parallax continuous works independently of Prism continuous.
§C — Shared Operations: axonis.core Extraction
The continuous pipelines in both engines need a handful of shared pure functions that today exist independently in athena, parallax-prod, and axonis-lens (reimplemented 2-3 times each). Rather than unify all operations, extract only the proven shared ones into the axonis.core foundation package.
What moves to axonis.core
axonis.core/
├── transforms/ ← pure functions, zero heavy deps
│ ├── scale.py ← min_max_normalize, log_scale, standard_scale
│ │ (from athena/ops/scale.py + parallax numeric.py)
│ ├── encode.py ← categorical encoding, one-hot
│ │ (from athena/ops/encode.py)
│ └── codecs.py ← numpy ↔ bytes, JSON serialization helpers
│ (from athena/libs/codecs.py)
├── pipeline/
│ └── dag.py ← topological sort + DAG execution ordering
│ (from athena/libs/topological_graph.py)
├── federation/
│ └── mergers.py ← merge federated query results
│ (from athena/libs/mergers.py)
What STAYS in each engine (does NOT move)
| Engine | Keeps | Why |
|---|---|---|
| Athena | All ML ops (PCA, t-SNE, UMAP, FFT, image, raster, spaCy NLP, augmentation) | Heavy deps (scikit-learn, opencv, GDAL). ML-training-specific. |
| Parallax | All fusion ops (metrics, blocking, scoring, clustering, tracking, derived features, transliteration, aliases) | Engine-specialized. More mature than athena equivalents. |
| Prism | All cost models, layer scoring, surface compute, observation primitives | Engine-specialized. Different domain. |
What gets DELETED after extraction (dedup)
| Duplicate | In | Canonical moves to |
|---|---|---|
min_max_normalize |
parallax numeric.py + athena scale.py |
axonis.core.transforms.scale |
log_scale |
parallax numeric.py + athena scale.py |
axonis.core.transforms.scale |
codecs (numpy ↔ bytes) |
athena libs/codecs.py (only copy but needed by multiple engines) |
axonis.core.transforms.codecs |
Effort
| Task | Weeks |
|---|---|
| Create axonis.core package with 5 files | 0.5 |
| Update imports in parallax-prod (replace local scale/codecs with core) | 0.5 |
| Update imports in atlas-fl-athena (replace libs/codecs, libs/mergers, libs/topological_graph with core) | 0.5 |
| Tests: ensure existing test suites pass with imports redirected | 0.5 |
| Total | 2 weeks |
Invariant
axonis.core NEVER imports scikit-learn, opencv, GDAL, dask, torch, or any heavy ML dependency. If a function requires these, it stays in the engine that owns the dependency. Core is pure Python + numpy (numpy is acceptable — it's already everywhere).
Out of scope (tracked separately)
- Cross-org continuous fusion (federation + continuous) → requires Xanadu integration + streaming protocol, not batch three-phase
- Backfill mode ("replay historical ingest through continuous pipeline") → use
replay_fusion(#8) for historical analysis, continuous for forward-looking - Auto-investigation creation ("match of sufficient importance auto-opens an investigation") → DES lifecycle concern, not fusion concern. Cortex decides whether a signal becomes an investigation.
- Streaming transport (Kafka, Pulsar, Redis Streams) → v1 uses callback pattern; transport is pluggable. Don't over-invest in transport before the pipeline logic is proven.
Depends on: component.parallax.attestation-rationale, component.parallax.decay-scheduler, component.parallax.local-persistence-adapter, component.parallax.observation-reverse-index
Realizes: product.fusion
Required by: component.parallax.wire-message-families