#!/usr/bin/env python3 """matrix-bridge bot — Phase 1. A text message in a mapped room launches a Claude Code session in that repo on the Mac (ssh -> gui-launch.sh -> launch-claude.sh -> claude), surfaced to the phone by Remote Control. A message in the all-projects room fans out to every mapped repo (each session named " - "). Launch failures are reported back into the room (fail loud). Runs on the Spark, where the SSH alias resolves. Config: ../config.toml Creds: ../.env """ import asyncio import datetime import os import shlex try: import tomllib # py >= 3.11 except ModuleNotFoundError: import tomli as tomllib # py < 3.11 from nio import AsyncClient, MatrixRoom, RoomMessageText REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Headless "ask" mode tunables. ASK_TIMEOUT = 300 # seconds to wait for `claude -p` before giving up MAX_MSG_CHARS = 30000 # split answers into chunks well under Matrix's ~64KB event cap def load_env(path): env = {} with open(path) as f: for line in f: line = line.strip() if line and not line.startswith("#") and "=" in line: k, v = line.split("=", 1) env[k] = v return env def load_config(path): with open(path, "rb") as f: return tomllib.load(f) def split_message(text, limit=MAX_MSG_CHARS): """Split text into <=limit-char chunks on newline boundaries (no truncation).""" if len(text) <= limit: return [text] chunks, buf = [], "" for line in text.splitlines(keepends=True): while len(line) > limit: # one oversized line: hard-split it if buf: chunks.append(buf) buf = "" chunks.append(line[:limit]) line = line[limit:] if len(buf) + len(line) > limit: chunks.append(buf) buf = "" buf += line if buf: chunks.append(buf) return chunks async def main(): env = load_env(os.path.join(REPO_ROOT, ".env")) cfg = load_config(os.path.join(REPO_ROOT, "config.toml")) homeserver = env["MATRIX_HOMESERVER"] user_id = env["MATRIX_USER"] token = env["MATRIX_ACCESS_TOKEN"] device_id = env.get("MATRIX_DEVICE_ID", "matrix-bridge-bot") rooms = {r["room_id"]: r for r in cfg.get("room", [])} all_projects_room = cfg.get("all_projects", {}).get("room_id") ssh_alias = os.environ.get("MB_SSH_ALIAS") or cfg["mac"]["ssh_alias"] launcher = cfg["mac"]["launcher"] ask_launcher = cfg["mac"].get("ask_launcher") client = AsyncClient(homeserver, user_id) client.restore_login(user_id=user_id, device_id=device_id, access_token=token) async def launch(repo_dir, prompt, session_name=None): """Run gui-launch.sh on the Mac over SSH. Returns (returncode, combined_output). All user text is passed through shlex.quote so it survives the remote shell — this is where the cross-shell quoting footgun is actually solved. """ remote = f"{shlex.quote(launcher)} {shlex.quote(repo_dir)} {shlex.quote(prompt)}" if session_name: remote = f"MB_SESSION_NAME={shlex.quote(session_name)} " + remote proc = await asyncio.create_subprocess_exec( "ssh", ssh_alias, remote, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) out, _ = await proc.communicate() return proc.returncode, out.decode(errors="replace").strip() async def run_ask(repo_dir, prompt): """Run ask-claude.sh on the Mac over SSH; return (rc, stdout, stderr). Headless `claude -p`: its stdout is the answer (captured here), stderr is diagnostics. This path never opens a GUI Terminal and is not surfaced to the phone. """ remote = f"{shlex.quote(ask_launcher)} {shlex.quote(repo_dir)} {shlex.quote(prompt)}" proc = await asyncio.create_subprocess_exec( "ssh", ssh_alias, remote, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: out, err = await asyncio.wait_for(proc.communicate(), timeout=ASK_TIMEOUT) except asyncio.TimeoutError: proc.kill() await proc.wait() # reap the killed ssh client (no zombie) return None, "", f"timed out after {ASK_TIMEOUT}s" return (proc.returncode, out.decode(errors="replace").strip(), err.decode(errors="replace").strip()) async def say(room_id, text): await client.room_send( room_id, "m.room.message", {"msgtype": "m.text", "body": text} ) async def launch_one(report_room, repo, prompt, session_name=None): rc, out = await launch(repo["repo_dir"], prompt, session_name) if rc == 0: print(f"launched {repo['label']} -> {repo['repo_dir']}", flush=True) return True print(f"FAILED {repo['label']}: rc={rc} {out[:300]}", flush=True) await say(report_room, f"⚠️ matrix-bridge: failed to launch {repo['label']} " f"(rc={rc}): {out[:300] or 'no output'}") return False async def ask(report_room, repo, prompt): """Headless ask: run `claude -p` in the repo and post the full answer back.""" if not ask_launcher: await say(report_room, "⚠️ matrix-bridge: ask mode not configured ([mac].ask_launcher missing).") return await say(report_room, f"🤔 asking claude in {repo['label']}…") rc, out, err = await run_ask(repo["repo_dir"], prompt) if rc == 0: # success — even an empty answer is not a failure print(f"ask {repo['label']}: {len(out)} chars", flush=True) for chunk in split_message(out or "(claude returned no output)"): await say(report_room, chunk) return detail = err or out or "no output" print(f"ASK FAILED {repo['label']}: rc={rc} {detail[:300]}", flush=True) await say(report_room, f"⚠️ matrix-bridge: ask failed in {repo['label']} " f"(rc={rc}): {detail[:500]}") async def on_message(room: MatrixRoom, event: RoomMessageText): if event.sender == user_id: return # never react to our own messages prompt = event.body.strip() if not prompt: return if room.room_id == all_projects_room: # fan-out room always launches, never asks date = datetime.date.today().isoformat() print(f"[all-projects] fan-out to {len(rooms)} repos: {prompt!r}", flush=True) results = await asyncio.gather(*[ launch_one(room.room_id, r, prompt, f"{r['label']} - {date}") for r in rooms.values() ]) await say(room.room_id, f"matrix-bridge: launched {sum(results)}/{len(rooms)} sessions ({date}).") elif room.room_id in rooms: r = rooms[room.room_id] if prompt.startswith("?"): # headless ask mode ask_prompt = prompt[1:].strip() if ask_prompt: await ask(room.room_id, r, ask_prompt) elif await launch_one(room.room_id, r, prompt): await say(room.room_id, f"matrix-bridge: launched {r['label']} — drive it on your phone.") # Prime the sync token past existing history, THEN register the callback, so the bot # only reacts to messages that arrive after startup (no backlog replay). print("priming sync (skipping backlog)...", flush=True) await client.sync(timeout=30000, full_state=False) client.add_event_callback(on_message, RoomMessageText) who = await client.whoami() print(f"listening as {who.user_id}; {len(rooms)} rooms + all-projects={all_projects_room}", flush=True) try: await client.sync_forever(timeout=30000) finally: await client.close() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass