Add capture mode: /capture + per-room capture threads → cross-project inbox

A capture-thread message (or /capture <text> in any room) logs to standards/
INBOX.md via capture-note.sh — deterministic, no claude call — and confirms
in-thread; /triage stays the gate into each repo (D13). Thread roots seeded by
seed-capture-threads.py.
This commit is contained in:
Keysat
2026-06-16 14:59:38 -05:00
parent 4204b82c6b
commit 2bae2f3571
5 changed files with 310 additions and 21 deletions
+94 -13
View File
@@ -24,7 +24,9 @@ REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Headless "ask" mode tunables.
ASK_TIMEOUT = 300 # seconds to wait for `claude -p` before giving up
CAPTURE_TIMEOUT = 60 # seconds to wait for capture-note.sh (inbox append + git push)
MAX_MSG_CHARS = 30000 # split answers into chunks well under Matrix's ~64KB event cap
CAPTURE_PREFIX = "/capture" # zero-config capture trigger, valid in any mapped room
def load_env(path):
@@ -64,6 +66,14 @@ def split_message(text, limit=MAX_MSG_CHARS):
return chunks
def thread_root_of(event):
"""Return the thread root event_id if this message is a threaded reply, else None."""
relates = (event.source or {}).get("content", {}).get("m.relates_to") or {}
if relates.get("rel_type") == "m.thread":
return relates.get("event_id")
return None
async def main():
env = load_env(os.path.join(REPO_ROOT, ".env"))
cfg = load_config(os.path.join(REPO_ROOT, "config.toml"))
@@ -74,10 +84,12 @@ async def main():
device_id = env.get("MATRIX_DEVICE_ID", "matrix-bridge-bot")
rooms = {r["room_id"]: r for r in cfg.get("room", [])}
all_projects_room = cfg.get("all_projects", {}).get("room_id")
all_projects_cfg = cfg.get("all_projects", {})
all_projects_room = all_projects_cfg.get("room_id")
ssh_alias = os.environ.get("MB_SSH_ALIAS") or cfg["mac"]["ssh_alias"]
launcher = cfg["mac"]["launcher"]
ask_launcher = cfg["mac"].get("ask_launcher")
capture_launcher = cfg["mac"].get("capture_launcher")
client = AsyncClient(homeserver, user_id)
client.restore_login(user_id=user_id, device_id=device_id, access_token=token)
@@ -121,10 +133,16 @@ async def main():
out.decode(errors="replace").strip(),
err.decode(errors="replace").strip())
async def say(room_id, text):
await client.room_send(
room_id, "m.room.message", {"msgtype": "m.text", "body": text}
)
async def say(room_id, text, thread_root=None):
content = {"msgtype": "m.text", "body": text}
if thread_root: # keep confirmations inside the capture thread
content["m.relates_to"] = {
"rel_type": "m.thread",
"event_id": thread_root,
"is_falling_back": True,
"m.in_reply_to": {"event_id": thread_root},
}
await client.room_send(room_id, "m.room.message", content)
async def launch_one(report_room, repo, prompt, session_name=None):
rc, out = await launch(repo["repo_dir"], prompt, session_name)
@@ -154,6 +172,46 @@ async def main():
await say(report_room, f"⚠️ matrix-bridge: ask failed in {repo['label']} "
f"(rc={rc}): {detail[:500]}")
async def run_capture(project, text):
"""Run capture-note.sh on the Mac over SSH; return (rc, stdout, stderr).
Deterministic: appends one line to standards/INBOX.md and commits/pushes. stdout is the
exact inbox line written, echoed back into the thread as confirmation.
"""
remote = (f"{shlex.quote(capture_launcher)} "
f"{shlex.quote(project)} {shlex.quote(text)}")
proc = await asyncio.create_subprocess_exec(
"ssh", ssh_alias, remote,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
out, err = await asyncio.wait_for(proc.communicate(), timeout=CAPTURE_TIMEOUT)
except asyncio.TimeoutError:
proc.kill()
await proc.wait() # reap the killed ssh client (no zombie)
return None, "", f"timed out after {CAPTURE_TIMEOUT}s"
return (proc.returncode,
out.decode(errors="replace").strip(),
err.decode(errors="replace").strip())
async def capture(report_room, project, text, thread_root=None):
"""Log a message to the cross-project inbox (standards/INBOX.md) tagged for `project`."""
if not capture_launcher:
await say(report_room,
"⚠️ matrix-bridge: capture mode not configured ([mac].capture_launcher missing).",
thread_root)
return
rc, out, err = await run_capture(project, text)
if rc == 0:
print(f"capture ({project}): {out}", flush=True)
await say(report_room, f"📥 captured → {out}", thread_root)
return
detail = err or out or "no output"
print(f"CAPTURE FAILED ({project}): rc={rc} {detail[:300]}", flush=True)
await say(report_room, f"⚠️ matrix-bridge: capture failed (rc={rc}): {detail[:500]}",
thread_root)
async def on_message(room: MatrixRoom, event: RoomMessageText):
if event.sender == user_id:
return # never react to our own messages
@@ -161,23 +219,46 @@ async def main():
if not prompt:
return
if room.room_id == all_projects_room: # fan-out room always launches, never asks
rid = room.room_id
if rid == all_projects_room:
cap_thread = all_projects_cfg.get("capture_thread")
cap_project = all_projects_cfg.get("capture_project", "?")
elif rid in rooms:
cap_thread = rooms[rid].get("capture_thread")
cap_project = rooms[rid].get("capture_project", rooms[rid]["label"])
else:
return # message in an unmapped room
# Capture is checked first: an explicit `/capture <text>` in any mapped room, or any
# message in this room's configured capture thread → logged to the inbox, not launched.
thread_root = thread_root_of(event)
head, _, rest = prompt.partition(" ")
if head == CAPTURE_PREFIX:
note = rest.strip()
if note:
await capture(rid, cap_project, note, thread_root)
return
if cap_thread and thread_root == cap_thread:
await capture(rid, cap_project, prompt, thread_root)
return
if rid == all_projects_room: # fan-out room: launch everywhere, never asks
date = datetime.date.today().isoformat()
print(f"[all-projects] fan-out to {len(rooms)} repos: {prompt!r}", flush=True)
results = await asyncio.gather(*[
launch_one(room.room_id, r, prompt, f"{r['label']} - {date}")
launch_one(rid, r, prompt, f"{r['label']} - {date}")
for r in rooms.values()
])
await say(room.room_id,
await say(rid,
f"matrix-bridge: launched {sum(results)}/{len(rooms)} sessions ({date}).")
elif room.room_id in rooms:
r = rooms[room.room_id]
else: # a mapped project room
r = rooms[rid]
if prompt.startswith("?"): # headless ask mode
ask_prompt = prompt[1:].strip()
if ask_prompt:
await ask(room.room_id, r, ask_prompt)
elif await launch_one(room.room_id, r, prompt):
await say(room.room_id,
await ask(rid, r, ask_prompt)
elif await launch_one(rid, r, prompt):
await say(rid,
f"matrix-bridge: launched {r['label']} — drive it on your phone.")
# Prime the sync token past existing history, THEN register the callback, so the bot