All Python services (memory, context, API, worker) need typed, async database access. Without a shared model layer, each service will define its own SQLAlchemy models — leading to column name drift, forgotten indexes, and inconsistent typing. This milestone defines the canonical SQLAlchemy 2.0 async models in `packages
Milestone 3.1.2 — Python Database Models (SQLAlchemy 2.0 Async)
Status: Planned
Goal: 3.1 — Memory schema and data foundation
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 2 days
Why This Milestone Exists
All Python services (memory, context, API, worker) need typed, async database access. Without a shared model layer, each service will define its own SQLAlchemy models — leading to column name drift, forgotten indexes, and inconsistent typing. This milestone defines the canonical SQLAlchemy 2.0 async models in packages/python/db/ — a shared Python package imported by all services.
Branch
chore/m3-1-2-python-db-models
PR Title
chore(python): SQLAlchemy 2.0 async models for all ibex_core tables (m3.1.2)
Deliverables
Shared Python package: packages/python/db/
packages/python/db/
pyproject.toml
ibex_db/
__init__.py
base.py # DeclarativeBase, MetaData, async engine factory
models/
__init__.py
organization.py
user.py
agent.py
token.py
directive.py
session.py
checkpoint.py
memory.py # the most complex model
memory_version.py
memory_relationship.py
memory_tag.py
repositories/
__init__.py
base.py # BaseRepository[T] with generic CRUD
memory_repo.py # Memory-specific queries (vector search, dedup, hot cache)
session_repo.py
types.py # pgvector type, custom Postgres typesKey model: ibex_db/models/memory.py
from __future__ import annotations
from decimal import Decimal
from uuid import UUID
import enum
from pgvector.sqlalchemy import Vector
from sqlalchemy import (
CheckConstraint, DateTime, ForeignKey, Index, String,
Text, UniqueConstraint, func, text,
)
from sqlalchemy.dialects.postgresql import JSONB, UUID as PGUUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from ibex_db.base import Base
class MemoryCategory(str, enum.Enum):
FACTUAL = "factual"
PREFERENCE = "preference"
BEHAVIORAL = "behavioral"
EPISODIC = "episodic"
PROCEDURAL = "procedural"
class MemorySource(str, enum.Enum):
EXTRACTED = "extracted"
USER_PROVIDED = "user_provided"
IMPORTED = "imported"
INFERRED = "inferred"
class MemoryStatus(str, enum.Enum):
ACTIVE = "active"
SUPERSEDED = "superseded"
MERGED = "merged"
ARCHIVED = "archived"
PENDING_REVIEW = "pending_review"
class Memory(Base):
__tablename__ = "memories"
__table_args__ = (
UniqueConstraint("org_id", "agent_id", "content_hash",
name="memories_content_hash_org",
postgresql_where=text("status = 'active' AND deleted_at IS NULL")),
CheckConstraint("length(content) > 0", name="memories_content_not_empty"),
CheckConstraint("length(content) <= 8192", name="memories_content_max_length"),
CheckConstraint("confidence >= 0.000 AND confidence <= 1.000", name="memories_confidence_range"),
{"schema": "ibex_core"},
)
id: Mapped[UUID] = mapped_column(PGUUID, primary_key=True, server_default=func.gen_random_uuid())
org_id: Mapped[UUID] = mapped_column(PGUUID, ForeignKey("ibex_core.organizations.id", ondelete="CASCADE"), nullable=False, index=True)
agent_id: Mapped[UUID] = mapped_column(PGUUID, ForeignKey("ibex_core.agents.id", ondelete="CASCADE"), nullable=False)
session_id: Mapped[UUID | None] = mapped_column(PGUUID, ForeignKey("ibex_core.sessions.id", ondelete="SET NULL"), nullable=True)
content: Mapped[str] = mapped_column(Text, nullable=False)
content_hash: Mapped[str] = mapped_column(String(64), nullable=False)
embedding: Mapped[list[float]] = mapped_column(Vector(384), nullable=False)
category: Mapped[MemoryCategory]= mapped_column(String(32), nullable=False)
source: Mapped[MemorySource] = mapped_column(String(32), nullable=False, server_default="extracted")
status: Mapped[MemoryStatus] = mapped_column(String(32), nullable=False, server_default="active")
confidence: Mapped[Decimal] = mapped_column(nullable=False, server_default="0.800")
usefulness_score: Mapped[Decimal] = mapped_column(nullable=False, server_default="0.500")
retrieval_count: Mapped[int] = mapped_column(nullable=False, server_default="0")
last_retrieved_at:Mapped[DateTime|None] = mapped_column(nullable=True)
last_injected_at: Mapped[DateTime|None] = mapped_column(nullable=True)
superseded_by: Mapped[UUID | None] = mapped_column(PGUUID, ForeignKey("ibex_core.memories.id", ondelete="SET NULL"), nullable=True)
merged_into: Mapped[UUID | None] = mapped_column(PGUUID, ForeignKey("ibex_core.memories.id", ondelete="SET NULL"), nullable=True)
expires_at: Mapped[DateTime|None] = mapped_column(nullable=True)
deleted_at: Mapped[DateTime|None] = mapped_column(nullable=True)
metadata_: Mapped[dict] = mapped_column("metadata", JSONB, nullable=False, server_default="{}")
created_at: Mapped[DateTime] = mapped_column(nullable=False, server_default=func.now())
updated_at: Mapped[DateTime] = mapped_column(nullable=False, server_default=func.now(), onupdate=func.now())
# Relationships
tags: Mapped[list["MemoryTag"]] = relationship("MemoryTag", back_populates="memory", lazy="select")
versions: Mapped[list["MemoryVersion"]] = relationship("MemoryVersion", back_populates="memory", order_by="MemoryVersion.version_num", lazy="select")Base repository pattern
# ibex_db/repositories/base.py
from __future__ import annotations
from typing import Generic, TypeVar
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
T = TypeVar("T")
class BaseRepository(Generic[T]):
"""
Generic async repository. All queries must include org_id for multi-tenant isolation.
SET LOCAL app.current_org_id is called by the transaction context manager before
any query, enabling RLS at the DB level as the second line of defence.
"""
def __init__(self, session: AsyncSession, org_id: UUID) -> None:
self.session = session
self.org_id = org_id
async def _set_rls_context(self) -> None:
"""Set the RLS session variable before any query."""
await self.session.execute(
text("SET LOCAL app.current_org_id = :org_id"),
{"org_id": str(self.org_id)},
)Acceptance Criteria
- All ibex_core tables have corresponding SQLAlchemy 2.0 mapped models
-
Memorymodel usesVector(384)frompgvector.sqlalchemy -
BaseRepositorysetsapp.current_org_idbefore every query - All models importable from
ibex_db -
pyproject.tomlspecifies all dependencies with pinned versions - Unit tests: model instantiation, column defaults, relationship loading
Last updated on