"""Daily activity-digest scheduler (Phase B). Co-located with the Gmail sync scheduler (it shares the same conn-factory and daemon-thread idiom). One daemon thread wakes every 60s and fires the daily activity digest once per local day, at/after the configured send hour. Control lives in the DB, set from Settings -> Admin (digest_builder.load_digest_policy -> app_settings 'digest_policy'): {enabled, send_hour}. The thread always runs and re-reads the policy each cycle, so toggling the digest on/off or changing the time takes effect on the next loop — no restart. CRM_DIGEST_ENABLED/SEND_HOUR only seed the first-boot default before an admin sets the policy. The send is an internal ops email to the team's own admins — exempt from the "agents draft, humans send" rule (which governs outward LP/prospect contact). Digest content is summarized on Spark (local), never Claude — see digest_builder. Window: the content covers (last successful send, now]. Tracked in app_settings so a missed day's activity rolls into the next digest rather than being dropped; the first-ever run covers the prior 24h. The once-per-day guard is a separate local-date key. The transport (Gmail-DWD -> SMTP) is digest_mailer's job. """ import json import logging import sqlite3 import threading from datetime import datetime, timedelta, timezone from .scheduler import _conn_factory_from_env log = logging.getLogger("email_integration.digest_scheduler") _LAST_DATE_KEY = "digest_last_sent_date" # local YYYY-MM-DD — once-per-day guard _LAST_AT_KEY = "digest_last_sent_at" # UTC ISO — content-window cursor _state: dict[str, object] = {"thread": None, "stop": threading.Event()} def _utc_iso(dt: datetime) -> str: return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") # app_settings access kept local (no server.py import — avoid the startup cycle); # the value_json/JSON encoding matches server.get/set_app_setting exactly. def _get_setting(conn, key): try: row = conn.execute("SELECT value_json FROM app_settings WHERE key = ?", (key,)).fetchone() except sqlite3.OperationalError: return None if not row: return None try: return json.loads(row["value_json"]) except Exception: return None def _set_setting(conn, key, value) -> None: conn.execute( "INSERT INTO app_settings (key, value_json, updated_at) VALUES (?, ?, ?) " "ON CONFLICT(key) DO UPDATE SET value_json = excluded.value_json, " "updated_at = excluded.updated_at", (key, json.dumps(value), _utc_iso(datetime.now(timezone.utc))), ) def _admin_recipients(conn) -> list[str]: rows = conn.execute( "SELECT email FROM users WHERE role = 'admin' AND is_active = 1 " "AND email IS NOT NULL AND TRIM(email) != ''" ).fetchall() return [str(r["email"]).strip() for r in rows if str(r["email"]).strip()] def _build_and_send(conn, since_iso, until_iso, *, build_fn=None, send_fn=None): """Build the digest and hand it to the transport. Raises digest_mailer.NoTransport (no transport / no recipient) — callers map that to a clear 400; the daily loop logs it. build_fn/send_fn are injectable for tests.""" import digest_builder import digest_mailer bf = build_fn or digest_builder.build_digest sf = send_fn or digest_mailer.send_digest recipients = _admin_recipients(conn) if not recipients: raise digest_mailer.NoTransport( "No active admin has an email address — give one an address to receive the digest.") digest = bf(conn, since_iso, until_iso) result = sf(conn, recipients, digest["subject"], digest["body"]) return { "recipients": recipients, "transport": (result or {}).get("transport"), "has_activity": digest["has_activity"], "user_count": digest["user_count"], "email_count": digest["email_count"], "investor_count": digest.get("investor_count"), "window": [since_iso, until_iso], } def maybe_send_digest(conn_factory=None, *, force=False, now_local=None, now_utc=None, build_fn=None, send_fn=None): """Send the daily digest if it is due (or unconditionally when force=True). Daily path: skips before the send hour and if already sent today; content window runs from the last send to now and the cursor advances on success. force path (the admin 'send now' endpoint): ignores the policy and the guards, uses a fixed last-24h window, and does NOT advance the daily cursor — so an on-demand preview never suppresses the scheduled send.""" import digest_builder factory = conn_factory or _conn_factory_from_env() conn = factory() try: policy = digest_builder.load_digest_policy(conn) if not force and not policy["enabled"]: return {"status": "disabled"} nl = now_local or datetime.now() nu = now_utc or datetime.now(timezone.utc) if not force: today = nl.strftime("%Y-%m-%d") if nl.hour < policy["send_hour"]: return {"status": "before_send_hour", "send_hour": policy["send_hour"]} if _get_setting(conn, _LAST_DATE_KEY) == today: return {"status": "already_sent_today"} until_iso = _utc_iso(nu) last_at = None if force else _get_setting(conn, _LAST_AT_KEY) since_iso = last_at or _utc_iso(nu - timedelta(hours=24)) result = _build_and_send(conn, since_iso, until_iso, build_fn=build_fn, send_fn=send_fn) if not force: _set_setting(conn, _LAST_DATE_KEY, nl.strftime("%Y-%m-%d")) _set_setting(conn, _LAST_AT_KEY, until_iso) conn.commit() return {"status": "sent", **result} finally: conn.close() def start_digest_scheduler(conn_factory=None) -> None: """Start the daily digest loop (idempotent). The thread always runs and reads the DB policy each cycle (admin-panel control), so it sends only when the policy is enabled — no env gate, no restart needed to toggle.""" if _state["thread"] is not None: return factory = conn_factory or _conn_factory_from_env() stop = threading.Event() _state["stop"] = stop def _loop(): log.info("digest scheduler started (policy-controlled via Settings -> Admin)") if stop.wait(15): # let server finish startup return while not stop.is_set(): try: res = maybe_send_digest(factory) if res.get("status") == "sent": log.info("daily digest sent: %s", res) except Exception: log.exception("digest send failed; will retry next cycle") if stop.wait(60): return t = threading.Thread(target=_loop, name="digest", daemon=True) t.start() _state["thread"] = t def stop_digest_scheduler() -> None: ev: threading.Event = _state["stop"] # type: ignore ev.set() t = _state.get("thread") if t: try: t.join(timeout=5) # type: ignore except Exception: pass _state["thread"] = None