Files
ten31-database/backend/ingest/qdrant_io.py
T
Keysat 3c31b1e8a5 Soft-delete + source-count diagnostics; thesis v4 (0.1.0:47)
- DELETE handlers soft-delete (set deleted_at) + cascade contact -> opps/comms/lp
  instead of hard-deleting (guardrail #3); list queries filter deleted rows.
- ingest: chunking excludes soft-deleted records; qdrant delete-by-source-id;
  sync prunes soft-deleted records' vectors incrementally.
- /api/system/status returns raw source-record counts for sanity-checking.
- docs/thesis-seed-v4.md (no "bet" language, scarcity-forward, freedom-tech as
  a banner option, tightened pillars, reworked segments + edge).

Soft-delete verified via the running HTTP server (delete -> hidden + row kept).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 12:20:38 -05:00

64 lines
2.1 KiB
Python

"""Minimal Qdrant REST client for the ingest pipeline (direct to QDRANT_URL).
Creates the crm_chunks collection per EMBEDDINGS.md: a named dense vector
(1024, Cosine) + a named sparse vector with modifier:idf, plus payload indexes.
"""
import config
import http_util
Q = config.QDRANT_URL
COL = config.COLLECTION
def _req(method, path, body=None):
return http_util.request(method, f"{Q}{path}", body, verify=False)
def exists() -> bool:
status, _ = _req("GET", f"/collections/{COL}")
return status == 200
def create_collection(recreate=False, dim=config.DENSE_DIM):
if exists():
if not recreate:
return "exists"
_req("DELETE", f"/collections/{COL}")
status, data = _req("PUT", f"/collections/{COL}", {
"vectors": {"dense": {"size": dim, "distance": "Cosine"}},
"sparse_vectors": {"sparse": {"modifier": "idf"}},
})
if status not in (200, 201):
raise RuntimeError(f"create collection -> {status}: {data}")
return "created"
def ensure_indexes():
for field, schema in (("lp_id", "keyword"), ("doc_type", "keyword"), ("date_ts", "integer")):
_req("PUT", f"/collections/{COL}/index", {"field_name": field, "field_schema": schema})
def upsert(points):
status, data = _req("PUT", f"/collections/{COL}/points?wait=true", {"points": points})
if status not in (200, 201):
raise RuntimeError(f"upsert -> {status}: {data}")
return data
def count():
status, data = _req("POST", f"/collections/{COL}/points/count", {"exact": True})
return (data or {}).get("result", {}).get("count")
def delete_by_source_ids(source_ids):
"""Delete all chunks belonging to the given CRM source records (by payload
source_id) — used to prune soft-deleted records from the index."""
ids = list(source_ids)
if not ids:
return None
status, data = _req("POST", f"/collections/{COL}/points/delete?wait=true",
{"filter": {"must": [{"key": "source_id", "match": {"any": ids}}]}})
if status not in (200, 201):
raise RuntimeError(f"delete points -> {status}: {data}")
return data