323f016f64
Sends a once-a-day internal email to all active admins summarizing each team member's email activity per investor, plus a team-wide by-investor view (inbound + outbound, deduped). Narratives are generated on the LOCAL Spark model, never Claude — the digest is intentionally un-anonymized, so substance stays on Ten31 infra. This is an internal ops email, exempt from the 'agents draft, humans send' rule (which governs outward LP contact). - backend/digest_builder.py: per-user + per-investor activity queries (soft-delete filtered), per-user Spark narrative with a deterministic fallback, two-section plain-text body, and the DB-backed policy resolver. - backend/email_integration/digest_scheduler.py: always-on daily thread that re-reads the policy each cycle and sends once/day; window cursor in app_settings so a missed day rolls forward. - server.py: POST /api/admin/digest/send-now and GET/PATCH /api/admin/digest/policy; scheduler wired into main(). - Control lives in Settings -> Admin (enable toggle + send-time dropdown), not StartOS actions; env vars only seed the first-boot default. - Tests: backend/test_digest_builder.py.
186 lines
7.0 KiB
Python
186 lines
7.0 KiB
Python
"""Daily activity-digest scheduler (Phase B).
|
|
|
|
Co-located with the Gmail sync scheduler (it shares the same conn-factory and
|
|
daemon-thread idiom). One daemon thread wakes every 60s and fires the daily
|
|
activity digest once per local day, at/after the configured send hour.
|
|
|
|
Control lives in the DB, set from Settings -> Admin (digest_builder.load_digest_policy
|
|
-> app_settings 'digest_policy'): {enabled, send_hour}. The thread always runs and
|
|
re-reads the policy each cycle, so toggling the digest on/off or changing the time
|
|
takes effect on the next loop — no restart. CRM_DIGEST_ENABLED/SEND_HOUR only seed
|
|
the first-boot default before an admin sets the policy.
|
|
|
|
The send is an internal ops email to the team's own admins — exempt from the
|
|
"agents draft, humans send" rule (which governs outward LP/prospect contact).
|
|
Digest content is summarized on Spark (local), never Claude — see digest_builder.
|
|
|
|
Window: the content covers (last successful send, now]. Tracked in app_settings
|
|
so a missed day's activity rolls into the next digest rather than being dropped;
|
|
the first-ever run covers the prior 24h. The once-per-day guard is a separate
|
|
local-date key. The transport (Gmail-DWD -> SMTP) is digest_mailer's job.
|
|
"""
|
|
import json
|
|
import logging
|
|
import sqlite3
|
|
import threading
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from .scheduler import _conn_factory_from_env
|
|
|
|
|
|
log = logging.getLogger("email_integration.digest_scheduler")
|
|
|
|
_LAST_DATE_KEY = "digest_last_sent_date" # local YYYY-MM-DD — once-per-day guard
|
|
_LAST_AT_KEY = "digest_last_sent_at" # UTC ISO — content-window cursor
|
|
|
|
_state: dict[str, object] = {"thread": None, "stop": threading.Event()}
|
|
|
|
|
|
def _utc_iso(dt: datetime) -> str:
|
|
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
# app_settings access kept local (no server.py import — avoid the startup cycle);
|
|
# the value_json/JSON encoding matches server.get/set_app_setting exactly.
|
|
def _get_setting(conn, key):
|
|
try:
|
|
row = conn.execute("SELECT value_json FROM app_settings WHERE key = ?", (key,)).fetchone()
|
|
except sqlite3.OperationalError:
|
|
return None
|
|
if not row:
|
|
return None
|
|
try:
|
|
return json.loads(row["value_json"])
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _set_setting(conn, key, value) -> None:
|
|
conn.execute(
|
|
"INSERT INTO app_settings (key, value_json, updated_at) VALUES (?, ?, ?) "
|
|
"ON CONFLICT(key) DO UPDATE SET value_json = excluded.value_json, "
|
|
"updated_at = excluded.updated_at",
|
|
(key, json.dumps(value), _utc_iso(datetime.now(timezone.utc))),
|
|
)
|
|
|
|
|
|
def _admin_recipients(conn) -> list[str]:
|
|
rows = conn.execute(
|
|
"SELECT email FROM users WHERE role = 'admin' AND is_active = 1 "
|
|
"AND email IS NOT NULL AND TRIM(email) != ''"
|
|
).fetchall()
|
|
return [str(r["email"]).strip() for r in rows if str(r["email"]).strip()]
|
|
|
|
|
|
def _build_and_send(conn, since_iso, until_iso, *, build_fn=None, send_fn=None):
|
|
"""Build the digest and hand it to the transport. Raises digest_mailer.NoTransport
|
|
(no transport / no recipient) — callers map that to a clear 400; the daily loop
|
|
logs it. build_fn/send_fn are injectable for tests."""
|
|
import digest_builder
|
|
import digest_mailer
|
|
|
|
bf = build_fn or digest_builder.build_digest
|
|
sf = send_fn or digest_mailer.send_digest
|
|
|
|
recipients = _admin_recipients(conn)
|
|
if not recipients:
|
|
raise digest_mailer.NoTransport(
|
|
"No active admin has an email address — give one an address to receive the digest.")
|
|
|
|
digest = bf(conn, since_iso, until_iso)
|
|
result = sf(conn, recipients, digest["subject"], digest["body"])
|
|
return {
|
|
"recipients": recipients,
|
|
"transport": (result or {}).get("transport"),
|
|
"has_activity": digest["has_activity"],
|
|
"user_count": digest["user_count"],
|
|
"email_count": digest["email_count"],
|
|
"investor_count": digest.get("investor_count"),
|
|
"window": [since_iso, until_iso],
|
|
}
|
|
|
|
|
|
def maybe_send_digest(conn_factory=None, *, force=False,
|
|
now_local=None, now_utc=None, build_fn=None, send_fn=None):
|
|
"""Send the daily digest if it is due (or unconditionally when force=True).
|
|
|
|
Daily path: skips before the send hour and if already sent today; content
|
|
window runs from the last send to now and the cursor advances on success.
|
|
force path (the admin 'send now' endpoint): ignores the policy and the guards,
|
|
uses a fixed last-24h window, and does NOT advance the daily cursor — so an
|
|
on-demand preview never suppresses the scheduled send."""
|
|
import digest_builder
|
|
|
|
factory = conn_factory or _conn_factory_from_env()
|
|
conn = factory()
|
|
try:
|
|
policy = digest_builder.load_digest_policy(conn)
|
|
if not force and not policy["enabled"]:
|
|
return {"status": "disabled"}
|
|
|
|
nl = now_local or datetime.now()
|
|
nu = now_utc or datetime.now(timezone.utc)
|
|
|
|
if not force:
|
|
today = nl.strftime("%Y-%m-%d")
|
|
if nl.hour < policy["send_hour"]:
|
|
return {"status": "before_send_hour", "send_hour": policy["send_hour"]}
|
|
if _get_setting(conn, _LAST_DATE_KEY) == today:
|
|
return {"status": "already_sent_today"}
|
|
|
|
until_iso = _utc_iso(nu)
|
|
last_at = None if force else _get_setting(conn, _LAST_AT_KEY)
|
|
since_iso = last_at or _utc_iso(nu - timedelta(hours=24))
|
|
|
|
result = _build_and_send(conn, since_iso, until_iso, build_fn=build_fn, send_fn=send_fn)
|
|
|
|
if not force:
|
|
_set_setting(conn, _LAST_DATE_KEY, nl.strftime("%Y-%m-%d"))
|
|
_set_setting(conn, _LAST_AT_KEY, until_iso)
|
|
conn.commit()
|
|
return {"status": "sent", **result}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def start_digest_scheduler(conn_factory=None) -> None:
|
|
"""Start the daily digest loop (idempotent). The thread always runs and reads
|
|
the DB policy each cycle (admin-panel control), so it sends only when the policy
|
|
is enabled — no env gate, no restart needed to toggle."""
|
|
if _state["thread"] is not None:
|
|
return
|
|
|
|
factory = conn_factory or _conn_factory_from_env()
|
|
stop = threading.Event()
|
|
_state["stop"] = stop
|
|
|
|
def _loop():
|
|
log.info("digest scheduler started (policy-controlled via Settings -> Admin)")
|
|
if stop.wait(15): # let server finish startup
|
|
return
|
|
while not stop.is_set():
|
|
try:
|
|
res = maybe_send_digest(factory)
|
|
if res.get("status") == "sent":
|
|
log.info("daily digest sent: %s", res)
|
|
except Exception:
|
|
log.exception("digest send failed; will retry next cycle")
|
|
if stop.wait(60):
|
|
return
|
|
|
|
t = threading.Thread(target=_loop, name="digest", daemon=True)
|
|
t.start()
|
|
_state["thread"] = t
|
|
|
|
|
|
def stop_digest_scheduler() -> None:
|
|
ev: threading.Event = _state["stop"] # type: ignore
|
|
ev.set()
|
|
t = _state.get("thread")
|
|
if t:
|
|
try:
|
|
t.join(timeout=5) # type: ignore
|
|
except Exception:
|
|
pass
|
|
_state["thread"] = None
|