The memory extraction worker generates embeddings for every extracted memory. Many extracted memories are near-identical (the same fact expressed in slightly different words across multiple conversations). Without a cache, the worker calls the embedder redundantly. More importantly, context assembly frequently needs to
Milestone 3.2.3 — Embedding Cache (Redis Content-Hash → Vector)
Status: Planned
Goal: 3.2 — Embedding service
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 1–2 days
Why This Milestone Exists
The memory extraction worker generates embeddings for every extracted memory. Many extracted memories are near-identical (the same fact expressed in slightly different words across multiple conversations). Without a cache, the worker calls the embedder redundantly. More importantly, context assembly frequently needs to embed the current user query to find similar memories — and the same short queries recur constantly ("how do I restart the server?", "what's our API rate limit?"). Caching by content hash eliminates repeated inference for identical text.
The cache key is SHA-256(text), not the text itself. This is important: we never store the raw text in Redis. The vector is not sensitive, but the text might be.
Deliverables
Cache middleware in embedder service
# src/embedder/cache.py
import hashlib
import json
from uuid import UUID
import redis.asyncio as redis
CACHE_KEY_PREFIX = "embed:v1:"
CACHE_TTL_SECONDS = 3600 # 1 hour; embeddings don't change unless the model changes
def content_hash(text: str) -> str:
"""SHA-256 hex digest of text. Used as cache key. Never log or store the text."""
return hashlib.sha256(text.encode("utf-8")).hexdigest()
class EmbeddingCache:
def __init__(self, redis_client: redis.Redis) -> None:
self._redis = redis_client
async def get(self, text: str) -> list[float] | None:
key = f"{CACHE_KEY_PREFIX}{content_hash(text)}"
data = await self._redis.get(key)
if data is None:
return None
return json.loads(data)
async def set(self, text: str, embedding: list[float]) -> None:
key = f"{CACHE_KEY_PREFIX}{content_hash(text)}"
await self._redis.set(key, json.dumps(embedding), ex=CACHE_TTL_SECONDS)Cache-aside pattern in embed endpoint:
async def embed_one_cached(text: str, model: EmbeddingModel, cache: EmbeddingCache) -> list[float]:
# 1. Check cache
cached = await cache.get(text)
if cached is not None:
metrics.embed_cache_hits.inc()
return cached
# 2. Compute
metrics.embed_cache_misses.inc()
embedding = await model.embed_one(text)
# 3. Store (fire and forget; cache miss on error is acceptable)
asyncio.create_task(cache.set(text, embedding))
return embeddingAcceptance Criteria
- Cache hit path returns in < 5ms (no model inference)
- Cache key is SHA-256 hash of text; raw text never stored in Redis
- Cache TTL: 1 hour (configurable via
IBEX_EMBED_CACHE_TTL_SECONDS) -
embed_cache_hits_totalandembed_cache_misses_totalPrometheus metrics - Cache miss on Redis error (fail open — inference still happens)
Last updated on