Decay Scheduler + Retention
Status & scope
- Stage: DRAFT
- Module:
parallax/ops/fusion/scheduler/(new) - Builds on: existing
apply_decay()incorrelation.py - Part of: #18 Full Fusion Cycle · Umbrella #6
- Milestone: M4 (OSS Substrate)
Problem
apply_decay() exists in parallax/ops/fusion/correlation.py as a primitive — but nothing calls it on a clock. Decay is a promise documented in EvidenceRulesConfig.confidence_decay and CorrelationStatus.DECAYED, but operationally it never fires. The same is true for retention: EvidenceRulesConfig.max_age says evidence expires after N days, but no process enforces it.
Three failure modes: 1. Stale confidence. A correlation attested 18 months ago with confidence 0.87 is treated as equally valid today, even if the lens declared a 180-day half-life. 2. Regulatory exposure. GDPR Article 5(1)(e) requires data be kept no longer than necessary. If the lens says 365-day retention, the system must enforce it. 3. No archival path. Some regulated customers require evidence to be moved to cold storage rather than deleted outright — today there's no primitive for this.
Decision
Ship a thin scheduler module with two modes:
- CLI one-shot:
python -m parallax.ops.fusion.scheduler decay --lens <lens_id>— reads lens config, applies decay, exits. For cron drivers or ad-hoc runs. - Daemon:
python -m parallax.ops.fusion.scheduler daemon --config <path>— long-running worker that wakes on schedule.
No new decay math. The scheduler calls the existing apply_decay() primitive. This spec is scheduling + retention lifecycle only.
Retention = archive, not delete, by default. Archive correlations older than max_age to a separate table; purge (hard delete) is opt-in and requires explicit config. Either way, a retention-driven action emits a typed LineageAction.
Architecture
Scheduler (CLI or daemon)
│
reads EvidenceRulesConfig per lens
│
┌────────────┴─────────────┐
│ │
Decay path Retention path
│ │
▼ ▼
For each correlation For each correlation
with status in older than max_age:
{PROPOSED, CONFIRMED}:
│ if retention.archive:
confidence *= decay_factor move to archive
if confidence < floor: emit ARCHIVED event
status = DECAYED elif retention.purge:
emit DECAYED event hard delete
emit PURGED event
Both paths use CorrelationStore interface methods — the scheduler works against any backend (SQLite / ES / memory).
Configuration
Per-lens config (already in EvidenceRulesConfig, documented here)
evidence_rules:
confidence_decay:
enabled: true
half_life: "180d" # Half-life for confidence decay
floor: 0.2 # Below this confidence, transition to DECAYED
frequency: "1d" # How often the scheduler should re-evaluate this lens
max_age: "365d" # Retention cap
retention_action: "archive" # "archive" | "purge" | "none" (default: archive)
Scheduler config (new — daemon mode only)
# scheduler.yaml
store:
backend: sqlite # or "elastic"
path: "./axonis.db" # for sqlite
schedules:
- lens_id: "vrs_vulnerability_v1"
mode: decay_and_retention
- lens_id: "aml_entity_v2"
mode: decay_only # skip retention for this lens
global:
max_concurrent_lenses: 2 # throttle concurrent decay runs
dry_run: false # if true, log actions but don't write
Public API
# parallax/ops/fusion/scheduler/runner.py
def run_decay_once(
store: CorrelationStore,
lens_id: str,
evidence_rules: EvidenceRulesConfig,
now: datetime | None = None, # injectable for testing
dry_run: bool = False,
) -> DecayReport:
"""Apply decay to all active correlations for a lens. Returns summary.
- Reads correlations with status in {PROPOSED, CONFIRMED}
- Computes decayed confidence per correlation (uses apply_decay)
- Transitions to DECAYED if below floor, emits DECAYED LineageEvent
- Writes nothing if dry_run=True; returns the planned transitions
"""
def run_retention_once(
store: CorrelationStore,
lens_id: str,
evidence_rules: EvidenceRulesConfig,
now: datetime | None = None,
dry_run: bool = False,
) -> RetentionReport:
"""Apply retention policy. Respects retention_action (archive/purge/none).
- Reads correlations whose updated_at is older than max_age
- Performs the configured action
- Emits ARCHIVED or PURGED LineageEvent
- PURGED: hard delete correlation + lineage + index rows (component.parallax.observation-reverse-index)
— reverse-index cleanup is part of this operation
- ARCHIVED: move correlation to archive table, retain lineage + index
"""
@dataclass(frozen=True)
class DecayReport:
lens_id: str
run_at: str
correlations_scanned: int
correlations_decayed: int
correlations_below_floor: int
dry_run: bool
@dataclass(frozen=True)
class RetentionReport:
lens_id: str
run_at: str
correlations_scanned: int
archived: int
purged: int
action: str # "archive" | "purge" | "none"
dry_run: bool
New LineageAction values
# extends LineageAction enum from SPEC-24
class LineageAction(str, Enum):
# ... existing ...
ARCHIVED = "archived" # NEW — retention-driven move to archive
PURGED = "purged" # NEW — retention-driven hard delete
New schema (archive table — SQLite addition)
-- Archive table mirrors correlations; used by retention.archive action
CREATE TABLE correlations_archive (
correlation_id TEXT PRIMARY KEY,
entity_type TEXT NOT NULL,
lens_id TEXT NOT NULL,
lens_version TEXT NOT NULL,
status TEXT NOT NULL,
confidence REAL NOT NULL,
archived_at TEXT NOT NULL,
document TEXT NOT NULL
);
CREATE INDEX idx_archive_lens ON correlations_archive(lens_id);
Archived correlations remain queryable via the archive table. Their lineage (in lineage_events) is retained. Reverse index (component.parallax.observation-reverse-index) is retained unless purge is specified.
CLI
$ python -m parallax.ops.fusion.scheduler decay --lens vrs_vulnerability_v1 --dry-run
[DECAY] lens=vrs_vulnerability_v1 scanned=1,247 decayed=89 below_floor=12 dry_run=True
$ python -m parallax.ops.fusion.scheduler decay --lens vrs_vulnerability_v1
[DECAY] lens=vrs_vulnerability_v1 scanned=1,247 decayed=89 below_floor=12 dry_run=False
[LEDGER] 12 DECAYED events appended
$ python -m parallax.ops.fusion.scheduler retention --lens vrs_vulnerability_v1
[RETENTION] lens=vrs_vulnerability_v1 scanned=1,247 archived=43 purged=0 action=archive
$ python -m parallax.ops.fusion.scheduler daemon --config scheduler.yaml
[DAEMON] started; 2 schedules active; next wake at 2026-04-16T00:00:00Z
Daemon mode uses APScheduler (Apache 2.0, pure-Python, minimal deps). One scheduler process per deployment; multi-process coordination is out of scope (single-writer assumption).
Invariants
- No action without a ledger event. Every decay transition, archive, and purge writes a typed LineageAction first. The ledger is the truth; the correlation state is a materialisation.
- Dry run is read-only. With
dry_run=True, the scheduler performs all reads and computations but writes nothing. Use for validation before scheduling. - Retention respects lens config. A lens with
max_age=""(unset) is immune to retention — the scheduler skips it, never defaults. - Purge is explicit. Default
retention_actionisarchive.purgemust be explicitly set in lens config. No silent deletion. - Idempotent. Running decay twice on the same day produces the same end state — individual decay steps compound correctly because confidence is reduced, not replaced.
Testing
Tests in tests/test_scheduler.py and tests/test_retention.py.
Decay tests
test_decay_reduces_confidence_by_half_lifetest_decay_below_floor_transitions_to_decayedtest_decay_emits_lineage_eventtest_decay_dry_run_writes_nothingtest_decay_on_already_decayed_nooptest_decay_time_travel— stub clock, advance 360d, verify expected state at each half-life
Retention tests
test_retention_archive_moves_row_to_archive_tabletest_retention_archive_retains_lineagetest_retention_purge_deletes_correlation_and_index_rows(component.parallax.observation-reverse-index integration)test_retention_action_none_skipstest_retention_respects_per_lens_max_agetest_retention_dry_run_writes_nothingtest_archived_correlation_queryable_from_archive_table
Daemon tests
test_daemon_loads_config_and_schedulestest_daemon_graceful_shutdown— SIGTERM completes current task and exitstest_daemon_dry_run_mode_logs_actions
Integration
- End-to-end: create 1000 correlations spanning 12 months of updated_at timestamps. Run decay + retention. Verify expected distribution of DECAYED / ARCHIVED / PURGED states and corresponding ledger events.
Acceptance
- [ ]
scheduler/module withrunner.py,daemon.py,__main__.py(CLI entry) - [ ]
LineageAction.ARCHIVEDandLineageAction.PURGEDadded to enum - [ ]
correlations_archivetable in SQLite schema - [ ]
run_decay_once()andrun_retention_once()callable from code + CLI - [ ] Dry-run mode validated by tests
- [ ] Daemon mode starts, schedules, shuts down cleanly
- [ ] Sample
scheduler.yamlinexamples/ - [ ] All tests pass
Rollback plan
Scheduler is a separate process and a separate module. Nothing in the main fusion pipeline depends on it.
Rollback steps:
1. Stop the daemon if running.
2. Remove the scheduler/ directory.
3. Remove ARCHIVED / PURGED from LineageAction (they'll round-trip as strings; consumers must handle unknown actions gracefully — already an invariant).
4. Leave the correlations_archive table in place; it's idempotent (empty is fine).
5. Decay + retention become manual primitives again, called from scripts or tests.
No data loss on rollback. Archived correlations stay in the archive table until a future spec provides a restoration path.
Out of scope (tracked separately)
- Restoration from archive → future spec if a customer requires it
- Cross-process coordination / multi-writer → assume single scheduler per deployment
- Distributed scheduling (kubernetes CronJob / Airflow) → trivial to wrap the CLI; not a primitive concern
- Audit / compliance reports on retention activity → consumes ledger events, lives in the UX layer (#17)
- Adaptive decay (half-life adjusts to observed drift) → research-grade, not 90-day
Depends on: component.parallax.attestation-rationale, component.parallax.local-persistence-adapter
Realizes: product.fusion
Required by: component.parallax.continuous-fusion, component.parallax.local-quickstart