Local Persistence Adapter
Status & scope
- Stage: DRAFT
- Module:
parallax/ops/fusion/storage/(new) - Part of: #18 Full Fusion Cycle · Umbrella #6
- Milestone: M4 (OSS Substrate)
Problem
Parallax has no local persistence. CorrelationRecord, FusionRun, and LineageEvent dataclasses are produced in-memory by the fusion pipeline and disappear after a run. Today the only way to persist them is via Titan + ES (per component.parallax.correlation-persistence). This breaks three things:
- OSS strategy. Parallax-as-open-source requires
docker runto produce queryable output without installing full platform. - Cold-start (#10). A customer evaluator needs matches they can query in <15 minutes. ES install + Titan wire-up is a 3-day engagement.
- Downstream 90-day work. Replay (#8), cold start (#10), UX surfaces (#17), reverse index (component.parallax.observation-reverse-index), decay scheduler (component.parallax.decay-scheduler), and rationale capture (component.parallax.attestation-rationale) all depend on a persistence layer that isn't gated on Titan.
Decision
Introduce a CorrelationStore abstract interface. Ship SQLite as the reference backend. Titan's ES-backed implementation (per component.parallax.correlation-persistence) implements the same interface — consumers don't know or care which backend is in use.
Invariants (non-negotiable):
- Append-only. No UPDATE or DELETE on CorrelationRecord field values or LineageEvent history. State transitions happen via new events, not mutations.
- Deterministic reads. Same inputs + same backend always produce same reads (foundation for replay).
- Retention-driven deletes only (component.parallax.decay-scheduler), with audit trail. No silent erasure.
Architecture
Fusion pipeline (three_phase.py, pipeline.py)
│
▼
┌────────────────────────────────────┐
│ CorrelationStore interface │
│ (abstract base class) │
└─────────┬──────────────────┬────────┘
│ │
┌───────▼────────┐ ┌───────▼──────────┐
│ SQLiteStore │ │ ElasticStore │
│ (this spec — │ │ (component.parallax.correlation-persistence — │
│ OSS + cold │ │ Titan / prod) │
│ start) │ │ │
└────────────────┘ └──────────────────┘
SQLite backend lives at parallax/ops/fusion/storage/sqlite_store.py. ES backend (not part of this spec, implemented by Titan) lives outside parallax and imports the interface.
Modes of operation
The SQLite backend supports three modes, all via the same class:
| Mode | Constructor | When to use |
|---|---|---|
| On-disk | SQLiteStore("./axonis.db") |
Normal operation. Persistent, survives restart. |
| In-memory | SQLiteStore(":memory:") |
Tests, CI, one-shot evaluations, demos, hot replay. Disposable. Zero I/O. |
| Hardened (SQLCipher) | SQLCipherStore("./axonis.db", key=...) |
Classified or regulated deployments requiring encryption at rest. Same interface, separate module (95% copy-paste of sqlite_store.py). |
In-memory mode is a first-class mode, not a testing shim. It's the right default for #8 replay's counterfactual path: load a historical correlation set into RAM for fast what-if exploration without touching the on-disk ledger.
SQLCipher variant. SQLite with AES-256 encryption-at-rest. Open-source (BSD), used by Signal and similar high-trust apps. Drop-in replacement for the sqlite3 driver; the SQLCipherStore module is essentially SQLiteStore with one different import and key-management hooks at open time. Shipped as an opt-in module so the default OSS build does not require the pysqlcipher3 binary. Iron Bank-hardened image builds this in; standard OSS image ships stock SQLite.
Licensing notes
- SQLite is public domain — strictly more permissive than Apache 2.0. Zero conflict with Parallax's OSS licence.
- SQLCipher is BSD 3-clause. Compatible with Apache 2.0 redistribution.
- No copyleft exposure on either path.
Public API
# parallax/ops/fusion/storage/base.py
from abc import ABC, abstractmethod
from parallax.ops.fusion.models.correlation import CorrelationRecord, LineageEvent, CorrelationStatus
from parallax.ops.fusion.models.fusion_run import FusionRun
class CorrelationStore(ABC):
"""Abstract storage interface for fusion outputs.
All implementations MUST honour append-only semantics on field
values and lineage events.
"""
# ── Writes ─────────────────────────────────────────────
@abstractmethod
def write_correlation(self, cr: CorrelationRecord) -> None:
"""Persist a new CorrelationRecord. Raises if correlation_id exists."""
@abstractmethod
def append_lineage(self, correlation_id: str, event: LineageEvent) -> None:
"""Append an event to the correlation's lineage. Never rewrites history."""
@abstractmethod
def write_fusion_run(self, run: FusionRun) -> None:
"""Persist a FusionRun audit record."""
# ── Reads ──────────────────────────────────────────────
@abstractmethod
def read_correlation(self, correlation_id: str) -> CorrelationRecord | None:
"""Read the current state of a correlation. None if not found."""
@abstractmethod
def read_lineage(self, correlation_id: str) -> list[LineageEvent]:
"""Read full lineage in chronological order."""
@abstractmethod
def list_correlations(
self,
lens_id: str | None = None,
status: CorrelationStatus | None = None,
since: str | None = None, # ISO8601
limit: int = 100,
) -> list[CorrelationRecord]:
"""Filtered listing for queue / dashboard use cases."""
@abstractmethod
def read_fusion_run(self, run_id: str) -> FusionRun | None:
"""Read a FusionRun audit record."""
# ── Lifecycle ──────────────────────────────────────────
@abstractmethod
def close(self) -> None:
"""Release resources. Idempotent."""
# parallax/ops/fusion/storage/sqlite_store.py
class SQLiteStore(CorrelationStore):
"""SQLite-backed CorrelationStore. Single-file, zero-infrastructure.
Schema is created on first write; see module docstring for DDL.
Safe for single-process use; multi-process requires WAL mode (default on).
"""
def __init__(self, path: str | Path): ...
Schema (SQLite)
-- Current correlation state (one row per correlation_id)
CREATE TABLE correlations (
correlation_id TEXT PRIMARY KEY,
entity_type TEXT NOT NULL,
lens_id TEXT NOT NULL,
lens_version TEXT NOT NULL,
status TEXT NOT NULL, -- CorrelationStatus enum value
confidence REAL NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
document TEXT NOT NULL -- full JSON serialisation
);
CREATE INDEX idx_corr_lens_status ON correlations(lens_id, status);
CREATE INDEX idx_corr_updated ON correlations(updated_at);
-- Append-only lineage
CREATE TABLE lineage_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
correlation_id TEXT NOT NULL REFERENCES correlations(correlation_id),
action TEXT NOT NULL, -- LineageAction enum value
timestamp TEXT NOT NULL,
fusion_run_id TEXT,
document TEXT NOT NULL -- full JSON serialisation
);
CREATE INDEX idx_lineage_corr ON lineage_events(correlation_id, timestamp);
CREATE INDEX idx_lineage_run ON lineage_events(fusion_run_id);
-- Fusion runs (audit)
CREATE TABLE fusion_runs (
run_id TEXT PRIMARY KEY,
lens_id TEXT NOT NULL,
lens_version TEXT NOT NULL,
started_at TEXT NOT NULL,
completed_at TEXT,
document TEXT NOT NULL
);
-- Derived features (for PSI-run replay fidelity)
-- Persists the derived vectors that crossed the wire during a PSI run,
-- scoped to the candidate pairs that survived blocking. Without this,
-- a PSI-driven historical run cannot be fully replayed because the
-- derived features aren't reconstructible from raw records alone
-- (they depend on lens config + primitive versions at run time).
CREATE TABLE derived_features (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fusion_run_id TEXT NOT NULL REFERENCES fusion_runs(run_id),
correlation_id TEXT REFERENCES correlations(correlation_id), -- NULL if no match emerged
federate_id TEXT NOT NULL,
local_id TEXT NOT NULL,
derived_vector TEXT NOT NULL, -- JSON: {field: derived_value}
crossed_wire INTEGER NOT NULL, -- 1 if sent in Phase 2, 0 if only local
created_at TEXT NOT NULL
);
CREATE INDEX idx_derived_run ON derived_features(fusion_run_id);
CREATE INDEX idx_derived_corr ON derived_features(correlation_id);
Replay invariant: Given a fusion_run_id and the pinned primitive + lens versions (see component.parallax.fusion-governance-lifecycle versioning), the derived_features table contains exactly what crossed the wire — enabling byte-for-byte reconstruction of the Phase 2 / Phase 3 computation. Raw records are not stored here; they remain at source nodes. The derived-feature table is the minimum viable record needed to prove what the system saw without exposing raw PII.
Append-only enforcement: no UPDATE triggers; writes to correlations (status transitions, confidence changes) happen by appending a new LineageEvent AND re-persisting the correlation row (the row reflects current state; the lineage is the truth). The correlations row is a materialised view; lineage is the source of truth.
Testing
Tests live in tests/test_sqlite_store.py and tests/test_correlation_store_contract.py (backend-agnostic contract tests).
Contract tests (run against every backend)
test_write_read_roundtrip— write a CR, read it back, fields matchtest_append_lineage_order— append 5 events, read back in chronological ordertest_list_correlations_filter_by_status— write 10 CRs with mixed status, filter query returns correct subsettest_list_correlations_since_timestamp— time-based filtering workstest_append_only_semantics— attempting to delete a lineage event raises; attempting to rewrite a historical event raisestest_write_correlation_duplicate_id_raises— secondwrite_correlationwith same ID raises (useappend_lineagefor updates)test_close_is_idempotent— callingclose()twice does not raise
SQLite-specific tests
test_sqlite_schema_created_on_first_write— fresh file gets schematest_sqlite_concurrent_reads— two readers, one writer, no corruptiontest_sqlite_perf_10k_writes— writing 10K correlations completes in <5s on commodity hardwaretest_sqlite_perf_list_1m_rows— listing with status filter on 1M rows returns in <200ms
Acceptance
- [ ]
CorrelationStoreabstract interface shipped inparallax/ops/fusion/storage/base.py - [ ]
SQLiteStoreimplementation shipped inparallax/ops/fusion/storage/sqlite_store.py - [ ] All contract tests pass against
SQLiteStore - [ ] Performance targets met (5s for 10K writes, 200ms for 1M-row filtered read)
- [ ]
pipeline.run_fusion()accepts optionalstore: CorrelationStoreparameter; when provided, persists correlations + fusion_run - [ ] Sample script
examples/local_persistence_demo.pyruns a VRS fusion end-to-end with SQLite persistence
Rollback plan
If this spec's implementation needs to be reverted:
1. Delete parallax/ops/fusion/storage/ directory.
2. Revert the optional store= parameter on run_fusion() (additive change — no downstream breakage).
3. No schema migration required (SQLite file is disposable).
No other modules import from storage/. Rollback is mechanical and cannot cascade.
Out of scope (tracked separately)
- Observation-level reverse index → component.parallax.observation-reverse-index
- Attestation rationale capture → component.parallax.attestation-rationale
- Decay + retention scheduler → component.parallax.decay-scheduler
- OSS Docker quickstart → component.parallax.local-quickstart
- ES-backed implementation → component.parallax.correlation-persistence (Titan owns)
- Cross-process locking / multi-writer semantics (SQLite WAL mode is sufficient for v1)
Open questions
- Migration on schema change — v1 assumes breaking changes require a new DB file. Acceptable for OSS; revisit before first paying customer on SQLite.
- Encryption at rest — out of scope for v1. SQLCipher as a drop-in replacement if a customer requires it; additive change.
- Multi-tenant — single DB per lens vs single DB for all lenses. Pick lens-per-DB for isolation; single DB for simpler deployment. Proposed: one DB per
(lens_id, lens_version)pair; revisit if operators push back.
Depends on: component.parallax.correlation-persistence
Realizes: product.fusion
Required by: component.parallax.attestation-rationale, component.parallax.continuous-fusion, component.parallax.decay-scheduler, component.parallax.local-quickstart, component.parallax.observation-reverse-index