Skip to content

Decay Scheduler + Retention

Status & scope

  • Stage: DRAFT
  • Module: parallax/ops/fusion/scheduler/ (new)
  • Builds on: existing apply_decay() in correlation.py
  • Part of: #18 Full Fusion Cycle · Umbrella #6
  • Milestone: M4 (OSS Substrate)

Problem

apply_decay() exists in parallax/ops/fusion/correlation.py as a primitive — but nothing calls it on a clock. Decay is a promise documented in EvidenceRulesConfig.confidence_decay and CorrelationStatus.DECAYED, but operationally it never fires. The same is true for retention: EvidenceRulesConfig.max_age says evidence expires after N days, but no process enforces it.

Three failure modes: 1. Stale confidence. A correlation attested 18 months ago with confidence 0.87 is treated as equally valid today, even if the lens declared a 180-day half-life. 2. Regulatory exposure. GDPR Article 5(1)(e) requires data be kept no longer than necessary. If the lens says 365-day retention, the system must enforce it. 3. No archival path. Some regulated customers require evidence to be moved to cold storage rather than deleted outright — today there's no primitive for this.

Decision

Ship a thin scheduler module with two modes:

  • CLI one-shot: python -m parallax.ops.fusion.scheduler decay --lens <lens_id> — reads lens config, applies decay, exits. For cron drivers or ad-hoc runs.
  • Daemon: python -m parallax.ops.fusion.scheduler daemon --config <path> — long-running worker that wakes on schedule.

No new decay math. The scheduler calls the existing apply_decay() primitive. This spec is scheduling + retention lifecycle only.

Retention = archive, not delete, by default. Archive correlations older than max_age to a separate table; purge (hard delete) is opt-in and requires explicit config. Either way, a retention-driven action emits a typed LineageAction.

Architecture

                    Scheduler (CLI or daemon)
                              │
                 reads EvidenceRulesConfig per lens
                              │
                 ┌────────────┴─────────────┐
                 │                          │
           Decay path                 Retention path
                 │                          │
                 ▼                          ▼
      For each correlation        For each correlation
      with status in              older than max_age:
      {PROPOSED, CONFIRMED}:
                 │                  if retention.archive:
      confidence *= decay_factor       move to archive
      if confidence < floor:            emit ARCHIVED event
         status = DECAYED            elif retention.purge:
         emit DECAYED event             hard delete
                                        emit PURGED event

Both paths use CorrelationStore interface methods — the scheduler works against any backend (SQLite / ES / memory).

Configuration

Per-lens config (already in EvidenceRulesConfig, documented here)

evidence_rules:
  confidence_decay:
    enabled: true
    half_life: "180d"            # Half-life for confidence decay
    floor: 0.2                   # Below this confidence, transition to DECAYED
    frequency: "1d"              # How often the scheduler should re-evaluate this lens
  max_age: "365d"                # Retention cap
  retention_action: "archive"    # "archive" | "purge" | "none" (default: archive)

Scheduler config (new — daemon mode only)

# scheduler.yaml
store:
  backend: sqlite                # or "elastic"
  path: "./axonis.db"            # for sqlite

schedules:
  - lens_id: "vrs_vulnerability_v1"
    mode: decay_and_retention
  - lens_id: "aml_entity_v2"
    mode: decay_only             # skip retention for this lens

global:
  max_concurrent_lenses: 2       # throttle concurrent decay runs
  dry_run: false                 # if true, log actions but don't write

Public API

# parallax/ops/fusion/scheduler/runner.py

def run_decay_once(
    store: CorrelationStore,
    lens_id: str,
    evidence_rules: EvidenceRulesConfig,
    now: datetime | None = None,   # injectable for testing
    dry_run: bool = False,
) -> DecayReport:
    """Apply decay to all active correlations for a lens. Returns summary.

    - Reads correlations with status in {PROPOSED, CONFIRMED}
    - Computes decayed confidence per correlation (uses apply_decay)
    - Transitions to DECAYED if below floor, emits DECAYED LineageEvent
    - Writes nothing if dry_run=True; returns the planned transitions
    """


def run_retention_once(
    store: CorrelationStore,
    lens_id: str,
    evidence_rules: EvidenceRulesConfig,
    now: datetime | None = None,
    dry_run: bool = False,
) -> RetentionReport:
    """Apply retention policy. Respects retention_action (archive/purge/none).

    - Reads correlations whose updated_at is older than max_age
    - Performs the configured action
    - Emits ARCHIVED or PURGED LineageEvent
    - PURGED: hard delete correlation + lineage + index rows (component.parallax.observation-reverse-index)
             — reverse-index cleanup is part of this operation
    - ARCHIVED: move correlation to archive table, retain lineage + index
    """


@dataclass(frozen=True)
class DecayReport:
    lens_id: str
    run_at: str
    correlations_scanned: int
    correlations_decayed: int
    correlations_below_floor: int
    dry_run: bool


@dataclass(frozen=True)
class RetentionReport:
    lens_id: str
    run_at: str
    correlations_scanned: int
    archived: int
    purged: int
    action: str                   # "archive" | "purge" | "none"
    dry_run: bool

New LineageAction values

# extends LineageAction enum from SPEC-24
class LineageAction(str, Enum):
    # ... existing ...
    ARCHIVED = "archived"    # NEW — retention-driven move to archive
    PURGED = "purged"        # NEW — retention-driven hard delete

New schema (archive table — SQLite addition)

-- Archive table mirrors correlations; used by retention.archive action
CREATE TABLE correlations_archive (
    correlation_id   TEXT PRIMARY KEY,
    entity_type      TEXT NOT NULL,
    lens_id          TEXT NOT NULL,
    lens_version     TEXT NOT NULL,
    status           TEXT NOT NULL,
    confidence       REAL NOT NULL,
    archived_at      TEXT NOT NULL,
    document         TEXT NOT NULL
);
CREATE INDEX idx_archive_lens ON correlations_archive(lens_id);

Archived correlations remain queryable via the archive table. Their lineage (in lineage_events) is retained. Reverse index (component.parallax.observation-reverse-index) is retained unless purge is specified.

CLI

$ python -m parallax.ops.fusion.scheduler decay --lens vrs_vulnerability_v1 --dry-run
[DECAY] lens=vrs_vulnerability_v1 scanned=1,247 decayed=89 below_floor=12 dry_run=True

$ python -m parallax.ops.fusion.scheduler decay --lens vrs_vulnerability_v1
[DECAY] lens=vrs_vulnerability_v1 scanned=1,247 decayed=89 below_floor=12 dry_run=False
[LEDGER] 12 DECAYED events appended

$ python -m parallax.ops.fusion.scheduler retention --lens vrs_vulnerability_v1
[RETENTION] lens=vrs_vulnerability_v1 scanned=1,247 archived=43 purged=0 action=archive

$ python -m parallax.ops.fusion.scheduler daemon --config scheduler.yaml
[DAEMON] started; 2 schedules active; next wake at 2026-04-16T00:00:00Z

Daemon mode uses APScheduler (Apache 2.0, pure-Python, minimal deps). One scheduler process per deployment; multi-process coordination is out of scope (single-writer assumption).

Invariants

  1. No action without a ledger event. Every decay transition, archive, and purge writes a typed LineageAction first. The ledger is the truth; the correlation state is a materialisation.
  2. Dry run is read-only. With dry_run=True, the scheduler performs all reads and computations but writes nothing. Use for validation before scheduling.
  3. Retention respects lens config. A lens with max_age="" (unset) is immune to retention — the scheduler skips it, never defaults.
  4. Purge is explicit. Default retention_action is archive. purge must be explicitly set in lens config. No silent deletion.
  5. Idempotent. Running decay twice on the same day produces the same end state — individual decay steps compound correctly because confidence is reduced, not replaced.

Testing

Tests in tests/test_scheduler.py and tests/test_retention.py.

Decay tests

  • test_decay_reduces_confidence_by_half_life
  • test_decay_below_floor_transitions_to_decayed
  • test_decay_emits_lineage_event
  • test_decay_dry_run_writes_nothing
  • test_decay_on_already_decayed_noop
  • test_decay_time_travel — stub clock, advance 360d, verify expected state at each half-life

Retention tests

  • test_retention_archive_moves_row_to_archive_table
  • test_retention_archive_retains_lineage
  • test_retention_purge_deletes_correlation_and_index_rows (component.parallax.observation-reverse-index integration)
  • test_retention_action_none_skips
  • test_retention_respects_per_lens_max_age
  • test_retention_dry_run_writes_nothing
  • test_archived_correlation_queryable_from_archive_table

Daemon tests

  • test_daemon_loads_config_and_schedules
  • test_daemon_graceful_shutdown — SIGTERM completes current task and exits
  • test_daemon_dry_run_mode_logs_actions

Integration

  • End-to-end: create 1000 correlations spanning 12 months of updated_at timestamps. Run decay + retention. Verify expected distribution of DECAYED / ARCHIVED / PURGED states and corresponding ledger events.

Acceptance

  • [ ] scheduler/ module with runner.py, daemon.py, __main__.py (CLI entry)
  • [ ] LineageAction.ARCHIVED and LineageAction.PURGED added to enum
  • [ ] correlations_archive table in SQLite schema
  • [ ] run_decay_once() and run_retention_once() callable from code + CLI
  • [ ] Dry-run mode validated by tests
  • [ ] Daemon mode starts, schedules, shuts down cleanly
  • [ ] Sample scheduler.yaml in examples/
  • [ ] All tests pass

Rollback plan

Scheduler is a separate process and a separate module. Nothing in the main fusion pipeline depends on it.

Rollback steps: 1. Stop the daemon if running. 2. Remove the scheduler/ directory. 3. Remove ARCHIVED / PURGED from LineageAction (they'll round-trip as strings; consumers must handle unknown actions gracefully — already an invariant). 4. Leave the correlations_archive table in place; it's idempotent (empty is fine). 5. Decay + retention become manual primitives again, called from scripts or tests.

No data loss on rollback. Archived correlations stay in the archive table until a future spec provides a restoration path.

Out of scope (tracked separately)

  • Restoration from archive → future spec if a customer requires it
  • Cross-process coordination / multi-writer → assume single scheduler per deployment
  • Distributed scheduling (kubernetes CronJob / Airflow) → trivial to wrap the CLI; not a primitive concern
  • Audit / compliance reports on retention activity → consumes ledger events, lives in the UX layer (#17)
  • Adaptive decay (half-life adjusts to observed drift) → research-grade, not 90-day

Depends on: component.parallax.attestation-rationale, component.parallax.local-persistence-adapter

Realizes: product.fusion

Required by: component.parallax.continuous-fusion, component.parallax.local-quickstart