Phase 3 memory engine

The embedding service is the narrowest bottleneck in the memory pipeline. If the embedder is slow or unavailable, memory writes block, context assembly falls back to hot-cache-only, and the entire memory system degrades. Isolating embedding as a dedicated service rather than a library called inline provides: - Indepe

Milestone 3.2.1 — Embedding Service Skeleton

Status: Planned
Goal: 3.2 — Embedding service
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 2 days
ADR required: ADR-0033 — Embedding service design and model selection


Why This Milestone Exists

The embedding service is the narrowest bottleneck in the memory pipeline. If the embedder is slow or unavailable, memory writes block, context assembly falls back to hot-cache-only, and the entire memory system degrades. Isolating embedding as a dedicated service rather than a library called inline provides:

  • Independent scaling: GPU instances only where needed (the embedder), not for every service
  • Model upgrades: Replace all-MiniLM-L6-v2 with a better model in Phase 4 by deploying a new version of the embedder — zero changes to memory service or worker
  • Caching: The embedder can cache embedding results (content_hash → vector) across all callers; inline libraries cannot

This milestone delivers the service skeleton: FastAPI application, model loading at startup, health probes, and the basic single-text endpoint.


Non-Goals

  • Batch endpoint (3.2.2)
  • Redis embedding cache (3.2.3)
  • GPU support (Phase 4)

Branch

feat/m3-2-1-embedder-skeleton

PR Title

feat(embedder): service skeleton with all-MiniLM-L6-v2 and single-text endpoint (m3.2.1)


ADR-0033 — Embedding service design

Write ADR-0033 covering:

  • Why all-MiniLM-L6-v2: 384 dimensions (fast to search), trained for semantic similarity (not generation), 90MB model (fits in 512MB container), 3000 sentences/sec on CPU. Phase 4 may add bge-large-en-v1.5 (1024-dim, better quality) as a second model.
  • Why a dedicated service over a library: See rationale above (scaling, model upgrades, caching).
  • Why FastAPI and not gRPC: The embedder only has two endpoints (single, batch). HTTP is simpler, well-understood by all service languages, and easier to test. gRPC would add proto overhead for minimal benefit at this scale.
  • Model warmup: First inference call takes 2–3 seconds (model loading + JIT). The service must be "not ready" until the first inference completes. This is why the liveness and readiness checks are separate.

Deliverables

Service structure

services/embedder/
  pyproject.toml
  Dockerfile
  .env.example
  src/
    embedder/
      __init__.py
      app.py           # FastAPI application factory
      settings.py      # pydantic-settings config
      model.py         # SentenceTransformer wrapper
      routers/
        embed.py       # /v1/embed endpoint
        health.py      # /health, /ready
      schemas.py       # Request/Response Pydantic models
  tests/
    conftest.py
    test_embed.py
    test_health.py

src/embedder/model.py — thread-safe model wrapper

Python
from __future__ import annotations
 
import asyncio
import hashlib
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Final
 
import numpy as np
from sentence_transformers import SentenceTransformer
 
# All-MiniLM-L6-v2: 384 dimensions, best-in-class for semantic similarity tasks
# at this model size. CPU throughput: ~3000 sentences/sec.
MODEL_NAME: Final[str] = "all-MiniLM-L6-v2"
EMBEDDING_DIM: Final[int] = 384
 
class EmbeddingModel:
    """
    Thread-safe wrapper around SentenceTransformer.
 
    SentenceTransformer.encode() releases the GIL during inference, so running
    it in a ThreadPoolExecutor does not block the asyncio event loop.
    The executor is sized to min(4, cpu_count) — more threads provide no benefit
    since the bottleneck is the neural network forward pass, not I/O.
 
    The model is loaded exactly once at construction time. After construction,
    this object is safe to share across all requests.
    """
 
    def __init__(self, model_name: str = MODEL_NAME) -> None:
        self._model = SentenceTransformer(model_name)
        self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="embedder")
        self._ready = False
        self._warmup()
 
    def _warmup(self) -> None:
        """
        Run a throwaway inference to:
          1. Load model weights into memory cache
          2. Trigger any JIT compilation
          3. Validate the model produces EMBEDDING_DIM outputs
        After warmup, is_ready() returns True.
        """
        warmup_vector = self._model.encode("warmup", convert_to_numpy=True)
        assert len(warmup_vector) == EMBEDDING_DIM, (
            f"Model produced {len(warmup_vector)}-dim vector, expected {EMBEDDING_DIM}"
        )
        self._ready = True
 
    def is_ready(self) -> bool:
        return self._ready
 
    async def embed_one(self, text: str) -> list[float]:
        """Embed a single text asynchronously. Runs inference in thread pool."""
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(
            self._executor,
            lambda: self._model.encode(text, convert_to_numpy=True, normalize_embeddings=True),
        )
        return result.tolist()
 
    async def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """
        Embed a batch of texts. All texts are encoded in a single model forward
        pass — much faster than N sequential encode() calls.
        Maximum batch size: 512 texts (configured in settings).
        """
        if not texts:
            return []
        loop = asyncio.get_running_loop()
        results = await loop.run_in_executor(
            self._executor,
            lambda: self._model.encode(
                texts,
                convert_to_numpy=True,
                normalize_embeddings=True,
                batch_size=128,       # internal mini-batch size; controls GPU/CPU memory
                show_progress_bar=False,
            ),
        )
        return [r.tolist() for r in results]

src/embedder/routers/embed.py — API endpoints

Python
from __future__ import annotations
 
from pydantic import BaseModel, Field, field_validator
from fastapi import APIRouter, Depends, Request
 
router = APIRouter(prefix="/v1")
 
class EmbedOneRequest(BaseModel):
    text: str = Field(min_length=1, max_length=8192)
 
class EmbedOneResponse(BaseModel):
    embedding: list[float]
    dim: int
    model: str
 
class EmbedBatchRequest(BaseModel):
    texts: list[str] = Field(min_length=1, max_length=512)
 
    @field_validator("texts")
    @classmethod
    def no_empty_texts(cls, v: list[str]) -> list[str]:
        for text in v:
            if not text.strip():
                raise ValueError("All texts must be non-empty")
        return v
 
class EmbedBatchResponse(BaseModel):
    embeddings: list[list[float]]
    count: int
    model: str
 
@router.post("/embed", response_model=EmbedOneResponse, status_code=200)
async def embed_one(request: EmbedOneRequest, req: Request) -> EmbedOneResponse:
    """Embed a single text. Returns a 384-dimensional vector."""
    model = req.app.state.model
    embedding = await model.embed_one(request.text)
    return EmbedOneResponse(embedding=embedding, dim=len(embedding), model=MODEL_NAME)
 
@router.post("/embed/batch", response_model=EmbedBatchResponse, status_code=200)
async def embed_batch(request: EmbedBatchRequest, req: Request) -> EmbedBatchResponse:
    """Embed up to 512 texts in a single forward pass. Always faster than N sequential calls."""
    model = req.app.state.model
    embeddings = await model.embed_batch(request.texts)
    return EmbedBatchResponse(embeddings=embeddings, count=len(embeddings), model=MODEL_NAME)

Health probes

Python
# /health — liveness: process is alive
# /ready  — readiness: model is loaded and first inference completed
@router.get("/health", status_code=200)
async def health() -> dict:
    return {"status": "ok"}
 
@router.get("/ready", status_code=200)
async def ready(req: Request) -> dict:
    model: EmbeddingModel = req.app.state.model
    if not model.is_ready():
        raise HTTPException(status_code=503, detail={"status": "unhealthy", "reason": "model not ready"})
    return {"status": "ok", "model": MODEL_NAME, "dim": EMBEDDING_DIM}

Testing Requirements

  • test_embed_one_returns_384_dim_vector: POST /v1/embed with a text → response has embedding of length 384
  • test_embed_batch_returns_correct_count: POST /v1/embed/batch with 10 texts → response has 10 embeddings, each of length 384
  • test_embed_batch_max_512_texts: 513 texts → 422 validation error
  • test_health_returns_200: /health always 200
  • test_ready_returns_503_before_warmup: Mock is_ready() → False; /ready returns 503
  • test_ready_returns_200_after_warmup: Model loaded → /ready returns 200
  • test_embed_one_normalised: sum(x^2 for x in embedding) ≈ 1.0 (cosine normalisation verified)

Acceptance Criteria

  • /v1/embed returns 384-dim vector
  • /v1/embed/batch processes up to 512 texts in a single forward pass
  • /ready returns 503 until model warmup completes
  • Model loaded at startup via lifespan, not on first request
  • All embeddings are L2-normalised (for cosine similarity via dot product)
  • ADR-0033 written and indexed

Edit on GitHub

Last updated on

On this page

0%