Streaming is non-optional for production AI applications. Users expect to see tokens appear immediately — waiting for the full response before displaying anything (non-streaming) is a noticeably worse user experience. Every major AI product (ChatGPT, GitHub Copilot, Cursor) streams. Any proxy that doesn't support strea
Milestone 2.1.3 — OpenAI Streaming Forwarder (SSE Dual-Write)
Status: Planned
Goal: 2.1 — LLM provider abstraction and OpenAI forwarding
Phase: 2 — Single Provider End-to-End
Estimated effort: 4–5 days
ADR required: ADR-0024 — Streaming response strategy and dual-write design
Why This Milestone Exists
Streaming is non-optional for production AI applications. Users expect to see tokens appear immediately — waiting for the full response before displaying anything (non-streaming) is a noticeably worse user experience. Every major AI product (ChatGPT, GitHub Copilot, Cursor) streams. Any proxy that doesn't support streaming cannot serve real production workloads.
Streaming in the proxy creates a unique technical challenge: the proxy must simultaneously forward bytes to the caller (for low TTFB) and accumulate them (to extract token counts for session checkpoints and traces). This is the "dual-write" design. Both writes must be correct:
- Forward must be byte-exact (SSE format must be preserved; clients parse it directly)
- Accumulation must be complete (full response needed for post-request processing)
Failure mode analysis:
- Forward failure: client loses the stream → error response (proxy retry is not possible for SSE)
- Accumulation failure: trace and session checkpoint are incomplete → acceptable degraded state (log warning, continue)
Non-Goals
- WebSocket streaming (Phase 4)
- Response caching for identical prompts (Phase 4)
- Streaming from other providers (Phase 4)
Branch
feature/m2-1-3-openai-streaming-forwarder
PR Title
feat(proxy): OpenAI streaming forwarder with SSE dual-write (m2.1.3)
ADR-0024 — Streaming strategy
Write docs/adr/ADR-0024-streaming-dual-write.md covering:
- Why io.TeeReader over goroutine fan-out:
io.TeeReadercopies bytes from the OpenAI connection to both the client writer and an accumulation buffer in a single read path. A goroutine fan-out would require a channel between goroutines and introduces ordering complexity. - Why accumulation is best-effort: The stream forward cannot be blocked by accumulation failures. A slow accumulation write causes the client to receive bytes late, which is worse than missing a trace.
- Why SSE format is preserved verbatim: IBEX does not re-frame or transform the SSE stream. Any transformation risks incompatibility with provider-specific SSE extensions (tool call streaming, content filtering chunks). We pass through exactly what OpenAI sends.
- Termination detection: The SSE stream ends with
data: [DONE]\n\n. The accumulator reads until this sentinel, then signals the post-request async pipeline.
Deliverables
1. Streaming path in openai.Client.Complete
// When req.Stream = true:
// 1. Send request to OpenAI with Accept: text/event-stream
// 2. Verify response is 200 Content-Type: text/event-stream
// 3. Set response headers on the client connection BEFORE reading the body:
// Content-Type: text/event-stream
// Cache-Control: no-cache
// X-Accel-Buffering: no (disables Nginx buffering if present)
// 4. Flush headers to the client immediately
// 5. Use io.TeeReader to dual-write:
// - Primary: http.ResponseWriter (forward to client)
// - Secondary: accumulation buffer (for token counting + session checkpoint)
// 6. Flush the ResponseWriter after each SSE chunk (not after the full stream)
// 7. On [DONE]: signal post-request pipeline with accumulated content// StreamAccumulator collects the full content from a streaming response
// without blocking the stream forward path.
// It is safe for concurrent read (from TeeReader) and post-stream read (by trace emitter).
type StreamAccumulator struct {
mu sync.Mutex
chunks []openAIStreamChunk
done chan struct{}
content strings.Builder // accumulated completion text
usage *provider.Usage // from the final usage chunk
}
// Wait blocks until the stream is complete or ctx is cancelled.
// Returns the accumulated content and usage after the stream ends.
func (a *StreamAccumulator) Wait(ctx context.Context) (string, *provider.Usage, error)2. http.Flusher enforcement
// SSE requires the ResponseWriter to flush after each chunk.
// Validate that the ResponseWriter supports flushing at handler entry.
flusher, ok := w.(http.Flusher)
if !ok {
// This should never happen with Go's net/http server, but guard defensively.
log.ErrorCtx(ctx, "ResponseWriter does not support http.Flusher; streaming not possible")
apierror.WriteHTTP(w, apierror.CodeInternalError, "Streaming not supported by server", requestID)
return
}3. Flush after each SSE chunk
// In the copy loop — flush after each complete SSE event (ends with \n\n):
for scanner.Scan() {
line := scanner.Bytes()
w.Write(line)
w.Write([]byte("\n"))
if len(line) == 0 { // blank line = end of SSE event
flusher.Flush()
}
}Testing Requirements
TestStreaming_ForwardsChunksInOrder: mock provider sends 5 SSE chunks; client receives them in order and within 10ms of each being sentTestStreaming_AccumulatorComplete: accumulator holds the full concatenated content after stream endsTestStreaming_DoneSignalReached:[DONE]chunk triggersStreamAccumulator.Wait()to returnTestStreaming_ProviderErrorMidStream: provider closes connection after 3 chunks → client receives partial stream + error, trace emitter receivesis_complete=falseTestStreaming_ClientDisconnect: client disconnects mid-stream → proxy stops reading from provider (context cancelled), no goroutine leak
Acceptance Criteria
- Streaming completions forward SSE bytes to client in real time (flush after each event)
-
X-Accel-Buffering: noheader set to prevent Nginx buffering -
StreamAccumulatorcollects full content without blocking the forward path - Provider error mid-stream results in partial stream + trace with
is_complete=false(not a server crash) - Client disconnect cancels context; proxy stops forwarding and cleans up (no goroutine leak)
- ADR-0024 written and indexed
Last updated on