Files
ten31-database/backend/email_integration/digest_scheduler.py
T
Keysat c7b74a2704 Email search/query + windowed digest preview (v0.1.0:83)
Communications tab (search/query roadmap items 1 & 2):
- Fix the investor dropdown: the facet only listed grid investors, so it
  came back empty whenever email matched a classic contact or org domain
  (no grid id — the common case). It now mirrors the email list, resolving
  each link to a typed identity (fund:/org:/contact:/addr:) with precedence
  grid -> org -> contact -> address; investor_id accepts the typed key
  (bare id = fund: for back-compat) and an unknown prefix matches nothing.
- Add a date-range filter and a click-to-expand full-body view
  (GET /api/email/detail, admin, soft-delete-gated; body_text only, never
  raw remote HTML).
- Add a "Search content" mode: GET /api/email/search wraps the ingest
  hybrid_search over the Qdrant email index (doc_type=email), hydrated and
  soft-delete-filtered against SQLite (canonical), 503 if Spark/Qdrant down.

Daily digest:
- Settings -> Admin builds a digest over a chosen window (last 24h or since
  a date) as an in-app preview before sending (POST /api/admin/digest/preview),
  so the local-Spark summarizer can be verified on demand even on a quiet day.
  Manual send uses the same window; neither advances the daily cursor, so a
  preview never suppresses the scheduled digest.

Code-only, migrations no-op. 22/22 backend tests, render-smoke pass.
2026-06-16 20:46:15 -05:00

207 lines
8.1 KiB
Python

"""Daily activity-digest scheduler (Phase B).
Co-located with the Gmail sync scheduler (it shares the same conn-factory and
daemon-thread idiom). One daemon thread wakes every 60s and fires the daily
activity digest once per local day, at/after the configured send hour.
Control lives in the DB, set from Settings -> Admin (digest_builder.load_digest_policy
-> app_settings 'digest_policy'): {enabled, send_hour}. The thread always runs and
re-reads the policy each cycle, so toggling the digest on/off or changing the time
takes effect on the next loop — no restart. CRM_DIGEST_ENABLED/SEND_HOUR only seed
the first-boot default before an admin sets the policy.
The send is an internal ops email to the team's own admins — exempt from the
"agents draft, humans send" rule (which governs outward LP/prospect contact).
Digest content is summarized on Spark (local), never Claude — see digest_builder.
Window: the content covers (last successful send, now]. Tracked in app_settings
so a missed day's activity rolls into the next digest rather than being dropped;
the first-ever run covers the prior 24h. The once-per-day guard is a separate
local-date key. The transport (Gmail-DWD -> SMTP) is digest_mailer's job.
"""
import json
import logging
import sqlite3
import threading
from datetime import datetime, timedelta, timezone
from .scheduler import _conn_factory_from_env
log = logging.getLogger("email_integration.digest_scheduler")
_LAST_DATE_KEY = "digest_last_sent_date" # local YYYY-MM-DD — once-per-day guard
_LAST_AT_KEY = "digest_last_sent_at" # UTC ISO — content-window cursor
_state: dict[str, object] = {"thread": None, "stop": threading.Event()}
def _utc_iso(dt: datetime) -> str:
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# app_settings access kept local (no server.py import — avoid the startup cycle);
# the value_json/JSON encoding matches server.get/set_app_setting exactly.
def _get_setting(conn, key):
try:
row = conn.execute("SELECT value_json FROM app_settings WHERE key = ?", (key,)).fetchone()
except sqlite3.OperationalError:
return None
if not row:
return None
try:
return json.loads(row["value_json"])
except Exception:
return None
def _set_setting(conn, key, value) -> None:
conn.execute(
"INSERT INTO app_settings (key, value_json, updated_at) VALUES (?, ?, ?) "
"ON CONFLICT(key) DO UPDATE SET value_json = excluded.value_json, "
"updated_at = excluded.updated_at",
(key, json.dumps(value), _utc_iso(datetime.now(timezone.utc))),
)
def _admin_recipients(conn) -> list[str]:
rows = conn.execute(
"SELECT email FROM users WHERE role = 'admin' AND is_active = 1 "
"AND email IS NOT NULL AND TRIM(email) != ''"
).fetchall()
return [str(r["email"]).strip() for r in rows if str(r["email"]).strip()]
def _build_and_send(conn, since_iso, until_iso, *, build_fn=None, send_fn=None):
"""Build the digest and hand it to the transport. Raises digest_mailer.NoTransport
(no transport / no recipient) — callers map that to a clear 400; the daily loop
logs it. build_fn/send_fn are injectable for tests."""
import digest_builder
import digest_mailer
bf = build_fn or digest_builder.build_digest
sf = send_fn or digest_mailer.send_digest
recipients = _admin_recipients(conn)
if not recipients:
raise digest_mailer.NoTransport(
"No active admin has an email address — give one an address to receive the digest.")
digest = bf(conn, since_iso, until_iso)
result = sf(conn, recipients, digest["subject"], digest["body"])
return {
"recipients": recipients,
"transport": (result or {}).get("transport"),
"has_activity": digest["has_activity"],
"user_count": digest["user_count"],
"email_count": digest["email_count"],
"investor_count": digest.get("investor_count"),
"window": [since_iso, until_iso],
}
def send_digest_window(conn_factory=None, *, since_iso, until_iso,
build_fn=None, send_fn=None):
"""Build the digest for an explicit (since_iso, until_iso] window and send it
to the active-admin set now, WITHOUT advancing the daily cursor — a manual or
preview send must never suppress the scheduled daily digest. Same transport +
recipient rules as the daily path (raises digest_mailer.NoTransport when none
is configured / no admin has an address). Backs the admin 'send now' endpoint.
No DB writes happen here (the cursor is deliberately untouched), so the connection
is opened and closed without a commit — don't add one without revisiting that."""
factory = conn_factory or _conn_factory_from_env()
conn = factory()
try:
result = _build_and_send(conn, since_iso, until_iso,
build_fn=build_fn, send_fn=send_fn)
return {"status": "sent", **result}
finally:
conn.close()
def maybe_send_digest(conn_factory=None, *, force=False,
now_local=None, now_utc=None, build_fn=None, send_fn=None):
"""Send the daily digest if it is due (or unconditionally when force=True).
Daily path (the scheduler loop): skips before the send hour and if already sent
today; content window runs from the last send to now and the cursor advances on
success. force path: ignores the policy and the guards, uses a fixed last-24h
window, and does NOT advance the daily cursor. (The admin 'send now' / preview
endpoints now use send_digest_window for an arbitrary window; force is retained
for the fixed last-24h case and its tests.)"""
import digest_builder
factory = conn_factory or _conn_factory_from_env()
conn = factory()
try:
policy = digest_builder.load_digest_policy(conn)
if not force and not policy["enabled"]:
return {"status": "disabled"}
nl = now_local or datetime.now()
nu = now_utc or datetime.now(timezone.utc)
if not force:
today = nl.strftime("%Y-%m-%d")
if nl.hour < policy["send_hour"]:
return {"status": "before_send_hour", "send_hour": policy["send_hour"]}
if _get_setting(conn, _LAST_DATE_KEY) == today:
return {"status": "already_sent_today"}
until_iso = _utc_iso(nu)
last_at = None if force else _get_setting(conn, _LAST_AT_KEY)
since_iso = last_at or _utc_iso(nu - timedelta(hours=24))
result = _build_and_send(conn, since_iso, until_iso, build_fn=build_fn, send_fn=send_fn)
if not force:
_set_setting(conn, _LAST_DATE_KEY, nl.strftime("%Y-%m-%d"))
_set_setting(conn, _LAST_AT_KEY, until_iso)
conn.commit()
return {"status": "sent", **result}
finally:
conn.close()
def start_digest_scheduler(conn_factory=None) -> None:
"""Start the daily digest loop (idempotent). The thread always runs and reads
the DB policy each cycle (admin-panel control), so it sends only when the policy
is enabled — no env gate, no restart needed to toggle."""
if _state["thread"] is not None:
return
factory = conn_factory or _conn_factory_from_env()
stop = threading.Event()
_state["stop"] = stop
def _loop():
log.info("digest scheduler started (policy-controlled via Settings -> Admin)")
if stop.wait(15): # let server finish startup
return
while not stop.is_set():
try:
res = maybe_send_digest(factory)
if res.get("status") == "sent":
log.info("daily digest sent: %s", res)
except Exception:
log.exception("digest send failed; will retry next cycle")
if stop.wait(60):
return
t = threading.Thread(target=_loop, name="digest", daemon=True)
t.start()
_state["thread"] = t
def stop_digest_scheduler() -> None:
ev: threading.Event = _state["stop"] # type: ignore
ev.set()
t = _state.get("thread")
if t:
try:
t.join(timeout=5) # type: ignore
except Exception:
pass
_state["thread"] = None