The context assembly engine is the most architecturally significant addition in Phase 3. It sits on the critical path of every LLM request. In Phase 2, the proxy handled directive injection inline (a Redis lookup). In Phase 3, context injection is far more complex — it involves querying memories, scoring and ranking hu
Milestone 3.5.1 — Context Assembly Engine Skeleton (Python gRPC Server)
Status: Planned
Goal: 3.5 — Context assembly engine
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 3 days
ADR required: ADR-0038 — Context assembly service design and gRPC contract
Why This Milestone Exists
The context assembly engine is the most architecturally significant addition in Phase 3. It sits on the critical path of every LLM request. In Phase 2, the proxy handled directive injection inline (a Redis lookup). In Phase 3, context injection is far more complex — it involves querying memories, scoring and ranking hundreds of candidates, and fitting them within a model-specific token budget. This logic is too complex, too Python-heavy (ML scoring, tiktoken, numpy), and too likely to change to live inside the Go proxy.
The correct architecture is a dedicated Python gRPC service. The proxy calls it with the agent ID, session ID, current messages, and target model. The service returns enriched messages ready to send to the LLM.
This milestone establishes the service skeleton, protobuf contract, gRPC server, and the proxy-side Go client — without the domain logic (scoring, packing) which comes in later milestones.
Non-Goals
- Memory scoring algorithm (3.5.4)
- Greedy knapsack packing (3.5.5)
- Context formatting logic (3.5.6)
- Proxy hot-path integration (3.5.7)
- Compression/summarisation of large contexts (Phase 4)
Branch
feat/m3-5-1-context-assembly-skeleton
PR Title
feat(context): context assembly gRPC service skeleton and proto contract (m3.5.1)
ADR-0038 — Context assembly service design
Write ADR-0038 covering:
- Why a separate service (not inline in proxy): Memory ranking requires NumPy vectorised operations; tiktoken for accurate token counting; complex caching logic. None of this belongs in a Go binary.
- Why gRPC (not HTTP): The proxy already uses gRPC for auth. gRPC provides a typed contract, bidirectional streaming support for Phase 4, and lower per-call overhead than JSON/HTTP for a <50ms latency target.
- Graceful degradation contract: If the context assembly gRPC call exceeds 45ms (5ms before the proxy's 50ms budget for the full context path), the proxy cancels the call and continues with directive-only context. This is the most important failure mode to handle correctly.
- Why betterproto for Python gRPC: betterproto generates Python dataclasses instead of protobuf message objects. This means standard type hints, mypy compatibility, and clean
async/awaitsyntax instead of callback-based gRPC stubs. - The 40ms retrieval deadline: The parallel retrieval phase (directive + hot cache + cold search) has a 40ms hard deadline enforced by
asyncio.wait_for. If any retrieval component exceeds this, it returns partial results. The system never waits for all components before proceeding.
Deliverables
1. Proto definition
// packages/proto/ibex/context/v1/context.proto
syntax = "proto3";
package ibex.context.v1;
option go_package = "github.com/Rick1330/ibex-harness/packages/proto/ibex/context/v1;contextv1";
option python_package = "ibex_proto.context.v1";
import "google/protobuf/timestamp.proto";
// AssembleContext is called by the proxy on every LLM request.
// It returns the enriched messages array with memories and directive injected.
service ContextAssemblyService {
rpc AssembleContext(AssembleContextRequest) returns (AssembleContextResponse);
}
message AssembleContextRequest {
string org_id = 1; // (required) UUID string
string agent_id = 2; // (required) UUID string
string session_id = 3; // (optional) UUID string; empty if new session
string model = 4; // (required) e.g. "gpt-4o", "gpt-4o-mini"
string request_id = 5; // (required) for tracing/logging
// The original messages from the client (before injection).
// The service returns an enriched version of this array.
repeated Message messages = 6;
}
message AssembleContextResponse {
// Enriched messages ready to send to the LLM provider.
// The service may add, reorder, or modify messages.
// The client's original messages are always preserved in order;
// only injected content (directive, memories) is prepended.
repeated Message messages = 1;
// Metadata about what was injected (for tracing and dashboard display).
InjectionMetadata metadata = 2;
}
message Message {
string role = 1; // "system", "user", "assistant"
string content = 2;
}
message InjectionMetadata {
bool directive_injected = 1;
int32 memories_injected = 2; // count of memories included
int32 memories_available = 3; // count of memories found (before token truncation)
int32 total_tokens_injected = 4; // approximate token count of injected content
int32 context_window_used = 5; // tokens used as % of context window
bool was_truncated = 6; // true if memories were cut to fit token budget
string fallback_reason = 7; // non-empty if assembly fell back to directive-only
repeated string memory_ids = 8; // UUIDs of injected memories (for hot cache score update)
}2. Service structure
services/context/
pyproject.toml
Dockerfile
.env.example
src/
context/
__init__.py
app.py # gRPC server startup
settings.py # pydantic-settings
grpc_server.py # ContextAssemblyServicer implementation
services/
budget.py # Token budget calculator (3.5.2)
retrieval.py # Parallel retrieval orchestrator (3.5.3)
scorer.py # Composite scoring (3.5.4)
packer.py # Greedy knapsack (3.5.5)
formatter.py # Context formatting and injection (3.5.6)
clients/
memory_client.py # HTTP client to memory service
embedder_client.py # HTTP client to embedding service
redis_client.py # Direct Redis access for hot cache + directive
proto/
# generated by make proto-gen
tests/3. gRPC server with graceful degradation
# src/context/grpc_server.py
from __future__ import annotations
import asyncio
import time
from uuid import UUID
import grpclib.server
from ibex_proto.context.v1 import (
ContextAssemblyBase,
AssembleContextRequest,
AssembleContextResponse,
InjectionMetadata,
Message,
)
from context.services.budget import TokenBudgetCalculator
from context.services.retrieval import ContextRetriever
from context.services.scorer import MemoryScorer
from context.services.packer import GreedyKnapsackPacker
from context.services.formatter import ContextFormatter
from context import logger
RETRIEVAL_DEADLINE_SECONDS = 0.040 # 40ms hard deadline on parallel retrieval
ASSEMBLY_DEADLINE_SECONDS = 0.048 # 48ms total — gives 2ms margin before proxy times out
class ContextAssemblyServicer(ContextAssemblyBase):
"""
gRPC servicer for context assembly.
Graceful degradation contract:
- If ANY part of assembly exceeds ASSEMBLY_DEADLINE_SECONDS:
return directive-only context with metadata.fallback_reason set.
- If directive retrieval fails: return original messages unchanged.
- If memory retrieval partially fails: use whatever was retrieved in time.
The proxy MUST enforce its own timeout (45ms) independently.
Never trust the context service to stay within budget.
"""
def __init__(
self,
retriever: ContextRetriever,
budget_calc: TokenBudgetCalculator,
scorer: MemoryScorer,
packer: GreedyKnapsackPacker,
formatter: ContextFormatter,
) -> None:
self._retriever = retriever
self._budget = budget_calc
self._scorer = scorer
self._packer = packer
self._formatter = formatter
async def assemble_context(
self,
stream: grpclib.server.Stream[AssembleContextRequest, AssembleContextResponse],
) -> None:
request: AssembleContextRequest = await stream.recv_message()
start = time.monotonic()
try:
response = await asyncio.wait_for(
self._assemble(request),
timeout=ASSEMBLY_DEADLINE_SECONDS,
)
except asyncio.TimeoutError:
logger.WarnCtx(
f"context assembly timeout for agent={request.agent_id} "
f"request_id={request.request_id}"
)
response = self._directive_only_fallback(
request, reason="assembly_timeout"
)
except Exception as exc:
logger.ErrorCtx(f"context assembly error: {exc}", request_id=request.request_id)
response = self._directive_only_fallback(
request, reason=f"assembly_error:{type(exc).__name__}"
)
elapsed_ms = int((time.monotonic() - start) * 1000)
# Emit latency metric regardless of success/fallback
metrics.context_assembly_duration.observe(elapsed_ms / 1000)
await stream.send_message(response)
async def _assemble(self, req: AssembleContextRequest) -> AssembleContextResponse:
org_id = UUID(req.org_id)
agent_id = UUID(req.agent_id)
# Step 1: Calculate token budget for this model
budget = self._budget.calculate(model=req.model, messages=req.messages)
# Step 2: Parallel retrieval (40ms deadline)
retrieval = await asyncio.wait_for(
self._retriever.retrieve(
org_id=org_id, agent_id=agent_id,
session_id=req.session_id or None,
messages=req.messages,
budget=budget,
),
timeout=RETRIEVAL_DEADLINE_SECONDS,
)
# Step 3: Score and rank candidate memories
scored = self._scorer.score(
memories=retrieval.memories,
query_embedding=retrieval.query_embedding,
)
# Step 4: Greedy knapsack — pack memories into token budget
packed = self._packer.pack(
scored_memories=scored,
token_budget=budget.memory_tokens,
)
# Step 5: Format and inject into messages array
enriched_messages, meta = self._formatter.inject(
original_messages=req.messages,
directive=retrieval.directive,
packed_memories=packed,
budget=budget,
)
return AssembleContextResponse(
messages=enriched_messages,
metadata=meta,
)
@staticmethod
def _directive_only_fallback(
req: AssembleContextRequest, reason: str
) -> AssembleContextResponse:
"""Return the original messages unchanged, or with directive prepended if available."""
return AssembleContextResponse(
messages=list(req.messages),
metadata=InjectionMetadata(
directive_injected=False,
memories_injected=0,
fallback_reason=reason,
),
)4. Go proxy client (skeleton)
// services/proxy/internal/context/client.go
// Package context provides a gRPC client for the context assembly service.
package context
import (
"context"
"fmt"
"time"
"google.golang.org/grpc"
contextv1 "github.com/Rick1330/ibex-harness/packages/proto/ibex/context/v1"
)
const assemblyTimeout = 45 * time.Millisecond // 5ms margin before the service's 48ms limit
// Client wraps the context assembly gRPC client with timeout enforcement.
type Client struct {
inner contextv1.ContextAssemblyServiceClient
tracer trace.Tracer
log *logger.Logger
}
// AssembleContext enriches the messages array with the agent's memories and directive.
// On timeout or error, returns the original messages with a fallback flag set.
// This function NEVER fails the LLM request — context assembly is a quality enhancement, not a gate.
func (c *Client) AssembleContext(
ctx context.Context,
req *contextv1.AssembleContextRequest,
) (*contextv1.AssembleContextResponse, error) {
ctx, cancel := context.WithTimeout(ctx, assemblyTimeout)
defer cancel()
ctx, span := c.tracer.Start(ctx, "ContextAssembly.AssembleContext")
defer span.End()
resp, err := c.inner.AssembleContext(ctx, req)
if err != nil {
// Do NOT propagate: return original messages, log warning
c.log.WarnCtx(ctx, "context assembly failed; proceeding without memory injection",
"error", err, "agent_id", req.AgentId)
span.SetStatus(codes.Error, err.Error())
return &contextv1.AssembleContextResponse{
Messages: req.Messages,
Metadata: &contextv1.InjectionMetadata{
FallbackReason: fmt.Sprintf("grpc_error:%T", err),
},
}, nil
}
return resp, nil
}Acceptance Criteria
- Proto compiles cleanly with
buf lintandbuf breaking(no breaking changes) -
go proto-genandmake proto-gen-pythonboth succeed - gRPC server starts, registers
ContextAssemblyServicer, accepts connections -
AssembleContextRPC returns a valid response (even if empty — no memories in DB yet) -
asyncio.wait_forenforces 48ms timeout on assembly;TimeoutErrortriggers directive-only fallback - Proxy-side Go client returns original messages on any gRPC error (never propagates context failure)
- ADR-0038 written and indexed
Last updated on