Source code for autorag.db

"""SQLite-backed database for audio clip transcription and topic storage."""

from __future__ import annotations

import contextlib
import json
import uuid
from typing import TYPE_CHECKING, Any

from pydantic import BaseModel
from pydantic_sqlite import DataBase

if TYPE_CHECKING:
    from pathlib import Path

_TABLE = "audio_clips"


[docs] class AudioClip(BaseModel): """One row of the ``audio_clips`` SQLite table. Fields ``transcription`` and ``topics`` are JSON-encoded strings; use :meth:`Database.get_clip` to fetch and decode them. The ``whisper_model`` / ``provider`` / ``llm_model`` columns are populated by :meth:`Database.finalize_topics` to record which backends produced the stored data. """ id: str title: str file_path: str created_at: str transcription: str | None = None topics: str | None = None whisper_model: str | None = None provider: str | None = None llm_model: str | None = None
[docs] class Database: """SQLite façade for AudioRAG clip state, keyed by ``session_id``. Every ``audio_clips`` read/write goes through the raw ``sqlite_utils`` table handle as a **column-scoped** upsert / raw read. It deliberately does *not* round-trip the row through ``pydantic_sqlite``'s model registry: that registry is per-instance and in-process, so a freshly-constructed :class:`Database` (a different worker process, or just a second ``AutoRAG`` persist call) would not see another instance's rows — and the old read-modify-write via ``model_from_table`` + a full-object ``add`` then upserted a *blank* model over the on-disk row, silently nulling the transcript a different process had written. Column-scoped upserts touch only the columns a method owns, so the clobber is impossible by construction regardless of instance or process. ``pydantic_sqlite`` is still used for the separate ``jobs`` table (see :class:`autorag.services.jobs.JobStore`), where a whole-record write is the intended semantics; ``self.db`` is kept for that reuse. Creates the SQLite file (and any missing parent directories) and the ``audio_clips`` schema on construction. """ def __init__(self, db_path: Path) -> None: db_path.parent.mkdir(parents=True, exist_ok=True) self.db = DataBase(db_path) self._ensure_schema() @property def _clips(self) -> Any: """Raw ``sqlite_utils`` handle for the ``audio_clips`` table. The single site that reaches the private ``pydantic_sqlite`` sqlite_utils database; all clip CRUD funnels through here. """ return self.db._db[_TABLE] # pyright: ignore[reportPrivateUsage] def _ensure_schema(self) -> None: """Create the full ``audio_clips`` table when absent. Column-scoped upserts auto-create a *truncated* table (only the written columns) if they run first, so the full nine-column schema is declared up front. It is byte-identical to the schema ``pydantic_sqlite`` produced before, so this is a no-op ``CREATE TABLE IF NOT EXISTS`` on pre-existing databases — no migration. """ self._clips.create( dict.fromkeys(AudioClip.model_fields, str), pk="id", if_not_exists=True, ) def _row(self, session_id: str) -> AudioClip | None: """Read one clip row from disk (cross-instance/process safe).""" rows = list(self._clips.rows_where("id = ?", [session_id])) if not rows: return None row = dict(rows[0]) # create-if-absent (store_transcription / finalize_topics on a # row create_clip has not populated yet) can leave the required # text columns NULL. That ordering never happens in the real # pipelines, but keep reads total rather than raising on it. for required in ("title", "file_path", "created_at"): if row.get(required) is None: row[required] = "" return AudioClip(**row)
[docs] def add_analytics_event( self, session_id: str, *, category: str, message: str, metadata: dict[str, Any], marked_at_utc: Any, ) -> dict[str, Any]: """Build the analytics-event dict written into a clip's ``topics`` JSON. Does not touch the database itself — callers accumulate the returned dicts and pass them to :meth:`finalize_topics`. """ event_id = str(uuid.uuid4()) tx = metadata.get("transcription", {}) return { "event_id": event_id, "category": category, "message": message, "level": int(tx.get("level") or 1), "start_s": float(tx.get("word_start_s") or 0.0), "number_label": str(tx.get("number_label") or ""), "summary": str(tx.get("summary") or ""), "marked_at_utc": marked_at_utc, }
# --- CLI helpers ---
[docs] def create_clip( self, session_id: str, *, title: str, file_path: str, created_at: str, ) -> None: """Insert an :class:`AudioClip` row if one doesn't already exist. First-writer-wins (``INSERT OR IGNORE``): a no-op when the ``session_id`` is already present, so a later ``create_clip`` with a different title/path never overwrites the original — and, crucially, never resets the ``transcription`` / ``topics`` a different process wrote in between. Only the four identity columns are written; the rest default to NULL. """ # sqlite_utils' insert(ignore=True) reads back the row by # last_rowid, which is 0 when the INSERT was ignored on # conflict; the lookup then errors. The conflict path is the # whole point of ignore=True, so swallow it. with contextlib.suppress(IndexError): self._clips.insert( { "id": session_id, "title": title, "file_path": file_path, "created_at": created_at, }, pk="id", ignore=True, )
[docs] def store_transcription(self, session_id: str, words: list[dict[str, Any]]) -> None: """Persist a JSON-encoded :class:`~autorag.types.WordSpan` list on the clip. Column-scoped: touches only ``transcription``. Create-if-absent (upsert), so a row is materialised even if ``create_clip`` has not run yet, and a concurrent ``finalize_topics`` cannot lose it. """ self._clips.upsert({"id": session_id, "transcription": json.dumps(words)}, pk="id")
[docs] def finalize_topics( self, session_id: str, transcript_end_s: float, *, events: list[dict[str, Any]], provider: str, llm_model: str, whisper_model: str, ) -> None: """Flatten topic events, compute durations, and write them to the clip. Within each L1/L2 level, ``duration_s`` is derived from the gap to the next sibling (or to ``transcript_end_s`` for the last node). The ``provider`` / ``llm_model`` / ``whisper_model`` columns record which backends produced the data. Column-scoped upsert (create-if-absent): touches only those columns, so the ``transcription`` written by an earlier stage/process survives. """ if not events: return by_level: dict[int, list[dict[str, Any]]] = {} for ev in events: by_level.setdefault(ev["level"], []).append(ev) for level_evs in by_level.values(): level_evs.sort(key=lambda e: e["start_s"]) for i, ev in enumerate(level_evs): if i + 1 < len(level_evs): ev["duration_s"] = round(level_evs[i + 1]["start_s"] - ev["start_s"], 3) else: ev["duration_s"] = round(max(0.0, transcript_end_s - ev["start_s"]), 3) topics = [ { "title": ev["message"], "level": ev["level"], "start_s": ev["start_s"], "duration_s": ev.get("duration_s", 0.0), "number": ev["number_label"], "summary": ev.get("summary", ""), } for ev in events ] topics.sort(key=lambda t: (t["start_s"], t["level"])) self._clips.upsert( { "id": session_id, "topics": json.dumps(topics), "provider": provider, "llm_model": llm_model, "whisper_model": whisper_model, }, pk="id", )
[docs] def get_clip(self, session_id: str) -> dict[str, Any] | None: """Return the clip as a plain dict, or ``None`` if missing.""" clip = self._row(session_id) if not clip: return None return clip.model_dump()
[docs] def list_clips(self) -> list[dict[str, Any]]: """Return every clip row as a plain dict (empty list on error).""" try: return [dict(row) for row in self._clips.rows] except Exception: return []