Initial public commit

This commit is contained in:
Keysat
2026-05-07 10:39:37 -05:00
commit 2d9caa814e
13 changed files with 1241 additions and 0 deletions
+38
View File
@@ -0,0 +1,38 @@
# Byte-compiled / optimized
__pycache__/
*.py[cod]
*$py.class
*.so
# Distribution / packaging
build/
dist/
*.egg-info/
*.egg
.eggs/
wheels/
pip-wheel-metadata/
# Virtual environments
venv/
.venv/
env/
.env
ENV/
# Testing / coverage
.pytest_cache/
.coverage
.coverage.*
.tox/
htmlcov/
.mypy_cache/
.ruff_cache/
# Editor / OS cruft
.DS_Store
.idea/
.vscode/
*.swp
*.bak
*.tmp
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026 Keysat
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+102
View File
@@ -0,0 +1,102 @@
# keysat-licensing-client (Python)
Python client for [Keysat](https://github.com/keysat-xyz/keysat) — a
self-hosted Bitcoin-paid software licensing server that runs on Start9.
Verifies signed license keys offline using the issuer's public key, and
(optionally) wraps the HTTP API for live validation, purchase, and
free-license redemption.
## Install
```bash
pip install keysat-licensing-client # offline only
pip install keysat-licensing-client[online] # + httpx for the online client
```
Requires Python 3.10+.
## Five-line offline check
```python
from keysat_licensing_client import Verifier, PublicKey
ISSUER_PUBKEY_PEM = open("assets/issuer.pub").read() # bake this into your app
verifier = Verifier(PublicKey.from_pem(ISSUER_PUBKEY_PEM))
ok = verifier.verify(key_from_user) # raises LicensingError on bad sig
print(f"licensed for product {ok.product_id}, expires {ok.expires_at}")
```
## Online check (with revocation + fingerprint binding)
```python
from keysat_licensing_client import Client
client = Client("https://license.example.com")
r = client.validate(
key_from_user,
product_slug="my-product",
fingerprint="machine-fingerprint",
)
if not r.ok:
print("server rejected:", r.reason)
# 'revoked', 'fingerprint_mismatch', 'not_found', 'product_mismatch', etc.
```
The recommended pattern is **offline-first, online-augmented**: do the
offline `Verifier.verify()` at boot. If that succeeds, also do an
async/background `client.validate()` to catch revocations and seat
mismatches. If the network fails, treat it as "status unknown" — don't
gate the user on your server's uptime.
## Purchase flow (drives the whole BTCPay round trip)
```python
from keysat_licensing_client import Client, StartPurchaseOptions
import webbrowser
client = Client("https://license.example.com")
session = client.start_purchase(
"my-product",
StartPurchaseOptions(buyer_email="bob@example.com"),
)
webbrowser.open(session.checkout_url)
license_key = client.wait_for_license(session.invoice_id)
# Save license_key wherever you decided to store keys (config dir, keychain, env).
```
## Free-license code redemption
For codes the seller created with kind `free_license` (no payment):
```python
result = client.redeem_free_license(
"my-product",
"PRESSPASS",
)
print("redeemed:", result.license_key)
```
## Fingerprint binding
The SDK doesn't decide WHAT to use as a fingerprint — that's a product
choice. Common sources, ordered by robustness:
- Linux: `/etc/machine-id`
- macOS: `ioreg -d2 -c IOPlatformExpertDevice`
- Windows: registry `MachineGuid`
- Fallback: random UUID written into your app's config dir on first run
Mix in a per-product salt so fingerprints from your app can't be
replayed against someone else's licensing server:
```python
fp_input = f"{APP_NAME}|{machine_id}"
# The SDK SHA-256s this for you when you pass it to validate(...).
```
## License
MIT OR Apache-2.0. See the upstream `LICENSE` file at
[github.com/keysat-xyz/keysat](https://github.com/keysat-xyz/keysat).
+45
View File
@@ -0,0 +1,45 @@
"""Example: verify a Keysat license key offline.
Usage::
KEYSAT_PUBKEY_PEM=$(cat issuer.pub) python examples/offline_verify.py "LIC1-...-..."
Or set the PEM string inline. The output reports the parsed payload
fields and exits non-zero on bad signature / bad format.
"""
import os
import sys
from keysat_licensing_client import LicensingError, PublicKey, Verifier
def main() -> int:
pem = os.environ.get("KEYSAT_PUBKEY_PEM")
if not pem:
print("error: set KEYSAT_PUBKEY_PEM to the issuer's PEM-encoded public key.", file=sys.stderr)
return 2
if len(sys.argv) != 2:
print("usage: offline_verify.py <license-key>", file=sys.stderr)
return 2
verifier = Verifier(PublicKey.from_pem(pem))
try:
ok = verifier.verify(sys.argv[1])
except LicensingError as e:
print(f"INVALID: {e.kind}: {e}", file=sys.stderr)
return 1
print("VALID")
print(f" product_id: {ok.product_id}")
print(f" license_id: {ok.license_id}")
print(f" issued_at: {ok.issued_at}")
print(f" expires_at: {ok.expires_at if ok.expires_at != 0 else 'perpetual'}")
print(f" trial: {ok.is_trial}")
print(f" fp_bound: {ok.is_fingerprint_bound}")
print(f" entitlements: {ok.entitlements or '(none)'}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+48
View File
@@ -0,0 +1,48 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "keysat-licensing-client"
version = "0.1.0"
description = "Python client for Keysat — a self-hosted Bitcoin-paid software licensing server that runs on Start9. Verifies signed license keys offline and wraps the HTTP API for purchase, redemption, and revocation checks."
readme = "README.md"
requires-python = ">=3.10"
license = { text = "MIT OR Apache-2.0" }
authors = [{ name = "Keysat", email = "licensing@keysat.xyz" }]
keywords = ["bitcoin", "licensing", "btcpay", "start9", "ed25519"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Security :: Cryptography",
]
dependencies = [
"cryptography>=42",
]
[project.optional-dependencies]
online = ["httpx>=0.27"]
dev = [
"pytest>=8",
"ruff>=0.5",
"mypy>=1.10",
"httpx>=0.27",
]
[project.urls]
Homepage = "https://keysat.xyz"
Repository = "https://github.com/keysat-xyz/keysat"
Documentation = "https://github.com/keysat-xyz/keysat/blob/main/docs/INTEGRATION.md"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-q"
+90
View File
@@ -0,0 +1,90 @@
"""
keysat_licensing_client
=======================
Python client for Keysat — a self-hosted Bitcoin-paid software licensing
server that runs on Start9. Verifies signed license keys offline, and
(via the optional `online` extra) wraps the HTTP API for purchase,
free-license redemption, and revocation checks.
Five-line offline check::
from keysat_licensing_client import Verifier, PublicKey
verifier = Verifier(PublicKey.from_pem(ISSUER_PUBKEY_PEM))
ok = verifier.verify(key_from_user)
print("licensed for product", ok.product_id)
Online client (requires the `online` extra: `pip install keysat-licensing-client[online]`)::
from keysat_licensing_client import Client
client = Client("https://license.example.com")
r = client.validate(key_from_user, product_slug="my-product",
fingerprint=machine_fingerprint)
if not r.ok:
...
"""
from .errors import LicensingError
from .key import (
FLAG_FINGERPRINT_BOUND,
FLAG_TRIAL,
KEY_PREFIX,
KEY_VERSION_V1,
KEY_VERSION_V2,
LicensePayload,
has_entitlement,
is_expired_at,
parse_license_key,
)
from .pubkey import PublicKey
from .verify import VerifyOk, Verifier
from .fingerprint import hash_fingerprint
__all__ = [
"LicensingError",
"PublicKey",
"Verifier",
"VerifyOk",
"LicensePayload",
"parse_license_key",
"is_expired_at",
"has_entitlement",
"hash_fingerprint",
"FLAG_FINGERPRINT_BOUND",
"FLAG_TRIAL",
"KEY_PREFIX",
"KEY_VERSION_V1",
"KEY_VERSION_V2",
]
# Online client is gated on optional dependency `httpx`. We try to
# expose it but degrade silently if httpx isn't installed — the offline
# verifier (above) doesn't need network at all.
try:
from .online import ( # noqa: F401
Client,
ValidateResponse,
ValidateOptions,
StartPurchaseOptions,
PurchaseSession,
PollResponse,
RedeemFreeOptions,
RedeemFreeResponse,
)
__all__ += [
"Client",
"ValidateResponse",
"ValidateOptions",
"StartPurchaseOptions",
"PurchaseSession",
"PollResponse",
"RedeemFreeOptions",
"RedeemFreeResponse",
]
except ImportError:
# httpx not installed — that's fine, online client is optional.
pass
__version__ = "0.1.0"
+23
View File
@@ -0,0 +1,23 @@
"""Exception types for keysat_licensing_client."""
class LicensingError(Exception):
"""Base exception for all keysat_licensing_client errors.
Subclasses (or `kind` strings on the base class) distinguish
specific failure modes:
- `bad_signature`: the key's Ed25519 signature didn't verify.
- `bad_format`: the key string isn't a parseable LIC1-... key.
- `expired`: the key parsed and verified but is past its expiry.
- `revoked`: the server reported the key as revoked.
- `fingerprint_mismatch`: the key was bound to a different machine.
- `not_found`: the server doesn't know about this key.
- `product_mismatch`: the key is for a different product than checked.
- `network`: network error talking to the server (transient).
- `server_error`: server returned an unexpected response.
"""
def __init__(self, kind: str, message: str = ""):
self.kind = kind
super().__init__(message or kind)
@@ -0,0 +1,23 @@
"""Machine-fingerprint helper.
The SDK doesn't decide WHAT to use as a fingerprint — that's a product
choice, see PORTING_SDK_TO_NEW_LANGUAGES.md and UPGRADING_EXISTING_SOFTWARE.md
for tradeoffs. The SDK only standardizes how the chosen string is hashed
before being sent to the server (so the server never sees the raw value).
"""
from __future__ import annotations
import hashlib
def hash_fingerprint(raw: str) -> str:
"""SHA-256 the raw fingerprint string and return a hex digest.
Output is the same as Python's
`hashlib.sha256(raw.encode()).hexdigest()` — 64 lowercase hex chars.
Cross-language SDKs all use this exact hashing so a fingerprint
bound by (say) the TS client validates correctly against the same
machine's Python client.
"""
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
+216
View File
@@ -0,0 +1,216 @@
"""License key payload struct and parser.
Wire format reference: see PORTING_SDK_TO_NEW_LANGUAGES.md and
licensing-service/src/crypto/mod.rs. Both v1 (legacy fixed-74) and v2
(variable-length with expires_at + entitlements) layouts are supported.
Format summary::
LIC1-<base32(payload)>-<base32(signature)>
base32: RFC 4648, uppercase, no padding.
signature: 64 bytes Ed25519 over the raw payload bytes.
v1 payload (74 bytes, fixed):
[0] version (=1)
[1] flags (bit 0 FINGERPRINT_BOUND, bit 1 TRIAL)
[2..18] product_id (UUID big-endian)
[18..34] license_id (UUID big-endian)
[34..42] issued_at (u64 big-endian, unix seconds)
[42..74] fingerprint_hash (SHA-256 hex digest, 32 raw bytes)
v2 payload (83+ bytes):
[0] version (=2)
[1] flags
[2..18] product_id
[18..34] license_id
[34..42] issued_at (u64 big-endian)
[42..50] expires_at (u64 big-endian; 0 = perpetual)
[50..82] fingerprint_hash
[82] num_entitlements (u8)
[83..] for each: <u8 len><len bytes UTF-8 entitlement slug>
"""
from __future__ import annotations
import base64
import uuid
from dataclasses import dataclass, field
from .errors import LicensingError
KEY_PREFIX = "LIC1"
KEY_VERSION_V1 = 1
KEY_VERSION_V2 = 2
FLAG_FINGERPRINT_BOUND = 0b0000_0001
FLAG_TRIAL = 0b0000_0010
V1_LEN = 74
V2_HEAD_LEN = 83 # bytes 0..82 fixed, then variable entitlement tail
@dataclass(frozen=True)
class LicensePayload:
"""Decoded license payload. Returned by `parse_license_key()`."""
version: int
flags: int
product_id: uuid.UUID
license_id: uuid.UUID
issued_at: int # unix seconds
expires_at: int # unix seconds; 0 = perpetual; v1 keys always 0
fingerprint_hash: bytes # 32 bytes; all-zero if unbound
entitlements: list[str] = field(default_factory=list)
@property
def is_fingerprint_bound(self) -> bool:
return bool(self.flags & FLAG_FINGERPRINT_BOUND)
@property
def is_trial(self) -> bool:
return bool(self.flags & FLAG_TRIAL)
@dataclass(frozen=True)
class ParsedKey:
"""Parsed (but NOT signature-verified) license key.
Carries the payload and the raw payload bytes (needed for signature
verification — the signature is over the bytes, not the parsed
struct).
"""
payload: LicensePayload
payload_bytes: bytes
signature: bytes # 64 bytes
def _b32_decode_nopad(s: str) -> bytes:
"""Decode RFC4648 base32, uppercase, with no padding. Adds padding
if needed to satisfy stdlib's strict decoder."""
s = s.upper()
pad = (-len(s)) % 8
return base64.b32decode(s + "=" * pad)
def parse_license_key(key: str) -> ParsedKey:
"""Parse a `LIC1-<payload>-<sig>` key string. Does NOT verify the
signature — use `Verifier.verify()` for that.
Raises `LicensingError(kind="bad_format")` on malformed input.
"""
if not isinstance(key, str):
raise LicensingError("bad_format", "key must be a string")
parts = key.strip().split("-")
if len(parts) != 3:
raise LicensingError(
"bad_format",
f"expected `LIC1-<payload>-<sig>`, got {len(parts)} dash-separated parts",
)
prefix, payload_b32, sig_b32 = parts
if prefix != KEY_PREFIX:
raise LicensingError("bad_format", f"unknown prefix '{prefix}'; expected '{KEY_PREFIX}'")
try:
payload_bytes = _b32_decode_nopad(payload_b32)
signature = _b32_decode_nopad(sig_b32)
except Exception as e:
raise LicensingError("bad_format", f"base32 decode failed: {e}") from e
if len(signature) != 64:
raise LicensingError(
"bad_format",
f"signature must be 64 bytes (got {len(signature)})",
)
return ParsedKey(
payload=_decode_payload(payload_bytes),
payload_bytes=payload_bytes,
signature=signature,
)
def _decode_payload(payload: bytes) -> LicensePayload:
if len(payload) < 2:
raise LicensingError("bad_format", "payload too short")
version = payload[0]
flags = payload[1]
if version == KEY_VERSION_V1:
if len(payload) != V1_LEN:
raise LicensingError(
"bad_format",
f"v1 payload must be exactly {V1_LEN} bytes (got {len(payload)})",
)
product_id = uuid.UUID(bytes=payload[2:18])
license_id = uuid.UUID(bytes=payload[18:34])
issued_at = int.from_bytes(payload[34:42], "big")
fingerprint_hash = payload[42:74]
return LicensePayload(
version=version,
flags=flags,
product_id=product_id,
license_id=license_id,
issued_at=issued_at,
expires_at=0, # v1 has no expiry
fingerprint_hash=fingerprint_hash,
entitlements=[],
)
elif version == KEY_VERSION_V2:
if len(payload) < V2_HEAD_LEN:
raise LicensingError(
"bad_format",
f"v2 payload header is {V2_HEAD_LEN} bytes; got {len(payload)}",
)
product_id = uuid.UUID(bytes=payload[2:18])
license_id = uuid.UUID(bytes=payload[18:34])
issued_at = int.from_bytes(payload[34:42], "big")
expires_at = int.from_bytes(payload[42:50], "big")
fingerprint_hash = payload[50:82]
num_ents = payload[82]
entitlements: list[str] = []
cursor = 83
for _ in range(num_ents):
if cursor >= len(payload):
raise LicensingError("bad_format", "v2 entitlement length byte missing")
slen = payload[cursor]
cursor += 1
if cursor + slen > len(payload):
raise LicensingError("bad_format", "v2 entitlement extends past payload")
slug = payload[cursor : cursor + slen].decode("utf-8")
cursor += slen
entitlements.append(slug)
if cursor != len(payload):
raise LicensingError(
"bad_format",
f"v2 payload has {len(payload) - cursor} trailing bytes after entitlements",
)
return LicensePayload(
version=version,
flags=flags,
product_id=product_id,
license_id=license_id,
issued_at=issued_at,
expires_at=expires_at,
fingerprint_hash=fingerprint_hash,
entitlements=entitlements,
)
else:
raise LicensingError("bad_format", f"unknown key version: {version}")
def is_expired_at(payload: LicensePayload, when_unix: int) -> bool:
"""Pure helper: is this key expired AT a given unix timestamp?
Returns False for perpetual keys (expires_at == 0) regardless of
when. Caller supplies `when_unix` so this is a pure function — no
clock dependency. Typically the caller passes `int(time.time())`.
"""
if payload.expires_at == 0:
return False
return when_unix >= payload.expires_at
def has_entitlement(payload: LicensePayload, slug: str) -> bool:
"""Does the license grant the given entitlement slug?"""
return slug in payload.entitlements
+322
View File
@@ -0,0 +1,322 @@
"""Online HTTP client for the Keysat REST API.
Wraps the public endpoints (`/v1/validate`, `/v1/purchase`,
`/v1/redeem`, `/v1/machines/*`) over HTTPS. All methods are synchronous;
an async variant can be added later if anyone asks.
Requires `httpx`. Install via the `online` extra::
pip install keysat-licensing-client[online]
"""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Any
import httpx # imported only when this module is loaded; gated in __init__.py
from .errors import LicensingError
# ---------- Response shapes ----------
@dataclass
class ValidateResponse:
ok: bool
reason: str | None = None
license_id: str | None = None
product_id: str | None = None
product_slug: str | None = None
issued_at: str | None = None
expires_at: str | None = None
grace_until: str | None = None
in_grace_period: bool | None = None
is_trial: bool | None = None
entitlements: list[str] = field(default_factory=list)
status: str | None = None
machine_id: str | None = None
max_machines: int | None = None
@dataclass
class ValidateOptions:
product_slug: str | None = None
fingerprint: str | None = None
hostname: str | None = None
platform: str | None = None
@dataclass
class StartPurchaseOptions:
buyer_email: str | None = None
buyer_note: str | None = None
redirect_url: str | None = None
code: str | None = None
@dataclass
class PurchaseSession:
invoice_id: str
btcpay_invoice_id: str
checkout_url: str
amount_sats: int
base_price_sats: int
discount_applied_sats: int
poll_url: str
@dataclass
class PollResponse:
invoice_id: str
status: str # 'pending' | 'settled' | 'expired' | 'invalid'
product_id: str
amount_sats: int
license_key: str | None
license_id: str | None
@dataclass
class RedeemFreeOptions:
buyer_email: str | None = None
buyer_note: str | None = None
@dataclass
class RedeemFreeResponse:
license_id: str
license_key: str
invoice_id: str
redemption_id: str
@dataclass
class MachineResponse:
ok: bool
reason: str | None = None
machine_id: str | None = None
active_count: int | None = None
max_machines: int | None = None
# ---------- Client ----------
class Client:
"""HTTP client pinned to one Keysat server URL.
Construct with the public base URL (e.g. ``https://license.example.com``).
All methods raise `LicensingError(kind="network")` on transport
failure and `LicensingError(kind="server_error")` on unexpected
server responses; semantic-failure cases (revoked / fingerprint
mismatch / bad signature) come back as `ValidateResponse(ok=False,
reason="...")` so the caller can render different messaging per
reason without try/except.
"""
def __init__(self, base_url: str, *, timeout: float = 15.0):
self._base = base_url.rstrip("/")
self._timeout = timeout
@property
def base_url(self) -> str:
return self._base
# ----- Public endpoints -----
def fetch_pubkey_pem(self) -> str:
data = self._get("/v1/pubkey")
return data["public_key_pem"]
def validate(
self,
key: str,
product_slug: str | None = None,
fingerprint: str | None = None,
opts: ValidateOptions | None = None,
) -> ValidateResponse:
merged = opts or ValidateOptions()
body = {
"key": key,
"product_slug": merged.product_slug or product_slug,
"fingerprint": merged.fingerprint or fingerprint,
"hostname": merged.hostname,
"platform": merged.platform,
}
body = {k: v for k, v in body.items() if v is not None}
raw = self._post("/v1/validate", body)
ents = raw.get("entitlements") or []
return ValidateResponse(
ok=bool(raw.get("ok")),
reason=raw.get("reason"),
license_id=raw.get("license_id"),
product_id=raw.get("product_id"),
product_slug=raw.get("product_slug"),
issued_at=raw.get("issued_at"),
expires_at=raw.get("expires_at"),
grace_until=raw.get("grace_until"),
in_grace_period=raw.get("in_grace_period"),
is_trial=raw.get("is_trial"),
entitlements=[e for e in ents if isinstance(e, str)],
status=raw.get("status"),
machine_id=raw.get("machine_id"),
max_machines=raw.get("max_machines"),
)
# ----- Machine seat management -----
def heartbeat(self, key: str, fingerprint: str) -> MachineResponse:
raw = self._post("/v1/machines/heartbeat", {"key": key, "fingerprint": fingerprint})
return _to_machine_response(raw)
def activate(
self,
key: str,
fingerprint: str,
hostname: str | None = None,
platform: str | None = None,
) -> MachineResponse:
body = {"key": key, "fingerprint": fingerprint, "hostname": hostname, "platform": platform}
body = {k: v for k, v in body.items() if v is not None}
raw = self._post("/v1/machines/activate", body)
return _to_machine_response(raw)
def deactivate(
self, key: str, fingerprint: str, reason: str | None = None
) -> MachineResponse:
body = {"key": key, "fingerprint": fingerprint, "reason": reason}
body = {k: v for k, v in body.items() if v is not None}
raw = self._post("/v1/machines/deactivate", body)
return _to_machine_response(raw)
# ----- Purchase flow -----
def start_purchase(
self,
product_slug: str,
opts: StartPurchaseOptions | None = None,
) -> PurchaseSession:
merged = opts or StartPurchaseOptions()
body = {
"product": product_slug,
"buyer_email": merged.buyer_email,
"buyer_note": merged.buyer_note,
"redirect_url": merged.redirect_url,
"code": merged.code,
}
body = {k: v for k, v in body.items() if v is not None}
raw = self._post("/v1/purchase", body)
return PurchaseSession(
invoice_id=raw["invoice_id"],
btcpay_invoice_id=raw["btcpay_invoice_id"],
checkout_url=raw["checkout_url"],
amount_sats=raw["amount_sats"],
base_price_sats=raw.get("base_price_sats", raw["amount_sats"]),
discount_applied_sats=raw.get("discount_applied_sats", 0),
poll_url=raw["poll_url"],
)
def poll_purchase(self, invoice_id: str) -> PollResponse:
raw = self._get(f"/v1/purchase/{invoice_id}")
return PollResponse(
invoice_id=raw["invoice_id"],
status=raw["status"],
product_id=raw["product_id"],
amount_sats=raw["amount_sats"],
license_key=raw.get("license_key"),
license_id=raw.get("license_id"),
)
def wait_for_license(
self,
invoice_id: str,
interval_s: float = 5.0,
timeout_s: float | None = None,
) -> str:
"""Poll `poll_purchase` until the license_key is non-null.
Raises LicensingError on terminal invoice states (expired /
invalid) or timeout. Returns the license_key string on success.
"""
deadline = time.monotonic() + timeout_s if timeout_s else None
while True:
poll = self.poll_purchase(invoice_id)
if poll.license_key:
return poll.license_key
if poll.status in ("expired", "invalid"):
raise LicensingError(
"server_error", f"invoice ended in status {poll.status}"
)
if deadline is not None and time.monotonic() > deadline:
raise LicensingError(
"server_error", "timed out waiting for license issuance"
)
time.sleep(interval_s)
# ----- Free-license redemption -----
def redeem_free_license(
self,
product_slug: str,
code: str,
opts: RedeemFreeOptions | None = None,
) -> RedeemFreeResponse:
merged = opts or RedeemFreeOptions()
body = {
"product": product_slug,
"code": code,
"buyer_email": merged.buyer_email,
"buyer_note": merged.buyer_note,
}
body = {k: v for k, v in body.items() if v is not None}
raw = self._post("/v1/redeem", body)
return RedeemFreeResponse(
license_id=raw["license_id"],
license_key=raw["license_key"],
invoice_id=raw["invoice_id"],
redemption_id=raw["redemption_id"],
)
# ----- Internals -----
def _get(self, path: str) -> dict[str, Any]:
return self._request("GET", path, json_body=None)
def _post(self, path: str, body: dict[str, Any]) -> dict[str, Any]:
return self._request("POST", path, json_body=body)
def _request(self, method: str, path: str, json_body: dict[str, Any] | None) -> dict[str, Any]:
url = self._base + path
try:
resp = httpx.request(method, url, json=json_body, timeout=self._timeout)
except httpx.HTTPError as e:
raise LicensingError("network", f"{method} {url}: {e}") from e
if resp.status_code >= 500:
raise LicensingError(
"server_error",
f"{method} {url}: HTTP {resp.status_code}{resp.text[:200]}",
)
if resp.status_code >= 400:
raise LicensingError(
"server_error",
f"{method} {url}: HTTP {resp.status_code}{resp.text[:200]}",
)
try:
return resp.json()
except Exception as e:
raise LicensingError(
"server_error",
f"{method} {url}: response was not JSON: {e}",
) from e
def _to_machine_response(raw: dict[str, Any]) -> MachineResponse:
return MachineResponse(
ok=bool(raw.get("ok")),
reason=raw.get("reason"),
machine_id=raw.get("machine_id"),
active_count=raw.get("active_count"),
max_machines=raw.get("max_machines"),
)
+55
View File
@@ -0,0 +1,55 @@
"""Server public key loading and management."""
from __future__ import annotations
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from .errors import LicensingError
class PublicKey:
"""An Ed25519 public key issued by the Keysat server.
Holds the underlying `cryptography.Ed25519PublicKey` and exposes
convenience constructors. Pass an instance to `Verifier(...)`.
"""
def __init__(self, key: Ed25519PublicKey):
self._key = key
@classmethod
def from_pem(cls, pem: str | bytes) -> "PublicKey":
"""Load from a PEM string as returned by the server's `GET /v1/pubkey`.
Raises `LicensingError(kind="bad_format")` if the PEM is not a
valid Ed25519 public key.
"""
if isinstance(pem, str):
pem = pem.encode("utf-8")
try:
loaded = load_pem_public_key(pem)
except Exception as e:
raise LicensingError("bad_format", f"could not parse PEM: {e}") from e
if not isinstance(loaded, Ed25519PublicKey):
raise LicensingError(
"bad_format",
"PEM is not an Ed25519 public key (got "
f"{type(loaded).__name__})",
)
return cls(loaded)
@classmethod
def from_raw(cls, raw: bytes) -> "PublicKey":
"""Load from a 32-byte raw Ed25519 public key."""
if len(raw) != 32:
raise LicensingError(
"bad_format",
f"raw Ed25519 public key must be 32 bytes (got {len(raw)})",
)
return cls(Ed25519PublicKey.from_public_bytes(raw))
@property
def underlying(self) -> Ed25519PublicKey:
"""Access the underlying cryptography object (for advanced use)."""
return self._key
+82
View File
@@ -0,0 +1,82 @@
"""Offline signature verification — the bulk of the value of the SDK."""
from __future__ import annotations
import uuid
from dataclasses import dataclass
from cryptography.exceptions import InvalidSignature
from .errors import LicensingError
from .key import LicensePayload, parse_license_key
from .pubkey import PublicKey
@dataclass(frozen=True)
class VerifyOk:
"""Result of a successful offline `Verifier.verify()` call."""
license_id: uuid.UUID
product_id: uuid.UUID
issued_at: int
expires_at: int # 0 = perpetual
is_trial: bool
is_fingerprint_bound: bool
fingerprint_hash: bytes # 32 bytes; all-zero if unbound
entitlements: list[str]
payload: LicensePayload # the full parsed payload
class Verifier:
"""Offline license-key verifier.
Given the issuer's PEM-encoded public key, verifies the cryptographic
integrity of a license key string with no network access. Suitable
for boot-time license checks where the licensing server may be
unreachable.
Usage::
verifier = Verifier(PublicKey.from_pem(ISSUER_PUBKEY_PEM))
ok = verifier.verify(key_from_user)
# raises LicensingError on bad signature / bad format
For revocation and fingerprint binding, layer the online
`Client.validate(...)` on top of this — but only AFTER offline
verification has passed.
"""
def __init__(self, public_key: PublicKey):
self._pubkey = public_key
def verify(self, key: str) -> VerifyOk:
"""Verify a `LIC1-...-...` key.
On success, returns a `VerifyOk` with all parsed fields. On
failure, raises `LicensingError`:
- `kind="bad_format"`: the key string is malformed.
- `kind="bad_signature"`: signature didn't verify against
the issuer's public key (key was edited, fabricated, or
issued by a different server).
"""
parsed = parse_license_key(key)
try:
self._pubkey.underlying.verify(parsed.signature, parsed.payload_bytes)
except InvalidSignature as e:
raise LicensingError(
"bad_signature",
"signature did not verify against the issuer's public key",
) from e
p = parsed.payload
return VerifyOk(
license_id=p.license_id,
product_id=p.product_id,
issued_at=p.issued_at,
expires_at=p.expires_at,
is_trial=p.is_trial,
is_fingerprint_bound=p.is_fingerprint_bound,
fingerprint_hash=p.fingerprint_hash,
entitlements=list(p.entitlements),
payload=p,
)
+176
View File
@@ -0,0 +1,176 @@
"""Cross-check the Python SDK against the canonical wire-format test
vectors at ../../tests/crosscheck/vector.json.
These vectors are also exercised by the Rust and TS SDKs. Any new SDK
must pass every fixture in vector.json — that's how we guarantee
wire-format compatibility across languages.
Run with: `pytest -q` from the package root, OR `python -m pytest`.
"""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from keysat_licensing_client import (
PublicKey,
Verifier,
LicensingError,
has_entitlement,
hash_fingerprint,
is_expired_at,
parse_license_key,
)
# Locate vector.json relative to this file. The tests directory lives
# at licensing-client-python/tests/, the vector lives at
# tests/crosscheck/vector.json (repo root). Walk up until we find it.
def _vectors_path() -> Path:
here = Path(__file__).resolve().parent
for ancestor in [here, *here.parents]:
candidate = ancestor / "tests" / "crosscheck" / "vector.json"
if candidate.exists():
return candidate
pytest.skip("crosscheck vector.json not found; run from repo with tests/ tree")
@pytest.fixture(scope="module")
def vectors() -> dict:
with _vectors_path().open() as f:
return json.load(f)
@pytest.fixture(scope="module")
def verifier(vectors: dict) -> Verifier:
return Verifier(PublicKey.from_pem(vectors["publicKeyPem"]))
# ------------------------------------------------------------------
# v1 fixture: legacy fixed-74 layout, fingerprint-bound, no expiry,
# no entitlements.
# ------------------------------------------------------------------
def test_v1_parses(vectors: dict) -> None:
parsed = parse_license_key(vectors["v1"]["licenseKey"])
exp = vectors["v1"]["expected"]
assert parsed.payload.version == exp["version"]
assert str(parsed.payload.product_id) == exp["productUuid"]
assert str(parsed.payload.license_id) == exp["licenseUuid"]
assert parsed.payload.issued_at == exp["issuedAt"]
assert parsed.payload.expires_at == exp["expiresAt"]
assert parsed.payload.flags == exp["flags"]
assert parsed.payload.is_fingerprint_bound is exp["isFingerprintBound"]
assert parsed.payload.is_trial is exp["isTrial"]
assert parsed.payload.entitlements == exp["entitlements"]
assert parsed.payload.fingerprint_hash.hex() == exp["fingerprintHashHex"]
def test_v1_verifies(verifier: Verifier, vectors: dict) -> None:
ok = verifier.verify(vectors["v1"]["licenseKey"])
assert str(ok.product_id) == vectors["v1"]["expected"]["productUuid"]
def test_v1_tamper_detected(verifier: Verifier, vectors: dict) -> None:
key = vectors["v1"]["licenseKey"]
# Flip one char in the payload section. The signature won't match.
payload_start = key.index("-") + 1
tampered = key[:payload_start] + ("B" if key[payload_start] != "B" else "C") + key[payload_start + 1 :]
with pytest.raises(LicensingError) as excinfo:
verifier.verify(tampered)
assert excinfo.value.kind in ("bad_signature", "bad_format")
# ------------------------------------------------------------------
# v2 fixture: trial, fingerprint-bound, explicit expiry, two entitlements.
# Stresses the variable-length tail parser.
# ------------------------------------------------------------------
def test_v2_parses(vectors: dict) -> None:
parsed = parse_license_key(vectors["v2"]["licenseKey"])
exp = vectors["v2"]["expected"]
assert parsed.payload.version == exp["version"]
assert str(parsed.payload.product_id) == exp["productUuid"]
assert str(parsed.payload.license_id) == exp["licenseUuid"]
assert parsed.payload.issued_at == exp["issuedAt"]
assert parsed.payload.expires_at == exp["expiresAt"]
assert parsed.payload.flags == exp["flags"]
assert parsed.payload.is_fingerprint_bound is exp["isFingerprintBound"]
assert parsed.payload.is_trial is exp["isTrial"]
assert parsed.payload.entitlements == exp["entitlements"]
def test_v2_verifies(verifier: Verifier, vectors: dict) -> None:
ok = verifier.verify(vectors["v2"]["licenseKey"])
assert ok.is_trial
assert ok.is_fingerprint_bound
assert len(ok.entitlements) == len(vectors["v2"]["expected"]["entitlements"])
def test_v2_expiry_boundary(vectors: dict) -> None:
parsed = parse_license_key(vectors["v2"]["licenseKey"])
expires_at = parsed.payload.expires_at
assert is_expired_at(parsed.payload, expires_at) is True
assert is_expired_at(parsed.payload, expires_at - 1) is False
def test_v2_entitlements(vectors: dict) -> None:
parsed = parse_license_key(vectors["v2"]["licenseKey"])
for slug in vectors["v2"]["expected"]["entitlements"]:
assert has_entitlement(parsed.payload, slug)
assert has_entitlement(parsed.payload, "definitely-not-a-real-entitlement") is False
# ------------------------------------------------------------------
# v2_perpetual_unbound — common case for paid purchase: v2, no expiry,
# no fingerprint binding, no entitlements.
# ------------------------------------------------------------------
def test_v2_perpetual_unbound_parses(vectors: dict) -> None:
if "v2_perpetual_unbound" not in vectors:
pytest.skip("vector.json doesn't include v2_perpetual_unbound")
parsed = parse_license_key(vectors["v2_perpetual_unbound"]["licenseKey"])
assert parsed.payload.version == 2
assert parsed.payload.expires_at == 0
assert parsed.payload.is_fingerprint_bound is False
def test_v2_perpetual_unbound_verifies(verifier: Verifier, vectors: dict) -> None:
if "v2_perpetual_unbound" not in vectors:
pytest.skip("vector.json doesn't include v2_perpetual_unbound")
verifier.verify(vectors["v2_perpetual_unbound"]["licenseKey"])
# ------------------------------------------------------------------
# Cross-language fingerprint-hash compatibility.
# ------------------------------------------------------------------
def test_hash_fingerprint_matches_python_stdlib() -> None:
import hashlib
raw = "hello"
expected = hashlib.sha256(raw.encode()).hexdigest()
assert hash_fingerprint(raw) == expected
# ------------------------------------------------------------------
# Negative cases.
# ------------------------------------------------------------------
def test_bad_format_short_key() -> None:
with pytest.raises(LicensingError) as excinfo:
parse_license_key("notakey")
assert excinfo.value.kind == "bad_format"
def test_bad_format_wrong_prefix() -> None:
with pytest.raises(LicensingError) as excinfo:
parse_license_key("LIC9-AAAA-BBBB")
assert excinfo.value.kind == "bad_format"