"""SQLite-backed database for audio clip transcription and topic storage."""
from __future__ import annotations
import contextlib
import json
import uuid
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel
from pydantic_sqlite import DataBase
if TYPE_CHECKING:
from pathlib import Path
_TABLE = "audio_clips"
[docs]
class AudioClip(BaseModel):
"""One row of the ``audio_clips`` SQLite table.
Fields ``transcription`` and ``topics`` are JSON-encoded strings;
use :meth:`Database.get_clip` to fetch and decode them. The
``whisper_model`` / ``provider`` / ``llm_model`` columns are
populated by :meth:`Database.finalize_topics` to record which
backends produced the stored data.
"""
id: str
title: str
file_path: str
created_at: str
transcription: str | None = None
topics: str | None = None
whisper_model: str | None = None
provider: str | None = None
llm_model: str | None = None
[docs]
class Database:
"""SQLite façade for AudioRAG clip state, keyed by ``session_id``.
Every ``audio_clips`` read/write goes through the raw
``sqlite_utils`` table handle as a **column-scoped** upsert / raw
read. It deliberately does *not* round-trip the row through
``pydantic_sqlite``'s model registry: that registry is per-instance
and in-process, so a freshly-constructed :class:`Database` (a
different worker process, or just a second ``AutoRAG`` persist call)
would not see another instance's rows — and the old read-modify-write
via ``model_from_table`` + a full-object ``add`` then upserted a
*blank* model over the on-disk row, silently nulling the transcript a
different process had written. Column-scoped upserts touch only the
columns a method owns, so the clobber is impossible by construction
regardless of instance or process.
``pydantic_sqlite`` is still used for the separate ``jobs`` table
(see :class:`autorag.services.jobs.JobStore`), where a whole-record
write is the intended semantics; ``self.db`` is kept for that reuse.
Creates the SQLite file (and any missing parent directories) and the
``audio_clips`` schema on construction.
"""
def __init__(self, db_path: Path) -> None:
db_path.parent.mkdir(parents=True, exist_ok=True)
self.db = DataBase(db_path)
self._ensure_schema()
@property
def _clips(self) -> Any:
"""Raw ``sqlite_utils`` handle for the ``audio_clips`` table.
The single site that reaches the private ``pydantic_sqlite``
sqlite_utils database; all clip CRUD funnels through here.
"""
return self.db._db[_TABLE] # pyright: ignore[reportPrivateUsage]
def _ensure_schema(self) -> None:
"""Create the full ``audio_clips`` table when absent.
Column-scoped upserts auto-create a *truncated* table (only the
written columns) if they run first, so the full nine-column
schema is declared up front. It is byte-identical to the schema
``pydantic_sqlite`` produced before, so this is a no-op
``CREATE TABLE IF NOT EXISTS`` on pre-existing databases — no
migration.
"""
self._clips.create(
dict.fromkeys(AudioClip.model_fields, str),
pk="id",
if_not_exists=True,
)
def _row(self, session_id: str) -> AudioClip | None:
"""Read one clip row from disk (cross-instance/process safe)."""
rows = list(self._clips.rows_where("id = ?", [session_id]))
if not rows:
return None
row = dict(rows[0])
# create-if-absent (store_transcription / finalize_topics on a
# row create_clip has not populated yet) can leave the required
# text columns NULL. That ordering never happens in the real
# pipelines, but keep reads total rather than raising on it.
for required in ("title", "file_path", "created_at"):
if row.get(required) is None:
row[required] = ""
return AudioClip(**row)
[docs]
def add_analytics_event(
self,
session_id: str,
*,
category: str,
message: str,
metadata: dict[str, Any],
marked_at_utc: Any,
) -> dict[str, Any]:
"""Build the analytics-event dict written into a clip's ``topics`` JSON.
Does not touch the database itself — callers accumulate the
returned dicts and pass them to :meth:`finalize_topics`.
"""
event_id = str(uuid.uuid4())
tx = metadata.get("transcription", {})
return {
"event_id": event_id,
"category": category,
"message": message,
"level": int(tx.get("level") or 1),
"start_s": float(tx.get("word_start_s") or 0.0),
"number_label": str(tx.get("number_label") or ""),
"summary": str(tx.get("summary") or ""),
"marked_at_utc": marked_at_utc,
}
# --- CLI helpers ---
[docs]
def create_clip(
self,
session_id: str,
*,
title: str,
file_path: str,
created_at: str,
) -> None:
"""Insert an :class:`AudioClip` row if one doesn't already exist.
First-writer-wins (``INSERT OR IGNORE``): a no-op when the
``session_id`` is already present, so a later ``create_clip``
with a different title/path never overwrites the original — and,
crucially, never resets the ``transcription`` / ``topics`` a
different process wrote in between. Only the four identity
columns are written; the rest default to NULL.
"""
# sqlite_utils' insert(ignore=True) reads back the row by
# last_rowid, which is 0 when the INSERT was ignored on
# conflict; the lookup then errors. The conflict path is the
# whole point of ignore=True, so swallow it.
with contextlib.suppress(IndexError):
self._clips.insert(
{
"id": session_id,
"title": title,
"file_path": file_path,
"created_at": created_at,
},
pk="id",
ignore=True,
)
[docs]
def store_transcription(self, session_id: str, words: list[dict[str, Any]]) -> None:
"""Persist a JSON-encoded :class:`~autorag.types.WordSpan` list on the clip.
Column-scoped: touches only ``transcription``. Create-if-absent
(upsert), so a row is materialised even if ``create_clip`` has
not run yet, and a concurrent ``finalize_topics`` cannot lose it.
"""
self._clips.upsert({"id": session_id, "transcription": json.dumps(words)}, pk="id")
[docs]
def finalize_topics(
self,
session_id: str,
transcript_end_s: float,
*,
events: list[dict[str, Any]],
provider: str,
llm_model: str,
whisper_model: str,
) -> None:
"""Flatten topic events, compute durations, and write them to the clip.
Within each L1/L2 level, ``duration_s`` is derived from the gap
to the next sibling (or to ``transcript_end_s`` for the last
node). The ``provider`` / ``llm_model`` / ``whisper_model``
columns record which backends produced the data. Column-scoped
upsert (create-if-absent): touches only those columns, so the
``transcription`` written by an earlier stage/process survives.
"""
if not events:
return
by_level: dict[int, list[dict[str, Any]]] = {}
for ev in events:
by_level.setdefault(ev["level"], []).append(ev)
for level_evs in by_level.values():
level_evs.sort(key=lambda e: e["start_s"])
for i, ev in enumerate(level_evs):
if i + 1 < len(level_evs):
ev["duration_s"] = round(level_evs[i + 1]["start_s"] - ev["start_s"], 3)
else:
ev["duration_s"] = round(max(0.0, transcript_end_s - ev["start_s"]), 3)
topics = [
{
"title": ev["message"],
"level": ev["level"],
"start_s": ev["start_s"],
"duration_s": ev.get("duration_s", 0.0),
"number": ev["number_label"],
"summary": ev.get("summary", ""),
}
for ev in events
]
topics.sort(key=lambda t: (t["start_s"], t["level"]))
self._clips.upsert(
{
"id": session_id,
"topics": json.dumps(topics),
"provider": provider,
"llm_model": llm_model,
"whisper_model": whisper_model,
},
pk="id",
)
[docs]
def get_clip(self, session_id: str) -> dict[str, Any] | None:
"""Return the clip as a plain dict, or ``None`` if missing."""
clip = self._row(session_id)
if not clip:
return None
return clip.model_dump()
[docs]
def list_clips(self) -> list[dict[str, Any]]:
"""Return every clip row as a plain dict (empty list on error)."""
try:
return [dict(row) for row in self._clips.rows]
except Exception:
return []