This is the "intelligence" of the system. After a session checkpoint is written, the extraction worker reads the conversation turns and uses a secondary LLM call to extract structured knowledge — facts the agent learned, preferences the user expressed, behaviours that were demonstrated, instructions that were given. Th
Milestone 3.4.3 — LLM-Based Memory Extraction Task
Status: Planned
Goal: 3.4 — Memory extraction worker
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 5–6 days
ADR required: ADR-0036 — Memory extraction strategy and prompt design
Why This Milestone Exists
This is the "intelligence" of the system. After a session checkpoint is written, the extraction worker reads the conversation turns and uses a secondary LLM call to extract structured knowledge — facts the agent learned, preferences the user expressed, behaviours that were demonstrated, instructions that were given.
The extraction is model-guided (uses an LLM) not rule-based. Rule-based extraction would miss paraphrased facts and context-dependent preferences. LLM extraction generalises across all conversation types.
The key design constraints:
- Idempotent: Extracting the same turn twice must produce the same memories (content_hash dedup handles this at the DB level — the pipeline just re-calls write_pipeline, which returns
was_duplicate=True) - Structured output: The extraction LLM must return JSON, not free text. The memory service validates the JSON before writing.
- Cost-efficient: Use the smallest model that produces acceptable quality (
gpt-4o-miniin Phase 3; self-hosted Llama in Phase 4) - Incremental: Only extract turns >
last_extracted_turn. Update pointer atomically with memory writes.
ADR-0036 — Memory extraction strategy
Document:
- Why LLM extraction (not NLP pipelines): Traditional NLP (NER, relation extraction) requires training data per domain. LLM zero-shot extraction generalises to any domain without training.
- Why
gpt-4o-minifor extraction: High quality at low cost. The extraction call consumes ~500 tokens input + ~200 tokens output per turn. At $0.15/1M input tokens, extracting a 10-turn session costs $0.0008. - Why structured JSON output (not free text): Free text requires a second parsing step. JSON can be validated with Pydantic at extraction time.
- Extraction prompt design: The prompt instructs the model to extract only objective information (facts, preferences, instructions) and NOT subjective commentary, pleasantries, or conversation mechanics.
- Confidence scoring in extraction: The model assigns a confidence 0.0–1.0 to each extracted memory. Low-confidence extractions (< 0.5) are stored as
pending_review.
Deliverables
src/worker/tasks/extraction.py
from __future__ import annotations
import json
from dataclasses import dataclass
from decimal import Decimal
from uuid import UUID
from celery import shared_task
from openai import OpenAI
from pydantic import BaseModel, Field, field_validator
from worker.settings import settings
# ── Extraction output schema ──────────────────────────────────────────────────
class ExtractedMemory(BaseModel):
"""A single memory extracted from a conversation turn."""
content: str = Field(min_length=5, max_length=1000)
category: str = Field(pattern="^(factual|preference|behavioral|episodic|procedural)$")
confidence: float = Field(ge=0.0, le=1.0)
class ExtractionResult(BaseModel):
"""The full result of one extraction LLM call."""
memories: list[ExtractedMemory] = Field(default_factory=list)
@field_validator("memories")
@classmethod
def limit_memories_per_turn(cls, v: list[ExtractedMemory]) -> list[ExtractedMemory]:
# Cap at 10 memories per turn to prevent hallucination loops
return v[:10]
EXTRACTION_SYSTEM_PROMPT = """
You are a knowledge extraction system for an AI agent memory platform.
Your task: Read a conversation turn (user message + assistant response) and extract
structured memories — durable facts, preferences, instructions, and behaviours that
an AI agent should remember about this user and context.
Rules:
1. Extract ONLY information explicitly stated or clearly implied. Do NOT infer or hallucinate.
2. Each memory must be self-contained (understandable without the conversation context).
3. Do NOT extract: pleasantries, questions without answers, conversational filler.
4. Assign a confidence score: 1.0 = explicitly stated; 0.7 = clearly implied; 0.5 = uncertain.
5. Categories:
- factual: objective facts about the user, their systems, or domain (e.g. "Uses Python 3.11")
- preference: stated preferences (e.g. "Prefers short bullet-point answers")
- behavioral: patterns of behavior (e.g. "Always starts debugging by checking logs")
- episodic: specific events or experiences (e.g. "Had an outage on 2024-03-15 due to DB migration")
- procedural: instructions the agent should follow (e.g. "Always confirm before running SQL updates")
Output valid JSON matching this schema:
{"memories": [{"content": "...", "category": "...", "confidence": 0.0}]}
Return {"memories": []} if nothing worth extracting.
"""
def extract_memories_from_turn(
user_message: str,
assistant_response: str,
client: OpenAI,
) -> ExtractionResult:
"""
Call GPT-4o-mini with the extraction prompt.
Returns structured ExtractionResult with validated memories.
Raises ValueError if the model returns invalid JSON.
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": EXTRACTION_SYSTEM_PROMPT},
{"role": "user", "content": (
f"User message:\n{user_message}\n\n"
f"Assistant response:\n{assistant_response}"
)},
],
max_tokens=500,
temperature=0.1, # low temperature for consistent extraction
timeout=30,
)
raw = response.choices[0].message.content
data = json.loads(raw)
return ExtractionResult.model_validate(data)
@shared_task(
name="worker.tasks.extraction.extract_session_memories",
bind=True,
max_retries=3,
default_retry_delay=10,
soft_time_limit=120,
time_limit=180,
queue="extraction",
)
def extract_session_memories(self, session_id: str, org_id: str) -> dict:
"""
Extract memories from all unprocessed checkpoints in a session.
Idempotent: safe to call multiple times for the same session.
Algorithm:
1. Load session; check status == 'completed'
2. Load checkpoints where turn_index > last_extracted_turn
3. For each unprocessed turn:
a. Read user message + assistant response from MinIO archive
b. Call extract_memories_from_turn
c. Call write_pipeline.write for each extracted memory
d. Atomically update last_extracted_turn in the DB
4. Mark session extraction as complete
"""
try:
return _run_extraction(session_id=UUID(session_id), org_id=UUID(org_id))
except Exception as exc:
raise self.retry(exc=exc)Acceptance Criteria
-
extract_session_memoriesis idempotent: running twice extracts no duplicates - Extraction prompt produces valid JSON in > 99% of test cases
-
last_extracted_turnupdated atomically with memory writes (not separately) -
ExtractionResultwith 0 memories (nothing to extract) is a valid, handled result - Extraction task fails gracefully (retries 3×, then dead-letter) if OpenAI returns error
- Extracted memories have correct category and confidence from model output
- ADR-0036 written with prompt design rationale and example extractions
Last updated on