Source code for autorag.cli

from __future__ import annotations

import json
import time
from pathlib import Path  # noqa: TC003 — typer needs the runtime type
from typing import TYPE_CHECKING, Any

import typer

from autorag.audio_source import default_title_from
from autorag.core import AutoRAG

if TYPE_CHECKING:
    from autorag.types import WordSpan

app = typer.Typer(help="AutoRAG — automated retrieval-augmented generation.")


@app.callback()
def _otel_init(ctx: typer.Context) -> None:
    """Best-effort OpenTelemetry initialisation for every CLI invocation.

    A no-op when ``AUTORAG_OTEL_ENABLED=false`` (the default), so the
    cold-start cost of the common case is unchanged.

    ``serve`` is the one subcommand that hosts a long-running API
    process; its FastAPI lifespan calls ``initialize_otel("autorag-api")``
    so traces and metrics land under the right service name. We skip the
    callback's init in that case because ``initialize_otel`` is
    idempotent (first call wins via a module-level ``_initialized``
    bool) — without this guard the lifespan call would be a no-op and
    every API span would register as ``autorag-cli``.
    """
    if ctx.invoked_subcommand == "serve":
        return
    from autorag.otel import initialize_otel

    initialize_otel("autorag-cli")


[docs] @app.command() def ingest(paths: list[Path] = typer.Argument(..., exists=True, readable=True)) -> None: """Ingest one or more files/directories into the vector store.""" rag = AutoRAG() result = rag.ingest([str(p) for p in paths]) typer.echo(f"Ingested {result.ingested} docs → {result.chunks} chunks.")
[docs] @app.command() def query( question: str = typer.Argument(...), top_k: int | None = typer.Option(None, "--top-k", "-k"), ) -> None: """Ask a question against the ingested corpus.""" rag = AutoRAG() result = rag.query(question, top_k=top_k) typer.echo(result.answer)
[docs] @app.command() def serve(host: str = "127.0.0.1", port: int = 8000, reload: bool = False) -> None: """Run the HTTP API server.""" import uvicorn uvicorn.run("autorag.api:app", host=host, port=port, reload=reload)
[docs] @app.command() def transcribe( source: str = typer.Argument( ..., help="Audio file path or YouTube URL (youtube.com / youtu.be / ...).", ), title: str | None = typer.Option( None, "--title", "-t", help="Clip title (defaults to filename stem or video id)" ), whisper_model: str = typer.Option( "base", "--whisper-model", "-w", help="Whisper model size: tiny/base/small/medium/large", ), language: str = typer.Option( "en", "--language", "-l", help="Whisper language code (default: English; pass '' to auto-detect).", ), persist: bool = typer.Option( True, "--persist/--no-persist", help="Write word spans to SQLite (default: true)." ), db_override: Path | None = typer.Option(None, "--db", help="Override database path"), ) -> None: """Transcribe an audio file or YouTube URL and output word spans as JSON.""" from autorag.audio_source import resolve_audio_input rag = AutoRAG() with resolve_audio_input(source) as src: t0 = time.perf_counter() words = rag.transcribe( src.path, whisper_model=whisper_model, language=language or None, ) whisper_secs = time.perf_counter() - t0 store_words_secs = 0.0 if persist: resolved_title = title or src.title or default_title_from(source) persisted_words = rag.persist_transcription( src.path, words, title=resolved_title, db_path=db_override, source_url=src.source_url, upload_date=src.upload_date, duration_s=src.duration_s, ) store_words_secs = float(persisted_words["timings"]["store_words"]) timings: dict[str, float] = { "whisper": whisper_secs, "cli_store_words": store_words_secs, } stage_order = ["whisper", "cli_store_words"] from autorag import whisper_runner # lazy: requires [audio] extra typer.echo("", err=True) typer.echo("=== Transcription Timing Breakdown ===", err=True) max_label = max(len(s) for s in stage_order) for stage in stage_order: secs = timings.get(stage, 0.0) label = stage.ljust(max_label) typer.echo(f" {label} {secs:8.3f}s", err=True) typer.echo(f" {'─' * (max_label + 11)}", err=True) total = sum(timings.values()) typer.echo(f" {'TOTAL'.ljust(max_label)} {total:8.3f}s", err=True) typer.echo(f" device: {whisper_runner.resolved_device()}", err=True) typer.echo("", err=True) typer.echo(json.dumps(words))
[docs] @app.command() def generate_topics( source: str = typer.Argument( ..., help="Audio file path or YouTube URL (youtube.com / youtu.be / ...).", ), title: str | None = typer.Option( None, "--title", "-t", help="Clip title (defaults to filename stem or video id)" ), whisper_model: str = typer.Option( "base", "--whisper-model", "-w", help="Whisper model size: tiny/base/small/medium/large", ), provider: str = typer.Option( "ollama", "--provider", "-p", help="LLM provider (ollama)", ), llm_model: str = typer.Option( "gemma4:latest", "--llm-model", "-m", help="LLM model name (uses provider default if empty)", ), num_ctx_l1: int = typer.Option( 8192, "--num-ctx-l1", min=512, help="LLM context for the Stage 2 L1-boundary call " "(raise to ~16384 for 1hr+ audio; costs one model reload).", ), num_ctx_fanout: int = typer.Option( 8192, "--num-ctx-fanout", min=512, help="LLM context for the batched fan-out stages (3a/3b/4/5).", ), max_concurrency: int = typer.Option( 8, "--max-concurrency", min=1, help="Max parallel LLM calls in batched stages (match OLLAMA_NUM_PARALLEL).", ), min_subdivide_duration_s: float = typer.Option( 120.0, "--min-subdivide-duration-s", min=0.0, help="Minimum L1 span length (s) before the L2 subdivide decision runs.", ), reasoning: bool = typer.Option( False, "--reasoning/--no-reasoning", help="Enable chain-of-thought on thinking-capable models (slower; A/B quality).", ), boundary_block_seconds: int = typer.Option( 30, "--boundary-block-seconds", min=1, help="Time-bucket window (s) for the L1/L2 boundary-prompt transcript; " "smaller = finer MM:SS anchors but more prompt tokens.", ), language: str = typer.Option( "en", "--language", "-l", help="Whisper language code (default: English; pass '' to auto-detect).", ), transcription_json: str | None = typer.Option( None, "--transcription", "-T", help="Pre-computed word spans as a JSON string (skips audio transcription).", ), persist: bool = typer.Option( True, "--persist/--no-persist", help="Write transcription and topics to SQLite/Chroma (default: true).", ), db_override: Path | None = typer.Option(None, "--db", help="Override database path"), ) -> None: """Generate topics for an audio file or YouTube URL, transcribing first if not cached.""" from autorag.audio_source import is_youtube_url, resolve_audio_input rag = AutoRAG() words: list[WordSpan] | None = None whisper_secs = 0.0 store_words_secs = 0.0 ran_whisper = False resolved_title: str | None = None upload_date_for_persist: str | None = None source_url_for_persist: str | None = None # Priority 1: caller-supplied transcription JSON if transcription_json: words = json.loads(transcription_json) resolved_title = title or default_title_from(source) source_url_for_persist = source if is_youtube_url(source) else None # Priority 2: SQLite cache (only when --persist so we have a DB to look up) if words is None and persist: from autorag.db import Database from autorag.persistence import derive_session_id, load_transcription session_id = derive_session_id(source) resolved_db = (db_override or rag.settings.db_path).expanduser() db = Database(resolved_db) cached = load_transcription(db, session_id) if cached is not None: words = cached clip: dict[str, Any] | None = db.get_clip(session_id) upload_date_for_persist = clip["created_at"][:10].replace("-", "") if clip else None source_url_for_persist = source if is_youtube_url(source) else None resolved_title = ( title or (clip.get("title") if clip else None) or default_title_from(source) ) # Priority 3: run Whisper if words is None: with resolve_audio_input(source) as src: t0 = time.perf_counter() words = rag.transcribe( src.path, whisper_model=whisper_model, language=language or None, ) whisper_secs = time.perf_counter() - t0 ran_whisper = True resolved_title = title or src.title or default_title_from(source) upload_date_for_persist = src.upload_date source_url_for_persist = src.source_url if persist: persisted_words = rag.persist_transcription( src.path, words, title=resolved_title, db_path=db_override, source_url=src.source_url, upload_date=src.upload_date, duration_s=src.duration_s, ) store_words_secs = float(persisted_words["timings"]["store_words"]) # Generate topics t0 = time.perf_counter() topics = rag.generate_topics( words, llm_model=llm_model, num_ctx_l1=num_ctx_l1, num_ctx_fanout=num_ctx_fanout, max_concurrency=max_concurrency, min_subdivide_duration_s=min_subdivide_duration_s, reasoning=reasoning, boundary_block_seconds=boundary_block_seconds, ) agent_secs = time.perf_counter() - t0 # Persist topics finalize_secs = 0.0 embed_secs = 0.0 persisted_clip: dict[str, Any] | None = None if persist: persisted_topics = rag.persist_topics( source, topics, words=words, title=resolved_title, provider=provider, llm_model=llm_model, whisper_model=whisper_model, db_path=db_override, source_url=source_url_for_persist, upload_date=upload_date_for_persist, ) p_timings = persisted_topics["timings"] finalize_secs = float(p_timings["finalize"]) embed_secs = float(p_timings["embed"]) persisted_clip = persisted_topics["clip"] timings: dict[str, float] = { "whisper": whisper_secs, "agent": agent_secs, "cli_store_words": store_words_secs, "cli_finalize": finalize_secs, "cli_embed": embed_secs, } stage_order = ["whisper", "agent", "cli_store_words", "cli_finalize", "cli_embed"] typer.echo("", err=True) typer.echo("=== Topic Generation Timing Breakdown ===", err=True) max_label = max(len(s) for s in stage_order) for stage in stage_order: secs = timings.get(stage, 0.0) label = stage.ljust(max_label) typer.echo(f" {label} {secs:8.3f}s", err=True) typer.echo(f" {'─' * (max_label + 11)}", err=True) total = sum(timings.values()) typer.echo(f" {'TOTAL'.ljust(max_label)} {total:8.3f}s", err=True) if ran_whisper: from autorag import whisper_runner # lazy: requires [audio] extra typer.echo(f" device: {whisper_runner.resolved_device()}", err=True) typer.echo("", err=True) if persist and persisted_clip and persisted_clip.get("topics"): typer.echo(persisted_clip["created_at"]) typer.echo(json.dumps(json.loads(persisted_clip["topics"]), indent=2)) else: typer.echo(json.dumps(topics, indent=2))
[docs] @app.command() def blocks( source: str = typer.Argument( ..., help="Audio file path or YouTube URL (youtube.com / youtu.be / ...).", ), seconds: int = typer.Option( 10, "--seconds", "-n", min=1, help="Time-block window length in seconds." ), force_retranscribe: bool = typer.Option( False, "--force-retranscribe", help="Re-run transcription even if a cached copy exists.", ), title: str | None = typer.Option( None, "--title", "-t", help="Clip title (only used on cache miss)" ), db_override: Path | None = typer.Option(None, "--db", help="Override database path"), whisper_model: str = typer.Option("base", "--whisper-model", "-w"), language: str = typer.Option( "en", "--language", "-l", help="Whisper language code (default: English; pass '' to auto-detect).", ), ) -> None: """Print the transcription as N-second time blocks, one line per speaker turn. Reads from the cached SQLite row when present; otherwise runs Whisper transcription and persists the words first. Topic generation is not performed here; use the ``transcribe`` command for that. """ rag = AutoRAG() text = rag.transcribe_blocks( source, seconds=seconds, force_retranscribe=force_retranscribe, db_path=db_override, whisper_model=whisper_model, language=language or None, title=title, ) typer.echo(text)
jobs_app = typer.Typer(help="Async pipeline jobs (needs autorag[broker,rag] + RabbitMQ).") app.add_typer(jobs_app, name="jobs")
[docs] @jobs_app.command("submit") def jobs_submit( source: str = typer.Argument(..., help="Audio file path or YouTube URL."), title: str | None = typer.Option(None, "--title", "-t"), whisper_model: str = typer.Option("base", "--whisper-model", "-w"), llm_model: str = typer.Option("gemma4:latest", "--llm-model", "-m"), language: str = typer.Option("en", "--language", "-l"), ) -> None: """Enqueue an audio→topics job on the broker; prints the job id.""" try: from autorag.services.broker import submit_audio_job from autorag.services.schemas import AudioJobRequest except ImportError as exc: typer.echo(str(exc), err=True) raise typer.Exit(1) from exc resp = submit_audio_job( AudioJobRequest( source=source, title=title, whisper_model=whisper_model, llm_model=llm_model, language=language, ) ) typer.echo(json.dumps({"job_id": resp.job_id, "session_id": resp.session_id}))
[docs] @jobs_app.command("status") def jobs_status(job_id: str = typer.Argument(...)) -> None: """Print a job's status + per-stage state as JSON.""" try: from autorag.services.jobs import JobStore except ImportError as exc: typer.echo(str(exc), err=True) raise typer.Exit(1) from exc record = JobStore().get_job(job_id) if record is None: typer.echo(f"unknown job {job_id!r}", err=True) raise typer.Exit(1) typer.echo(record.model_dump_json(indent=2))
if __name__ == "__main__": app()