Phase 3 memory engine

Session content (the actual messages exchanged between users and agents) is high-value but high-volume data. Storing it in Postgres alongside metadata would cause the `checkpoints` table to balloon — Postgres is not designed for multi-MB JSONB blobs at high write rates. MinIO (S3-compatible object storage) is the right

Milestone 3.7.1 — MinIO Session Content Archives

Status: Planned
Goal: 3.7 — MinIO session content archives
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 3 days
ADR required: ADR-0044 — Session content storage and retention strategy


Why This Milestone Exists

Session content (the actual messages exchanged between users and agents) is high-value but high-volume data. Storing it in Postgres alongside metadata would cause the checkpoints table to balloon — Postgres is not designed for multi-MB JSONB blobs at high write rates. MinIO (S3-compatible object storage) is the right solution: optimised for large objects, cheap at scale, and compatible with lifecycle-based retention policies.

Every completed session is archived to MinIO as a newline-delimited JSON file. The Postgres checkpoints table retains only metadata (token counts, latency, hashes). The MinIO archive holds the full conversation.


ADR-0044 — Session content storage

Document:

  • Why MinIO (not Postgres JSONB): Postgres BLOB/JSONB performance degrades past 1MB per row. A 100-turn conversation is ~50KB; storing 10M sessions would require 500GB of Postgres TOAST storage.
  • Why NDJSON format: One JSON object per line; streamable; easily imported into data pipelines.
  • Archive path: {org_id}/{agent_id}/{YYYY}/{MM}/{session_id}.ndjson — date-partitioned for lifecycle policy application.
  • Retention policy: Default 90 days (configurable per org). MinIO lifecycle rules auto-delete expired archives.
  • GDPR delete: When an org or agent is deleted, all archives are deleted via MinIO object expiry tag override (immediate deletion, not wait for lifecycle).
  • Pre-signed URLs: 15-minute expiry; generated only on explicit operator request (session replay).

Deliverables

packages/python/minio/ — shared MinIO client

Python
# packages/python/minio/ibex_minio/client.py
from __future__ import annotations
 
import io
import json
from datetime import timedelta
from typing import AsyncGenerator, Iterator
 
from minio import Minio
from minio.commonconfig import Tags
 
BUCKET_NAME  = "ibex-sessions"
ARCHIVE_EXPIRY_MINUTES = 15
 
class SessionArchiveClient:
    """
    MinIO client for session content archives.
    All operations are synchronous (Minio Python SDK is sync);
    call from a thread pool if used in async context.
    """
 
    def __init__(self, endpoint: str, access_key: str, secret_key: str, secure: bool = True) -> None:
        self._minio = Minio(endpoint, access_key=access_key, secret_key=secret_key, secure=secure)
        self._ensure_bucket()
 
    def _ensure_bucket(self) -> None:
        if not self._minio.bucket_exists(BUCKET_NAME):
            self._minio.make_bucket(BUCKET_NAME)
 
    def _object_path(self, org_id: str, agent_id: str, session_id: str, year: int, month: int) -> str:
        return f"{org_id}/{agent_id}/{year:04d}/{month:02d}/{session_id}.ndjson"
 
    def write_archive(
        self,
        org_id: str,
        agent_id: str,
        session_id: str,
        checkpoints: list[dict],
        year: int,
        month: int,
    ) -> str:
        """
        Write all checkpoints as NDJSON to MinIO.
        Returns the object path.
        """
        path = self._object_path(org_id, agent_id, session_id, year, month)
        ndjson = "\n".join(json.dumps(cp) for cp in checkpoints) + "\n"
        data = ndjson.encode("utf-8")
        self._minio.put_object(
            BUCKET_NAME, path,
            data=io.BytesIO(data),
            length=len(data),
            content_type="application/x-ndjson",
            metadata={"org-id": org_id, "agent-id": agent_id, "session-id": session_id},
        )
        return path
 
    def get_presigned_url(self, object_path: str) -> str:
        """Generate a pre-signed URL for session replay. Valid for 15 minutes."""
        return self._minio.presigned_get_object(
            BUCKET_NAME, object_path,
            expires=timedelta(minutes=ARCHIVE_EXPIRY_MINUTES),
        )
 
    def delete_org_archives(self, org_id: str) -> int:
        """GDPR deletion: delete all archives for an org. Returns count deleted."""
        objects = self._minio.list_objects(BUCKET_NAME, prefix=f"{org_id}/", recursive=True)
        count = 0
        for obj in objects:
            self._minio.remove_object(BUCKET_NAME, obj.object_name)
            count += 1
        return count

Archive trigger in proxy (after session completion)

Go
// In the async post-response goroutine (services/proxy):
// After AppendCheckpoint succeeds and session is marked complete:
go func() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    if err := archiveEnqueuer.EnqueueArchive(ctx, sessionID, orgID, agentID); err != nil {
        log.WarnCtx(ctx, "archive enqueue failed", "session_id", sessionID, "error", err)
    }
}()
// Archive is written by the worker (Celery task), not the proxy directly.

Acceptance Criteria

  • Completed session produces NDJSON archive in MinIO within 30 seconds
  • Archive path includes org_id, agent_id, YYYY/MM for lifecycle partitioning
  • Pre-signed URL expires in 15 minutes
  • GDPR deletion removes all org archives (tested with MinIO testcontainer)
  • Archive write failure does not affect LLM response or session completion
  • ADR-0044 written with retention policy and GDPR procedure

Edit on GitHub

Last updated on

On this page

0%