Phase 3 memory engine

The extraction worker is the async backbone of Phase 3. Without it, no memories are created — the proxy triggers extraction jobs but they go nowhere. This milestone establishes the Celery application, worker configuration, task routing, and monitoring setup. Domain logic (the actual extraction) comes in subsequent mile

Milestone 3.4.1 — Worker Service Skeleton (Celery + Redis Broker)

Status: Planned
Goal: 3.4 — Memory extraction worker
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 2 days


Why This Milestone Exists

The extraction worker is the async backbone of Phase 3. Without it, no memories are created — the proxy triggers extraction jobs but they go nowhere. This milestone establishes the Celery application, worker configuration, task routing, and monitoring setup. Domain logic (the actual extraction) comes in subsequent milestones.


Branch

feat/m3-4-1-worker-skeleton

PR Title

feat(worker): Celery worker skeleton with Redis broker and task routing (m3.4.1)


Deliverables

Service structure

services/worker/
  pyproject.toml
  Dockerfile
  .env.example
  src/
    worker/
      __init__.py
      celery_app.py      # Celery application instance
      settings.py
      tasks/
        __init__.py
        extraction.py    # Memory extraction tasks (3.4.3)
        embedding.py     # Memory embedding tasks (3.4.4)
        maintenance.py   # GC, expiry, dedup maintenance
      queues.py          # Queue definitions
  tests/

src/worker/celery_app.py

Python
from celery import Celery
from kombu import Exchange, Queue
 
from worker.settings import settings
 
app = Celery(
    "ibex-worker",
    broker=settings.celery_broker_url,    # redis://...
    backend=settings.celery_result_backend,  # redis://...
    include=[
        "worker.tasks.extraction",
        "worker.tasks.embedding",
        "worker.tasks.maintenance",
    ],
)
 
# ── Queue topology ────────────────────────────────────────────────────────
# extraction: high-priority; triggered immediately after session close
# embedding:  medium-priority; embedding computation after extraction
# maintenance: low-priority; GC, expiry, dedup sweeps (run overnight)
QUEUES = [
    Queue("extraction",  Exchange("extraction"),  routing_key="extraction",
          queue_arguments={"x-max-priority": 10}),
    Queue("embedding",   Exchange("embedding"),   routing_key="embedding"),
    Queue("maintenance", Exchange("maintenance"), routing_key="maintenance"),
]
 
app.conf.update(
    task_queues=QUEUES,
    task_default_queue="extraction",
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_acks_late=True,           # re-queue on worker crash (not-yet-acked)
    task_reject_on_worker_lost=True,
    worker_prefetch_multiplier=1,  # one task at a time per worker; prevents starvation
    task_track_started=True,
    beat_schedule={
        # Run maintenance sweep every night at 02:00 UTC
        "memory-expiry-sweep": {
            "task": "worker.tasks.maintenance.expire_old_memories",
            "schedule": crontab(hour=2, minute=0),
        },
        "session-completion-sweep": {
            "task": "worker.tasks.maintenance.complete_idle_sessions",
            "schedule": 300.0,  # every 5 minutes
        },
    },
)

Acceptance Criteria

  • celery -A worker.celery_app worker -Q extraction,embedding,maintenance starts without errors
  • Task routing: extraction tasks go to extraction queue, not celery default
  • task_acks_late=True ensures tasks are not lost on worker crash
  • /health endpoint on worker process (separate HTTP server): returns 200 if Celery workers are alive

Edit on GitHub

Last updated on

On this page

0%