Files
ten31-database/backend/email_integration/scheduler.py
T
Keysat 069e60053b email-activity agent: propose -> review -> approve grid notes (v0.1.0:64)
When a sent/received email is matched to an investor, a local-model agent drafts a
one-line dated note and queues it as a PENDING proposal (it never writes the grid
itself). On the Email Capture page a partner sees "Proposed grid notes", can edit the
text, and Approve (appends to that investor's grid notes cell, newest at bottom,
stamped with the approver) or Dismiss. Going-forward only: a cutoff (app_settings
email_activity_since, set on first run) means email dated before the feature was
enabled is never summarized, so the historical backfill makes no noise. Sovereign:
summaries run entirely on the local model (no redaction needed). Gmail sync interval
tightened 180 -> 15 min so outgoing email surfaces quickly.

Backend: migration 0002 (email_activity_proposals); propose_email_activity_notes()
runs via a new scheduler post_sync hook; list/decide functions + routes
GET /api/activity/proposals, POST .../{id}/approve|dismiss. Grid append stamps the
approving user (fundraising_state.updated_by has a FK to users). Test
test_email_activity.py (propose cutoff/idempotency, approve appends + edited note,
dismiss, already-decided guard) under FK enforcement.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 15:55:26 -05:00

152 lines
4.7 KiB
Python

"""
Background sync scheduler.
Runs as a daemon thread started from server.py main(). One thread; it wakes
every `sync_interval_sec`, processes all accounts serially, sleeps again.
Singleton: start_sync_scheduler() is idempotent — calling twice won't spawn
a second thread. stop_sync_scheduler() gracefully signals shutdown (not
strictly needed since it's daemon, but useful for tests).
"""
import logging
import sqlite3
import threading
import time
from typing import Callable, Optional
from . import config as _cfg
from . import credentials as _creds
from . import sync as _sync
from .matcher import InvestorIndex
log = logging.getLogger("email_integration.scheduler")
_state: dict[str, object] = {
"thread": None,
"stop": threading.Event(),
"last_run": 0.0,
"last_result": None,
"running_now": False,
}
def _conn_factory_from_env() -> Callable[[], sqlite3.Connection]:
"""Build a get_db() compatible with server.py's pattern.
We don't import server.py (avoid circular / startup ordering). Instead
we re-implement the same settings. If server.py's DB path differs from
the default, CRM_DB_PATH env var should be set — same mechanism.
"""
import os
db_path = os.environ.get(
"CRM_DB_PATH",
os.path.join(_cfg.CONFIG.data_dir, "crm.db"),
)
def get_db() -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.execute("PRAGMA busy_timeout=5000")
conn.row_factory = sqlite3.Row
return conn
return get_db
def start_sync_scheduler(conn_factory: Optional[Callable] = None,
post_sync: Optional[Callable] = None) -> None:
"""Start the periodic Gmail sync loop. `post_sync`, if given, is called after each
sync pass (best-effort) — used to run the email-activity summarizer."""
if _state["thread"] is not None:
return # already running
if not _cfg.CONFIG.enabled:
log.info("email_integration not enabled; scheduler will not start")
return
factory = conn_factory or _conn_factory_from_env()
try:
provider = _creds.build_provider(factory)
except Exception as e:
log.exception("cannot build credential provider: %s", e)
return
index = InvestorIndex(own_domain=_cfg.CONFIG.workspace_domain)
try:
index.rebuild(factory)
except Exception:
log.exception("initial investor-index build failed; scheduler continues")
stop = threading.Event()
_state["stop"] = stop
def _loop():
log.info("email sync scheduler started; interval=%ss", _cfg.CONFIG.sync_interval_sec)
# First cycle: short delay to let server finish startup.
if stop.wait(10):
return
while not stop.is_set():
_state["running_now"] = True
t0 = time.time()
try:
result = _sync.sync_all(factory, provider, index)
_state["last_result"] = result
except Exception:
log.exception("sync loop crashed; will retry next cycle")
finally:
_state["running_now"] = False
_state["last_run"] = t0
if post_sync is not None:
try:
post_sync()
except Exception:
log.exception("post_sync hook failed; continuing")
if stop.wait(_cfg.CONFIG.sync_interval_sec):
return
t = threading.Thread(target=_loop, name="email-sync", daemon=True)
t.start()
_state["thread"] = t
_state["provider"] = provider
_state["index"] = index
_state["factory"] = factory
def stop_sync_scheduler() -> None:
ev: threading.Event = _state["stop"] # type: ignore
ev.set()
t = _state.get("thread")
if t:
try:
t.join(timeout=5)
except Exception:
pass
_state["thread"] = None
def trigger_run_now() -> dict:
"""Force a single sync pass synchronously (admin 'sync now' endpoint)."""
if _state.get("running_now"):
return {"status": "already_running"}
factory = _state.get("factory")
provider = _state.get("provider")
index = _state.get("index")
if not (factory and provider and index):
return {"status": "not_initialized"}
return _sync.sync_all(factory, provider, index) # type: ignore
def status_snapshot() -> dict:
return {
"enabled": _cfg.CONFIG.enabled,
"running": _state["running_now"],
"last_run_unix": _state.get("last_run"),
"last_result": _state.get("last_result"),
"interval_sec": _cfg.CONFIG.sync_interval_sec,
}