""" Background sync scheduler. Runs as a daemon thread started from server.py main(). One thread; it wakes every `sync_interval_sec`, processes all accounts serially, sleeps again. Singleton: start_sync_scheduler() is idempotent — calling twice won't spawn a second thread. stop_sync_scheduler() gracefully signals shutdown (not strictly needed since it's daemon, but useful for tests). """ import logging import sqlite3 import threading import time from typing import Callable, Optional from . import config as _cfg from . import credentials as _creds from . import sync as _sync from .matcher import InvestorIndex log = logging.getLogger("email_integration.scheduler") _state: dict[str, object] = { "thread": None, "stop": threading.Event(), "last_run": 0.0, "last_result": None, "running_now": False, } def _conn_factory_from_env() -> Callable[[], sqlite3.Connection]: """Build a get_db() compatible with server.py's pattern. We don't import server.py (avoid circular / startup ordering). Instead we re-implement the same settings. If server.py's DB path differs from the default, CRM_DB_PATH env var should be set — same mechanism. """ import os db_path = os.environ.get( "CRM_DB_PATH", os.path.join(_cfg.CONFIG.data_dir, "crm.db"), ) def get_db() -> sqlite3.Connection: conn = sqlite3.connect(db_path) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") conn.execute("PRAGMA busy_timeout=5000") conn.row_factory = sqlite3.Row return conn return get_db def start_sync_scheduler(conn_factory: Optional[Callable] = None, post_sync: Optional[Callable] = None) -> None: """Start the periodic Gmail sync loop. `post_sync`, if given, is called after each sync pass (best-effort) — used to run the email-activity summarizer.""" if _state["thread"] is not None: return # already running if not _cfg.CONFIG.enabled: log.info("email_integration not enabled; scheduler will not start") return factory = conn_factory or _conn_factory_from_env() try: provider = _creds.build_provider(factory) except Exception as e: log.exception("cannot build credential provider: %s", e) return index = InvestorIndex(own_domain=_cfg.CONFIG.workspace_domain) try: index.rebuild(factory) except Exception: log.exception("initial investor-index build failed; scheduler continues") stop = threading.Event() _state["stop"] = stop def _loop(): log.info("email sync scheduler started; interval=%ss", _cfg.CONFIG.sync_interval_sec) # First cycle: short delay to let server finish startup. if stop.wait(10): return while not stop.is_set(): _state["running_now"] = True t0 = time.time() try: result = _sync.sync_all(factory, provider, index) _state["last_result"] = result except Exception: log.exception("sync loop crashed; will retry next cycle") finally: _state["running_now"] = False _state["last_run"] = t0 if post_sync is not None: try: post_sync() except Exception: log.exception("post_sync hook failed; continuing") if stop.wait(_cfg.CONFIG.sync_interval_sec): return t = threading.Thread(target=_loop, name="email-sync", daemon=True) t.start() _state["thread"] = t _state["provider"] = provider _state["index"] = index _state["factory"] = factory def stop_sync_scheduler() -> None: ev: threading.Event = _state["stop"] # type: ignore ev.set() t = _state.get("thread") if t: try: t.join(timeout=5) except Exception: pass _state["thread"] = None def trigger_run_now() -> dict: """Force a single sync pass synchronously (admin 'sync now' endpoint).""" if _state.get("running_now"): return {"status": "already_running"} factory = _state.get("factory") provider = _state.get("provider") index = _state.get("index") if not (factory and provider and index): return {"status": "not_initialized"} return _sync.sync_all(factory, provider, index) # type: ignore def status_snapshot() -> dict: return { "enabled": _cfg.CONFIG.enabled, "running": _state["running_now"], "last_run_unix": _state.get("last_run"), "last_result": _state.get("last_result"), "interval_sec": _cfg.CONFIG.sync_interval_sec, }