Phase 3 memory engine

The context assembly engine is the most architecturally significant addition in Phase 3. It sits on the critical path of every LLM request. In Phase 2, the proxy handled directive injection inline (a Redis lookup). In Phase 3, context injection is far more complex — it involves querying memories, scoring and ranking hu

Milestone 3.5.1 — Context Assembly Engine Skeleton (Python gRPC Server)

Status: Planned
Goal: 3.5 — Context assembly engine
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 3 days
ADR required: ADR-0038 — Context assembly service design and gRPC contract


Why This Milestone Exists

The context assembly engine is the most architecturally significant addition in Phase 3. It sits on the critical path of every LLM request. In Phase 2, the proxy handled directive injection inline (a Redis lookup). In Phase 3, context injection is far more complex — it involves querying memories, scoring and ranking hundreds of candidates, and fitting them within a model-specific token budget. This logic is too complex, too Python-heavy (ML scoring, tiktoken, numpy), and too likely to change to live inside the Go proxy.

The correct architecture is a dedicated Python gRPC service. The proxy calls it with the agent ID, session ID, current messages, and target model. The service returns enriched messages ready to send to the LLM.

This milestone establishes the service skeleton, protobuf contract, gRPC server, and the proxy-side Go client — without the domain logic (scoring, packing) which comes in later milestones.


Non-Goals

  • Memory scoring algorithm (3.5.4)
  • Greedy knapsack packing (3.5.5)
  • Context formatting logic (3.5.6)
  • Proxy hot-path integration (3.5.7)
  • Compression/summarisation of large contexts (Phase 4)

Branch

feat/m3-5-1-context-assembly-skeleton

PR Title

feat(context): context assembly gRPC service skeleton and proto contract (m3.5.1)


ADR-0038 — Context assembly service design

Write ADR-0038 covering:

  • Why a separate service (not inline in proxy): Memory ranking requires NumPy vectorised operations; tiktoken for accurate token counting; complex caching logic. None of this belongs in a Go binary.
  • Why gRPC (not HTTP): The proxy already uses gRPC for auth. gRPC provides a typed contract, bidirectional streaming support for Phase 4, and lower per-call overhead than JSON/HTTP for a <50ms latency target.
  • Graceful degradation contract: If the context assembly gRPC call exceeds 45ms (5ms before the proxy's 50ms budget for the full context path), the proxy cancels the call and continues with directive-only context. This is the most important failure mode to handle correctly.
  • Why betterproto for Python gRPC: betterproto generates Python dataclasses instead of protobuf message objects. This means standard type hints, mypy compatibility, and clean async/await syntax instead of callback-based gRPC stubs.
  • The 40ms retrieval deadline: The parallel retrieval phase (directive + hot cache + cold search) has a 40ms hard deadline enforced by asyncio.wait_for. If any retrieval component exceeds this, it returns partial results. The system never waits for all components before proceeding.

Deliverables

1. Proto definition

// packages/proto/ibex/context/v1/context.proto
syntax = "proto3";
package ibex.context.v1;

option go_package = "github.com/Rick1330/ibex-harness/packages/proto/ibex/context/v1;contextv1";
option python_package = "ibex_proto.context.v1";

import "google/protobuf/timestamp.proto";

// AssembleContext is called by the proxy on every LLM request.
// It returns the enriched messages array with memories and directive injected.
service ContextAssemblyService {
    rpc AssembleContext(AssembleContextRequest) returns (AssembleContextResponse);
}

message AssembleContextRequest {
    string org_id      = 1; // (required) UUID string
    string agent_id    = 2; // (required) UUID string
    string session_id  = 3; // (optional) UUID string; empty if new session
    string model       = 4; // (required) e.g. "gpt-4o", "gpt-4o-mini"
    string request_id  = 5; // (required) for tracing/logging

    // The original messages from the client (before injection).
    // The service returns an enriched version of this array.
    repeated Message messages = 6;
}

message AssembleContextResponse {
    // Enriched messages ready to send to the LLM provider.
    // The service may add, reorder, or modify messages.
    // The client's original messages are always preserved in order;
    // only injected content (directive, memories) is prepended.
    repeated Message messages = 1;

    // Metadata about what was injected (for tracing and dashboard display).
    InjectionMetadata metadata = 2;
}

message Message {
    string role    = 1; // "system", "user", "assistant"
    string content = 2;
}

message InjectionMetadata {
    bool   directive_injected    = 1;
    int32  memories_injected     = 2;  // count of memories included
    int32  memories_available    = 3;  // count of memories found (before token truncation)
    int32  total_tokens_injected = 4;  // approximate token count of injected content
    int32  context_window_used   = 5;  // tokens used as % of context window
    bool   was_truncated         = 6;  // true if memories were cut to fit token budget
    string fallback_reason       = 7;  // non-empty if assembly fell back to directive-only
    repeated string memory_ids   = 8;  // UUIDs of injected memories (for hot cache score update)
}

2. Service structure

services/context/
  pyproject.toml
  Dockerfile
  .env.example
  src/
    context/
      __init__.py
      app.py             # gRPC server startup
      settings.py        # pydantic-settings
      grpc_server.py     # ContextAssemblyServicer implementation
      services/
        budget.py          # Token budget calculator (3.5.2)
        retrieval.py       # Parallel retrieval orchestrator (3.5.3)
        scorer.py          # Composite scoring (3.5.4)
        packer.py          # Greedy knapsack (3.5.5)
        formatter.py       # Context formatting and injection (3.5.6)
      clients/
        memory_client.py   # HTTP client to memory service
        embedder_client.py # HTTP client to embedding service
        redis_client.py    # Direct Redis access for hot cache + directive
      proto/
        # generated by make proto-gen
  tests/

3. gRPC server with graceful degradation

Python
# src/context/grpc_server.py
from __future__ import annotations
 
import asyncio
import time
from uuid import UUID
 
import grpclib.server
from ibex_proto.context.v1 import (
    ContextAssemblyBase,
    AssembleContextRequest,
    AssembleContextResponse,
    InjectionMetadata,
    Message,
)
 
from context.services.budget import TokenBudgetCalculator
from context.services.retrieval import ContextRetriever
from context.services.scorer import MemoryScorer
from context.services.packer import GreedyKnapsackPacker
from context.services.formatter import ContextFormatter
from context import logger
 
RETRIEVAL_DEADLINE_SECONDS = 0.040  # 40ms hard deadline on parallel retrieval
ASSEMBLY_DEADLINE_SECONDS  = 0.048  # 48ms total — gives 2ms margin before proxy times out
 
class ContextAssemblyServicer(ContextAssemblyBase):
    """
    gRPC servicer for context assembly.
 
    Graceful degradation contract:
    - If ANY part of assembly exceeds ASSEMBLY_DEADLINE_SECONDS:
      return directive-only context with metadata.fallback_reason set.
    - If directive retrieval fails: return original messages unchanged.
    - If memory retrieval partially fails: use whatever was retrieved in time.
 
    The proxy MUST enforce its own timeout (45ms) independently.
    Never trust the context service to stay within budget.
    """
 
    def __init__(
        self,
        retriever: ContextRetriever,
        budget_calc: TokenBudgetCalculator,
        scorer: MemoryScorer,
        packer: GreedyKnapsackPacker,
        formatter: ContextFormatter,
    ) -> None:
        self._retriever  = retriever
        self._budget     = budget_calc
        self._scorer     = scorer
        self._packer     = packer
        self._formatter  = formatter
 
    async def assemble_context(
        self,
        stream: grpclib.server.Stream[AssembleContextRequest, AssembleContextResponse],
    ) -> None:
        request: AssembleContextRequest = await stream.recv_message()
        start = time.monotonic()
 
        try:
            response = await asyncio.wait_for(
                self._assemble(request),
                timeout=ASSEMBLY_DEADLINE_SECONDS,
            )
        except asyncio.TimeoutError:
            logger.WarnCtx(
                f"context assembly timeout for agent={request.agent_id} "
                f"request_id={request.request_id}"
            )
            response = self._directive_only_fallback(
                request, reason="assembly_timeout"
            )
        except Exception as exc:
            logger.ErrorCtx(f"context assembly error: {exc}", request_id=request.request_id)
            response = self._directive_only_fallback(
                request, reason=f"assembly_error:{type(exc).__name__}"
            )
 
        elapsed_ms = int((time.monotonic() - start) * 1000)
        # Emit latency metric regardless of success/fallback
        metrics.context_assembly_duration.observe(elapsed_ms / 1000)
 
        await stream.send_message(response)
 
    async def _assemble(self, req: AssembleContextRequest) -> AssembleContextResponse:
        org_id   = UUID(req.org_id)
        agent_id = UUID(req.agent_id)
 
        # Step 1: Calculate token budget for this model
        budget = self._budget.calculate(model=req.model, messages=req.messages)
 
        # Step 2: Parallel retrieval (40ms deadline)
        retrieval = await asyncio.wait_for(
            self._retriever.retrieve(
                org_id=org_id, agent_id=agent_id,
                session_id=req.session_id or None,
                messages=req.messages,
                budget=budget,
            ),
            timeout=RETRIEVAL_DEADLINE_SECONDS,
        )
 
        # Step 3: Score and rank candidate memories
        scored = self._scorer.score(
            memories=retrieval.memories,
            query_embedding=retrieval.query_embedding,
        )
 
        # Step 4: Greedy knapsack — pack memories into token budget
        packed = self._packer.pack(
            scored_memories=scored,
            token_budget=budget.memory_tokens,
        )
 
        # Step 5: Format and inject into messages array
        enriched_messages, meta = self._formatter.inject(
            original_messages=req.messages,
            directive=retrieval.directive,
            packed_memories=packed,
            budget=budget,
        )
 
        return AssembleContextResponse(
            messages=enriched_messages,
            metadata=meta,
        )
 
    @staticmethod
    def _directive_only_fallback(
        req: AssembleContextRequest, reason: str
    ) -> AssembleContextResponse:
        """Return the original messages unchanged, or with directive prepended if available."""
        return AssembleContextResponse(
            messages=list(req.messages),
            metadata=InjectionMetadata(
                directive_injected=False,
                memories_injected=0,
                fallback_reason=reason,
            ),
        )

4. Go proxy client (skeleton)

Go
// services/proxy/internal/context/client.go
// Package context provides a gRPC client for the context assembly service.
package context
 
import (
    "context"
    "fmt"
    "time"
 
    "google.golang.org/grpc"
    contextv1 "github.com/Rick1330/ibex-harness/packages/proto/ibex/context/v1"
)
 
const assemblyTimeout = 45 * time.Millisecond // 5ms margin before the service's 48ms limit
 
// Client wraps the context assembly gRPC client with timeout enforcement.
type Client struct {
    inner  contextv1.ContextAssemblyServiceClient
    tracer trace.Tracer
    log    *logger.Logger
}
 
// AssembleContext enriches the messages array with the agent's memories and directive.
// On timeout or error, returns the original messages with a fallback flag set.
// This function NEVER fails the LLM request — context assembly is a quality enhancement, not a gate.
func (c *Client) AssembleContext(
    ctx context.Context,
    req *contextv1.AssembleContextRequest,
) (*contextv1.AssembleContextResponse, error) {
    ctx, cancel := context.WithTimeout(ctx, assemblyTimeout)
    defer cancel()
 
    ctx, span := c.tracer.Start(ctx, "ContextAssembly.AssembleContext")
    defer span.End()
 
    resp, err := c.inner.AssembleContext(ctx, req)
    if err != nil {
        // Do NOT propagate: return original messages, log warning
        c.log.WarnCtx(ctx, "context assembly failed; proceeding without memory injection",
            "error", err, "agent_id", req.AgentId)
        span.SetStatus(codes.Error, err.Error())
        return &contextv1.AssembleContextResponse{
            Messages: req.Messages,
            Metadata: &contextv1.InjectionMetadata{
                FallbackReason: fmt.Sprintf("grpc_error:%T", err),
            },
        }, nil
    }
    return resp, nil
}

Acceptance Criteria

  • Proto compiles cleanly with buf lint and buf breaking (no breaking changes)
  • go proto-gen and make proto-gen-python both succeed
  • gRPC server starts, registers ContextAssemblyServicer, accepts connections
  • AssembleContext RPC returns a valid response (even if empty — no memories in DB yet)
  • asyncio.wait_for enforces 48ms timeout on assembly; TimeoutError triggers directive-only fallback
  • Proxy-side Go client returns original messages on any gRPC error (never propagates context failure)
  • ADR-0038 written and indexed

Edit on GitHub

Last updated on

On this page

0%