Observation Reverse Index
Status & scope
- Stage: DRAFT
- Module:
parallax/ops/fusion/storage/(extends component.parallax.local-persistence-adapter) - Part of: #18 Full Fusion Cycle · Umbrella #6
- Milestone: M4 (OSS Substrate)
Problem
An observation (a single record at a federate) may contribute to many correlations over its lifetime. Today the link is one-way only:
- Forward:
CorrelationRecord.contributing_records[]lists all observations that formed the correlation. ✅ Already stored. - Reverse: "Show me every correlation that ever touched observation
barclays:cust_uk_00234" — no index. Requires scanning every correlation.
Reverse lookup is core to: - UX review surfaces (#17) — "what else is this record linked to?" - Incident response — "a record was mistakenly shared; which downstream correlations must be reviewed?" - Right-to-erasure (GDPR Article 17) — "delete this record; invalidate the correlations that used it." - Source-credibility audits — "if source X is proven unreliable, which correlations depended on it?"
At N correlations × M observations per correlation, forward-only storage costs O(N·M) to answer a single reverse query. Unworkable at scale.
Decision
Maintain a secondary index observation_ref → [correlation_ids] alongside every correlation write. The index is a write-time invariant — updated by the same transaction that writes the correlation.
Scope: every observation that appears in CorrelationRecord.contributing_records[] is indexed. No scanning. No async rebuild.
Key = (federate_id, record_ref) tuple. Observations are federate-scoped; cust_uk_00234 at Barclays is not the same as cust_uk_00234 at HSBC.
Raw-URI preservation: store a source_uri field alongside the record_ref to support "click through to original system" workflows. Optional at write time; required for UX deep-linking.
Architecture
CorrelationStore.write_correlation(cr)
│
▼
┌───────────────────────────────┐
│ TRANSACTIONAL (atomic): │
│ 1. write correlation row │
│ 2. append lineage events │
│ 3. upsert observation_index │ ← this spec
│ rows, one per │
│ contributing_record │
└───────────────────────────────┘
│
Reverse query: O(log N) index lookup
│
▼
find_correlations_by_observation("barclays", "cust_uk_00234")
→ ["CR-2026-03-vrs-00001", "CR-2026-03-aml-00017", ...]
Public API
Extensions to the CorrelationStore interface from component.parallax.local-persistence-adapter:
class CorrelationStore(ABC):
# ... existing methods from component.parallax.local-persistence-adapter ...
@abstractmethod
def find_correlations_by_observation(
self,
federate_id: str,
record_ref: str,
) -> list[str]:
"""Return correlation_ids that include this observation.
Returns [] if the observation has never been part of any correlation.
Order: most recently updated first.
"""
@abstractmethod
def find_observations_by_correlation(
self,
correlation_id: str,
) -> list[ObservationRef]:
"""Return ObservationRef objects for a correlation. Convenience
over reading the full correlation document.
"""
New dataclass:
# parallax/ops/fusion/models/observation.py (new — small file)
@dataclass(frozen=True)
class ObservationRef:
"""Reference to a single observation at a federate.
source_uri is optional; when present it enables deep-linking back to
the source system (\"open this record in Barclays' CRM\").
"""
federate_id: str
record_ref: str
source_uri: str | None = None # e.g. "https://barclays.internal/crm/customers/cust_uk_00234"
blocking_keys: tuple[str, ...] = ()
last_seen_at: str = "" # ISO8601
Schema (SQLite addition to component.parallax.local-persistence-adapter)
-- Reverse index: observation → correlations that touched it
CREATE TABLE observation_index (
id INTEGER PRIMARY KEY AUTOINCREMENT,
federate_id TEXT NOT NULL,
record_ref TEXT NOT NULL,
correlation_id TEXT NOT NULL REFERENCES correlations(correlation_id),
source_uri TEXT,
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
UNIQUE(federate_id, record_ref, correlation_id)
);
CREATE INDEX idx_obs_lookup ON observation_index(federate_id, record_ref);
CREATE INDEX idx_obs_corr ON observation_index(correlation_id);
Write path: on every write_correlation(cr), the store extracts cr.contributing_records[] and upserts one row per observation. The UNIQUE constraint is upsert-friendly (SQLite INSERT ... ON CONFLICT DO UPDATE).
Read path: the reverse lookup is a single indexed query:
SELECT correlation_id FROM observation_index
WHERE federate_id = ? AND record_ref = ?
ORDER BY last_seen_at DESC;
ES-backed implementation (component.parallax.correlation-persistence / Titan) accomplishes the same via an ES sub-field on the correlation document; same interface.
Invariants
- Write atomicity. Writing a correlation and updating the index happen in one transaction. Index can never lag the correlations table.
- No orphans. An
observation_indexrow cannot exist for a non-existent correlation (enforced by FK). - Retention-driven cleanup only. When a correlation is archived or purged (component.parallax.decay-scheduler), its index rows go with it. No silent index erasure.
Testing
Tests in tests/test_observation_reverse_index.py.
Contract tests (backend-agnostic)
test_reverse_lookup_after_write— write a CR with 3 observations, all 3 resolve to that CRtest_multi_correlation_same_observation— one observation in 2 CRs, reverse lookup returns bothtest_source_uri_preserved— URI written, URI read backtest_reverse_lookup_nonexistent— unknown observation returns []test_index_cleaned_on_correlation_delete— when correlation is purged (retention), index rows go with ittest_order_by_last_seen— most-recently-updated correlation first
Performance (SQLite)
test_reverse_lookup_perf_100k— 100K correlations × avg 3 observations each (300K index rows). Reverse lookup <50ms P99.test_write_overhead_with_index— adding index costs <10% write throughput
Invariant tests
test_index_and_correlation_atomic— kill a write mid-transaction (simulated); neither correlation nor index row partially commits
Acceptance
- [ ]
ObservationRefdataclass inmodels/observation.py - [ ] Interface extensions in
storage/base.py - [ ] SQLite schema + write path + read path in
storage/sqlite_store.py - [ ] All contract tests pass
- [ ] Perf target met (<50ms P99 on 100K correlations)
- [ ]
source_urioptional but stored when provided - [ ] Documentation: a reverse-lookup example in the sample script (component.parallax.local-quickstart)
Rollback plan
Secondary index is additive. Rollback steps:
1. DROP TABLE observation_index; (SQLite) or equivalent on ES.
2. Remove the two methods from the interface.
3. Remove write-path update in write_correlation().
No downstream module breaks — reverse queries are opt-in; nothing today calls them.
Out of scope (tracked separately)
- UX screens that consume reverse lookup → #17
- Cross-lens observation deduplication (same observation fed to multiple lenses) → open question; acceptable to have separate index rows per
(observation, correlation)pair and let consumers union across lenses. - Observation content change-data-capture (sensor corrects their data upstream) → out of scope for v1; customer conversation needed before committing scope.
Depends on: component.parallax.correlation-persistence, component.parallax.local-persistence-adapter
Realizes: product.fusion
Required by: component.parallax.continuous-fusion, component.parallax.local-quickstart