Phase 3 memory engine

The memory extraction worker generates embeddings for every extracted memory. Many extracted memories are near-identical (the same fact expressed in slightly different words across multiple conversations). Without a cache, the worker calls the embedder redundantly. More importantly, context assembly frequently needs to

Milestone 3.2.3 — Embedding Cache (Redis Content-Hash → Vector)

Status: Planned
Goal: 3.2 — Embedding service
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 1–2 days


Why This Milestone Exists

The memory extraction worker generates embeddings for every extracted memory. Many extracted memories are near-identical (the same fact expressed in slightly different words across multiple conversations). Without a cache, the worker calls the embedder redundantly. More importantly, context assembly frequently needs to embed the current user query to find similar memories — and the same short queries recur constantly ("how do I restart the server?", "what's our API rate limit?"). Caching by content hash eliminates repeated inference for identical text.

The cache key is SHA-256(text), not the text itself. This is important: we never store the raw text in Redis. The vector is not sensitive, but the text might be.


Deliverables

Cache middleware in embedder service

Python
# src/embedder/cache.py
import hashlib
import json
from uuid import UUID
 
import redis.asyncio as redis
 
CACHE_KEY_PREFIX = "embed:v1:"
CACHE_TTL_SECONDS = 3600  # 1 hour; embeddings don't change unless the model changes
 
def content_hash(text: str) -> str:
    """SHA-256 hex digest of text. Used as cache key. Never log or store the text."""
    return hashlib.sha256(text.encode("utf-8")).hexdigest()
 
class EmbeddingCache:
    def __init__(self, redis_client: redis.Redis) -> None:
        self._redis = redis_client
 
    async def get(self, text: str) -> list[float] | None:
        key = f"{CACHE_KEY_PREFIX}{content_hash(text)}"
        data = await self._redis.get(key)
        if data is None:
            return None
        return json.loads(data)
 
    async def set(self, text: str, embedding: list[float]) -> None:
        key = f"{CACHE_KEY_PREFIX}{content_hash(text)}"
        await self._redis.set(key, json.dumps(embedding), ex=CACHE_TTL_SECONDS)

Cache-aside pattern in embed endpoint:

Python
async def embed_one_cached(text: str, model: EmbeddingModel, cache: EmbeddingCache) -> list[float]:
    # 1. Check cache
    cached = await cache.get(text)
    if cached is not None:
        metrics.embed_cache_hits.inc()
        return cached
    # 2. Compute
    metrics.embed_cache_misses.inc()
    embedding = await model.embed_one(text)
    # 3. Store (fire and forget; cache miss on error is acceptable)
    asyncio.create_task(cache.set(text, embedding))
    return embedding

Acceptance Criteria

  • Cache hit path returns in < 5ms (no model inference)
  • Cache key is SHA-256 hash of text; raw text never stored in Redis
  • Cache TTL: 1 hour (configurable via IBEX_EMBED_CACHE_TTL_SECONDS)
  • embed_cache_hits_total and embed_cache_misses_total Prometheus metrics
  • Cache miss on Redis error (fail open — inference still happens)

Edit on GitHub

Last updated on

On this page

0%