Phase 3 memory engine

This is the "intelligence" of the system. After a session checkpoint is written, the extraction worker reads the conversation turns and uses a secondary LLM call to extract structured knowledge — facts the agent learned, preferences the user expressed, behaviours that were demonstrated, instructions that were given. Th

Milestone 3.4.3 — LLM-Based Memory Extraction Task

Status: Planned
Goal: 3.4 — Memory extraction worker
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 5–6 days
ADR required: ADR-0036 — Memory extraction strategy and prompt design


Why This Milestone Exists

This is the "intelligence" of the system. After a session checkpoint is written, the extraction worker reads the conversation turns and uses a secondary LLM call to extract structured knowledge — facts the agent learned, preferences the user expressed, behaviours that were demonstrated, instructions that were given.

The extraction is model-guided (uses an LLM) not rule-based. Rule-based extraction would miss paraphrased facts and context-dependent preferences. LLM extraction generalises across all conversation types.

The key design constraints:

  1. Idempotent: Extracting the same turn twice must produce the same memories (content_hash dedup handles this at the DB level — the pipeline just re-calls write_pipeline, which returns was_duplicate=True)
  2. Structured output: The extraction LLM must return JSON, not free text. The memory service validates the JSON before writing.
  3. Cost-efficient: Use the smallest model that produces acceptable quality (gpt-4o-mini in Phase 3; self-hosted Llama in Phase 4)
  4. Incremental: Only extract turns > last_extracted_turn. Update pointer atomically with memory writes.

ADR-0036 — Memory extraction strategy

Document:

  • Why LLM extraction (not NLP pipelines): Traditional NLP (NER, relation extraction) requires training data per domain. LLM zero-shot extraction generalises to any domain without training.
  • Why gpt-4o-mini for extraction: High quality at low cost. The extraction call consumes ~500 tokens input + ~200 tokens output per turn. At $0.15/1M input tokens, extracting a 10-turn session costs $0.0008.
  • Why structured JSON output (not free text): Free text requires a second parsing step. JSON can be validated with Pydantic at extraction time.
  • Extraction prompt design: The prompt instructs the model to extract only objective information (facts, preferences, instructions) and NOT subjective commentary, pleasantries, or conversation mechanics.
  • Confidence scoring in extraction: The model assigns a confidence 0.0–1.0 to each extracted memory. Low-confidence extractions (< 0.5) are stored as pending_review.

Deliverables

src/worker/tasks/extraction.py

Python
from __future__ import annotations
 
import json
from dataclasses import dataclass
from decimal import Decimal
from uuid import UUID
 
from celery import shared_task
from openai import OpenAI
from pydantic import BaseModel, Field, field_validator
 
from worker.settings import settings
 
# ── Extraction output schema ──────────────────────────────────────────────────
 
class ExtractedMemory(BaseModel):
    """A single memory extracted from a conversation turn."""
    content:    str   = Field(min_length=5, max_length=1000)
    category:   str   = Field(pattern="^(factual|preference|behavioral|episodic|procedural)$")
    confidence: float = Field(ge=0.0, le=1.0)
 
class ExtractionResult(BaseModel):
    """The full result of one extraction LLM call."""
    memories: list[ExtractedMemory] = Field(default_factory=list)
 
    @field_validator("memories")
    @classmethod
    def limit_memories_per_turn(cls, v: list[ExtractedMemory]) -> list[ExtractedMemory]:
        # Cap at 10 memories per turn to prevent hallucination loops
        return v[:10]
 
EXTRACTION_SYSTEM_PROMPT = """
You are a knowledge extraction system for an AI agent memory platform.
 
Your task: Read a conversation turn (user message + assistant response) and extract
structured memories — durable facts, preferences, instructions, and behaviours that
an AI agent should remember about this user and context.
 
Rules:
1. Extract ONLY information explicitly stated or clearly implied. Do NOT infer or hallucinate.
2. Each memory must be self-contained (understandable without the conversation context).
3. Do NOT extract: pleasantries, questions without answers, conversational filler.
4. Assign a confidence score: 1.0 = explicitly stated; 0.7 = clearly implied; 0.5 = uncertain.
5. Categories:
   - factual: objective facts about the user, their systems, or domain (e.g. "Uses Python 3.11")
   - preference: stated preferences (e.g. "Prefers short bullet-point answers")
   - behavioral: patterns of behavior (e.g. "Always starts debugging by checking logs")
   - episodic: specific events or experiences (e.g. "Had an outage on 2024-03-15 due to DB migration")
   - procedural: instructions the agent should follow (e.g. "Always confirm before running SQL updates")
 
Output valid JSON matching this schema:
{"memories": [{"content": "...", "category": "...", "confidence": 0.0}]}
 
Return {"memories": []} if nothing worth extracting.
"""
 
def extract_memories_from_turn(
    user_message: str,
    assistant_response: str,
    client: OpenAI,
) -> ExtractionResult:
    """
    Call GPT-4o-mini with the extraction prompt.
    Returns structured ExtractionResult with validated memories.
    Raises ValueError if the model returns invalid JSON.
    """
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        response_format={"type": "json_object"},
        messages=[
            {"role": "system", "content": EXTRACTION_SYSTEM_PROMPT},
            {"role": "user", "content": (
                f"User message:\n{user_message}\n\n"
                f"Assistant response:\n{assistant_response}"
            )},
        ],
        max_tokens=500,
        temperature=0.1,   # low temperature for consistent extraction
        timeout=30,
    )
    raw = response.choices[0].message.content
    data = json.loads(raw)
    return ExtractionResult.model_validate(data)
 
@shared_task(
    name="worker.tasks.extraction.extract_session_memories",
    bind=True,
    max_retries=3,
    default_retry_delay=10,
    soft_time_limit=120,
    time_limit=180,
    queue="extraction",
)
def extract_session_memories(self, session_id: str, org_id: str) -> dict:
    """
    Extract memories from all unprocessed checkpoints in a session.
    Idempotent: safe to call multiple times for the same session.
 
    Algorithm:
    1. Load session; check status == 'completed'
    2. Load checkpoints where turn_index > last_extracted_turn
    3. For each unprocessed turn:
       a. Read user message + assistant response from MinIO archive
       b. Call extract_memories_from_turn
       c. Call write_pipeline.write for each extracted memory
       d. Atomically update last_extracted_turn in the DB
    4. Mark session extraction as complete
    """
    try:
        return _run_extraction(session_id=UUID(session_id), org_id=UUID(org_id))
    except Exception as exc:
        raise self.retry(exc=exc)

Acceptance Criteria

  • extract_session_memories is idempotent: running twice extracts no duplicates
  • Extraction prompt produces valid JSON in > 99% of test cases
  • last_extracted_turn updated atomically with memory writes (not separately)
  • ExtractionResult with 0 memories (nothing to extract) is a valid, handled result
  • Extraction task fails gracefully (retries 3×, then dead-letter) if OpenAI returns error
  • Extracted memories have correct category and confidence from model output
  • ADR-0036 written with prompt design rationale and example extractions

Edit on GitHub

Last updated on

On this page

0%