Phase 3 memory engine

All Python services (memory, context, API, worker) need typed, async database access. Without a shared model layer, each service will define its own SQLAlchemy models — leading to column name drift, forgotten indexes, and inconsistent typing. This milestone defines the canonical SQLAlchemy 2.0 async models in `packages

Milestone 3.1.2 — Python Database Models (SQLAlchemy 2.0 Async)

Status: Planned
Goal: 3.1 — Memory schema and data foundation
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 2 days


Why This Milestone Exists

All Python services (memory, context, API, worker) need typed, async database access. Without a shared model layer, each service will define its own SQLAlchemy models — leading to column name drift, forgotten indexes, and inconsistent typing. This milestone defines the canonical SQLAlchemy 2.0 async models in packages/python/db/ — a shared Python package imported by all services.


Branch

chore/m3-1-2-python-db-models

PR Title

chore(python): SQLAlchemy 2.0 async models for all ibex_core tables (m3.1.2)


Deliverables

Shared Python package: packages/python/db/

packages/python/db/
  pyproject.toml
  ibex_db/
    __init__.py
    base.py          # DeclarativeBase, MetaData, async engine factory
    models/
      __init__.py
      organization.py
      user.py
      agent.py
      token.py
      directive.py
      session.py
      checkpoint.py
      memory.py       # the most complex model
      memory_version.py
      memory_relationship.py
      memory_tag.py
    repositories/
      __init__.py
      base.py         # BaseRepository[T] with generic CRUD
      memory_repo.py  # Memory-specific queries (vector search, dedup, hot cache)
      session_repo.py
    types.py          # pgvector type, custom Postgres types

Key model: ibex_db/models/memory.py

Python
from __future__ import annotations
 
from decimal import Decimal
from uuid import UUID
import enum
 
from pgvector.sqlalchemy import Vector
from sqlalchemy import (
    CheckConstraint, DateTime, ForeignKey, Index, String,
    Text, UniqueConstraint, func, text,
)
from sqlalchemy.dialects.postgresql import JSONB, UUID as PGUUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
 
from ibex_db.base import Base
 
class MemoryCategory(str, enum.Enum):
    FACTUAL    = "factual"
    PREFERENCE = "preference"
    BEHAVIORAL = "behavioral"
    EPISODIC   = "episodic"
    PROCEDURAL = "procedural"
 
class MemorySource(str, enum.Enum):
    EXTRACTED      = "extracted"
    USER_PROVIDED  = "user_provided"
    IMPORTED       = "imported"
    INFERRED       = "inferred"
 
class MemoryStatus(str, enum.Enum):
    ACTIVE         = "active"
    SUPERSEDED     = "superseded"
    MERGED         = "merged"
    ARCHIVED       = "archived"
    PENDING_REVIEW = "pending_review"
 
class Memory(Base):
    __tablename__ = "memories"
    __table_args__ = (
        UniqueConstraint("org_id", "agent_id", "content_hash",
                         name="memories_content_hash_org",
                         postgresql_where=text("status = 'active' AND deleted_at IS NULL")),
        CheckConstraint("length(content) > 0",         name="memories_content_not_empty"),
        CheckConstraint("length(content) <= 8192",     name="memories_content_max_length"),
        CheckConstraint("confidence >= 0.000 AND confidence <= 1.000", name="memories_confidence_range"),
        {"schema": "ibex_core"},
    )
 
    id:               Mapped[UUID]          = mapped_column(PGUUID, primary_key=True, server_default=func.gen_random_uuid())
    org_id:           Mapped[UUID]          = mapped_column(PGUUID, ForeignKey("ibex_core.organizations.id", ondelete="CASCADE"), nullable=False, index=True)
    agent_id:         Mapped[UUID]          = mapped_column(PGUUID, ForeignKey("ibex_core.agents.id", ondelete="CASCADE"), nullable=False)
    session_id:       Mapped[UUID | None]   = mapped_column(PGUUID, ForeignKey("ibex_core.sessions.id", ondelete="SET NULL"), nullable=True)
    content:          Mapped[str]           = mapped_column(Text, nullable=False)
    content_hash:     Mapped[str]           = mapped_column(String(64), nullable=False)
    embedding:        Mapped[list[float]]   = mapped_column(Vector(384), nullable=False)
    category:         Mapped[MemoryCategory]= mapped_column(String(32), nullable=False)
    source:           Mapped[MemorySource]  = mapped_column(String(32), nullable=False, server_default="extracted")
    status:           Mapped[MemoryStatus]  = mapped_column(String(32), nullable=False, server_default="active")
    confidence:       Mapped[Decimal]       = mapped_column(nullable=False, server_default="0.800")
    usefulness_score: Mapped[Decimal]       = mapped_column(nullable=False, server_default="0.500")
    retrieval_count:  Mapped[int]           = mapped_column(nullable=False, server_default="0")
    last_retrieved_at:Mapped[DateTime|None] = mapped_column(nullable=True)
    last_injected_at: Mapped[DateTime|None] = mapped_column(nullable=True)
    superseded_by:    Mapped[UUID | None]   = mapped_column(PGUUID, ForeignKey("ibex_core.memories.id", ondelete="SET NULL"), nullable=True)
    merged_into:      Mapped[UUID | None]   = mapped_column(PGUUID, ForeignKey("ibex_core.memories.id", ondelete="SET NULL"), nullable=True)
    expires_at:       Mapped[DateTime|None] = mapped_column(nullable=True)
    deleted_at:       Mapped[DateTime|None] = mapped_column(nullable=True)
    metadata_:        Mapped[dict]          = mapped_column("metadata", JSONB, nullable=False, server_default="{}")
    created_at:       Mapped[DateTime]      = mapped_column(nullable=False, server_default=func.now())
    updated_at:       Mapped[DateTime]      = mapped_column(nullable=False, server_default=func.now(), onupdate=func.now())
 
    # Relationships
    tags: Mapped[list["MemoryTag"]] = relationship("MemoryTag", back_populates="memory", lazy="select")
    versions: Mapped[list["MemoryVersion"]] = relationship("MemoryVersion", back_populates="memory", order_by="MemoryVersion.version_num", lazy="select")

Base repository pattern

Python
# ibex_db/repositories/base.py
from __future__ import annotations
from typing import Generic, TypeVar
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
 
T = TypeVar("T")
 
class BaseRepository(Generic[T]):
    """
    Generic async repository. All queries must include org_id for multi-tenant isolation.
    SET LOCAL app.current_org_id is called by the transaction context manager before
    any query, enabling RLS at the DB level as the second line of defence.
    """
    def __init__(self, session: AsyncSession, org_id: UUID) -> None:
        self.session = session
        self.org_id = org_id
 
    async def _set_rls_context(self) -> None:
        """Set the RLS session variable before any query."""
        await self.session.execute(
            text("SET LOCAL app.current_org_id = :org_id"),
            {"org_id": str(self.org_id)},
        )

Acceptance Criteria

  • All ibex_core tables have corresponding SQLAlchemy 2.0 mapped models
  • Memory model uses Vector(384) from pgvector.sqlalchemy
  • BaseRepository sets app.current_org_id before every query
  • All models importable from ibex_db
  • pyproject.toml specifies all dependencies with pinned versions
  • Unit tests: model instantiation, column defaults, relationship loading

Edit on GitHub

Last updated on

On this page

0%