069e60053b
When a sent/received email is matched to an investor, a local-model agent drafts a
one-line dated note and queues it as a PENDING proposal (it never writes the grid
itself). On the Email Capture page a partner sees "Proposed grid notes", can edit the
text, and Approve (appends to that investor's grid notes cell, newest at bottom,
stamped with the approver) or Dismiss. Going-forward only: a cutoff (app_settings
email_activity_since, set on first run) means email dated before the feature was
enabled is never summarized, so the historical backfill makes no noise. Sovereign:
summaries run entirely on the local model (no redaction needed). Gmail sync interval
tightened 180 -> 15 min so outgoing email surfaces quickly.
Backend: migration 0002 (email_activity_proposals); propose_email_activity_notes()
runs via a new scheduler post_sync hook; list/decide functions + routes
GET /api/activity/proposals, POST .../{id}/approve|dismiss. Grid append stamps the
approving user (fundraising_state.updated_by has a FK to users). Test
test_email_activity.py (propose cutoff/idempotency, approve appends + edited note,
dismiss, already-decided guard) under FK enforcement.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
152 lines
4.7 KiB
Python
152 lines
4.7 KiB
Python
"""
|
|
Background sync scheduler.
|
|
|
|
Runs as a daemon thread started from server.py main(). One thread; it wakes
|
|
every `sync_interval_sec`, processes all accounts serially, sleeps again.
|
|
|
|
Singleton: start_sync_scheduler() is idempotent — calling twice won't spawn
|
|
a second thread. stop_sync_scheduler() gracefully signals shutdown (not
|
|
strictly needed since it's daemon, but useful for tests).
|
|
"""
|
|
|
|
import logging
|
|
import sqlite3
|
|
import threading
|
|
import time
|
|
from typing import Callable, Optional
|
|
|
|
from . import config as _cfg
|
|
from . import credentials as _creds
|
|
from . import sync as _sync
|
|
from .matcher import InvestorIndex
|
|
|
|
|
|
log = logging.getLogger("email_integration.scheduler")
|
|
|
|
|
|
_state: dict[str, object] = {
|
|
"thread": None,
|
|
"stop": threading.Event(),
|
|
"last_run": 0.0,
|
|
"last_result": None,
|
|
"running_now": False,
|
|
}
|
|
|
|
|
|
def _conn_factory_from_env() -> Callable[[], sqlite3.Connection]:
|
|
"""Build a get_db() compatible with server.py's pattern.
|
|
|
|
We don't import server.py (avoid circular / startup ordering). Instead
|
|
we re-implement the same settings. If server.py's DB path differs from
|
|
the default, CRM_DB_PATH env var should be set — same mechanism.
|
|
"""
|
|
import os
|
|
db_path = os.environ.get(
|
|
"CRM_DB_PATH",
|
|
os.path.join(_cfg.CONFIG.data_dir, "crm.db"),
|
|
)
|
|
|
|
def get_db() -> sqlite3.Connection:
|
|
conn = sqlite3.connect(db_path)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
conn.execute("PRAGMA busy_timeout=5000")
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
return get_db
|
|
|
|
|
|
def start_sync_scheduler(conn_factory: Optional[Callable] = None,
|
|
post_sync: Optional[Callable] = None) -> None:
|
|
"""Start the periodic Gmail sync loop. `post_sync`, if given, is called after each
|
|
sync pass (best-effort) — used to run the email-activity summarizer."""
|
|
if _state["thread"] is not None:
|
|
return # already running
|
|
|
|
if not _cfg.CONFIG.enabled:
|
|
log.info("email_integration not enabled; scheduler will not start")
|
|
return
|
|
|
|
factory = conn_factory or _conn_factory_from_env()
|
|
|
|
try:
|
|
provider = _creds.build_provider(factory)
|
|
except Exception as e:
|
|
log.exception("cannot build credential provider: %s", e)
|
|
return
|
|
|
|
index = InvestorIndex(own_domain=_cfg.CONFIG.workspace_domain)
|
|
try:
|
|
index.rebuild(factory)
|
|
except Exception:
|
|
log.exception("initial investor-index build failed; scheduler continues")
|
|
|
|
stop = threading.Event()
|
|
_state["stop"] = stop
|
|
|
|
def _loop():
|
|
log.info("email sync scheduler started; interval=%ss", _cfg.CONFIG.sync_interval_sec)
|
|
# First cycle: short delay to let server finish startup.
|
|
if stop.wait(10):
|
|
return
|
|
while not stop.is_set():
|
|
_state["running_now"] = True
|
|
t0 = time.time()
|
|
try:
|
|
result = _sync.sync_all(factory, provider, index)
|
|
_state["last_result"] = result
|
|
except Exception:
|
|
log.exception("sync loop crashed; will retry next cycle")
|
|
finally:
|
|
_state["running_now"] = False
|
|
_state["last_run"] = t0
|
|
if post_sync is not None:
|
|
try:
|
|
post_sync()
|
|
except Exception:
|
|
log.exception("post_sync hook failed; continuing")
|
|
if stop.wait(_cfg.CONFIG.sync_interval_sec):
|
|
return
|
|
|
|
t = threading.Thread(target=_loop, name="email-sync", daemon=True)
|
|
t.start()
|
|
_state["thread"] = t
|
|
_state["provider"] = provider
|
|
_state["index"] = index
|
|
_state["factory"] = factory
|
|
|
|
|
|
def stop_sync_scheduler() -> None:
|
|
ev: threading.Event = _state["stop"] # type: ignore
|
|
ev.set()
|
|
t = _state.get("thread")
|
|
if t:
|
|
try:
|
|
t.join(timeout=5)
|
|
except Exception:
|
|
pass
|
|
_state["thread"] = None
|
|
|
|
|
|
def trigger_run_now() -> dict:
|
|
"""Force a single sync pass synchronously (admin 'sync now' endpoint)."""
|
|
if _state.get("running_now"):
|
|
return {"status": "already_running"}
|
|
factory = _state.get("factory")
|
|
provider = _state.get("provider")
|
|
index = _state.get("index")
|
|
if not (factory and provider and index):
|
|
return {"status": "not_initialized"}
|
|
return _sync.sync_all(factory, provider, index) # type: ignore
|
|
|
|
|
|
def status_snapshot() -> dict:
|
|
return {
|
|
"enabled": _cfg.CONFIG.enabled,
|
|
"running": _state["running_now"],
|
|
"last_run_unix": _state.get("last_run"),
|
|
"last_result": _state.get("last_result"),
|
|
"interval_sec": _cfg.CONFIG.sync_interval_sec,
|
|
}
|