#!/usr/bin/env python3 """One-time maintenance: redact already-resolved email-proposal review cards. The bot redacts a card when it's decided going forward, but cards that were decided BEFORE that behavior shipped (e.g. smoke-test remnants) are already `closed` in the CRM, so the normal to_close sweep never touches them. This walks the review room's history, finds the bot's own "proposed grid note" cards, and redacts every one that is NOT still pending (i.e. not in the CRM `open` work-list) — leaving the room showing only what still needs handling. Safe by default: prints what it WOULD redact and does nothing. Pass --apply to actually redact. Run on the Spark via the bot's own creds/image: docker compose run --rm matrix-intake python -u backend/matrix_intake/redact_resolved.py docker compose run --rm matrix-intake python -u backend/matrix_intake/redact_resolved.py --apply """ import asyncio import sys from nio import AsyncClient, MessageDirection import crm_client import settings CARD_MARKER = "📧 Proposed" # present in every review card (old and dash-framed) MAX_PAGES = 30 # 30 * 100 events is far more history than this room holds async def main(apply): mx = settings.matrix_settings() review_room = settings.email_review_room() if not review_room: print("MATRIX_EMAIL_REVIEW_ROOM is not set — nothing to do.") return client = AsyncClient(mx["homeserver"], mx["user_id"]) client.restore_login(user_id=mx["user_id"], device_id=mx["device_id"], access_token=mx["token"]) try: # Cards still pending (must be KEPT) — their thread-root event id is the card event id. open_ids = {it["event_id"] for it in crm_client.list_email_proposals().get("open", []) if it.get("event_id")} print(f"pending cards to keep: {len(open_ids)}") sync = await client.sync(timeout=10000, full_state=False) token = sync.next_batch cards = {} # root event_id -> snippet (still-identifiable card bodies) replies = {} # reply event_id -> (thread_root, snippet) for _ in range(MAX_PAGES): resp = await client.room_messages(review_room, start=token, direction=MessageDirection.back, limit=100) chunk = getattr(resp, "chunk", None) if not chunk: break for ev in chunk: body = (getattr(ev, "body", "") or "").replace("\n", " ") rel = ((getattr(ev, "source", None) or {}).get("content", {}) or {}).get("m.relates_to") or {} if rel.get("rel_type") == "m.thread" and rel.get("event_id"): replies[ev.event_id] = (rel["event_id"], body[:50]) # a threaded reply (card already redacted) elif getattr(ev, "sender", None) == mx["user_id"] and CARD_MARKER in body: cards[ev.event_id] = body[:70] # an un-redacted card root token = getattr(resp, "end", None) if not token: break # Redact card roots that aren't still pending, AND any reply whose thread isn't still pending. targets = [(eid, "card :: " + snip) for eid, snip in cards.items() if eid not in open_ids] targets += [(eid, "reply :: " + snip) for eid, (root, snip) in replies.items() if root not in open_ids] print(f"resolved cards: {sum(1 for e,_ in cards.items() if e not in open_ids)}; " f"thread replies to clear: {sum(1 for _,(r,_) in replies.items() if r not in open_ids)}") for eid, label in targets: print(("APPLY redact " if apply else "WOULD redact ") + eid + " :: " + label) if apply: r = await client.room_redact(review_room, eid, reason="retroactive cleanup of resolved review threads") if not hasattr(r, "event_id"): print(f" ! redact failed: {r}") print(("done — redacted " if apply else "dry run — would redact ") + f"{len(targets)} event(s).") finally: await client.close() if __name__ == "__main__": asyncio.run(main(apply="--apply" in sys.argv[1:]))