Phase 0 complete: fuzzy entity tier, incremental sync, Start9 packaging
- Fuzzy tier (backend/ingest/fuzzy_resolve.py + llm.py): local Qwen adjudicates the deterministic resolver's flagged name-variant candidates; merges are durable via entity_merges (deterministic re-runs respect them), losers soft-deleted, logged. Idempotent. - Incremental sync (backend/ingest/sync.py): re-embeds only rows changed since a watermark (ingest_sync_state); first run / --recreate = full. Tested full→0→1. - Start9 packaging (start9/0.4): Dockerfile bundles ingest+mcp + fastembed/mcp; "Build search index" action runs the init in a subcontainer; MCP shipped as a manual stdio server (not a daemon); version 0.1.0:44. INGEST_PACKAGING.md. - backfill.py: factored embed_and_upsert() shared with sync. Verified end-to-end on synthetic data + live Sparks/Qwen/Qdrant. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
+19
-12
@@ -17,17 +17,9 @@ import qdrant_io
|
||||
import sparse
|
||||
|
||||
|
||||
def run(db, recreate=False, batch=32):
|
||||
conn = sqlite3.connect(db)
|
||||
conn.row_factory = sqlite3.Row
|
||||
chunks = chunking.build_chunks(conn)
|
||||
conn.close()
|
||||
print(f"Built {len(chunks)} chunks from {db}")
|
||||
|
||||
state = qdrant_io.create_collection(recreate=recreate)
|
||||
qdrant_io.ensure_indexes()
|
||||
print(f"Collection '{config.COLLECTION}': {state}")
|
||||
|
||||
def embed_and_upsert(chunks, batch=32, progress=True):
|
||||
"""Embed (dense + sparse) and upsert a list of chunks to Qdrant. Shared by the
|
||||
full backfill and the incremental sync. Returns the number of points written."""
|
||||
total = 0
|
||||
for i in range(0, len(chunks), batch):
|
||||
group = chunks[i:i + batch]
|
||||
@@ -46,8 +38,23 @@ def run(db, recreate=False, batch=32):
|
||||
})
|
||||
qdrant_io.upsert(points)
|
||||
total += len(points)
|
||||
print(f" upserted {total}/{len(chunks)}")
|
||||
if progress:
|
||||
print(f" upserted {total}/{len(chunks)}")
|
||||
return total
|
||||
|
||||
|
||||
def run(db, recreate=False, batch=32):
|
||||
conn = sqlite3.connect(db)
|
||||
conn.row_factory = sqlite3.Row
|
||||
chunks = chunking.build_chunks(conn)
|
||||
conn.close()
|
||||
print(f"Built {len(chunks)} chunks from {db}")
|
||||
|
||||
state = qdrant_io.create_collection(recreate=recreate)
|
||||
qdrant_io.ensure_indexes()
|
||||
print(f"Collection '{config.COLLECTION}': {state}")
|
||||
|
||||
embed_and_upsert(chunks, batch=batch)
|
||||
print(f"Done. Qdrant '{config.COLLECTION}' now holds {qdrant_io.count()} points.")
|
||||
|
||||
|
||||
|
||||
@@ -24,5 +24,6 @@ SPARK_VERIFY_TLS = os.environ.get("SPARK_CONTROL_VERIFY_TLS", "false").lower() i
|
||||
QDRANT_URL = os.environ.get("QDRANT_URL", "").rstrip("/")
|
||||
COLLECTION = os.environ.get("CRM_QDRANT_COLLECTION", "crm_chunks")
|
||||
EMBED_MODEL = os.environ.get("CRM_EMBED_MODEL", "BAAI/bge-m3")
|
||||
CHAT_MODEL = os.environ.get("CRM_CHAT_MODEL", "RedHatAI/Qwen3.6-35B-A3B-NVFP4")
|
||||
DENSE_DIM = int(os.environ.get("CRM_EMBED_DIM", "1024"))
|
||||
DEFAULT_DB = os.environ.get("CRM_DEV_DB_PATH", os.path.join(_ROOT, "data", "crm_dev.db"))
|
||||
|
||||
@@ -69,6 +69,16 @@ def _split_name(full: str):
|
||||
return parts[0], parts[-1] if len(parts) > 1 else ""
|
||||
|
||||
|
||||
def _redirect(merge_map, eid):
|
||||
"""Follow durable fuzzy-merge redirects (entity_merges) so deterministic
|
||||
re-runs respect prior merges instead of recreating the merged-away entity."""
|
||||
seen = set()
|
||||
while eid in merge_map and eid not in seen:
|
||||
seen.add(eid)
|
||||
eid = merge_map[eid]
|
||||
return eid
|
||||
|
||||
|
||||
# ── upsert helpers ────────────────────────────────────────────────────────────
|
||||
|
||||
def _upsert_entity(conn, eid, kind, display_name, primary_email):
|
||||
@@ -102,12 +112,13 @@ def _link(conn, canonical_id, source_model, source_id, match_value, match_kind,
|
||||
|
||||
# ── resolution passes ─────────────────────────────────────────────────────────
|
||||
|
||||
def resolve_organizations(conn):
|
||||
def resolve_organizations(conn, merge_map=None):
|
||||
"""Merge organizations + fundraising_investors by normalized name.
|
||||
|
||||
Returns (org_canon_by_orgid, org_canon_by_fundinv) so the people pass can
|
||||
attach each person to their firm's canonical id.
|
||||
"""
|
||||
merge_map = merge_map or {}
|
||||
groups = defaultdict(lambda: {"orgs": [], "investors": [], "name": "", "email": ""})
|
||||
|
||||
for r in conn.execute("SELECT id, name, email FROM organizations"):
|
||||
@@ -135,7 +146,7 @@ def resolve_organizations(conn):
|
||||
# An org we are actively raising from (has a fundraising row) is an 'lp';
|
||||
# otherwise a plain 'organization'.
|
||||
kind = "lp" if g["investors"] else "organization"
|
||||
cid = _eid("lp" if kind == "lp" else "org", key)
|
||||
cid = _redirect(merge_map, _eid("lp" if kind == "lp" else "org", key))
|
||||
_upsert_entity(conn, cid, kind, g["name"], g["email"])
|
||||
for oid in g["orgs"]:
|
||||
_link(conn, cid, "organizations", oid, key, "exact_name", 1.0)
|
||||
@@ -147,9 +158,10 @@ def resolve_organizations(conn):
|
||||
return org_canon_by_orgid, org_canon_by_fundinv
|
||||
|
||||
|
||||
def resolve_people(conn, org_canon_by_orgid, org_canon_by_fundinv):
|
||||
def resolve_people(conn, org_canon_by_orgid, org_canon_by_fundinv, merge_map=None):
|
||||
"""Merge contacts + fundraising_contacts by exact email, else exact name within
|
||||
the same canonical org. Returns contact_id -> person canonical id (for lp_profiles)."""
|
||||
merge_map = merge_map or {}
|
||||
# gather (model, source_id, full_name, email, org_canon)
|
||||
people = []
|
||||
for r in conn.execute("SELECT id, first_name, last_name, email, organization_id FROM contacts"):
|
||||
@@ -173,7 +185,7 @@ def resolve_people(conn, org_canon_by_orgid, org_canon_by_fundinv):
|
||||
match_kind, conf, match_value = "name_org", 0.8, name_norm
|
||||
else:
|
||||
continue
|
||||
cid = _eid("per", key)
|
||||
cid = _redirect(merge_map, _eid("per", key))
|
||||
display = full.strip() or email
|
||||
_upsert_entity(conn, cid, "person", display, email)
|
||||
_link(conn, cid, model, sid, match_value, match_kind, conf)
|
||||
@@ -210,17 +222,32 @@ def run(db_path: str):
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
|
||||
org_by_oid, org_by_inv = resolve_organizations(conn)
|
||||
# Durable fuzzy-merge map (entity_merges) so deterministic re-runs respect
|
||||
# prior local-Qwen merges instead of recreating merged-away entities.
|
||||
conn.execute("""CREATE TABLE IF NOT EXISTS entity_merges (
|
||||
merged_id TEXT PRIMARY KEY,
|
||||
survivor_id TEXT NOT NULL,
|
||||
confidence REAL,
|
||||
reason TEXT,
|
||||
created_at TEXT DEFAULT (datetime('now'))
|
||||
)""")
|
||||
merge_map = {r["merged_id"]: r["survivor_id"]
|
||||
for r in conn.execute("SELECT merged_id, survivor_id FROM entity_merges")}
|
||||
|
||||
org_by_oid, org_by_inv = resolve_organizations(conn, merge_map)
|
||||
conn.commit()
|
||||
person_meta = resolve_people(conn, org_by_oid, org_by_inv)
|
||||
person_meta = resolve_people(conn, org_by_oid, org_by_inv, merge_map)
|
||||
conn.commit()
|
||||
candidates = find_fuzzy_candidates(person_meta)
|
||||
|
||||
# Counts report LIVE entities (deleted_at IS NULL); fuzzy-merged losers are
|
||||
# soft-deleted tombstones (guardrail #3) and excluded.
|
||||
live = "deleted_at IS NULL"
|
||||
counts = {
|
||||
"canonical_total": conn.execute("SELECT COUNT(*) FROM canonical_entities").fetchone()[0],
|
||||
"lp": conn.execute("SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='lp'").fetchone()[0],
|
||||
"organization": conn.execute("SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='organization'").fetchone()[0],
|
||||
"person": conn.execute("SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='person'").fetchone()[0],
|
||||
"canonical_total": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE {live}").fetchone()[0],
|
||||
"lp": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='lp' AND {live}").fetchone()[0],
|
||||
"organization": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='organization' AND {live}").fetchone()[0],
|
||||
"person": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='person' AND {live}").fetchone()[0],
|
||||
"links": conn.execute("SELECT COUNT(*) FROM entity_links").fetchone()[0],
|
||||
"fuzzy_candidates": len(candidates),
|
||||
}
|
||||
|
||||
@@ -0,0 +1,116 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Phase-0 Workstream B3 — fuzzy entity-resolution tier (local Qwen).
|
||||
|
||||
The deterministic tier (entity_resolution.py) merges only provable matches and
|
||||
FLAGS the hard name-variant candidates (same firm + surname, different first
|
||||
name/email) without guessing. This tier asks the local Qwen model (Spark Control
|
||||
/v1/chat/completions — sovereign, on Ten31 infra) to adjudicate each candidate
|
||||
and merges the confirmed ones.
|
||||
|
||||
A merge repoints the loser's entity_links to the survivor and soft-deletes the
|
||||
loser canonical entity (deleted_at; never hard-deleted — guardrail #3). Every
|
||||
merge is written to the interaction_log (guardrail #5). Idempotent: re-running
|
||||
finds no new candidates once merged.
|
||||
|
||||
python3 backend/ingest/fuzzy_resolve.py --db data/crm_dev.db
|
||||
python3 backend/ingest/fuzzy_resolve.py --db data/crm_dev.db --dry-run
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
import sqlite3
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import entity_resolution as er
|
||||
import llm
|
||||
|
||||
_SYSTEM = ("You are an entity-resolution assistant for a CRM. Decide if the listed "
|
||||
"people are the SAME individual recorded under name variants (e.g. nicknames "
|
||||
"like Kate/Katherine, Bill/William), or DIFFERENT people who happen to share a "
|
||||
"surname and firm. Be conservative: only say same when a nickname/abbreviation "
|
||||
"relationship or matching contact info makes it clear.")
|
||||
|
||||
|
||||
def _now():
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def _ask(members, firm):
|
||||
people = "; ".join(f"{n}" + (f" <{e}>" if e else "") for _, n, e in members)
|
||||
prompt = (f"Firm: {firm or 'unknown'}\nPeople: {people}\n\n"
|
||||
"Are these the SAME person under name variants? "
|
||||
'Answer only JSON: {"same": true|false, "confidence": 0.0-1.0, "reason": "..."}')
|
||||
return llm.chat_json(prompt, system=_SYSTEM, max_tokens=160) or {"same": False, "confidence": 0.0}
|
||||
|
||||
|
||||
def _survivor(members):
|
||||
# Prefer a member with an email, then the longest (most complete) name.
|
||||
return sorted(members, key=lambda m: (bool(m[2]), len(m[1])), reverse=True)[0]
|
||||
|
||||
|
||||
def run(db, threshold=0.7, dry_run=False):
|
||||
counts, candidates = er.run(db) # ensure deterministic state + fresh candidates
|
||||
conn = sqlite3.connect(db)
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
name_of = {r["id"]: r["display_name"] for r in conn.execute("SELECT id, display_name FROM canonical_entities")}
|
||||
|
||||
merges = []
|
||||
for cand in candidates:
|
||||
members = cand["members"]
|
||||
verdict = _ask(members, name_of.get(cand["org"]))
|
||||
same = bool(verdict.get("same")) and float(verdict.get("confidence", 0)) >= threshold
|
||||
decision = {"surname": cand["surname"], "firm": name_of.get(cand["org"]),
|
||||
"members": [{"id": m[0], "name": m[1]} for m in members],
|
||||
"same": same, "confidence": verdict.get("confidence"),
|
||||
"reason": verdict.get("reason")}
|
||||
if same:
|
||||
keep = _survivor(members)
|
||||
losers = [m for m in members if m[0] != keep[0]]
|
||||
decision["merged_into"] = {"id": keep[0], "name": keep[1]}
|
||||
if not dry_run:
|
||||
for loser in losers:
|
||||
# Record the merge durably so deterministic re-runs respect it.
|
||||
conn.execute("""INSERT INTO entity_merges (merged_id, survivor_id, confidence, reason, created_at)
|
||||
VALUES (?,?,?,?,?)
|
||||
ON CONFLICT(merged_id) DO UPDATE SET survivor_id=excluded.survivor_id,
|
||||
confidence=excluded.confidence, reason=excluded.reason""",
|
||||
(loser[0], keep[0], verdict.get("confidence", 0.7),
|
||||
verdict.get("reason"), _now()))
|
||||
conn.execute("UPDATE entity_links SET canonical_id=?, match_kind='fuzzy_merge', confidence=? "
|
||||
"WHERE canonical_id=?", (keep[0], verdict.get("confidence", 0.7), loser[0]))
|
||||
conn.execute("UPDATE canonical_entities SET deleted_at=?, updated_at=? WHERE id=?",
|
||||
(_now(), _now(), loser[0]))
|
||||
conn.execute("""INSERT INTO interaction_log
|
||||
(id, ts, actor_type, actor_id, action, target_type, target_id, payload, source, created_at)
|
||||
VALUES (?,?,?,?,?,?,?,?,?,?)""",
|
||||
(str(uuid.uuid4()), _now(), "agent", "qwen_entity_resolver", "entity.merged",
|
||||
"canonical_entity", keep[0], json.dumps(decision), "ingest", _now()))
|
||||
merges.append(decision)
|
||||
|
||||
if not dry_run:
|
||||
conn.commit()
|
||||
live_people = conn.execute("SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='person' AND deleted_at IS NULL").fetchone()[0]
|
||||
conn.close()
|
||||
return merges, live_people
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--db", default="data/crm_dev.db")
|
||||
ap.add_argument("--threshold", type=float, default=0.7)
|
||||
ap.add_argument("--dry-run", action="store_true")
|
||||
args = ap.parse_args()
|
||||
merges, live = run(args.db, threshold=args.threshold, dry_run=args.dry_run)
|
||||
print(f"Adjudicated {len(merges)} candidate group(s){' (dry run)' if args.dry_run else ''}:")
|
||||
for m in merges:
|
||||
names = " / ".join(p["name"] for p in m["members"])
|
||||
verdict = f"MERGE -> {m['merged_into']['name']}" if m.get("merged_into") else "keep separate"
|
||||
print(f" [{m['surname']}] {names}: same={m['same']} conf={m['confidence']} => {verdict}")
|
||||
if m.get("reason"):
|
||||
print(f" reason: {m['reason']}")
|
||||
print(f"Live person entities now: {live}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,39 @@
|
||||
"""Local Qwen chat client via Spark Control /v1/chat/completions.
|
||||
|
||||
Used for the privacy-sensitive, high-volume reasoning that must stay on Ten31
|
||||
infra (entity-resolution adjudication, triage). Frontier reasoning still goes to
|
||||
Claude; this is the local leg. Thinking is disabled for fast structured output.
|
||||
"""
|
||||
import json
|
||||
import re
|
||||
|
||||
import config
|
||||
import http_util
|
||||
|
||||
|
||||
def chat(prompt, system=None, max_tokens=200, temperature=0.0):
|
||||
messages = []
|
||||
if system:
|
||||
messages.append({"role": "system", "content": system})
|
||||
messages.append({"role": "user", "content": prompt})
|
||||
body = {"model": config.CHAT_MODEL, "messages": messages,
|
||||
"temperature": temperature, "max_tokens": max_tokens,
|
||||
"chat_template_kwargs": {"enable_thinking": False}}
|
||||
status, data = http_util.request("POST", f"{config.SPARK_CONTROL_URL}/v1/chat/completions",
|
||||
body, verify=config.SPARK_VERIFY_TLS)
|
||||
if status != 200:
|
||||
raise RuntimeError(f"/v1/chat/completions -> {status}: {data}")
|
||||
return (data["choices"][0]["message"].get("content") or "").strip()
|
||||
|
||||
|
||||
def chat_json(prompt, system=None, max_tokens=200):
|
||||
"""Chat and parse the first JSON object from the reply (tolerant of fences)."""
|
||||
raw = chat(prompt, system=system, max_tokens=max_tokens)
|
||||
raw = re.sub(r"^```(json)?|```$", "", raw.strip(), flags=re.MULTILINE).strip()
|
||||
m = re.search(r"\{.*\}", raw, re.DOTALL)
|
||||
if not m:
|
||||
return None
|
||||
try:
|
||||
return json.loads(m.group(0))
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
+32
-16
@@ -16,25 +16,41 @@ import hashlib
|
||||
import math
|
||||
import re
|
||||
|
||||
_TOKEN_RE = re.compile(r"[a-z0-9]+")
|
||||
# Prefer FastEmbed Qdrant/bm25 (the EMBEDDINGS.md-specified encoder) when it is
|
||||
# installable — true on the Start9 box (Python 3.11). Fall back to the
|
||||
# dependency-free encoder below where it is not (e.g. this dev Mac on 3.14).
|
||||
# Whichever is active, ingest and query in the SAME environment use it, so they
|
||||
# stay consistent; production rebuilds the index on the box, so it uses FastEmbed
|
||||
# end-to-end. BACKEND reports which is live.
|
||||
try:
|
||||
from fastembed import SparseTextEmbedding # type: ignore
|
||||
_MODEL = None
|
||||
|
||||
def _model():
|
||||
global _MODEL
|
||||
if _MODEL is None:
|
||||
_MODEL = SparseTextEmbedding(model_name="Qdrant/bm25")
|
||||
return _MODEL
|
||||
|
||||
def tokenize(text: str):
|
||||
return _TOKEN_RE.findall((text or "").lower())
|
||||
def encode(text: str):
|
||||
emb = next(_model().embed([text or ""]))
|
||||
return {"indices": [int(i) for i in emb.indices], "values": [float(v) for v in emb.values]}
|
||||
|
||||
BACKEND = "fastembed:Qdrant/bm25"
|
||||
except Exception:
|
||||
BACKEND = "pure-python-bm25"
|
||||
_TOKEN_RE = re.compile(r"[a-z0-9]+")
|
||||
|
||||
def _index(token: str) -> int:
|
||||
# Stable unsigned 32-bit index for a token (Qdrant sparse indices are u32).
|
||||
return int.from_bytes(hashlib.md5(token.encode("utf-8")).digest()[:4], "big")
|
||||
def tokenize(text: str):
|
||||
return _TOKEN_RE.findall((text or "").lower())
|
||||
|
||||
def _index(token: str) -> int:
|
||||
# Stable unsigned 32-bit index for a token (Qdrant sparse indices are u32).
|
||||
return int.from_bytes(hashlib.md5(token.encode("utf-8")).digest()[:4], "big")
|
||||
|
||||
def encode(text: str):
|
||||
"""Return a sparse vector {indices, values}. Value is 1 + ln(tf) (sublinear
|
||||
term frequency); IDF is applied by Qdrant via modifier:idf."""
|
||||
tf = {}
|
||||
for tok in tokenize(text):
|
||||
tf[tok] = tf.get(tok, 0) + 1
|
||||
idx_val = {}
|
||||
for tok, count in tf.items():
|
||||
idx_val[_index(tok)] = 1.0 + math.log(count)
|
||||
return {"indices": list(idx_val.keys()), "values": list(idx_val.values())}
|
||||
def encode(text: str):
|
||||
"""Sparse vector {indices, values}; value = 1 + ln(tf). Qdrant applies IDF."""
|
||||
tf = {}
|
||||
for tok in tokenize(text):
|
||||
tf[tok] = tf.get(tok, 0) + 1
|
||||
return {"indices": [_index(t) for t in tf], "values": [1.0 + math.log(c) for c in tf.values()]}
|
||||
|
||||
@@ -0,0 +1,126 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Phase-0 Workstream B4 — incremental, idempotent CRM -> Qdrant sync.
|
||||
|
||||
One command that keeps the index fresh:
|
||||
1. Re-run deterministic entity resolution (cheap, idempotent, respects durable
|
||||
fuzzy merges). Optionally re-run the local-Qwen fuzzy tier (--fuzzy).
|
||||
2. Re-embed ONLY the source rows changed since the last sync (by updated_at);
|
||||
the first run (or --recreate) is a full backfill.
|
||||
3. Upsert with deterministic point ids (overwrite in place) and advance the
|
||||
watermark. Logged to interaction_log.
|
||||
|
||||
Idempotent: re-running with no CRM changes embeds nothing. Watermark lives in an
|
||||
`ingest_sync_state` table the pipeline owns.
|
||||
|
||||
python3 backend/ingest/sync.py --db data/crm_dev.db # incremental (full on first run)
|
||||
python3 backend/ingest/sync.py --db data/crm_dev.db --recreate # force full rebuild
|
||||
python3 backend/ingest/sync.py --db data/crm_dev.db --fuzzy # also run the Qwen fuzzy tier
|
||||
|
||||
LIMITATION: the CRM hard-deletes today, so a removed row's chunk is not pruned
|
||||
incrementally (no tombstone). Until the DELETE handlers honor `deleted_at`, run a
|
||||
periodic `--recreate` (or `backfill.py --recreate`) to drop orphans. Structural
|
||||
entity-id changes (merges) are likewise best followed by a periodic full rebuild.
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
import sqlite3
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import backfill
|
||||
import chunking
|
||||
import config
|
||||
import entity_resolution as er
|
||||
import qdrant_io
|
||||
|
||||
_CHANGE_TABLES = [("communications", "communications"), ("contacts", "contacts"),
|
||||
("lp_profiles", "lp_profiles"), ("opportunities", "opportunities"),
|
||||
("organizations", "organizations"), ("fundraising_investors", "fundraising_investors")]
|
||||
|
||||
|
||||
def _now():
|
||||
# Match the CRM's updated_at format ("...Z") so the watermark compares
|
||||
# correctly against source-row updated_at (server.now() in server.py).
|
||||
return datetime.now(timezone.utc).replace(tzinfo=None).isoformat() + "Z"
|
||||
|
||||
|
||||
def _ensure_state(conn):
|
||||
conn.execute("""CREATE TABLE IF NOT EXISTS ingest_sync_state (
|
||||
key TEXT PRIMARY KEY, value TEXT, updated_at TEXT DEFAULT (datetime('now')))""")
|
||||
|
||||
|
||||
def _state_get(conn, key):
|
||||
r = conn.execute("SELECT value FROM ingest_sync_state WHERE key=?", (key,)).fetchone()
|
||||
return r[0] if r else None
|
||||
|
||||
|
||||
def _state_set(conn, key, value):
|
||||
conn.execute("""INSERT INTO ingest_sync_state (key, value, updated_at) VALUES (?,?,?)
|
||||
ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at""",
|
||||
(key, value, _now()))
|
||||
|
||||
|
||||
def _changed_source_ids(conn, since):
|
||||
changed = set()
|
||||
for tbl, model in _CHANGE_TABLES:
|
||||
for r in conn.execute(f"SELECT id FROM {tbl} WHERE updated_at > ?", (since,)):
|
||||
changed.add((model, r["id"]))
|
||||
if chunking._has_table(conn, "emails"):
|
||||
for r in conn.execute("SELECT id FROM emails WHERE updated_at > ? AND is_matched=1", (since,)):
|
||||
changed.add(("emails", r["id"]))
|
||||
return changed
|
||||
|
||||
|
||||
def run(db, recreate=False, fuzzy=False, batch=32):
|
||||
# 1. refresh the canonical layer (deterministic always; fuzzy on request)
|
||||
er.run(db)
|
||||
if fuzzy:
|
||||
import fuzzy_resolve
|
||||
fuzzy_resolve.run(db)
|
||||
|
||||
conn = sqlite3.connect(db)
|
||||
conn.row_factory = sqlite3.Row
|
||||
_ensure_state(conn)
|
||||
last = _state_get(conn, "last_sync_ts")
|
||||
run_start = _now()
|
||||
|
||||
qdrant_io.create_collection(recreate=recreate)
|
||||
qdrant_io.ensure_indexes()
|
||||
|
||||
all_chunks = chunking.build_chunks(conn)
|
||||
if last is None or recreate:
|
||||
mode, target = "full", all_chunks
|
||||
else:
|
||||
changed = _changed_source_ids(conn, last)
|
||||
mode, target = "incremental", [c for c in all_chunks
|
||||
if (c["source_model"], c["source_id"]) in changed]
|
||||
|
||||
written = backfill.embed_and_upsert(target, batch=batch, progress=False)
|
||||
_state_set(conn, "last_sync_ts", run_start)
|
||||
|
||||
summary = {"mode": mode, "rows_embedded": written, "total_chunks": len(all_chunks),
|
||||
"qdrant_points": qdrant_io.count()}
|
||||
conn.execute("""INSERT INTO interaction_log
|
||||
(id, ts, actor_type, actor_id, action, target_type, payload, source, created_at)
|
||||
VALUES (?,?,?,?,?,?,?,?,?)""",
|
||||
(str(uuid.uuid4()), _now(), "system", "ingest_sync", "ingest.sync", "crm_chunks",
|
||||
json.dumps(summary), "ingest", _now()))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return summary
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--db", default=config.DEFAULT_DB)
|
||||
ap.add_argument("--recreate", action="store_true")
|
||||
ap.add_argument("--fuzzy", action="store_true")
|
||||
ap.add_argument("--batch", type=int, default=32)
|
||||
args = ap.parse_args()
|
||||
s = run(args.db, recreate=args.recreate, fuzzy=args.fuzzy, batch=args.batch)
|
||||
print(f"Sync ({s['mode']}): embedded {s['rows_embedded']} chunk(s); "
|
||||
f"{s['total_chunks']} total; Qdrant now holds {s['qdrant_points']} points.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user