From 2d9caa814e2292634287951abb066688a5a0e2ae Mon Sep 17 00:00:00 2001 From: Keysat Date: Thu, 7 May 2026 10:39:37 -0500 Subject: [PATCH] Initial public commit --- .gitignore | 38 +++ LICENSE | 21 ++ README.md | 102 +++++++ examples/offline_verify.py | 45 +++ pyproject.toml | 48 +++ src/keysat_licensing_client/__init__.py | 90 ++++++ src/keysat_licensing_client/errors.py | 23 ++ src/keysat_licensing_client/fingerprint.py | 23 ++ src/keysat_licensing_client/key.py | 216 ++++++++++++++ src/keysat_licensing_client/online.py | 322 +++++++++++++++++++++ src/keysat_licensing_client/pubkey.py | 55 ++++ src/keysat_licensing_client/verify.py | 82 ++++++ tests/test_crosscheck.py | 176 +++++++++++ 13 files changed, 1241 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 examples/offline_verify.py create mode 100644 pyproject.toml create mode 100644 src/keysat_licensing_client/__init__.py create mode 100644 src/keysat_licensing_client/errors.py create mode 100644 src/keysat_licensing_client/fingerprint.py create mode 100644 src/keysat_licensing_client/key.py create mode 100644 src/keysat_licensing_client/online.py create mode 100644 src/keysat_licensing_client/pubkey.py create mode 100644 src/keysat_licensing_client/verify.py create mode 100644 tests/test_crosscheck.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ca15e12 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +# Byte-compiled / optimized +__pycache__/ +*.py[cod] +*$py.class +*.so + +# Distribution / packaging +build/ +dist/ +*.egg-info/ +*.egg +.eggs/ +wheels/ +pip-wheel-metadata/ + +# Virtual environments +venv/ +.venv/ +env/ +.env +ENV/ + +# Testing / coverage +.pytest_cache/ +.coverage +.coverage.* +.tox/ +htmlcov/ +.mypy_cache/ +.ruff_cache/ + +# Editor / OS cruft +.DS_Store +.idea/ +.vscode/ +*.swp +*.bak +*.tmp diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..1b63053 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 Keysat + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..bc4dedc --- /dev/null +++ b/README.md @@ -0,0 +1,102 @@ +# keysat-licensing-client (Python) + +Python client for [Keysat](https://github.com/keysat-xyz/keysat) — a +self-hosted Bitcoin-paid software licensing server that runs on Start9. + +Verifies signed license keys offline using the issuer's public key, and +(optionally) wraps the HTTP API for live validation, purchase, and +free-license redemption. + +## Install + +```bash +pip install keysat-licensing-client # offline only +pip install keysat-licensing-client[online] # + httpx for the online client +``` + +Requires Python 3.10+. + +## Five-line offline check + +```python +from keysat_licensing_client import Verifier, PublicKey + +ISSUER_PUBKEY_PEM = open("assets/issuer.pub").read() # bake this into your app +verifier = Verifier(PublicKey.from_pem(ISSUER_PUBKEY_PEM)) + +ok = verifier.verify(key_from_user) # raises LicensingError on bad sig +print(f"licensed for product {ok.product_id}, expires {ok.expires_at}") +``` + +## Online check (with revocation + fingerprint binding) + +```python +from keysat_licensing_client import Client + +client = Client("https://license.example.com") +r = client.validate( + key_from_user, + product_slug="my-product", + fingerprint="machine-fingerprint", +) +if not r.ok: + print("server rejected:", r.reason) + # 'revoked', 'fingerprint_mismatch', 'not_found', 'product_mismatch', etc. +``` + +The recommended pattern is **offline-first, online-augmented**: do the +offline `Verifier.verify()` at boot. If that succeeds, also do an +async/background `client.validate()` to catch revocations and seat +mismatches. If the network fails, treat it as "status unknown" — don't +gate the user on your server's uptime. + +## Purchase flow (drives the whole BTCPay round trip) + +```python +from keysat_licensing_client import Client, StartPurchaseOptions +import webbrowser + +client = Client("https://license.example.com") +session = client.start_purchase( + "my-product", + StartPurchaseOptions(buyer_email="bob@example.com"), +) +webbrowser.open(session.checkout_url) +license_key = client.wait_for_license(session.invoice_id) +# Save license_key wherever you decided to store keys (config dir, keychain, env). +``` + +## Free-license code redemption + +For codes the seller created with kind `free_license` (no payment): + +```python +result = client.redeem_free_license( + "my-product", + "PRESSPASS", +) +print("redeemed:", result.license_key) +``` + +## Fingerprint binding + +The SDK doesn't decide WHAT to use as a fingerprint — that's a product +choice. Common sources, ordered by robustness: + +- Linux: `/etc/machine-id` +- macOS: `ioreg -d2 -c IOPlatformExpertDevice` +- Windows: registry `MachineGuid` +- Fallback: random UUID written into your app's config dir on first run + +Mix in a per-product salt so fingerprints from your app can't be +replayed against someone else's licensing server: + +```python +fp_input = f"{APP_NAME}|{machine_id}" +# The SDK SHA-256s this for you when you pass it to validate(...). +``` + +## License + +MIT OR Apache-2.0. See the upstream `LICENSE` file at +[github.com/keysat-xyz/keysat](https://github.com/keysat-xyz/keysat). diff --git a/examples/offline_verify.py b/examples/offline_verify.py new file mode 100644 index 0000000..5df47db --- /dev/null +++ b/examples/offline_verify.py @@ -0,0 +1,45 @@ +"""Example: verify a Keysat license key offline. + +Usage:: + + KEYSAT_PUBKEY_PEM=$(cat issuer.pub) python examples/offline_verify.py "LIC1-...-..." + +Or set the PEM string inline. The output reports the parsed payload +fields and exits non-zero on bad signature / bad format. +""" + +import os +import sys + +from keysat_licensing_client import LicensingError, PublicKey, Verifier + + +def main() -> int: + pem = os.environ.get("KEYSAT_PUBKEY_PEM") + if not pem: + print("error: set KEYSAT_PUBKEY_PEM to the issuer's PEM-encoded public key.", file=sys.stderr) + return 2 + if len(sys.argv) != 2: + print("usage: offline_verify.py ", file=sys.stderr) + return 2 + + verifier = Verifier(PublicKey.from_pem(pem)) + try: + ok = verifier.verify(sys.argv[1]) + except LicensingError as e: + print(f"INVALID: {e.kind}: {e}", file=sys.stderr) + return 1 + + print("VALID") + print(f" product_id: {ok.product_id}") + print(f" license_id: {ok.license_id}") + print(f" issued_at: {ok.issued_at}") + print(f" expires_at: {ok.expires_at if ok.expires_at != 0 else 'perpetual'}") + print(f" trial: {ok.is_trial}") + print(f" fp_bound: {ok.is_fingerprint_bound}") + print(f" entitlements: {ok.entitlements or '(none)'}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..0d677d9 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,48 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "keysat-licensing-client" +version = "0.1.0" +description = "Python client for Keysat — a self-hosted Bitcoin-paid software licensing server that runs on Start9. Verifies signed license keys offline and wraps the HTTP API for purchase, redemption, and revocation checks." +readme = "README.md" +requires-python = ">=3.10" +license = { text = "MIT OR Apache-2.0" } +authors = [{ name = "Keysat", email = "licensing@keysat.xyz" }] +keywords = ["bitcoin", "licensing", "btcpay", "start9", "ed25519"] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Security :: Cryptography", +] +dependencies = [ + "cryptography>=42", +] + +[project.optional-dependencies] +online = ["httpx>=0.27"] +dev = [ + "pytest>=8", + "ruff>=0.5", + "mypy>=1.10", + "httpx>=0.27", +] + +[project.urls] +Homepage = "https://keysat.xyz" +Repository = "https://github.com/keysat-xyz/keysat" +Documentation = "https://github.com/keysat-xyz/keysat/blob/main/docs/INTEGRATION.md" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +addopts = "-q" diff --git a/src/keysat_licensing_client/__init__.py b/src/keysat_licensing_client/__init__.py new file mode 100644 index 0000000..a9fab6a --- /dev/null +++ b/src/keysat_licensing_client/__init__.py @@ -0,0 +1,90 @@ +""" +keysat_licensing_client +======================= + +Python client for Keysat — a self-hosted Bitcoin-paid software licensing +server that runs on Start9. Verifies signed license keys offline, and +(via the optional `online` extra) wraps the HTTP API for purchase, +free-license redemption, and revocation checks. + +Five-line offline check:: + + from keysat_licensing_client import Verifier, PublicKey + + verifier = Verifier(PublicKey.from_pem(ISSUER_PUBKEY_PEM)) + ok = verifier.verify(key_from_user) + print("licensed for product", ok.product_id) + +Online client (requires the `online` extra: `pip install keysat-licensing-client[online]`):: + + from keysat_licensing_client import Client + client = Client("https://license.example.com") + r = client.validate(key_from_user, product_slug="my-product", + fingerprint=machine_fingerprint) + if not r.ok: + ... +""" + +from .errors import LicensingError +from .key import ( + FLAG_FINGERPRINT_BOUND, + FLAG_TRIAL, + KEY_PREFIX, + KEY_VERSION_V1, + KEY_VERSION_V2, + LicensePayload, + has_entitlement, + is_expired_at, + parse_license_key, +) +from .pubkey import PublicKey +from .verify import VerifyOk, Verifier +from .fingerprint import hash_fingerprint + +__all__ = [ + "LicensingError", + "PublicKey", + "Verifier", + "VerifyOk", + "LicensePayload", + "parse_license_key", + "is_expired_at", + "has_entitlement", + "hash_fingerprint", + "FLAG_FINGERPRINT_BOUND", + "FLAG_TRIAL", + "KEY_PREFIX", + "KEY_VERSION_V1", + "KEY_VERSION_V2", +] + +# Online client is gated on optional dependency `httpx`. We try to +# expose it but degrade silently if httpx isn't installed — the offline +# verifier (above) doesn't need network at all. +try: + from .online import ( # noqa: F401 + Client, + ValidateResponse, + ValidateOptions, + StartPurchaseOptions, + PurchaseSession, + PollResponse, + RedeemFreeOptions, + RedeemFreeResponse, + ) + + __all__ += [ + "Client", + "ValidateResponse", + "ValidateOptions", + "StartPurchaseOptions", + "PurchaseSession", + "PollResponse", + "RedeemFreeOptions", + "RedeemFreeResponse", + ] +except ImportError: + # httpx not installed — that's fine, online client is optional. + pass + +__version__ = "0.1.0" diff --git a/src/keysat_licensing_client/errors.py b/src/keysat_licensing_client/errors.py new file mode 100644 index 0000000..a3c5d74 --- /dev/null +++ b/src/keysat_licensing_client/errors.py @@ -0,0 +1,23 @@ +"""Exception types for keysat_licensing_client.""" + + +class LicensingError(Exception): + """Base exception for all keysat_licensing_client errors. + + Subclasses (or `kind` strings on the base class) distinguish + specific failure modes: + + - `bad_signature`: the key's Ed25519 signature didn't verify. + - `bad_format`: the key string isn't a parseable LIC1-... key. + - `expired`: the key parsed and verified but is past its expiry. + - `revoked`: the server reported the key as revoked. + - `fingerprint_mismatch`: the key was bound to a different machine. + - `not_found`: the server doesn't know about this key. + - `product_mismatch`: the key is for a different product than checked. + - `network`: network error talking to the server (transient). + - `server_error`: server returned an unexpected response. + """ + + def __init__(self, kind: str, message: str = ""): + self.kind = kind + super().__init__(message or kind) diff --git a/src/keysat_licensing_client/fingerprint.py b/src/keysat_licensing_client/fingerprint.py new file mode 100644 index 0000000..047660a --- /dev/null +++ b/src/keysat_licensing_client/fingerprint.py @@ -0,0 +1,23 @@ +"""Machine-fingerprint helper. + +The SDK doesn't decide WHAT to use as a fingerprint — that's a product +choice, see PORTING_SDK_TO_NEW_LANGUAGES.md and UPGRADING_EXISTING_SOFTWARE.md +for tradeoffs. The SDK only standardizes how the chosen string is hashed +before being sent to the server (so the server never sees the raw value). +""" + +from __future__ import annotations + +import hashlib + + +def hash_fingerprint(raw: str) -> str: + """SHA-256 the raw fingerprint string and return a hex digest. + + Output is the same as Python's + `hashlib.sha256(raw.encode()).hexdigest()` — 64 lowercase hex chars. + Cross-language SDKs all use this exact hashing so a fingerprint + bound by (say) the TS client validates correctly against the same + machine's Python client. + """ + return hashlib.sha256(raw.encode("utf-8")).hexdigest() diff --git a/src/keysat_licensing_client/key.py b/src/keysat_licensing_client/key.py new file mode 100644 index 0000000..a61c282 --- /dev/null +++ b/src/keysat_licensing_client/key.py @@ -0,0 +1,216 @@ +"""License key payload struct and parser. + +Wire format reference: see PORTING_SDK_TO_NEW_LANGUAGES.md and +licensing-service/src/crypto/mod.rs. Both v1 (legacy fixed-74) and v2 +(variable-length with expires_at + entitlements) layouts are supported. + +Format summary:: + + LIC1-- + + base32: RFC 4648, uppercase, no padding. + signature: 64 bytes Ed25519 over the raw payload bytes. + + v1 payload (74 bytes, fixed): + [0] version (=1) + [1] flags (bit 0 FINGERPRINT_BOUND, bit 1 TRIAL) + [2..18] product_id (UUID big-endian) + [18..34] license_id (UUID big-endian) + [34..42] issued_at (u64 big-endian, unix seconds) + [42..74] fingerprint_hash (SHA-256 hex digest, 32 raw bytes) + + v2 payload (83+ bytes): + [0] version (=2) + [1] flags + [2..18] product_id + [18..34] license_id + [34..42] issued_at (u64 big-endian) + [42..50] expires_at (u64 big-endian; 0 = perpetual) + [50..82] fingerprint_hash + [82] num_entitlements (u8) + [83..] for each: +""" + +from __future__ import annotations + +import base64 +import uuid +from dataclasses import dataclass, field + +from .errors import LicensingError + +KEY_PREFIX = "LIC1" +KEY_VERSION_V1 = 1 +KEY_VERSION_V2 = 2 + +FLAG_FINGERPRINT_BOUND = 0b0000_0001 +FLAG_TRIAL = 0b0000_0010 + +V1_LEN = 74 +V2_HEAD_LEN = 83 # bytes 0..82 fixed, then variable entitlement tail + + +@dataclass(frozen=True) +class LicensePayload: + """Decoded license payload. Returned by `parse_license_key()`.""" + + version: int + flags: int + product_id: uuid.UUID + license_id: uuid.UUID + issued_at: int # unix seconds + expires_at: int # unix seconds; 0 = perpetual; v1 keys always 0 + fingerprint_hash: bytes # 32 bytes; all-zero if unbound + entitlements: list[str] = field(default_factory=list) + + @property + def is_fingerprint_bound(self) -> bool: + return bool(self.flags & FLAG_FINGERPRINT_BOUND) + + @property + def is_trial(self) -> bool: + return bool(self.flags & FLAG_TRIAL) + + +@dataclass(frozen=True) +class ParsedKey: + """Parsed (but NOT signature-verified) license key. + + Carries the payload and the raw payload bytes (needed for signature + verification — the signature is over the bytes, not the parsed + struct). + """ + + payload: LicensePayload + payload_bytes: bytes + signature: bytes # 64 bytes + + +def _b32_decode_nopad(s: str) -> bytes: + """Decode RFC4648 base32, uppercase, with no padding. Adds padding + if needed to satisfy stdlib's strict decoder.""" + s = s.upper() + pad = (-len(s)) % 8 + return base64.b32decode(s + "=" * pad) + + +def parse_license_key(key: str) -> ParsedKey: + """Parse a `LIC1--` key string. Does NOT verify the + signature — use `Verifier.verify()` for that. + + Raises `LicensingError(kind="bad_format")` on malformed input. + """ + if not isinstance(key, str): + raise LicensingError("bad_format", "key must be a string") + parts = key.strip().split("-") + if len(parts) != 3: + raise LicensingError( + "bad_format", + f"expected `LIC1--`, got {len(parts)} dash-separated parts", + ) + prefix, payload_b32, sig_b32 = parts + if prefix != KEY_PREFIX: + raise LicensingError("bad_format", f"unknown prefix '{prefix}'; expected '{KEY_PREFIX}'") + + try: + payload_bytes = _b32_decode_nopad(payload_b32) + signature = _b32_decode_nopad(sig_b32) + except Exception as e: + raise LicensingError("bad_format", f"base32 decode failed: {e}") from e + + if len(signature) != 64: + raise LicensingError( + "bad_format", + f"signature must be 64 bytes (got {len(signature)})", + ) + + return ParsedKey( + payload=_decode_payload(payload_bytes), + payload_bytes=payload_bytes, + signature=signature, + ) + + +def _decode_payload(payload: bytes) -> LicensePayload: + if len(payload) < 2: + raise LicensingError("bad_format", "payload too short") + version = payload[0] + flags = payload[1] + if version == KEY_VERSION_V1: + if len(payload) != V1_LEN: + raise LicensingError( + "bad_format", + f"v1 payload must be exactly {V1_LEN} bytes (got {len(payload)})", + ) + product_id = uuid.UUID(bytes=payload[2:18]) + license_id = uuid.UUID(bytes=payload[18:34]) + issued_at = int.from_bytes(payload[34:42], "big") + fingerprint_hash = payload[42:74] + return LicensePayload( + version=version, + flags=flags, + product_id=product_id, + license_id=license_id, + issued_at=issued_at, + expires_at=0, # v1 has no expiry + fingerprint_hash=fingerprint_hash, + entitlements=[], + ) + elif version == KEY_VERSION_V2: + if len(payload) < V2_HEAD_LEN: + raise LicensingError( + "bad_format", + f"v2 payload header is {V2_HEAD_LEN} bytes; got {len(payload)}", + ) + product_id = uuid.UUID(bytes=payload[2:18]) + license_id = uuid.UUID(bytes=payload[18:34]) + issued_at = int.from_bytes(payload[34:42], "big") + expires_at = int.from_bytes(payload[42:50], "big") + fingerprint_hash = payload[50:82] + num_ents = payload[82] + entitlements: list[str] = [] + cursor = 83 + for _ in range(num_ents): + if cursor >= len(payload): + raise LicensingError("bad_format", "v2 entitlement length byte missing") + slen = payload[cursor] + cursor += 1 + if cursor + slen > len(payload): + raise LicensingError("bad_format", "v2 entitlement extends past payload") + slug = payload[cursor : cursor + slen].decode("utf-8") + cursor += slen + entitlements.append(slug) + if cursor != len(payload): + raise LicensingError( + "bad_format", + f"v2 payload has {len(payload) - cursor} trailing bytes after entitlements", + ) + return LicensePayload( + version=version, + flags=flags, + product_id=product_id, + license_id=license_id, + issued_at=issued_at, + expires_at=expires_at, + fingerprint_hash=fingerprint_hash, + entitlements=entitlements, + ) + else: + raise LicensingError("bad_format", f"unknown key version: {version}") + + +def is_expired_at(payload: LicensePayload, when_unix: int) -> bool: + """Pure helper: is this key expired AT a given unix timestamp? + + Returns False for perpetual keys (expires_at == 0) regardless of + when. Caller supplies `when_unix` so this is a pure function — no + clock dependency. Typically the caller passes `int(time.time())`. + """ + if payload.expires_at == 0: + return False + return when_unix >= payload.expires_at + + +def has_entitlement(payload: LicensePayload, slug: str) -> bool: + """Does the license grant the given entitlement slug?""" + return slug in payload.entitlements diff --git a/src/keysat_licensing_client/online.py b/src/keysat_licensing_client/online.py new file mode 100644 index 0000000..c3b4c3e --- /dev/null +++ b/src/keysat_licensing_client/online.py @@ -0,0 +1,322 @@ +"""Online HTTP client for the Keysat REST API. + +Wraps the public endpoints (`/v1/validate`, `/v1/purchase`, +`/v1/redeem`, `/v1/machines/*`) over HTTPS. All methods are synchronous; +an async variant can be added later if anyone asks. + +Requires `httpx`. Install via the `online` extra:: + + pip install keysat-licensing-client[online] +""" + +from __future__ import annotations + +import time +from dataclasses import dataclass, field +from typing import Any + +import httpx # imported only when this module is loaded; gated in __init__.py + +from .errors import LicensingError + + +# ---------- Response shapes ---------- + + +@dataclass +class ValidateResponse: + ok: bool + reason: str | None = None + license_id: str | None = None + product_id: str | None = None + product_slug: str | None = None + issued_at: str | None = None + expires_at: str | None = None + grace_until: str | None = None + in_grace_period: bool | None = None + is_trial: bool | None = None + entitlements: list[str] = field(default_factory=list) + status: str | None = None + machine_id: str | None = None + max_machines: int | None = None + + +@dataclass +class ValidateOptions: + product_slug: str | None = None + fingerprint: str | None = None + hostname: str | None = None + platform: str | None = None + + +@dataclass +class StartPurchaseOptions: + buyer_email: str | None = None + buyer_note: str | None = None + redirect_url: str | None = None + code: str | None = None + + +@dataclass +class PurchaseSession: + invoice_id: str + btcpay_invoice_id: str + checkout_url: str + amount_sats: int + base_price_sats: int + discount_applied_sats: int + poll_url: str + + +@dataclass +class PollResponse: + invoice_id: str + status: str # 'pending' | 'settled' | 'expired' | 'invalid' + product_id: str + amount_sats: int + license_key: str | None + license_id: str | None + + +@dataclass +class RedeemFreeOptions: + buyer_email: str | None = None + buyer_note: str | None = None + + +@dataclass +class RedeemFreeResponse: + license_id: str + license_key: str + invoice_id: str + redemption_id: str + + +@dataclass +class MachineResponse: + ok: bool + reason: str | None = None + machine_id: str | None = None + active_count: int | None = None + max_machines: int | None = None + + +# ---------- Client ---------- + + +class Client: + """HTTP client pinned to one Keysat server URL. + + Construct with the public base URL (e.g. ``https://license.example.com``). + All methods raise `LicensingError(kind="network")` on transport + failure and `LicensingError(kind="server_error")` on unexpected + server responses; semantic-failure cases (revoked / fingerprint + mismatch / bad signature) come back as `ValidateResponse(ok=False, + reason="...")` so the caller can render different messaging per + reason without try/except. + """ + + def __init__(self, base_url: str, *, timeout: float = 15.0): + self._base = base_url.rstrip("/") + self._timeout = timeout + + @property + def base_url(self) -> str: + return self._base + + # ----- Public endpoints ----- + + def fetch_pubkey_pem(self) -> str: + data = self._get("/v1/pubkey") + return data["public_key_pem"] + + def validate( + self, + key: str, + product_slug: str | None = None, + fingerprint: str | None = None, + opts: ValidateOptions | None = None, + ) -> ValidateResponse: + merged = opts or ValidateOptions() + body = { + "key": key, + "product_slug": merged.product_slug or product_slug, + "fingerprint": merged.fingerprint or fingerprint, + "hostname": merged.hostname, + "platform": merged.platform, + } + body = {k: v for k, v in body.items() if v is not None} + raw = self._post("/v1/validate", body) + ents = raw.get("entitlements") or [] + return ValidateResponse( + ok=bool(raw.get("ok")), + reason=raw.get("reason"), + license_id=raw.get("license_id"), + product_id=raw.get("product_id"), + product_slug=raw.get("product_slug"), + issued_at=raw.get("issued_at"), + expires_at=raw.get("expires_at"), + grace_until=raw.get("grace_until"), + in_grace_period=raw.get("in_grace_period"), + is_trial=raw.get("is_trial"), + entitlements=[e for e in ents if isinstance(e, str)], + status=raw.get("status"), + machine_id=raw.get("machine_id"), + max_machines=raw.get("max_machines"), + ) + + # ----- Machine seat management ----- + + def heartbeat(self, key: str, fingerprint: str) -> MachineResponse: + raw = self._post("/v1/machines/heartbeat", {"key": key, "fingerprint": fingerprint}) + return _to_machine_response(raw) + + def activate( + self, + key: str, + fingerprint: str, + hostname: str | None = None, + platform: str | None = None, + ) -> MachineResponse: + body = {"key": key, "fingerprint": fingerprint, "hostname": hostname, "platform": platform} + body = {k: v for k, v in body.items() if v is not None} + raw = self._post("/v1/machines/activate", body) + return _to_machine_response(raw) + + def deactivate( + self, key: str, fingerprint: str, reason: str | None = None + ) -> MachineResponse: + body = {"key": key, "fingerprint": fingerprint, "reason": reason} + body = {k: v for k, v in body.items() if v is not None} + raw = self._post("/v1/machines/deactivate", body) + return _to_machine_response(raw) + + # ----- Purchase flow ----- + + def start_purchase( + self, + product_slug: str, + opts: StartPurchaseOptions | None = None, + ) -> PurchaseSession: + merged = opts or StartPurchaseOptions() + body = { + "product": product_slug, + "buyer_email": merged.buyer_email, + "buyer_note": merged.buyer_note, + "redirect_url": merged.redirect_url, + "code": merged.code, + } + body = {k: v for k, v in body.items() if v is not None} + raw = self._post("/v1/purchase", body) + return PurchaseSession( + invoice_id=raw["invoice_id"], + btcpay_invoice_id=raw["btcpay_invoice_id"], + checkout_url=raw["checkout_url"], + amount_sats=raw["amount_sats"], + base_price_sats=raw.get("base_price_sats", raw["amount_sats"]), + discount_applied_sats=raw.get("discount_applied_sats", 0), + poll_url=raw["poll_url"], + ) + + def poll_purchase(self, invoice_id: str) -> PollResponse: + raw = self._get(f"/v1/purchase/{invoice_id}") + return PollResponse( + invoice_id=raw["invoice_id"], + status=raw["status"], + product_id=raw["product_id"], + amount_sats=raw["amount_sats"], + license_key=raw.get("license_key"), + license_id=raw.get("license_id"), + ) + + def wait_for_license( + self, + invoice_id: str, + interval_s: float = 5.0, + timeout_s: float | None = None, + ) -> str: + """Poll `poll_purchase` until the license_key is non-null. + + Raises LicensingError on terminal invoice states (expired / + invalid) or timeout. Returns the license_key string on success. + """ + deadline = time.monotonic() + timeout_s if timeout_s else None + while True: + poll = self.poll_purchase(invoice_id) + if poll.license_key: + return poll.license_key + if poll.status in ("expired", "invalid"): + raise LicensingError( + "server_error", f"invoice ended in status {poll.status}" + ) + if deadline is not None and time.monotonic() > deadline: + raise LicensingError( + "server_error", "timed out waiting for license issuance" + ) + time.sleep(interval_s) + + # ----- Free-license redemption ----- + + def redeem_free_license( + self, + product_slug: str, + code: str, + opts: RedeemFreeOptions | None = None, + ) -> RedeemFreeResponse: + merged = opts or RedeemFreeOptions() + body = { + "product": product_slug, + "code": code, + "buyer_email": merged.buyer_email, + "buyer_note": merged.buyer_note, + } + body = {k: v for k, v in body.items() if v is not None} + raw = self._post("/v1/redeem", body) + return RedeemFreeResponse( + license_id=raw["license_id"], + license_key=raw["license_key"], + invoice_id=raw["invoice_id"], + redemption_id=raw["redemption_id"], + ) + + # ----- Internals ----- + + def _get(self, path: str) -> dict[str, Any]: + return self._request("GET", path, json_body=None) + + def _post(self, path: str, body: dict[str, Any]) -> dict[str, Any]: + return self._request("POST", path, json_body=body) + + def _request(self, method: str, path: str, json_body: dict[str, Any] | None) -> dict[str, Any]: + url = self._base + path + try: + resp = httpx.request(method, url, json=json_body, timeout=self._timeout) + except httpx.HTTPError as e: + raise LicensingError("network", f"{method} {url}: {e}") from e + if resp.status_code >= 500: + raise LicensingError( + "server_error", + f"{method} {url}: HTTP {resp.status_code} — {resp.text[:200]}", + ) + if resp.status_code >= 400: + raise LicensingError( + "server_error", + f"{method} {url}: HTTP {resp.status_code} — {resp.text[:200]}", + ) + try: + return resp.json() + except Exception as e: + raise LicensingError( + "server_error", + f"{method} {url}: response was not JSON: {e}", + ) from e + + +def _to_machine_response(raw: dict[str, Any]) -> MachineResponse: + return MachineResponse( + ok=bool(raw.get("ok")), + reason=raw.get("reason"), + machine_id=raw.get("machine_id"), + active_count=raw.get("active_count"), + max_machines=raw.get("max_machines"), + ) diff --git a/src/keysat_licensing_client/pubkey.py b/src/keysat_licensing_client/pubkey.py new file mode 100644 index 0000000..a84eada --- /dev/null +++ b/src/keysat_licensing_client/pubkey.py @@ -0,0 +1,55 @@ +"""Server public key loading and management.""" + +from __future__ import annotations + +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey +from cryptography.hazmat.primitives.serialization import load_pem_public_key + +from .errors import LicensingError + + +class PublicKey: + """An Ed25519 public key issued by the Keysat server. + + Holds the underlying `cryptography.Ed25519PublicKey` and exposes + convenience constructors. Pass an instance to `Verifier(...)`. + """ + + def __init__(self, key: Ed25519PublicKey): + self._key = key + + @classmethod + def from_pem(cls, pem: str | bytes) -> "PublicKey": + """Load from a PEM string as returned by the server's `GET /v1/pubkey`. + + Raises `LicensingError(kind="bad_format")` if the PEM is not a + valid Ed25519 public key. + """ + if isinstance(pem, str): + pem = pem.encode("utf-8") + try: + loaded = load_pem_public_key(pem) + except Exception as e: + raise LicensingError("bad_format", f"could not parse PEM: {e}") from e + if not isinstance(loaded, Ed25519PublicKey): + raise LicensingError( + "bad_format", + "PEM is not an Ed25519 public key (got " + f"{type(loaded).__name__})", + ) + return cls(loaded) + + @classmethod + def from_raw(cls, raw: bytes) -> "PublicKey": + """Load from a 32-byte raw Ed25519 public key.""" + if len(raw) != 32: + raise LicensingError( + "bad_format", + f"raw Ed25519 public key must be 32 bytes (got {len(raw)})", + ) + return cls(Ed25519PublicKey.from_public_bytes(raw)) + + @property + def underlying(self) -> Ed25519PublicKey: + """Access the underlying cryptography object (for advanced use).""" + return self._key diff --git a/src/keysat_licensing_client/verify.py b/src/keysat_licensing_client/verify.py new file mode 100644 index 0000000..5099750 --- /dev/null +++ b/src/keysat_licensing_client/verify.py @@ -0,0 +1,82 @@ +"""Offline signature verification — the bulk of the value of the SDK.""" + +from __future__ import annotations + +import uuid +from dataclasses import dataclass + +from cryptography.exceptions import InvalidSignature + +from .errors import LicensingError +from .key import LicensePayload, parse_license_key +from .pubkey import PublicKey + + +@dataclass(frozen=True) +class VerifyOk: + """Result of a successful offline `Verifier.verify()` call.""" + + license_id: uuid.UUID + product_id: uuid.UUID + issued_at: int + expires_at: int # 0 = perpetual + is_trial: bool + is_fingerprint_bound: bool + fingerprint_hash: bytes # 32 bytes; all-zero if unbound + entitlements: list[str] + payload: LicensePayload # the full parsed payload + + +class Verifier: + """Offline license-key verifier. + + Given the issuer's PEM-encoded public key, verifies the cryptographic + integrity of a license key string with no network access. Suitable + for boot-time license checks where the licensing server may be + unreachable. + + Usage:: + + verifier = Verifier(PublicKey.from_pem(ISSUER_PUBKEY_PEM)) + ok = verifier.verify(key_from_user) + # raises LicensingError on bad signature / bad format + + For revocation and fingerprint binding, layer the online + `Client.validate(...)` on top of this — but only AFTER offline + verification has passed. + """ + + def __init__(self, public_key: PublicKey): + self._pubkey = public_key + + def verify(self, key: str) -> VerifyOk: + """Verify a `LIC1-...-...` key. + + On success, returns a `VerifyOk` with all parsed fields. On + failure, raises `LicensingError`: + + - `kind="bad_format"`: the key string is malformed. + - `kind="bad_signature"`: signature didn't verify against + the issuer's public key (key was edited, fabricated, or + issued by a different server). + """ + parsed = parse_license_key(key) + try: + self._pubkey.underlying.verify(parsed.signature, parsed.payload_bytes) + except InvalidSignature as e: + raise LicensingError( + "bad_signature", + "signature did not verify against the issuer's public key", + ) from e + p = parsed.payload + return VerifyOk( + license_id=p.license_id, + product_id=p.product_id, + issued_at=p.issued_at, + expires_at=p.expires_at, + is_trial=p.is_trial, + is_fingerprint_bound=p.is_fingerprint_bound, + fingerprint_hash=p.fingerprint_hash, + entitlements=list(p.entitlements), + payload=p, + ) diff --git a/tests/test_crosscheck.py b/tests/test_crosscheck.py new file mode 100644 index 0000000..2dc80f9 --- /dev/null +++ b/tests/test_crosscheck.py @@ -0,0 +1,176 @@ +"""Cross-check the Python SDK against the canonical wire-format test +vectors at ../../tests/crosscheck/vector.json. + +These vectors are also exercised by the Rust and TS SDKs. Any new SDK +must pass every fixture in vector.json — that's how we guarantee +wire-format compatibility across languages. + +Run with: `pytest -q` from the package root, OR `python -m pytest`. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from keysat_licensing_client import ( + PublicKey, + Verifier, + LicensingError, + has_entitlement, + hash_fingerprint, + is_expired_at, + parse_license_key, +) + + +# Locate vector.json relative to this file. The tests directory lives +# at licensing-client-python/tests/, the vector lives at +# tests/crosscheck/vector.json (repo root). Walk up until we find it. +def _vectors_path() -> Path: + here = Path(__file__).resolve().parent + for ancestor in [here, *here.parents]: + candidate = ancestor / "tests" / "crosscheck" / "vector.json" + if candidate.exists(): + return candidate + pytest.skip("crosscheck vector.json not found; run from repo with tests/ tree") + + +@pytest.fixture(scope="module") +def vectors() -> dict: + with _vectors_path().open() as f: + return json.load(f) + + +@pytest.fixture(scope="module") +def verifier(vectors: dict) -> Verifier: + return Verifier(PublicKey.from_pem(vectors["publicKeyPem"])) + + +# ------------------------------------------------------------------ +# v1 fixture: legacy fixed-74 layout, fingerprint-bound, no expiry, +# no entitlements. +# ------------------------------------------------------------------ + + +def test_v1_parses(vectors: dict) -> None: + parsed = parse_license_key(vectors["v1"]["licenseKey"]) + exp = vectors["v1"]["expected"] + assert parsed.payload.version == exp["version"] + assert str(parsed.payload.product_id) == exp["productUuid"] + assert str(parsed.payload.license_id) == exp["licenseUuid"] + assert parsed.payload.issued_at == exp["issuedAt"] + assert parsed.payload.expires_at == exp["expiresAt"] + assert parsed.payload.flags == exp["flags"] + assert parsed.payload.is_fingerprint_bound is exp["isFingerprintBound"] + assert parsed.payload.is_trial is exp["isTrial"] + assert parsed.payload.entitlements == exp["entitlements"] + assert parsed.payload.fingerprint_hash.hex() == exp["fingerprintHashHex"] + + +def test_v1_verifies(verifier: Verifier, vectors: dict) -> None: + ok = verifier.verify(vectors["v1"]["licenseKey"]) + assert str(ok.product_id) == vectors["v1"]["expected"]["productUuid"] + + +def test_v1_tamper_detected(verifier: Verifier, vectors: dict) -> None: + key = vectors["v1"]["licenseKey"] + # Flip one char in the payload section. The signature won't match. + payload_start = key.index("-") + 1 + tampered = key[:payload_start] + ("B" if key[payload_start] != "B" else "C") + key[payload_start + 1 :] + with pytest.raises(LicensingError) as excinfo: + verifier.verify(tampered) + assert excinfo.value.kind in ("bad_signature", "bad_format") + + +# ------------------------------------------------------------------ +# v2 fixture: trial, fingerprint-bound, explicit expiry, two entitlements. +# Stresses the variable-length tail parser. +# ------------------------------------------------------------------ + + +def test_v2_parses(vectors: dict) -> None: + parsed = parse_license_key(vectors["v2"]["licenseKey"]) + exp = vectors["v2"]["expected"] + assert parsed.payload.version == exp["version"] + assert str(parsed.payload.product_id) == exp["productUuid"] + assert str(parsed.payload.license_id) == exp["licenseUuid"] + assert parsed.payload.issued_at == exp["issuedAt"] + assert parsed.payload.expires_at == exp["expiresAt"] + assert parsed.payload.flags == exp["flags"] + assert parsed.payload.is_fingerprint_bound is exp["isFingerprintBound"] + assert parsed.payload.is_trial is exp["isTrial"] + assert parsed.payload.entitlements == exp["entitlements"] + + +def test_v2_verifies(verifier: Verifier, vectors: dict) -> None: + ok = verifier.verify(vectors["v2"]["licenseKey"]) + assert ok.is_trial + assert ok.is_fingerprint_bound + assert len(ok.entitlements) == len(vectors["v2"]["expected"]["entitlements"]) + + +def test_v2_expiry_boundary(vectors: dict) -> None: + parsed = parse_license_key(vectors["v2"]["licenseKey"]) + expires_at = parsed.payload.expires_at + assert is_expired_at(parsed.payload, expires_at) is True + assert is_expired_at(parsed.payload, expires_at - 1) is False + + +def test_v2_entitlements(vectors: dict) -> None: + parsed = parse_license_key(vectors["v2"]["licenseKey"]) + for slug in vectors["v2"]["expected"]["entitlements"]: + assert has_entitlement(parsed.payload, slug) + assert has_entitlement(parsed.payload, "definitely-not-a-real-entitlement") is False + + +# ------------------------------------------------------------------ +# v2_perpetual_unbound — common case for paid purchase: v2, no expiry, +# no fingerprint binding, no entitlements. +# ------------------------------------------------------------------ + + +def test_v2_perpetual_unbound_parses(vectors: dict) -> None: + if "v2_perpetual_unbound" not in vectors: + pytest.skip("vector.json doesn't include v2_perpetual_unbound") + parsed = parse_license_key(vectors["v2_perpetual_unbound"]["licenseKey"]) + assert parsed.payload.version == 2 + assert parsed.payload.expires_at == 0 + assert parsed.payload.is_fingerprint_bound is False + + +def test_v2_perpetual_unbound_verifies(verifier: Verifier, vectors: dict) -> None: + if "v2_perpetual_unbound" not in vectors: + pytest.skip("vector.json doesn't include v2_perpetual_unbound") + verifier.verify(vectors["v2_perpetual_unbound"]["licenseKey"]) + + +# ------------------------------------------------------------------ +# Cross-language fingerprint-hash compatibility. +# ------------------------------------------------------------------ + + +def test_hash_fingerprint_matches_python_stdlib() -> None: + import hashlib + raw = "hello" + expected = hashlib.sha256(raw.encode()).hexdigest() + assert hash_fingerprint(raw) == expected + + +# ------------------------------------------------------------------ +# Negative cases. +# ------------------------------------------------------------------ + + +def test_bad_format_short_key() -> None: + with pytest.raises(LicensingError) as excinfo: + parse_license_key("notakey") + assert excinfo.value.kind == "bad_format" + + +def test_bad_format_wrong_prefix() -> None: + with pytest.raises(LicensingError) as excinfo: + parse_license_key("LIC9-AAAA-BBBB") + assert excinfo.value.kind == "bad_format"