Async pipeline and deployment

The optional broker-driven path in autorag.services runs many audio→topics requests concurrently alongside the unchanged synchronous SDK / CLI / API. The synchronous path keeps its in-process flow and never touches the broker. Everything broker-side sits behind the [broker] extra (pika, lazy-imported only in services/broker.py); import autorag.services stays base-install safe (only services/schemas.py is eager — pure pydantic — the rest resolves via a PEP 562 __getattr__).

Topology

RabbitMQ work-queue-per-stage on a direct exchange + a DLX:

stage.whisper -> stage.l1 -> stage.decide -> stage.l2 -> stage.summarize_l2 ->
                          \\                                                 \\
                           -> stage.summarize_l1 -----------------------------> stage.l0 -> stage.persist

l1 fans out to decide and summarize_l1 so the L1-summary pass overlaps with decide + l2 instead of waiting behind l2 for the combined summarize. l0 is the join: it fires exactly once after BOTH summarize_l1 and summarize_l2 finish (stages._try_emit_l0 reads the per-job stage_states row; the single-GPU-worker drain loop makes the read race-free).

Retry is bounded and handler-driven: a failing stage republishes the envelope with attempt+1 up to broker.MAX_ATTEMPTS, then dead-letters and the job is marked failed.

The AMQP envelope (StageMessage) is fixed-size — the WordSpan transcript travels by session_id reference in SQLite (services/blobs.py, reusing Database.store_transcription / persistence.load_transcription); the evolving tree lives in the jobs row’s partial_tree.

Workers

  • A single GPU-worker (autorag-gpu-worker) owns whisper and every LLM stage; a separate IO-worker (autorag-io-worker) owns persist. The GPU-worker drains stages in reverse pipeline order so jobs flow to completion before new whisper work starts — fewer tenancy flips.

  • GpuArbiter (services/model_manager.py) is the in-process residency state machine (none / whisper / llm) plus a VRAM-budget gate. preload() warms CPU standbys (wav2vec2 + pyannote via the existing _offload_* idioms; whisperX CT2 int8 — CT2 is not movable, so it is destroy+rebuilt, never .to()) and additionally builds the CUDA fp16 CT2 instance up front when the VRAM probe shows headroom. Paired with the residency contract (transcribe_segment no longer destroys it inline, and _default_offload_whisper no longer drops the cache on the whisper -> llm flip), autorag.whisper.load_model fires at most once per worker lifetime. acquire() smart-unloads the prior tenant first (evict LLM via the keep_alive=0 call from agent.build_stage_handlers()["evict"]; offload the torch parts of the whisper stack — wav2vec2 align + pyannote — to CPU, while keeping CT2 resident on CUDA).

  • The batched worker keeps the LLM warm across jobs (no per-L0 evict — that would thrash); the in-process services/runner.py evicts per-L0 via on_l0_complete and is the reference sequence + the no-broker test vehicle.

  • build_stage_handlers() (agent.py) exposes the five stage closures + evict, sharing _build_stage_closures with build_topic_runnable so the distributed and in-process paths construct identical warm Ollama chains.

Job state and the DB-path contract

services/jobs.JobStore is a jobs table in the same SQLite DB the synchronous path uses; the full JobRecord lives in a JSON record_json column. Writes go through pydantic_sqlite; reads go through raw sqlite_utils (rows_where) so the API process reads rows the workers wrote. The clip row is cross-process safe too: the Database audio_clips path is all column-scoped sqlite_utils upserts + raw reads (persistence.load_clip / Database.get_clip); the in-process pydantic_sqlite registry is now used only for the jobs table.

Warning

This only holds if the host ``autorag serve`` shares the workers’ DB. The workers use AUTORAG_DB_PATH=/data/autorag.db (the container side of the ${AUTORAG_STACK_DATA_DIR:-./.stack-data}:/data mount), so the host API must set AUTORAG_DB_PATH to the absolute .stack-data/autorag.db path — not the ~/.autorag/autorag.db default. Put it in the repo .env; Settings reads it via SettingsConfigDict(env_file=".env"). The workers’ compose environment: block hardcodes /data/autorag.db and wins for the containers, so .env only steers host processes — no conflict.

A mismatch leaves every /jobs/{id} stuck on ``queued`` forever even after the job failed and dead-lettered, with the real error written to the DB nobody reads (handle_batch’s except records it only into the JobStore — no logger call, so the gpu-worker log is silent too).

Deployment

The repo-root docker-compose.yml is the single source of truth for the whole host stack: rabbitmq + ollama (which owns the server-side tuning contract as its only copy — see Ollama tuning) + one gpu-worker + io-worker + a docker-socket-proxy.

Worker image

Lean .devcontainer/worker.Dockerfile. Only ./src and ./pyproject.toml are bind-mounted read-onlynot the repo root, so .env never enters a worker mount. The container runs uv run, so a code edit needs only a worker restart — no rebuild. The image bakes uv sync --extra audio --extra diarize --extra youtube --extra rag --extra broker.

Note

youtube is load-bearing in that extras list: the whisper stage runs AutoRAG.transcribe(source) and URL inputs need yt_dlp. Dropping it MissingExtraErrors every YouTube job into the DLQ. A deps change here needs ./scripts/stack.sh rebuild.

Control plane = docker-socket-proxy

No bespoke service. A pinned tecnativa/docker-socket-proxy:0.3.0 is the only container mounting /var/run/docker.sock (read-only, no published port); every endpoint group defaults to deny except the CONTAINERS / POST / NETWORKS / SERVICES / TASKS / INFO / VERSION ones docker compose ps|logs|restart need — so build / exec / run / images are refused at the proxy (no host-code-exec path).

Devcontainer

A thin sandbox: no dockerd, no mounted socket, no GPU. It joins the stack’s shared, user-defined autorag-net network (declared external in docker-compose.yml; created idempotently by both ./scripts/stack.sh and the devcontainer initializeCommand, so either side may come up first) and reaches services by name:

  • AUTORAG_OLLAMA_BASE_URL=http://ollama:11434

  • AUTORAG_BROKER_URL=amqp://rabbitmq:5672

  • a client-only docker CLI with DOCKER_HOST=tcp://docker-socket-proxy:2375.

The per-edit loop is plain docker compose -p autorag restart <svc> (logs / ps likewise) — the proxy enforces the same ps / logs / restart-only surface for everyone; there is no token, no autorag-ctl, no ctl.py. .devcontainer/check-stack.sh (postStartCommand) only probes via docker compose ps and always exits 0.

The devcontainer’s project venv is UV_PROJECT_ENVIRONMENT=/opt/autorag-venvoutside the bind-mounted workspace, so the sandbox and host never share or thrash one ./.venv (their Pythons differ).

Stack lifecycle

  • ./scripts/stack.sh up — idempotent: create net → build → wait healthy → pull models. Pass --with-observability to bring up the OTel profile (see Observability (autorag.otel)).

  • ./scripts/stack.sh rebuild — dependency-only rebuild.

  • ./scripts/stack.sh down — keeps named volumes (model-cache, ollama-models, rabbitmq-data) and never removes the external autorag-net.

  • ./scripts/stack.sh down -v — drops all three named volumes too.

The gpu-worker’s HF Hub + torch.hub downloads (whisperX ASR, pyannote diarization, torchaudio wav2vec2 alignment) go to a persistent model-cache named volume — HF_HOME / TORCH_HOME set in worker.Dockerfile to /opt/model-cache (outside the /app bind target, same rationale as /opt/venv; chowned 1000:1000), so the multi-GB weights download once.

Trust surface (accepted)

Anything in the sandbox can restart / inspect / log any container the host daemon manages (project scoping is convention, not enforcement) — but not host root, build, or exec.

Accepted trade-off: the in-process GPU pytest (test_real_whisper_tenancy_transitions) and tests/test_pipeline_docker_stack.py don’t run inside the devcontainer (no GPU / no docker). Both auto-SKIP in-container and run on the host via ./scripts/stack.sh test-stack. CI stays Python-only; pika is pure-Python and lazy, so the base-install guard holds.