"""Coordination layer: swap lock lifecycle/expiry, schedule registry CRUD, and the webhook payload+signature. All offline — the lock takes an injectable `now` so expiry is tested without sleeping, and the webhook is exercised only on the disabled (no-network) path plus its pure payload/signature helpers. """ import asyncio from datetime import datetime, timedelta, timezone import pytest from app.coordination import ( LOCK_TTL_MAX, LOCK_TTL_MIN, LockHeld, ScheduleRegistry, SwapLockManager, WebhookNotifier, build_webhook_payload, sign_payload, valid_schedule_id, ) T0 = datetime(2026, 6, 17, 12, 0, 0, tzinfo=timezone.utc) # ----------------------------------------------------------------- swap lock ---- def test_acquire_free_lock_returns_token_and_status_held(): mgr = SwapLockManager() lock = mgr.acquire("openclaw", ttl_seconds=60, note="daily vol", now=T0) assert lock.token st = mgr.status(now=T0) assert st["held"] is True assert st["holder"] == "openclaw" assert st["note"] == "daily vol" assert st["seconds_remaining"] == 60 assert "token" not in st # public view never leaks the token def test_acquire_requires_holder(): with pytest.raises(ValueError): SwapLockManager().acquire(" ", now=T0) def test_acquire_held_by_other_raises_lockheld_with_state(): mgr = SwapLockManager() mgr.acquire("openclaw", ttl_seconds=60, now=T0) with pytest.raises(LockHeld) as ei: mgr.acquire("johnny5", ttl_seconds=60, now=T0) assert ei.value.state["holder"] == "openclaw" def test_reacquire_with_token_extends_and_keeps_token(): mgr = SwapLockManager() first = mgr.acquire("openclaw", ttl_seconds=60, now=T0) later = T0 + timedelta(seconds=30) second = mgr.acquire("openclaw", ttl_seconds=60, token=first.token, now=later) assert second.token == first.token # window extended from the later moment, not the original assert mgr.status(now=later)["seconds_remaining"] == 60 assert second.acquired_at == first.acquired_at # acquired_at preserved def test_reacquire_without_token_is_refused_even_for_same_holder_name(): # Holder name is descriptive, not a secret — matching it must not grant access. mgr = SwapLockManager() mgr.acquire("openclaw", ttl_seconds=60, now=T0) with pytest.raises(LockHeld): mgr.acquire("openclaw", ttl_seconds=60, now=T0) def test_ttl_is_clamped(): mgr = SwapLockManager() mgr.acquire("a", ttl_seconds=0, now=T0) assert mgr.status(now=T0)["seconds_remaining"] == LOCK_TTL_MIN mgr2 = SwapLockManager() mgr2.acquire("b", ttl_seconds=10**9, now=T0) assert mgr2.status(now=T0)["seconds_remaining"] == LOCK_TTL_MAX def test_lock_expires_and_clears_lazily(): mgr = SwapLockManager() tok = mgr.acquire("openclaw", ttl_seconds=10, now=T0).token after = T0 + timedelta(seconds=11) assert mgr.status(now=after) == {"held": False} assert mgr.verify(tok, now=after) is False # an expired lock is free to re-take by anyone mgr.acquire("johnny5", ttl_seconds=10, now=after) assert mgr.status(now=after)["holder"] == "johnny5" def test_verify_matches_only_active_token(): mgr = SwapLockManager() tok = mgr.acquire("openclaw", ttl_seconds=60, now=T0).token assert mgr.verify(tok, now=T0) is True assert mgr.verify("nope", now=T0) is False assert mgr.verify(None, now=T0) is False def test_release_requires_token_then_frees(): mgr = SwapLockManager() tok = mgr.acquire("openclaw", ttl_seconds=60, now=T0).token with pytest.raises(PermissionError): mgr.release("wrong", now=T0) assert mgr.release(tok, now=T0) is True assert mgr.status(now=T0) == {"held": False} def test_force_release_skips_token_and_release_of_free_lock_is_false(): mgr = SwapLockManager() mgr.acquire("openclaw", ttl_seconds=60, now=T0) assert mgr.release(force=True, now=T0) is True assert mgr.release(force=True, now=T0) is False # nothing held now def test_is_blocked_by_is_the_swap_gate(): # Mirrors the single-read decision the /api/swap endpoint makes. mgr = SwapLockManager() assert mgr.is_blocked_by(None, now=T0) is None # free lock blocks nobody tok = mgr.acquire("openclaw", ttl_seconds=10, now=T0).token blocked = mgr.is_blocked_by(None, now=T0) # no token -> blocked assert blocked is not None and blocked["holder"] == "openclaw" assert mgr.is_blocked_by("wrong", now=T0) is not None # wrong token -> blocked assert mgr.is_blocked_by(tok, now=T0) is None # holder's token -> allowed # At/after expiry the gate is open even without a token (the bug a separate # status()+verify() pair would get wrong). assert mgr.is_blocked_by(None, now=T0 + timedelta(seconds=11)) is None # ------------------------------------------------------------------- webhook ---- def test_build_webhook_payload_shape(): p = build_webhook_payload( event="swap_complete", job_id="abc123", model_key="gemma", state="ready", returncode=0, started_at="t0", finished_at="t1", dry_run=False, ) assert p == { "event": "swap_complete", "job_id": "abc123", "model_key": "gemma", "state": "ready", "returncode": 0, "started_at": "t0", "finished_at": "t1", "dry_run": False, } def test_sign_payload_is_deterministic_and_prefixed(): body = b'{"event":"swap_complete"}' sig = sign_payload("s3cr3t", body) assert sig.startswith("sha256=") assert sig == sign_payload("s3cr3t", body) assert sig != sign_payload("other", body) def test_disabled_webhook_fire_is_noop(): n = WebhookNotifier("", "") assert n.enabled is False # Must not attempt any network call or raise when no URL is configured. assert asyncio.run(n.fire("swap_complete", {"x": 1})) is None # --------------------------------------------------------- schedule registry ---- def test_register_and_list_schedule(): reg = ScheduleRegistry() e = reg.register(name="Daily Vol", owner="openclaw", cron="0 6 * * *") assert e.id and e.registered_at and e.updated_at listed = reg.list() assert len(listed) == 1 and listed[0]["name"] == "Daily Vol" def test_register_with_id_updates_in_place(): reg = ScheduleRegistry() reg.register(name="Daily Vol", id="dv", owner="openclaw", cron="0 6 * * *") reg.register(name="Daily Vol v2", id="dv", owner="openclaw", cron="0 7 * * *") listed = reg.list() assert len(listed) == 1 assert listed[0]["name"] == "Daily Vol v2" and listed[0]["cron"] == "0 7 * * *" def test_register_requires_name_and_validates_id(): reg = ScheduleRegistry() with pytest.raises(ValueError): reg.register(name=" ") with pytest.raises(ValueError): reg.register(name="ok", id="bad id; rm -rf") def test_delete_schedule(): reg = ScheduleRegistry() reg.register(name="Daily Vol", id="dv") assert reg.delete("dv") is True assert reg.delete("dv") is False assert reg.list() == [] def test_valid_schedule_id(): assert valid_schedule_id("daily-vol") assert valid_schedule_id("a.b_c-1") assert not valid_schedule_id("") assert not valid_schedule_id("../etc") assert not valid_schedule_id("has space") assert not valid_schedule_id("x" * 65)