Phase 3 memory engine

All previous context assembly milestones built the Python service. This milestone wires the gRPC call into the Go proxy's request handler — between directive routing and provider forwarding. It also wires the async post-response signal that triggers memory extraction after each session checkpoint. The proxy integration

Milestone 3.5.7 — Proxy Integration: Context Assembly in the Hot Path (Go)

Status: Planned
Goal: 3.5 — Context assembly engine
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 3 days
ADR required: None (follows ADR-0038 proxy integration contract)


Why This Milestone Exists

All previous context assembly milestones built the Python service. This milestone wires the gRPC call into the Go proxy's request handler — between directive routing and provider forwarding. It also wires the async post-response signal that triggers memory extraction after each session checkpoint.

The proxy integration has three non-trivial concerns:

  1. Context assembly is not a middleware — it is an operation inside the handler, after provider routing but before the forward. Unlike auth or rate limiting, it requires the parsed request body (messages array).
  2. Failure must not fail the request — the Go client (3.5.1) already handles this, but the integration must correctly use the fallback response.
  3. Memory extraction trigger — after each session checkpoint is written (async), the proxy must enqueue a Celery task via Redis. This is a Redis LPUSH to the extraction queue — not a gRPC call.

Deliverables

Updated middleware chain (Phase 3 complete)

RequestID
  → OTel Span
  → Auth (LRU cache → gRPC fallback)
  → AgentVerify (LRU cache → gRPC fallback)
  → RateLimit (Redis)
  → DirectiveRouter (select provider)
  → [Handler]
      ├── Parse & validate request body (already done in M1.2.2)
      ├── AssembleContext gRPC call (45ms timeout; fallback = original messages)
      ├── Inject metadata into request (X-IBEX-Memories-Injected header for client)
      ├── Forward to LLM provider (streaming or non-streaming)
      └── [async post-response]
          ├── Write session checkpoint (M2.4.3)
          ├── Emit ClickHouse trace (M2.5.3)
          └── Enqueue extraction task (NEW in M3.5.7)

Memory extraction task enqueue

Go
// services/proxy/internal/worker/enqueuer.go
// Package worker provides a Redis-based Celery task enqueuer.
// The proxy does not import Celery; it pushes to the Redis queue directly.
package worker
 
import (
    "context"
    "encoding/json"
    "fmt"
    "time"
 
    "github.com/google/uuid"
    "github.com/redis/go-redis/v9"
)
 
// CeleryTask is the minimal Celery task message format for Redis broker.
// Format: LPUSH extraction {"id":"...", "task":"...", "args":[...], "kwargs":{...}}
type CeleryTask struct {
    ID     string         `json:"id"`
    Task   string         `json:"task"`
    Args   []any          `json:"args"`
    Kwargs map[string]any `json:"kwargs"`
    Retries int           `json:"retries"`
    ETA    *time.Time     `json:"eta"`
}
 
// Enqueuer pushes Celery tasks to a Redis queue.
type Enqueuer struct {
    client redis.UniversalClient
    log    *logger.Logger
}
 
// EnqueueExtraction pushes an extraction task for the given session.
// Non-blocking: the task is pushed to Redis and the function returns immediately.
// Failure is logged as WARN; it does not affect the LLM response.
func (e *Enqueuer) EnqueueExtraction(ctx context.Context, sessionID, orgID uuid.UUID) {
    task := CeleryTask{
        ID:     uuid.New().String(),
        Task:   "worker.tasks.extraction.extract_session_memories",
        Args:   []any{},
        Kwargs: map[string]any{"session_id": sessionID.String(), "org_id": orgID.String()},
    }
    payload, err := json.Marshal(task)
    if err != nil {
        e.log.WarnCtx(ctx, "failed to marshal extraction task", "error", err)
        return
    }
    if err := e.client.LPush(ctx, "extraction", payload).Err(); err != nil {
        e.log.WarnCtx(ctx, "failed to enqueue extraction task",
            "session_id", sessionID, "error", err)
    }
}

X-IBEX context headers returned to client

X-IBEX-Memories-Injected: 12        // number of memories in context
X-IBEX-Context-Tokens: 4320         // approximate tokens of injected content
X-IBEX-Context-Fallback: false      // true if assembly fell back to directive-only

Acceptance Criteria

  • Context assembly gRPC called for every LLM request after provider routing
  • 45ms timeout enforced; timeout produces fallback (original messages), not 503
  • X-IBEX-Memories-Injected header correct in every response
  • Extraction task enqueued asynchronously after every session checkpoint
  • Enqueue failure does not affect LLM response (logged WARN only)
  • Phase 1 and Phase 2 security tests still pass (no regression)
  • Integration test: real end-to-end with memories in DB shows them in OTel trace spans

Edit on GitHub

Last updated on

On this page

0%