Session content (the actual messages exchanged between users and agents) is high-value but high-volume data. Storing it in Postgres alongside metadata would cause the `checkpoints` table to balloon — Postgres is not designed for multi-MB JSONB blobs at high write rates. MinIO (S3-compatible object storage) is the right
Milestone 3.7.1 — MinIO Session Content Archives
Status: Planned
Goal: 3.7 — MinIO session content archives
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 3 days
ADR required: ADR-0044 — Session content storage and retention strategy
Why This Milestone Exists
Session content (the actual messages exchanged between users and agents) is high-value but high-volume data. Storing it in Postgres alongside metadata would cause the checkpoints table to balloon — Postgres is not designed for multi-MB JSONB blobs at high write rates. MinIO (S3-compatible object storage) is the right solution: optimised for large objects, cheap at scale, and compatible with lifecycle-based retention policies.
Every completed session is archived to MinIO as a newline-delimited JSON file. The Postgres checkpoints table retains only metadata (token counts, latency, hashes). The MinIO archive holds the full conversation.
ADR-0044 — Session content storage
Document:
- Why MinIO (not Postgres JSONB): Postgres BLOB/JSONB performance degrades past 1MB per row. A 100-turn conversation is ~50KB; storing 10M sessions would require 500GB of Postgres TOAST storage.
- Why NDJSON format: One JSON object per line; streamable; easily imported into data pipelines.
- Archive path:
{org_id}/{agent_id}/{YYYY}/{MM}/{session_id}.ndjson— date-partitioned for lifecycle policy application. - Retention policy: Default 90 days (configurable per org). MinIO lifecycle rules auto-delete expired archives.
- GDPR delete: When an org or agent is deleted, all archives are deleted via MinIO object expiry tag override (immediate deletion, not wait for lifecycle).
- Pre-signed URLs: 15-minute expiry; generated only on explicit operator request (session replay).
Deliverables
packages/python/minio/ — shared MinIO client
# packages/python/minio/ibex_minio/client.py
from __future__ import annotations
import io
import json
from datetime import timedelta
from typing import AsyncGenerator, Iterator
from minio import Minio
from minio.commonconfig import Tags
BUCKET_NAME = "ibex-sessions"
ARCHIVE_EXPIRY_MINUTES = 15
class SessionArchiveClient:
"""
MinIO client for session content archives.
All operations are synchronous (Minio Python SDK is sync);
call from a thread pool if used in async context.
"""
def __init__(self, endpoint: str, access_key: str, secret_key: str, secure: bool = True) -> None:
self._minio = Minio(endpoint, access_key=access_key, secret_key=secret_key, secure=secure)
self._ensure_bucket()
def _ensure_bucket(self) -> None:
if not self._minio.bucket_exists(BUCKET_NAME):
self._minio.make_bucket(BUCKET_NAME)
def _object_path(self, org_id: str, agent_id: str, session_id: str, year: int, month: int) -> str:
return f"{org_id}/{agent_id}/{year:04d}/{month:02d}/{session_id}.ndjson"
def write_archive(
self,
org_id: str,
agent_id: str,
session_id: str,
checkpoints: list[dict],
year: int,
month: int,
) -> str:
"""
Write all checkpoints as NDJSON to MinIO.
Returns the object path.
"""
path = self._object_path(org_id, agent_id, session_id, year, month)
ndjson = "\n".join(json.dumps(cp) for cp in checkpoints) + "\n"
data = ndjson.encode("utf-8")
self._minio.put_object(
BUCKET_NAME, path,
data=io.BytesIO(data),
length=len(data),
content_type="application/x-ndjson",
metadata={"org-id": org_id, "agent-id": agent_id, "session-id": session_id},
)
return path
def get_presigned_url(self, object_path: str) -> str:
"""Generate a pre-signed URL for session replay. Valid for 15 minutes."""
return self._minio.presigned_get_object(
BUCKET_NAME, object_path,
expires=timedelta(minutes=ARCHIVE_EXPIRY_MINUTES),
)
def delete_org_archives(self, org_id: str) -> int:
"""GDPR deletion: delete all archives for an org. Returns count deleted."""
objects = self._minio.list_objects(BUCKET_NAME, prefix=f"{org_id}/", recursive=True)
count = 0
for obj in objects:
self._minio.remove_object(BUCKET_NAME, obj.object_name)
count += 1
return countArchive trigger in proxy (after session completion)
// In the async post-response goroutine (services/proxy):
// After AppendCheckpoint succeeds and session is marked complete:
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := archiveEnqueuer.EnqueueArchive(ctx, sessionID, orgID, agentID); err != nil {
log.WarnCtx(ctx, "archive enqueue failed", "session_id", sessionID, "error", err)
}
}()
// Archive is written by the worker (Celery task), not the proxy directly.Acceptance Criteria
- Completed session produces NDJSON archive in MinIO within 30 seconds
- Archive path includes org_id, agent_id, YYYY/MM for lifecycle partitioning
- Pre-signed URL expires in 15 minutes
- GDPR deletion removes all org archives (tested with MinIO testcontainer)
- Archive write failure does not affect LLM response or session completion
- ADR-0044 written with retention policy and GDPR procedure
Last updated on