Skip to content

Consensus Session — M-of-N Quorum Lifecycle (Service Layer)

Status & scope

  • Stage: DRAFT — service layer (implementation target: titan/REST, NOT the parallax library)
  • Depends on: multi-contributor-combination (combiner), quorum (the pure function this session calls), dissent, CL-06 (consensus session primitive), CL-09 (evidence chain)
  • Milestone: P0 — gates production federation; gates Q3 2026 GA

Renumbered. This spec circulated as "SPEC-29" in the CPO-direction series before the compute-layer specs landed. The number 29 now belongs to the pure quorum evaluation function (SPEC-29-QUORUM); this spec defines the service-layer session that accumulates contributions over time and calls that evaluation. The session is an explicit state machine — which is why it lives at the titan/REST layer, never inside the parallax library (see CLAUDE.md Task Pattern boundary).

Purpose

A consensus session accumulates contributions from N federates and resolves to a joint output under a declared quorum policy. SPEC-28 defines the math when the session resolves; this spec defines when it resolves, what state it's in along the way, and how the policy parameters drive transitions.

This spec implements the REST-only session primitive defined in CL-06 §"Consensus Session Primitive" and integrates the SPEC-28 combiner.

Quorum Policy

Pack-configurable per session at creation. Three parameters:

quorum:
  required_contributors: 2          # M-of-N: minimum number of contributors
  minimum_authority_sum: 1.5        # sum of composite_rating must reach this
  conflict_threshold: 0.3           # SPEC-28 conflict_indicator above this → IN_CONFLICT
  conflict_policy: flag             # one of: suppress | flag | split
  deadline_seconds: 300             # if not satisfied in this window → WITHDRAWN
Parameter Effect
required_contributors M in M-of-N. Session waits until at least this many distinct contributors have submitted.
minimum_authority_sum Even with M contributors, weight matters. Sum of composite_rating(accuracy, credibility) across contributors must reach this floor.
conflict_threshold After combiner runs, if conflict_indicator > threshold, session enters IN_CONFLICT instead of RATIFIED.
conflict_policy What to do when conflict is detected: smooth (suppress, not recommended), expose (flag), or fork into per-contributor outputs (split).
deadline_seconds Hard timeout for quorum satisfaction. Beyond this, session moves to WITHDRAWN.

State Machine

                     create()
                        │
                        ▼
                  ┌──────────┐
                  │ PROPOSED │
                  └─────┬────┘
                        │ first contribution arrives
                        ▼
              ┌─────────────────┐
              │ PENDING_QUORUM  │◄─────── more contributions
              └────────┬────────┘
                       │
       quorum check passes (M, authority_sum, no override pending)
                       │
                       ├───── conflict_indicator ≤ threshold ──► RATIFIED
                       │
                       └───── conflict_indicator > threshold ──► IN_CONFLICT
                                                                      │
                                                                      │ (conflict reconciled: 
                                                                      │  human attestation OR
                                                                      │  one contributor withdraws)
                                                                      ▼
                                                                  RATIFIED

       deadline expired without quorum   ─────────────────────► WITHDRAWN
       explicit cancel                   ─────────────────────► WITHDRAWN

State semantics

State Mutations allowed Side effects on entry
PROPOSED Add contribution → PENDING_QUORUM Session created in ES, tenant-scoped per CL-10
PENDING_QUORUM Add contribution; check quorum on each Update last_modified_ts
RATIFIED None Combiner runs (SPEC-28); session.classification set from CombinationResult.output_classification per CL-08; evidence Block emitted with session's tenant_id (CL-09); lens emission emitted (CL-06); SSE/notify subscribers
IN_CONFLICT Add contribution (may resolve); resolve via human attestation; cancel Machine dissent emitted (SPEC-30) with session's tenant_id and classification; audit entry; subscribers notified
WITHDRAWN None Audit entry; subscribers notified

Field Authority (resolves SPEC-28 / CL-09 / CL-10 cross-spec)

Field Set when Source
session.tenant_id Session creation From token_payload.tenant_id (CL-10)
session.classification (initial) Session creation From create_session(classification=...) argument; represents the session's max-handling level
session.classification (final, RATIFIED) RATIFY transition Updated to CombinationResult.output_classification (CL-08 propagation applied to contributions)
Contribution.classification Per contribution Contributor's declaration of its own derived feature; used as input to CL-08 propagation
Block.classification (emitted on RATIFY) At Block emission Set to session.classification (final); applied per CL-09

States are terminal in different ways: - RATIFIED and WITHDRAWN are truly terminal — no further mutations. - IN_CONFLICT is soft-terminal — additional contributions or human reconciliation may transition to RATIFIED. Without intervention, session remains IN_CONFLICT until cancelled.

Quorum Check Algorithm

Run on every contribution arrival OR on explicit resolve() call:

def check_quorum(session, policy):
    contributions = session.contributions
    if len(contributions) < policy.required_contributors:
        return ("PENDING", "insufficient contributors")

    distinct_contributors = {c.contributor_id for c in contributions}
    if len(distinct_contributors) < policy.required_contributors:
        return ("PENDING", "duplicate-contributor count below M")

    authority_sum = sum(composite_rating(c.accuracy, c.credibility) for c in contributions)
    if authority_sum < policy.minimum_authority_sum:
        return ("PENDING", f"authority_sum {authority_sum:.2f} below floor {policy.minimum_authority_sum}")

    # Quorum satisfied — run combiner
    result = combine(contributions, method=policy.combination_method)

    if result.conflict_indicator <= policy.conflict_threshold:
        return ("RATIFY", result)

    # Conflict — apply policy
    if policy.conflict_policy == "suppress":
        # Not recommended; smooth and ratify
        return ("RATIFY", result)
    elif policy.conflict_policy == "flag":
        return ("CONFLICT", result)
    elif policy.conflict_policy == "split":
        # Produce per-contributor outputs; SPEC-30 dissent
        return ("CONFLICT_SPLIT", result)

    raise ValueError(f"unknown conflict_policy: {policy.conflict_policy}")

Reference Implementation

Drops into parallax/server/api/routes.py for REST endpoints and uses parallax/ops/consensus_session.py for state management.

# parallax/ops/consensus_session.py — NEW FILE

from __future__ import annotations
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

from axonis_core.uds import UDS
from axonis_core.schema import Schema
from parallax.ops.fusion.combination import (
    Contribution, CombinationResult, combine, composite_rating
)


class SessionState(str, Enum):
    PROPOSED = "PROPOSED"
    PENDING_QUORUM = "PENDING_QUORUM"
    RATIFIED = "RATIFIED"
    IN_CONFLICT = "IN_CONFLICT"
    WITHDRAWN = "WITHDRAWN"


@dataclass(frozen=True)
class QuorumPolicy:
    required_contributors: int = 2
    minimum_authority_sum: float = 1.0
    conflict_threshold: float = 0.3
    conflict_policy: str = "flag"          # suppress | flag | split
    combination_method: str = "weighted_average"
    deadline_seconds: int = 300


class ConsensusSession(UDS):
    """REST-only consensus session — no MCP, works in all profiles."""

    def __init__(self, alias=Schema.CONSENSUS_SESSION):
        super().__init__(alias)
        self.namespace = Schema.USERSPACE


def create_session(lens_id: str, lens_version: str, policy: QuorumPolicy,
                   tenant_id: str, classification: str) -> dict:
    session_id = f"sess_{uuid.uuid4().hex[:12]}"
    now = int(time.time())
    session = {
        "session_id": session_id,
        "lens_id": lens_id,
        "lens_version": lens_version,
        "tenant_id": tenant_id,
        "classification": classification,
        "policy": policy.__dict__,
        "state": SessionState.PROPOSED.value,
        "contributions": [],
        "result": None,
        "created_ts": now,
        "deadline_ts": now + policy.deadline_seconds,
        "last_modified_ts": now,
    }
    ConsensusSession().create(session)
    return session


def add_contribution(session_id: str, contribution: Contribution) -> dict:
    session = ConsensusSession().read(uid=session_id)
    if session["state"] in (SessionState.RATIFIED.value,
                            SessionState.WITHDRAWN.value):
        raise ValueError(f"session {session_id} is terminal ({session['state']})")
    if int(time.time()) > session["deadline_ts"]:
        return _transition(session, SessionState.WITHDRAWN, reason="deadline_expired")

    # Append (frozen) contribution
    session["contributions"].append(contribution.__dict__)
    session["last_modified_ts"] = int(time.time())

    if session["state"] == SessionState.PROPOSED.value:
        session["state"] = SessionState.PENDING_QUORUM.value

    # Try to satisfy quorum
    return _try_resolve(session)


def _try_resolve(session: dict) -> dict:
    policy = QuorumPolicy(**session["policy"])
    contribs = [Contribution(**c) for c in session["contributions"]]

    if len(contribs) < policy.required_contributors:
        ConsensusSession().update(session, session["session_id"])
        return session

    distinct = {c.contributor_id for c in contribs}
    if len(distinct) < policy.required_contributors:
        ConsensusSession().update(session, session["session_id"])
        return session

    authority_sum = sum(composite_rating(c.accuracy, c.credibility) for c in contribs)
    if authority_sum < policy.minimum_authority_sum:
        ConsensusSession().update(session, session["session_id"])
        return session

    # Run combiner
    result = combine(contribs, method=policy.combination_method)

    if result.conflict_indicator <= policy.conflict_threshold or policy.conflict_policy == "suppress":
        return _transition(session, SessionState.RATIFIED, result=result)
    else:
        return _transition(session, SessionState.IN_CONFLICT, result=result)


def _transition(session: dict, new_state: SessionState,
                result: Optional[CombinationResult] = None,
                reason: str = "") -> dict:
    session["state"] = new_state.value
    session["last_modified_ts"] = int(time.time())
    if result is not None:
        session["result"] = result.__dict__
    if reason:
        session.setdefault("transition_reasons", []).append(
            {"to": new_state.value, "reason": reason, "ts": session["last_modified_ts"]}
        )

    ConsensusSession().update(session, session["session_id"])

    # Side effects on entry
    if new_state == SessionState.RATIFIED:
        _emit_evidence_block(session, result)         # CL-09
        _emit_lens_emission(session, result)          # CL-06
        _notify_subscribers(session, "ratified")
    elif new_state == SessionState.IN_CONFLICT:
        _emit_machine_dissent(session, result)        # SPEC-30
        _notify_subscribers(session, "in_conflict")
    elif new_state == SessionState.WITHDRAWN:
        _notify_subscribers(session, "withdrawn")

    return session


# stubs — implemented in their respective specs
def _emit_evidence_block(session, result): ...      # SPEC-28 / CL-09
def _emit_lens_emission(session, result): ...       # CL-06
def _emit_machine_dissent(session, result): ...     # SPEC-30
def _notify_subscribers(session, event): ...        # SSE / polling listener

REST Endpoints (per CL-06)

Method Path Body / Returns
POST /api/v2/consensus/session {lens_id, lens_version, policy, classification}{session_id, state, ...}
GET /api/v2/consensus/session/{id} → full session document
POST /api/v2/consensus/session/{id}/contribution {contributor_id, pair_score, accuracy, credibility, signer_key_id, classification} → updated session
POST /api/v2/consensus/session/{id}/resolve (force resolve attempt) → updated session
POST /api/v2/consensus/session/{id}/cancel → session in WITHDRAWN state
GET /api/v2/consensus/session/{id}/events SSE stream of state transitions (optional, per CL-06)

All endpoints REST-only. No MCP. Polling-correct (SSE optional).

Deadline Handling

A background sweeper checks all PENDING_QUORUM sessions every 30 seconds:

def deadline_sweeper():
    now = int(time.time())
    sessions = ConsensusSession().read({
        "query": {"bool": {"filter": [
            {"term": {"state": "PENDING_QUORUM"}},
            {"range": {"deadline_ts": {"lte": now}}},
        ]}}
    })
    for s in sessions:
        _transition(s, SessionState.WITHDRAWN, reason="deadline_expired")

Run as a periodic task in the Parallax service. Cheap query (filtered + indexed) so cost is bounded.

Conflict Reconciliation Path

When a session is IN_CONFLICT, three paths return it to RATIFIED:

  1. Additional contribution arrives that, when combined with the existing set, drops conflict_indicator below threshold.
  2. A contributor explicitly withdraws their contribution (e.g., they realize their data was stale). The session re-evaluates without that contribution.
  3. Human attestation overrides (per ADI/DES attestation model). An authorized human declares the joint output despite the conflict. The attestation Block carries the human's signature and reasoning; the audit chain shows IN_CONFLICT followed by HUMAN_RATIFIED. SPEC-30 machine dissent is preserved alongside.

Path 3 is critical for defense / regulated use where the system must produce an answer eventually, and the human carries accountability for that answer.

Invariants

  1. State transitions are append-only. Each transition writes a new entry; previous states are never erased.
  2. Contributions are append-only. A contribution is never modified or deleted from the session — corrections are new contributions with explicit corrects reference.
  3. Quorum is checked on every contribution. No deferred / batched evaluation that could miss a borderline transition.
  4. Deadline overrides any state. PENDING_QUORUM that hits deadline goes WITHDRAWN regardless of how close to quorum it was.
  5. RATIFIED and WITHDRAWN are truly terminal. No mutations after entry.
  6. Conflict policy is declared at session creation. Cannot change mid-session.
  7. Sessions are tenant-scoped (CL-10). A session belongs to exactly one tenant and is invisible across tenant boundaries.

Test Expectations

  • Happy-path test: create session, add 2 contributions, quorum satisfied, RATIFIED with combiner result.
  • Insufficient quorum test: 1 contribution, session stays PENDING_QUORUM. Add 2nd, ratifies.
  • Authority floor test: 2 contributions, but authority_sum=0.8 < 1.5 floor → stays PENDING_QUORUM until 3rd contribution boosts authority.
  • Conflict-flag test: 2 contributions with conflicting scores (0.9, 0.2) → IN_CONFLICT.
  • Conflict-resolve test: IN_CONFLICT session receives 3rd contribution that pulls combiner below threshold → RATIFIED.
  • Deadline test: session with 1 contribution, deadline expires, sweeper transitions to WITHDRAWN.
  • Cancel test: cancel a PENDING_QUORUM session → WITHDRAWN.
  • Tenant isolation test: session for tenant A is invisible to tenant B's queries.
  • Append-only test: attempt to modify a contribution returns 409.
  • Polling parity test: SSE event stream and GET polling produce same state transition sequence.

Cross-Reference

  • SPEC-28: combiner math invoked at _try_resolve()
  • SPEC-30: machine dissent emitted on IN_CONFLICT entry
  • SPEC-31: Consensus Mission Service exposes these endpoints
  • CL-06: REST-only requirement, polling correctness
  • CL-09: evidence Block emission on RATIFIED entry
  • CL-10: tenant scoping

Depends on: component.parallax.dissent, component.parallax.multi-contributor-combination, component.parallax.quorum

Realizes: product.fusion

Required by: component.parallax.consensus-mission-service, component.parallax.wire-message-families