All previous context assembly milestones built the Python service. This milestone wires the gRPC call into the Go proxy's request handler — between directive routing and provider forwarding. It also wires the async post-response signal that triggers memory extraction after each session checkpoint. The proxy integration
Milestone 3.5.7 — Proxy Integration: Context Assembly in the Hot Path (Go)
Status: Planned
Goal: 3.5 — Context assembly engine
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 3 days
ADR required: None (follows ADR-0038 proxy integration contract)
Why This Milestone Exists
All previous context assembly milestones built the Python service. This milestone wires the gRPC call into the Go proxy's request handler — between directive routing and provider forwarding. It also wires the async post-response signal that triggers memory extraction after each session checkpoint.
The proxy integration has three non-trivial concerns:
- Context assembly is not a middleware — it is an operation inside the handler, after provider routing but before the forward. Unlike auth or rate limiting, it requires the parsed request body (messages array).
- Failure must not fail the request — the Go client (3.5.1) already handles this, but the integration must correctly use the fallback response.
- Memory extraction trigger — after each session checkpoint is written (async), the proxy must enqueue a Celery task via Redis. This is a Redis LPUSH to the
extractionqueue — not a gRPC call.
Deliverables
Updated middleware chain (Phase 3 complete)
RequestID
→ OTel Span
→ Auth (LRU cache → gRPC fallback)
→ AgentVerify (LRU cache → gRPC fallback)
→ RateLimit (Redis)
→ DirectiveRouter (select provider)
→ [Handler]
├── Parse & validate request body (already done in M1.2.2)
├── AssembleContext gRPC call (45ms timeout; fallback = original messages)
├── Inject metadata into request (X-IBEX-Memories-Injected header for client)
├── Forward to LLM provider (streaming or non-streaming)
└── [async post-response]
├── Write session checkpoint (M2.4.3)
├── Emit ClickHouse trace (M2.5.3)
└── Enqueue extraction task (NEW in M3.5.7)Memory extraction task enqueue
// services/proxy/internal/worker/enqueuer.go
// Package worker provides a Redis-based Celery task enqueuer.
// The proxy does not import Celery; it pushes to the Redis queue directly.
package worker
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)
// CeleryTask is the minimal Celery task message format for Redis broker.
// Format: LPUSH extraction {"id":"...", "task":"...", "args":[...], "kwargs":{...}}
type CeleryTask struct {
ID string `json:"id"`
Task string `json:"task"`
Args []any `json:"args"`
Kwargs map[string]any `json:"kwargs"`
Retries int `json:"retries"`
ETA *time.Time `json:"eta"`
}
// Enqueuer pushes Celery tasks to a Redis queue.
type Enqueuer struct {
client redis.UniversalClient
log *logger.Logger
}
// EnqueueExtraction pushes an extraction task for the given session.
// Non-blocking: the task is pushed to Redis and the function returns immediately.
// Failure is logged as WARN; it does not affect the LLM response.
func (e *Enqueuer) EnqueueExtraction(ctx context.Context, sessionID, orgID uuid.UUID) {
task := CeleryTask{
ID: uuid.New().String(),
Task: "worker.tasks.extraction.extract_session_memories",
Args: []any{},
Kwargs: map[string]any{"session_id": sessionID.String(), "org_id": orgID.String()},
}
payload, err := json.Marshal(task)
if err != nil {
e.log.WarnCtx(ctx, "failed to marshal extraction task", "error", err)
return
}
if err := e.client.LPush(ctx, "extraction", payload).Err(); err != nil {
e.log.WarnCtx(ctx, "failed to enqueue extraction task",
"session_id", sessionID, "error", err)
}
}X-IBEX context headers returned to client
X-IBEX-Memories-Injected: 12 // number of memories in context
X-IBEX-Context-Tokens: 4320 // approximate tokens of injected content
X-IBEX-Context-Fallback: false // true if assembly fell back to directive-onlyAcceptance Criteria
- Context assembly gRPC called for every LLM request after provider routing
- 45ms timeout enforced; timeout produces fallback (original messages), not 503
-
X-IBEX-Memories-Injectedheader correct in every response - Extraction task enqueued asynchronously after every session checkpoint
- Enqueue failure does not affect LLM response (logged WARN only)
- Phase 1 and Phase 2 security tests still pass (no regression)
- Integration test: real end-to-end with memories in DB shows them in OTel trace spans
Last updated on