Skip to content

Observation Reverse Index

Status & scope

Problem

An observation (a single record at a federate) may contribute to many correlations over its lifetime. Today the link is one-way only:

  • Forward: CorrelationRecord.contributing_records[] lists all observations that formed the correlation. ✅ Already stored.
  • Reverse: "Show me every correlation that ever touched observation barclays:cust_uk_00234" — no index. Requires scanning every correlation.

Reverse lookup is core to: - UX review surfaces (#17) — "what else is this record linked to?" - Incident response — "a record was mistakenly shared; which downstream correlations must be reviewed?" - Right-to-erasure (GDPR Article 17) — "delete this record; invalidate the correlations that used it." - Source-credibility audits — "if source X is proven unreliable, which correlations depended on it?"

At N correlations × M observations per correlation, forward-only storage costs O(N·M) to answer a single reverse query. Unworkable at scale.

Decision

Maintain a secondary index observation_ref → [correlation_ids] alongside every correlation write. The index is a write-time invariant — updated by the same transaction that writes the correlation.

Scope: every observation that appears in CorrelationRecord.contributing_records[] is indexed. No scanning. No async rebuild.

Key = (federate_id, record_ref) tuple. Observations are federate-scoped; cust_uk_00234 at Barclays is not the same as cust_uk_00234 at HSBC.

Raw-URI preservation: store a source_uri field alongside the record_ref to support "click through to original system" workflows. Optional at write time; required for UX deep-linking.

Architecture

                   CorrelationStore.write_correlation(cr)
                                  │
                                  ▼
                  ┌───────────────────────────────┐
                  │ TRANSACTIONAL (atomic):        │
                  │   1. write correlation row     │
                  │   2. append lineage events     │
                  │   3. upsert observation_index  │  ← this spec
                  │      rows, one per             │
                  │      contributing_record       │
                  └───────────────────────────────┘
                                  │
                    Reverse query: O(log N) index lookup
                                  │
                                  ▼
         find_correlations_by_observation("barclays", "cust_uk_00234")
           → ["CR-2026-03-vrs-00001", "CR-2026-03-aml-00017", ...]

Public API

Extensions to the CorrelationStore interface from component.parallax.local-persistence-adapter:

class CorrelationStore(ABC):
    # ... existing methods from component.parallax.local-persistence-adapter ...

    @abstractmethod
    def find_correlations_by_observation(
        self,
        federate_id: str,
        record_ref: str,
    ) -> list[str]:
        """Return correlation_ids that include this observation.

        Returns [] if the observation has never been part of any correlation.
        Order: most recently updated first.
        """

    @abstractmethod
    def find_observations_by_correlation(
        self,
        correlation_id: str,
    ) -> list[ObservationRef]:
        """Return ObservationRef objects for a correlation. Convenience
        over reading the full correlation document.
        """

New dataclass:

# parallax/ops/fusion/models/observation.py  (new — small file)

@dataclass(frozen=True)
class ObservationRef:
    """Reference to a single observation at a federate.

    source_uri is optional; when present it enables deep-linking back to
    the source system (\"open this record in Barclays' CRM\").
    """
    federate_id: str
    record_ref: str
    source_uri: str | None = None   # e.g. "https://barclays.internal/crm/customers/cust_uk_00234"
    blocking_keys: tuple[str, ...] = ()
    last_seen_at: str = ""          # ISO8601

Schema (SQLite addition to component.parallax.local-persistence-adapter)

-- Reverse index: observation → correlations that touched it
CREATE TABLE observation_index (
    id                INTEGER PRIMARY KEY AUTOINCREMENT,
    federate_id       TEXT NOT NULL,
    record_ref        TEXT NOT NULL,
    correlation_id    TEXT NOT NULL REFERENCES correlations(correlation_id),
    source_uri        TEXT,
    first_seen_at     TEXT NOT NULL,
    last_seen_at      TEXT NOT NULL,
    UNIQUE(federate_id, record_ref, correlation_id)
);
CREATE INDEX idx_obs_lookup ON observation_index(federate_id, record_ref);
CREATE INDEX idx_obs_corr   ON observation_index(correlation_id);

Write path: on every write_correlation(cr), the store extracts cr.contributing_records[] and upserts one row per observation. The UNIQUE constraint is upsert-friendly (SQLite INSERT ... ON CONFLICT DO UPDATE).

Read path: the reverse lookup is a single indexed query:

SELECT correlation_id FROM observation_index
 WHERE federate_id = ? AND record_ref = ?
 ORDER BY last_seen_at DESC;

ES-backed implementation (component.parallax.correlation-persistence / Titan) accomplishes the same via an ES sub-field on the correlation document; same interface.

Invariants

  1. Write atomicity. Writing a correlation and updating the index happen in one transaction. Index can never lag the correlations table.
  2. No orphans. An observation_index row cannot exist for a non-existent correlation (enforced by FK).
  3. Retention-driven cleanup only. When a correlation is archived or purged (component.parallax.decay-scheduler), its index rows go with it. No silent index erasure.

Testing

Tests in tests/test_observation_reverse_index.py.

Contract tests (backend-agnostic)

  • test_reverse_lookup_after_write — write a CR with 3 observations, all 3 resolve to that CR
  • test_multi_correlation_same_observation — one observation in 2 CRs, reverse lookup returns both
  • test_source_uri_preserved — URI written, URI read back
  • test_reverse_lookup_nonexistent — unknown observation returns []
  • test_index_cleaned_on_correlation_delete — when correlation is purged (retention), index rows go with it
  • test_order_by_last_seen — most-recently-updated correlation first

Performance (SQLite)

  • test_reverse_lookup_perf_100k — 100K correlations × avg 3 observations each (300K index rows). Reverse lookup <50ms P99.
  • test_write_overhead_with_index — adding index costs <10% write throughput

Invariant tests

  • test_index_and_correlation_atomic — kill a write mid-transaction (simulated); neither correlation nor index row partially commits

Acceptance

  • [ ] ObservationRef dataclass in models/observation.py
  • [ ] Interface extensions in storage/base.py
  • [ ] SQLite schema + write path + read path in storage/sqlite_store.py
  • [ ] All contract tests pass
  • [ ] Perf target met (<50ms P99 on 100K correlations)
  • [ ] source_uri optional but stored when provided
  • [ ] Documentation: a reverse-lookup example in the sample script (component.parallax.local-quickstart)

Rollback plan

Secondary index is additive. Rollback steps: 1. DROP TABLE observation_index; (SQLite) or equivalent on ES. 2. Remove the two methods from the interface. 3. Remove write-path update in write_correlation().

No downstream module breaks — reverse queries are opt-in; nothing today calls them.

Out of scope (tracked separately)

  • UX screens that consume reverse lookup → #17
  • Cross-lens observation deduplication (same observation fed to multiple lenses) → open question; acceptable to have separate index rows per (observation, correlation) pair and let consumers union across lenses.
  • Observation content change-data-capture (sensor corrects their data upstream) → out of scope for v1; customer conversation needed before committing scope.

Depends on: component.parallax.correlation-persistence, component.parallax.local-persistence-adapter

Realizes: product.fusion

Required by: component.parallax.continuous-fusion, component.parallax.local-quickstart