Source code for autorag.topic_cluster
"""Semantic clustering and similarity-edge construction for topic embeddings."""
from __future__ import annotations
import numpy as np
from sklearn.cluster import AgglomerativeClustering # type: ignore[import-untyped]
from sklearn.metrics.pairwise import cosine_similarity # type: ignore[import-untyped]
[docs]
def cluster_embeddings(
embeddings: np.ndarray,
distance_threshold: float = 0.35,
) -> np.ndarray:
"""Assign cluster labels to topic embeddings using agglomerative clustering.
distance_threshold is cosine distance (0-2); 0.35 ~ similarity >= 0.65.
Returns an int array of shape (N,) with labels 0..K-1.
"""
n = len(embeddings)
if n == 0:
return np.array([], dtype=int)
if n == 1:
return np.zeros(1, dtype=int)
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
safe = np.where(norms == 0, 1e-10, embeddings)
clust = AgglomerativeClustering(
n_clusters=None,
distance_threshold=distance_threshold,
metric="cosine",
linkage="average",
)
labels: np.ndarray = np.asarray(clust.fit_predict(safe), dtype=int)
return labels
[docs]
def build_edges(
embeddings: np.ndarray,
top_n: int = 5,
min_similarity: float = 0.60,
) -> list[tuple[int, int, float]]:
"""Return undirected similarity edges between topics.
For each topic, finds top_n most similar neighbours above min_similarity.
Returns a deduplicated list of (idx_a, idx_b, similarity) with idx_a < idx_b.
"""
if len(embeddings) < 2:
return []
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
safe = np.where(norms == 0, 1e-10, embeddings)
sim = cosine_similarity(safe)
np.fill_diagonal(sim, -1.0)
seen: dict[tuple[int, int], float] = {}
for i in range(len(sim)):
candidates = np.argsort(sim[i])[::-1][:top_n]
for j in candidates:
s = float(sim[i, j])
if s < min_similarity:
break
key = (min(i, int(j)), max(i, int(j)))
if key not in seen:
seen[key] = s
return [(a, b, s) for (a, b), s in seen.items()]