"""FastAPI server for AutoRAG.
Wraps :class:`autorag.core.AutoRAG` behind HTTP endpoints. Mount this
``app`` with ``autorag serve`` or any ASGI runner.
Endpoints:
* ``GET /health`` — liveness probe.
* ``POST /ingest`` — document ingestion (see
:class:`~autorag.schemas.IngestRequest`).
* ``POST /query`` — RAG query (see
:class:`~autorag.schemas.QueryRequest`).
When the ``[rag]`` extra is installed, the ``/viz`` page, its JSON
endpoints (``/viz/data``, ``/viz/search``), and the React asset mount
at ``/viz-assets`` are added on top. A ``[server]``-only install
silently skips them.
The async job API (``POST /jobs/audio``, ``GET /jobs/{id}``,
``GET /jobs/{id}/result``) is added unconditionally but degrades
gracefully: it returns ``503`` with an install hint when the
``[broker]`` / ``[rag]`` extras the worker path needs are absent. The
synchronous endpoints above never touch the broker.
"""
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from functools import lru_cache
from typing import TYPE_CHECKING
from fastapi import FastAPI, HTTPException
from autorag.core import AutoRAG
from autorag.schemas import (
IngestRequest,
IngestResponse,
QueryRequest,
QueryResponse,
)
from autorag.services.schemas import (
AudioJobRequest,
JobRecord,
JobResultResponse,
JobStatus,
JobSubmitResponse,
)
if TYPE_CHECKING:
from collections.abc import AsyncIterator
from autorag.services.jobs import JobStore
logger = logging.getLogger(__name__)
@asynccontextmanager
async def _lifespan(_app: FastAPI) -> AsyncIterator[None]:
"""Initialise OpenTelemetry once the ASGI loop is running.
A no-op when ``AUTORAG_OTEL_ENABLED=false`` (the default) — at which
point ``FastAPIInstrumentor`` is also skipped. We import the
instrumentor lazily because it ships in the ``[observability]``
extra and ``[server]``-only installs must still boot.
"""
from autorag.otel import initialize_otel
initialize_otel("autorag-api")
try:
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
except ModuleNotFoundError:
pass
else:
try:
FastAPIInstrumentor.instrument_app(_app)
except Exception as exc:
logger.debug("FastAPIInstrumentor.instrument_app skipped: %s", exc)
yield
app = FastAPI(title="AutoRAG", version="0.2.0", lifespan=_lifespan)
"""The FastAPI application instance. Importable as ``autorag.api:app``."""
# Viz endpoints depend on the `[rag]` extra (umap, sklearn, chromadb).
# `[server]` users without `[rag]` get the API minus /viz.
try:
from autorag.viz import router as viz_router
from autorag.viz import viz_assets_dir
except ModuleNotFoundError as exc:
logger.info("viz endpoints disabled (install autorag[rag] to enable): %s", exc)
else:
from fastapi.staticfiles import StaticFiles
app.include_router(viz_router)
app.mount("/viz-assets", StaticFiles(directory=viz_assets_dir), name="viz-assets")
[docs]
@lru_cache(maxsize=1)
def get_rag() -> AutoRAG:
"""Return a process-wide :class:`~autorag.core.AutoRAG` singleton.
Cached so request handlers reuse the same vector store / embedder
instead of re-instantiating per call.
"""
return AutoRAG()
@app.get("/health")
def health() -> dict[str, str]:
"""Liveness probe. Always returns ``{"status": "ok"}``."""
return {"status": "ok"}
@app.post("/ingest", response_model=IngestResponse)
def ingest(req: IngestRequest) -> IngestResponse:
"""Load, chunk, embed, and store the documents referenced by ``req.paths``."""
return get_rag().ingest(req.paths)
@app.post("/query", response_model=QueryResponse)
def query(req: QueryRequest) -> QueryResponse:
"""Retrieve relevant chunks and generate an answer for ``req.question``."""
return get_rag().query(req.question, top_k=req.top_k)
# --- Async job API (autorag.services) ---------------------------------
#
# These enqueue/poll only — no blocking work in the request thread. The
# extras the worker path needs (`[broker]` for pika, `[rag]` for the job
# store) are imported lazily inside the handlers so a `[server]`-only
# install still boots; a missing extra surfaces as a clean 503.
def _job_store() -> JobStore:
"""Open a :class:`JobStore` on the configured DB, or 503 if ``[rag]``
is missing."""
try:
from autorag.services.jobs import JobStore
except ImportError as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
return JobStore()
@app.post("/jobs/audio", status_code=202, response_model=JobSubmitResponse)
def submit_job(req: AudioJobRequest) -> JobSubmitResponse:
"""Enqueue an audio→topics job. Returns ``202`` + ``job_id`` at once.
``503`` when the ``[broker]`` / ``[rag]`` extras are not installed
(``MissingExtraError`` is an ``ImportError``).
"""
try:
from autorag.services.broker import submit_audio_job
except ImportError as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
try:
return submit_audio_job(req)
except ImportError as exc: # MissingExtraError("broker"/"rag")
raise HTTPException(status_code=503, detail=str(exc)) from exc
@app.get("/jobs/{job_id}", response_model=JobRecord)
def job_status(job_id: str) -> JobRecord:
"""Return the job's status + per-stage state. ``404`` if unknown."""
record = _job_store().get_job(job_id)
if record is None:
raise HTTPException(status_code=404, detail=f"unknown job {job_id!r}")
return record
@app.get("/jobs/{job_id}/result", response_model=JobResultResponse)
def job_result(job_id: str) -> JobResultResponse:
"""Return the finished clip (the existing SQLite row).
``404`` if unknown, ``409`` if the job is not yet ``done``.
"""
store = _job_store()
record = store.get_job(job_id)
if record is None:
raise HTTPException(status_code=404, detail=f"unknown job {job_id!r}")
if record.status is not JobStatus.done:
raise HTTPException(
status_code=409,
detail=f"job {job_id!r} is {record.status.value}, not done",
)
from autorag.persistence import load_clip
clip = load_clip(store.database, record.result_session_id or record.session_id)
return JobResultResponse(
job_id=job_id,
session_id=record.session_id,
status=record.status,
clip=clip,
)