Files
Keysat 6be2e40f54 Phase 0 go-live polish: hands-off incremental sync + refresh action
- backend/ingest/sync_scheduler.py: periodic incremental-sync loop (every
  CRM_INGEST_SYNC_INTERVAL_MIN min); resilient, --once for testing.
- start9/0.4: "Refresh search index" action (incremental sync.py); entrypoint
  launches the scheduler as a background process when Spark/Qdrant are set;
  CRM_INGEST_SYNC_INTERVAL_MIN env; pre-release note on fastembed/mcp pins.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 09:36:06 -05:00

56 lines
1.9 KiB
Python

#!/usr/bin/env python3
"""Hands-off periodic incremental-sync loop.
Runs `sync.run()` every CRM_INGEST_SYNC_INTERVAL_MIN minutes so the Qdrant index
tracks CRM changes without manual action. Mirrors the email-sync / backup
scheduler pattern already used in this codebase. Resilient: a failed cycle is
logged and the loop continues. Intended to be launched as a background process
by the StartOS docker_entrypoint.sh (only when Spark/Qdrant are configured).
python3 backend/ingest/sync_scheduler.py --db /data/crm.db
python3 backend/ingest/sync_scheduler.py --db data/crm_dev.db --once # one cycle (test)
"""
import argparse
import os
import sys
import time
import traceback
import config
import sync
def _log(msg):
sys.stderr.write(f"[ingest-scheduler] {msg}\n")
sys.stderr.flush()
def loop(db, interval_min, fuzzy):
interval = max(60, int(interval_min) * 60)
_log(f"started: every {interval_min} min on {db} (fuzzy={fuzzy})")
while True:
try:
s = sync.run(db, fuzzy=fuzzy)
_log(f"{s['mode']}: embedded {s['rows_embedded']} chunk(s); {s['qdrant_points']} points")
except Exception as exc:
_log(f"sync FAILED (continuing): {exc}\n{traceback.format_exc()}")
time.sleep(interval)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--db", default=config.DEFAULT_DB)
ap.add_argument("--interval-min", type=int,
default=int(os.environ.get("CRM_INGEST_SYNC_INTERVAL_MIN", "60")))
ap.add_argument("--fuzzy", action="store_true", help="also run the local-Qwen fuzzy tier each cycle")
ap.add_argument("--once", action="store_true", help="run a single cycle and exit (testing)")
args = ap.parse_args()
if args.once:
print(sync.run(args.db, fuzzy=args.fuzzy))
return
loop(args.db, args.interval_min, args.fuzzy)
if __name__ == "__main__":
main()