The extraction worker is the async backbone of Phase 3. Without it, no memories are created — the proxy triggers extraction jobs but they go nowhere. This milestone establishes the Celery application, worker configuration, task routing, and monitoring setup. Domain logic (the actual extraction) comes in subsequent mile
Milestone 3.4.1 — Worker Service Skeleton (Celery + Redis Broker)
Status: Planned
Goal: 3.4 — Memory extraction worker
Phase: 3 — Memory Engine and Operator Platform
Estimated effort: 2 days
Why This Milestone Exists
The extraction worker is the async backbone of Phase 3. Without it, no memories are created — the proxy triggers extraction jobs but they go nowhere. This milestone establishes the Celery application, worker configuration, task routing, and monitoring setup. Domain logic (the actual extraction) comes in subsequent milestones.
Branch
feat/m3-4-1-worker-skeleton
PR Title
feat(worker): Celery worker skeleton with Redis broker and task routing (m3.4.1)
Deliverables
Service structure
services/worker/
pyproject.toml
Dockerfile
.env.example
src/
worker/
__init__.py
celery_app.py # Celery application instance
settings.py
tasks/
__init__.py
extraction.py # Memory extraction tasks (3.4.3)
embedding.py # Memory embedding tasks (3.4.4)
maintenance.py # GC, expiry, dedup maintenance
queues.py # Queue definitions
tests/src/worker/celery_app.py
from celery import Celery
from kombu import Exchange, Queue
from worker.settings import settings
app = Celery(
"ibex-worker",
broker=settings.celery_broker_url, # redis://...
backend=settings.celery_result_backend, # redis://...
include=[
"worker.tasks.extraction",
"worker.tasks.embedding",
"worker.tasks.maintenance",
],
)
# ── Queue topology ────────────────────────────────────────────────────────
# extraction: high-priority; triggered immediately after session close
# embedding: medium-priority; embedding computation after extraction
# maintenance: low-priority; GC, expiry, dedup sweeps (run overnight)
QUEUES = [
Queue("extraction", Exchange("extraction"), routing_key="extraction",
queue_arguments={"x-max-priority": 10}),
Queue("embedding", Exchange("embedding"), routing_key="embedding"),
Queue("maintenance", Exchange("maintenance"), routing_key="maintenance"),
]
app.conf.update(
task_queues=QUEUES,
task_default_queue="extraction",
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_acks_late=True, # re-queue on worker crash (not-yet-acked)
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1, # one task at a time per worker; prevents starvation
task_track_started=True,
beat_schedule={
# Run maintenance sweep every night at 02:00 UTC
"memory-expiry-sweep": {
"task": "worker.tasks.maintenance.expire_old_memories",
"schedule": crontab(hour=2, minute=0),
},
"session-completion-sweep": {
"task": "worker.tasks.maintenance.complete_idle_sessions",
"schedule": 300.0, # every 5 minutes
},
},
)Acceptance Criteria
-
celery -A worker.celery_app worker -Q extraction,embedding,maintenancestarts without errors - Task routing: extraction tasks go to
extractionqueue, notcelerydefault -
task_acks_late=Trueensures tasks are not lost on worker crash -
/healthendpoint on worker process (separate HTTP server): returns 200 if Celery workers are alive
Last updated on