Phase 3 memory engine
Writing a memory is not a simple INSERT. ARCHITECTURE.md describes a 9-step pipeline. Each step can fail, and failures must be handled correctly: 1. Input validation — Pydantic schema validation, content length check
Milestone 3.3.2 — Memory Write Pipeline (9-Step)
Status: Planned
Goal: 3.3 — Memory service
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 4–5 days
ADR required: ADR-0034 — Memory write pipeline design and PII handling
Why This Milestone Exists
Writing a memory is not a simple INSERT. ARCHITECTURE.md describes a 9-step pipeline. Each step can fail, and failures must be handled correctly:
- Input validation — Pydantic schema validation, content length check
- PII detection — Redact email, phone, SSN, credit card patterns before storage
- Content deduplication — SHA-256 hash check: if identical memory exists, return it (no insert)
- Embedding generation — Call embedder service; get 384-dim vector
- Near-duplicate detection — pgvector cosine similarity: if similarity > 0.92, trigger merge workflow
- Conflict detection — If new memory contradicts an existing memory (same category, opposite claim), flag it
- Database write — INSERT with all fields; update memory_versions
- Hot cache update — Refresh Redis sorted set for this agent
- Index notification — No explicit step; pgvector index updates automatically on INSERT
The pipeline is transactional for steps 3–7. Steps 8–9 are best-effort (cache failure does not fail the write).
ADR-0034 — Memory write pipeline
Document:
- PII detection approach: Regex patterns (Phase 3); spaCy NER (Phase 4 for higher accuracy)
- Why dedup is content_hash first, then vector similarity: Hash is O(1); vector search is O(log n) minimum. Do the cheap check first.
- Near-duplicate threshold (0.92): Empirically validated; below 0.92 memories are different enough to keep. Above 0.92, the information is redundant.
- Why the conflict flag does not block the write: Conflict resolution is a human-in-the-loop process. The new memory is written with
status='pending_review'if a conflict is detected; it does not overwrite the existing memory automatically. - Transactionality scope: Steps 3–7 run in one DB transaction; steps 8–9 run after commit.
Deliverables
src/memory/services/write_pipeline.py
from __future__ import annotations
import hashlib
import re
from dataclasses import dataclass
from decimal import Decimal
from uuid import UUID
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from ibex_db.models.memory import Memory, MemoryCategory, MemoryStatus
from ibex_db.repositories.memory_repo import MemoryRepository
from memory.services.hot_cache import HotCacheService
# PII patterns (Phase 3 — regex based; Phase 4 upgrades to spaCy NER)
_PII_PATTERNS: list[tuple[str, str]] = [
(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]'),
(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]'),
(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]'),
(r'\b(?:\d{4}[-\s]?){4}\b', '[CARD]'),
]
NEAR_DUPLICATE_THRESHOLD = 0.92 # cosine similarity; above = merge candidate
CONFLICT_THRESHOLD = 0.85 # cosine similarity + opposite content = conflict
@dataclass(frozen=True)
class WriteMemoryParams:
org_id: UUID
agent_id: UUID
session_id: UUID | None
content: str
category: MemoryCategory
confidence: Decimal
source: str
metadata: dict
@dataclass(frozen=True)
class WriteMemoryResult:
memory_id: UUID
was_duplicate: bool
was_merged: bool
was_flagged: bool # True if conflict detected; memory written as pending_review
class MemoryWritePipeline:
"""
Orchestrates the 9-step memory write pipeline.
Steps 1–7 are transactional; steps 8–9 are best-effort.
"""
def __init__(
self,
repo: MemoryRepository,
embedder: httpx.AsyncClient,
hot_cache: HotCacheService,
embedder_url: str,
) -> None:
self._repo = repo
self._embedder = embedder
self._hot_cache = hot_cache
self._embed_url = embedder_url
async def write(self, params: WriteMemoryParams) -> WriteMemoryResult:
# ── Step 1: Input validation ──────────────────────────────────────────
content = params.content.strip()
if not content:
raise ValueError("Memory content cannot be empty")
if len(content) > 8192:
raise ValueError(f"Memory content exceeds 8192 chars: {len(content)}")
# ── Step 2: PII redaction ─────────────────────────────────────────────
content = _redact_pii(content)
# ── Step 3: Content deduplication (hash check) ────────────────────────
content_hash = hashlib.sha256(content.encode()).hexdigest()
existing = await self._repo.find_by_content_hash(
agent_id=params.agent_id, content_hash=content_hash
)
if existing is not None:
return WriteMemoryResult(
memory_id=existing.id, was_duplicate=True,
was_merged=False, was_flagged=False
)
# ── Step 4: Embedding generation ──────────────────────────────────────
embedding = await self._embed(content)
# ── Step 5: Near-duplicate detection ─────────────────────────────────
near_dups = await self._repo.find_near_duplicates(
agent_id=params.agent_id,
embedding=embedding,
threshold=NEAR_DUPLICATE_THRESHOLD,
limit=3,
)
if near_dups:
# Merge: supersede the near-duplicate(s), write new merged memory
return await self._handle_near_duplicate(params, content, content_hash, embedding, near_dups)
# ── Step 6: Conflict detection ────────────────────────────────────────
conflicts = await self._repo.find_conflicts(
agent_id=params.agent_id,
category=params.category,
embedding=embedding,
threshold=CONFLICT_THRESHOLD,
)
status = MemoryStatus.PENDING_REVIEW if conflicts else MemoryStatus.ACTIVE
was_flagged = bool(conflicts)
# ── Step 7: Database write (transactional) ────────────────────────────
memory = await self._repo.create(
org_id=params.org_id, agent_id=params.agent_id,
session_id=params.session_id, content=content,
content_hash=content_hash, embedding=embedding,
category=params.category, source=params.source,
confidence=params.confidence, status=status,
metadata=params.metadata,
)
# ── Step 8: Hot cache update (best-effort) ────────────────────────────
try:
await self._hot_cache.update(params.agent_id, memory)
except Exception as e:
# Log warning; cache miss is recovered on next read
pass
return WriteMemoryResult(
memory_id=memory.id, was_duplicate=False,
was_merged=False, was_flagged=was_flagged,
)
async def _embed(self, text: str) -> list[float]:
resp = await self._embedder.post(
f"{self._embed_url}/v1/embed",
json={"text": text},
timeout=10.0,
)
resp.raise_for_status()
return resp.json()["embedding"]
def _redact_pii(content: str) -> str:
"""Replace PII patterns with placeholders. Does not log the original content."""
for pattern, replacement in _PII_PATTERNS:
content = re.sub(pattern, replacement, content)
return contentAcceptance Criteria
- Identical content →
was_duplicate=True, no new DB row - Content with email pattern → email replaced with
[EMAIL]in stored content - Near-duplicate (similarity > 0.92) → existing memory superseded, new merged memory written
- Conflict detected (similarity > 0.85, same category) → new memory written with
status='pending_review' - Steps 1–7 are atomic (transaction rolled back on any step failure before commit)
- Step 8 failure (Redis down) does not fail the write (warning logged, result returned)
- Write p95 < 200ms in integration tests with real embedder
Edit on GitHub
Last updated on