Source code for autorag.api

"""FastAPI server for AutoRAG.

Wraps :class:`autorag.core.AutoRAG` behind HTTP endpoints. Mount this
``app`` with ``autorag serve`` or any ASGI runner.

Endpoints:

* ``GET /health`` — liveness probe.
* ``POST /ingest`` — document ingestion (see
  :class:`~autorag.schemas.IngestRequest`).
* ``POST /query`` — RAG query (see
  :class:`~autorag.schemas.QueryRequest`).

When the ``[rag]`` extra is installed, the ``/viz`` page, its JSON
endpoints (``/viz/data``, ``/viz/search``), and the React asset mount
at ``/viz-assets`` are added on top. A ``[server]``-only install
silently skips them.

The async job API (``POST /jobs/audio``, ``GET /jobs/{id}``,
``GET /jobs/{id}/result``) is added unconditionally but degrades
gracefully: it returns ``503`` with an install hint when the
``[broker]`` / ``[rag]`` extras the worker path needs are absent. The
synchronous endpoints above never touch the broker.
"""

from __future__ import annotations

import logging
from contextlib import asynccontextmanager
from functools import lru_cache
from typing import TYPE_CHECKING

from fastapi import FastAPI, HTTPException

from autorag.core import AutoRAG
from autorag.schemas import (
    IngestRequest,
    IngestResponse,
    QueryRequest,
    QueryResponse,
)
from autorag.services.schemas import (
    AudioJobRequest,
    JobRecord,
    JobResultResponse,
    JobStatus,
    JobSubmitResponse,
)

if TYPE_CHECKING:
    from collections.abc import AsyncIterator

    from autorag.services.jobs import JobStore

logger = logging.getLogger(__name__)


@asynccontextmanager
async def _lifespan(_app: FastAPI) -> AsyncIterator[None]:
    """Initialise OpenTelemetry once the ASGI loop is running.

    A no-op when ``AUTORAG_OTEL_ENABLED=false`` (the default) — at which
    point ``FastAPIInstrumentor`` is also skipped. We import the
    instrumentor lazily because it ships in the ``[observability]``
    extra and ``[server]``-only installs must still boot.
    """
    from autorag.otel import initialize_otel

    initialize_otel("autorag-api")
    try:
        from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
    except ModuleNotFoundError:
        pass
    else:
        try:
            FastAPIInstrumentor.instrument_app(_app)
        except Exception as exc:
            logger.debug("FastAPIInstrumentor.instrument_app skipped: %s", exc)
    yield


app = FastAPI(title="AutoRAG", version="0.2.0", lifespan=_lifespan)
"""The FastAPI application instance. Importable as ``autorag.api:app``."""

# Viz endpoints depend on the `[rag]` extra (umap, sklearn, chromadb).
# `[server]` users without `[rag]` get the API minus /viz.
try:
    from autorag.viz import router as viz_router
    from autorag.viz import viz_assets_dir
except ModuleNotFoundError as exc:
    logger.info("viz endpoints disabled (install autorag[rag] to enable): %s", exc)
else:
    from fastapi.staticfiles import StaticFiles

    app.include_router(viz_router)
    app.mount("/viz-assets", StaticFiles(directory=viz_assets_dir), name="viz-assets")


[docs] @lru_cache(maxsize=1) def get_rag() -> AutoRAG: """Return a process-wide :class:`~autorag.core.AutoRAG` singleton. Cached so request handlers reuse the same vector store / embedder instead of re-instantiating per call. """ return AutoRAG()
@app.get("/health") def health() -> dict[str, str]: """Liveness probe. Always returns ``{"status": "ok"}``.""" return {"status": "ok"} @app.post("/ingest", response_model=IngestResponse) def ingest(req: IngestRequest) -> IngestResponse: """Load, chunk, embed, and store the documents referenced by ``req.paths``.""" return get_rag().ingest(req.paths) @app.post("/query", response_model=QueryResponse) def query(req: QueryRequest) -> QueryResponse: """Retrieve relevant chunks and generate an answer for ``req.question``.""" return get_rag().query(req.question, top_k=req.top_k) # --- Async job API (autorag.services) --------------------------------- # # These enqueue/poll only — no blocking work in the request thread. The # extras the worker path needs (`[broker]` for pika, `[rag]` for the job # store) are imported lazily inside the handlers so a `[server]`-only # install still boots; a missing extra surfaces as a clean 503. def _job_store() -> JobStore: """Open a :class:`JobStore` on the configured DB, or 503 if ``[rag]`` is missing.""" try: from autorag.services.jobs import JobStore except ImportError as exc: raise HTTPException(status_code=503, detail=str(exc)) from exc return JobStore() @app.post("/jobs/audio", status_code=202, response_model=JobSubmitResponse) def submit_job(req: AudioJobRequest) -> JobSubmitResponse: """Enqueue an audio→topics job. Returns ``202`` + ``job_id`` at once. ``503`` when the ``[broker]`` / ``[rag]`` extras are not installed (``MissingExtraError`` is an ``ImportError``). """ try: from autorag.services.broker import submit_audio_job except ImportError as exc: raise HTTPException(status_code=503, detail=str(exc)) from exc try: return submit_audio_job(req) except ImportError as exc: # MissingExtraError("broker"/"rag") raise HTTPException(status_code=503, detail=str(exc)) from exc @app.get("/jobs/{job_id}", response_model=JobRecord) def job_status(job_id: str) -> JobRecord: """Return the job's status + per-stage state. ``404`` if unknown.""" record = _job_store().get_job(job_id) if record is None: raise HTTPException(status_code=404, detail=f"unknown job {job_id!r}") return record @app.get("/jobs/{job_id}/result", response_model=JobResultResponse) def job_result(job_id: str) -> JobResultResponse: """Return the finished clip (the existing SQLite row). ``404`` if unknown, ``409`` if the job is not yet ``done``. """ store = _job_store() record = store.get_job(job_id) if record is None: raise HTTPException(status_code=404, detail=f"unknown job {job_id!r}") if record.status is not JobStatus.done: raise HTTPException( status_code=409, detail=f"job {job_id!r} is {record.status.value}, not done", ) from autorag.persistence import load_clip clip = load_clip(store.database, record.result_session_id or record.session_id) return JobResultResponse( job_id=job_id, session_id=record.session_id, status=record.status, clip=clip, )