80 lines
3.2 KiB
Python
80 lines
3.2 KiB
Python
"""Qdrant hybrid collection: create + upsert distilled propositions (§4.3).
|
|
|
|
Collection mgmt + upserts go DIRECT to Qdrant (§13.2 "(Qdrant direct) :6333"); retrieval goes
|
|
through the gateway's /api/search. Named dense vector `bge_m3` (1024-d cosine) + sparse `bm25`
|
|
(modifier IDF). Point id is a deterministic UUID5 of claim_id, so re-upsert is idempotent.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import sqlite3
|
|
import uuid
|
|
|
|
from qdrant_client import QdrantClient, models
|
|
|
|
from .embedder import SparseEmbedder, dense_embed
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
COLLECTION = "propositions"
|
|
DENSE = "bge_m3"
|
|
SPARSE = "bm25"
|
|
_NS = uuid.UUID("5f9b7e10-0000-4000-8000-000000000001")
|
|
|
|
# Filterable payload (§4.3): stance/topic/cluster/date for stance distributions, time-windowed
|
|
# consensus, corroboration lookups. NEVER infer stance from vector distance (§2.2/§5.3).
|
|
_PAYLOAD_FIELDS = (
|
|
"claim_id", "doc_id", "source_id", "source_cluster", "topic_canonical", "date",
|
|
"claim_type", "time_horizon", "confidence", "rel_polarity", "engages_consensus",
|
|
"counters_position", "thesis_seam", "salience", "claimant", "proposition",
|
|
)
|
|
|
|
|
|
def get_client(qdrant_url: str) -> QdrantClient:
|
|
return QdrantClient(url=qdrant_url, prefer_grpc=False, timeout=60)
|
|
|
|
|
|
def ensure_collection(client: QdrantClient, *, dim: int = 1024) -> bool:
|
|
names = [c.name for c in client.get_collections().collections]
|
|
if COLLECTION in names:
|
|
return False
|
|
client.create_collection(
|
|
collection_name=COLLECTION,
|
|
vectors_config={DENSE: models.VectorParams(size=dim, distance=models.Distance.COSINE)},
|
|
sparse_vectors_config={SPARSE: models.SparseVectorParams(modifier=models.Modifier.IDF)},
|
|
)
|
|
log.info("created Qdrant collection %r (dense %s %dd + sparse %s/idf)", COLLECTION, DENSE, dim, SPARSE)
|
|
return True
|
|
|
|
|
|
def _point_id(claim_id: str) -> str:
|
|
return str(uuid.uuid5(_NS, claim_id))
|
|
|
|
|
|
def upsert_pending(conn: sqlite3.Connection, sc, client: QdrantClient,
|
|
sparse: SparseEmbedder | None = None, *, batch: int = 64) -> int:
|
|
"""Embed + upsert every claim that has no qdrant_point_id yet; back-link the id into SQLite."""
|
|
rows = conn.execute("SELECT * FROM claims WHERE qdrant_point_id IS NULL").fetchall()
|
|
if not rows:
|
|
return 0
|
|
total = 0
|
|
for i in range(0, len(rows), batch):
|
|
chunk = rows[i:i + batch]
|
|
texts = [r["proposition"] for r in chunk]
|
|
dvecs = dense_embed(sc, texts)
|
|
svecs = sparse.embed(texts) if sparse else [None] * len(texts)
|
|
points = []
|
|
for r, dv, sv in zip(chunk, dvecs, svecs):
|
|
vectors: dict = {DENSE: dv}
|
|
if sv is not None:
|
|
vectors[SPARSE] = models.SparseVector(indices=sv["indices"], values=sv["values"])
|
|
payload = {f: r[f] for f in _PAYLOAD_FIELDS}
|
|
points.append(models.PointStruct(id=_point_id(r["claim_id"]), vector=vectors, payload=payload))
|
|
client.upsert(collection_name=COLLECTION, points=points)
|
|
for r in chunk:
|
|
conn.execute("UPDATE claims SET qdrant_point_id=? WHERE claim_id=?",
|
|
(_point_id(r["claim_id"]), r["claim_id"]))
|
|
conn.commit()
|
|
total += len(chunk)
|
|
return total
|