Phase 3 memory engine

Writing a memory is not a simple INSERT. ARCHITECTURE.md describes a 9-step pipeline. Each step can fail, and failures must be handled correctly: 1. Input validation — Pydantic schema validation, content length check

Milestone 3.3.2 — Memory Write Pipeline (9-Step)

Status: Planned
Goal: 3.3 — Memory service
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 4–5 days
ADR required: ADR-0034 — Memory write pipeline design and PII handling


Why This Milestone Exists

Writing a memory is not a simple INSERT. ARCHITECTURE.md describes a 9-step pipeline. Each step can fail, and failures must be handled correctly:

  1. Input validation — Pydantic schema validation, content length check
  2. PII detection — Redact email, phone, SSN, credit card patterns before storage
  3. Content deduplication — SHA-256 hash check: if identical memory exists, return it (no insert)
  4. Embedding generation — Call embedder service; get 384-dim vector
  5. Near-duplicate detection — pgvector cosine similarity: if similarity > 0.92, trigger merge workflow
  6. Conflict detection — If new memory contradicts an existing memory (same category, opposite claim), flag it
  7. Database write — INSERT with all fields; update memory_versions
  8. Hot cache update — Refresh Redis sorted set for this agent
  9. Index notification — No explicit step; pgvector index updates automatically on INSERT

The pipeline is transactional for steps 3–7. Steps 8–9 are best-effort (cache failure does not fail the write).


ADR-0034 — Memory write pipeline

Document:

  • PII detection approach: Regex patterns (Phase 3); spaCy NER (Phase 4 for higher accuracy)
  • Why dedup is content_hash first, then vector similarity: Hash is O(1); vector search is O(log n) minimum. Do the cheap check first.
  • Near-duplicate threshold (0.92): Empirically validated; below 0.92 memories are different enough to keep. Above 0.92, the information is redundant.
  • Why the conflict flag does not block the write: Conflict resolution is a human-in-the-loop process. The new memory is written with status='pending_review' if a conflict is detected; it does not overwrite the existing memory automatically.
  • Transactionality scope: Steps 3–7 run in one DB transaction; steps 8–9 run after commit.

Deliverables

src/memory/services/write_pipeline.py

Python
from __future__ import annotations
 
import hashlib
import re
from dataclasses import dataclass
from decimal import Decimal
from uuid import UUID
 
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
 
from ibex_db.models.memory import Memory, MemoryCategory, MemoryStatus
from ibex_db.repositories.memory_repo import MemoryRepository
from memory.services.hot_cache import HotCacheService
 
# PII patterns (Phase 3 — regex based; Phase 4 upgrades to spaCy NER)
_PII_PATTERNS: list[tuple[str, str]] = [
    (r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]'),
    (r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',                        '[PHONE]'),
    (r'\b\d{3}-\d{2}-\d{4}\b',                                '[SSN]'),
    (r'\b(?:\d{4}[-\s]?){4}\b',                               '[CARD]'),
]
 
NEAR_DUPLICATE_THRESHOLD = 0.92  # cosine similarity; above = merge candidate
CONFLICT_THRESHOLD       = 0.85  # cosine similarity + opposite content = conflict
 
@dataclass(frozen=True)
class WriteMemoryParams:
    org_id:     UUID
    agent_id:   UUID
    session_id: UUID | None
    content:    str
    category:   MemoryCategory
    confidence: Decimal
    source:     str
    metadata:   dict
 
@dataclass(frozen=True)
class WriteMemoryResult:
    memory_id:    UUID
    was_duplicate: bool
    was_merged:    bool
    was_flagged:   bool  # True if conflict detected; memory written as pending_review
 
class MemoryWritePipeline:
    """
    Orchestrates the 9-step memory write pipeline.
    Steps 1–7 are transactional; steps 8–9 are best-effort.
    """
 
    def __init__(
        self,
        repo: MemoryRepository,
        embedder: httpx.AsyncClient,
        hot_cache: HotCacheService,
        embedder_url: str,
    ) -> None:
        self._repo      = repo
        self._embedder  = embedder
        self._hot_cache = hot_cache
        self._embed_url = embedder_url
 
    async def write(self, params: WriteMemoryParams) -> WriteMemoryResult:
        # ── Step 1: Input validation ──────────────────────────────────────────
        content = params.content.strip()
        if not content:
            raise ValueError("Memory content cannot be empty")
        if len(content) > 8192:
            raise ValueError(f"Memory content exceeds 8192 chars: {len(content)}")
 
        # ── Step 2: PII redaction ─────────────────────────────────────────────
        content = _redact_pii(content)
 
        # ── Step 3: Content deduplication (hash check) ────────────────────────
        content_hash = hashlib.sha256(content.encode()).hexdigest()
        existing = await self._repo.find_by_content_hash(
            agent_id=params.agent_id, content_hash=content_hash
        )
        if existing is not None:
            return WriteMemoryResult(
                memory_id=existing.id, was_duplicate=True,
                was_merged=False, was_flagged=False
            )
 
        # ── Step 4: Embedding generation ──────────────────────────────────────
        embedding = await self._embed(content)
 
        # ── Step 5: Near-duplicate detection ─────────────────────────────────
        near_dups = await self._repo.find_near_duplicates(
            agent_id=params.agent_id,
            embedding=embedding,
            threshold=NEAR_DUPLICATE_THRESHOLD,
            limit=3,
        )
        if near_dups:
            # Merge: supersede the near-duplicate(s), write new merged memory
            return await self._handle_near_duplicate(params, content, content_hash, embedding, near_dups)
 
        # ── Step 6: Conflict detection ────────────────────────────────────────
        conflicts = await self._repo.find_conflicts(
            agent_id=params.agent_id,
            category=params.category,
            embedding=embedding,
            threshold=CONFLICT_THRESHOLD,
        )
        status = MemoryStatus.PENDING_REVIEW if conflicts else MemoryStatus.ACTIVE
        was_flagged = bool(conflicts)
 
        # ── Step 7: Database write (transactional) ────────────────────────────
        memory = await self._repo.create(
            org_id=params.org_id, agent_id=params.agent_id,
            session_id=params.session_id, content=content,
            content_hash=content_hash, embedding=embedding,
            category=params.category, source=params.source,
            confidence=params.confidence, status=status,
            metadata=params.metadata,
        )
 
        # ── Step 8: Hot cache update (best-effort) ────────────────────────────
        try:
            await self._hot_cache.update(params.agent_id, memory)
        except Exception as e:
            # Log warning; cache miss is recovered on next read
            pass
 
        return WriteMemoryResult(
            memory_id=memory.id, was_duplicate=False,
            was_merged=False, was_flagged=was_flagged,
        )
 
    async def _embed(self, text: str) -> list[float]:
        resp = await self._embedder.post(
            f"{self._embed_url}/v1/embed",
            json={"text": text},
            timeout=10.0,
        )
        resp.raise_for_status()
        return resp.json()["embedding"]
 
def _redact_pii(content: str) -> str:
    """Replace PII patterns with placeholders. Does not log the original content."""
    for pattern, replacement in _PII_PATTERNS:
        content = re.sub(pattern, replacement, content)
    return content

Acceptance Criteria

  • Identical content → was_duplicate=True, no new DB row
  • Content with email pattern → email replaced with [EMAIL] in stored content
  • Near-duplicate (similarity > 0.92) → existing memory superseded, new merged memory written
  • Conflict detected (similarity > 0.85, same category) → new memory written with status='pending_review'
  • Steps 1–7 are atomic (transaction rolled back on any step failure before commit)
  • Step 8 failure (Redis down) does not fail the write (warning logged, result returned)
  • Write p95 < 200ms in integration tests with real embedder

Edit on GitHub

Last updated on

On this page

0%