phase 2 single provider

Streaming is non-optional for production AI applications. Users expect to see tokens appear immediately — waiting for the full response before displaying anything (non-streaming) is a noticeably worse user experience. Every major AI product (ChatGPT, GitHub Copilot, Cursor) streams. Any proxy that doesn't support strea

Milestone 2.1.3 — OpenAI Streaming Forwarder (SSE Dual-Write)

Status: Planned
Goal: 2.1 — LLM provider abstraction and OpenAI forwarding
Phase: 2 — Single Provider End-to-End
Estimated effort: 4–5 days
ADR required: ADR-0024 — Streaming response strategy and dual-write design


Why This Milestone Exists

Streaming is non-optional for production AI applications. Users expect to see tokens appear immediately — waiting for the full response before displaying anything (non-streaming) is a noticeably worse user experience. Every major AI product (ChatGPT, GitHub Copilot, Cursor) streams. Any proxy that doesn't support streaming cannot serve real production workloads.

Streaming in the proxy creates a unique technical challenge: the proxy must simultaneously forward bytes to the caller (for low TTFB) and accumulate them (to extract token counts for session checkpoints and traces). This is the "dual-write" design. Both writes must be correct:

  • Forward must be byte-exact (SSE format must be preserved; clients parse it directly)
  • Accumulation must be complete (full response needed for post-request processing)

Failure mode analysis:

  • Forward failure: client loses the stream → error response (proxy retry is not possible for SSE)
  • Accumulation failure: trace and session checkpoint are incomplete → acceptable degraded state (log warning, continue)

Non-Goals

  • WebSocket streaming (Phase 4)
  • Response caching for identical prompts (Phase 4)
  • Streaming from other providers (Phase 4)

Branch

feature/m2-1-3-openai-streaming-forwarder

PR Title

feat(proxy): OpenAI streaming forwarder with SSE dual-write (m2.1.3)


ADR-0024 — Streaming strategy

Write docs/adr/ADR-0024-streaming-dual-write.md covering:

  • Why io.TeeReader over goroutine fan-out: io.TeeReader copies bytes from the OpenAI connection to both the client writer and an accumulation buffer in a single read path. A goroutine fan-out would require a channel between goroutines and introduces ordering complexity.
  • Why accumulation is best-effort: The stream forward cannot be blocked by accumulation failures. A slow accumulation write causes the client to receive bytes late, which is worse than missing a trace.
  • Why SSE format is preserved verbatim: IBEX does not re-frame or transform the SSE stream. Any transformation risks incompatibility with provider-specific SSE extensions (tool call streaming, content filtering chunks). We pass through exactly what OpenAI sends.
  • Termination detection: The SSE stream ends with data: [DONE]\n\n. The accumulator reads until this sentinel, then signals the post-request async pipeline.

Deliverables

1. Streaming path in openai.Client.Complete

Go
// When req.Stream = true:
// 1. Send request to OpenAI with Accept: text/event-stream
// 2. Verify response is 200 Content-Type: text/event-stream
// 3. Set response headers on the client connection BEFORE reading the body:
//    Content-Type: text/event-stream
//    Cache-Control: no-cache
//    X-Accel-Buffering: no  (disables Nginx buffering if present)
// 4. Flush headers to the client immediately
// 5. Use io.TeeReader to dual-write:
//    - Primary: http.ResponseWriter (forward to client)
//    - Secondary: accumulation buffer (for token counting + session checkpoint)
// 6. Flush the ResponseWriter after each SSE chunk (not after the full stream)
// 7. On [DONE]: signal post-request pipeline with accumulated content
Go
// StreamAccumulator collects the full content from a streaming response
// without blocking the stream forward path.
// It is safe for concurrent read (from TeeReader) and post-stream read (by trace emitter).
type StreamAccumulator struct {
    mu      sync.Mutex
    chunks  []openAIStreamChunk
    done    chan struct{}
    content strings.Builder // accumulated completion text
    usage   *provider.Usage // from the final usage chunk
}
 
// Wait blocks until the stream is complete or ctx is cancelled.
// Returns the accumulated content and usage after the stream ends.
func (a *StreamAccumulator) Wait(ctx context.Context) (string, *provider.Usage, error)

2. http.Flusher enforcement

Go
// SSE requires the ResponseWriter to flush after each chunk.
// Validate that the ResponseWriter supports flushing at handler entry.
flusher, ok := w.(http.Flusher)
if !ok {
    // This should never happen with Go's net/http server, but guard defensively.
    log.ErrorCtx(ctx, "ResponseWriter does not support http.Flusher; streaming not possible")
    apierror.WriteHTTP(w, apierror.CodeInternalError, "Streaming not supported by server", requestID)
    return
}

3. Flush after each SSE chunk

Go
// In the copy loop — flush after each complete SSE event (ends with \n\n):
for scanner.Scan() {
    line := scanner.Bytes()
    w.Write(line)
    w.Write([]byte("\n"))
    if len(line) == 0 { // blank line = end of SSE event
        flusher.Flush()
    }
}

Testing Requirements

  • TestStreaming_ForwardsChunksInOrder: mock provider sends 5 SSE chunks; client receives them in order and within 10ms of each being sent
  • TestStreaming_AccumulatorComplete: accumulator holds the full concatenated content after stream ends
  • TestStreaming_DoneSignalReached: [DONE] chunk triggers StreamAccumulator.Wait() to return
  • TestStreaming_ProviderErrorMidStream: provider closes connection after 3 chunks → client receives partial stream + error, trace emitter receives is_complete=false
  • TestStreaming_ClientDisconnect: client disconnects mid-stream → proxy stops reading from provider (context cancelled), no goroutine leak

Acceptance Criteria

  • Streaming completions forward SSE bytes to client in real time (flush after each event)
  • X-Accel-Buffering: no header set to prevent Nginx buffering
  • StreamAccumulator collects full content without blocking the forward path
  • Provider error mid-stream results in partial stream + trace with is_complete=false (not a server crash)
  • Client disconnect cancels context; proxy stops forwarding and cleans up (no goroutine leak)
  • ADR-0024 written and indexed

Edit on GitHub

Last updated on

On this page

0%