From fa81aba944d8dc0040b5c542e11b6b5eb8e70181 Mon Sep 17 00:00:00 2001 From: Stefan Schmidt-Egermann Date: Sat, 25 Apr 2026 12:26:03 +0200 Subject: [PATCH] feat: initial python content (v0.1.0) - Installierbares Package mit pyproject.toml - Client-Klasse mit Sync, Async (mit Polling), Webhook, Verify, Download - Typisierte Exception-Hierarchie - Webhook-Signatur-Verifikation (HMAC-SHA-256) - Pytest-Suite + Quickstart und Webhook-Receiver-Beispiel --- LICENSE | 20 +- README.md | 138 ++++++++++---- SECURITY.md | 6 - examples/quickstart.py | 56 ++++++ examples/webhook_receiver.py | 60 ++++++ hightrusted_capture/__init__.py | 37 ++++ hightrusted_capture/client.py | 322 ++++++++++++++++++++++++++++++++ hightrusted_capture/errors.py | 87 +++++++++ hightrusted_capture/webhooks.py | 48 +++++ pyproject.toml | 69 +++++++ tests/__init__.py | 0 tests/test_client.py | 181 ++++++++++++++++++ 12 files changed, 974 insertions(+), 50 deletions(-) create mode 100644 examples/quickstart.py create mode 100644 examples/webhook_receiver.py create mode 100644 hightrusted_capture/__init__.py create mode 100644 hightrusted_capture/client.py create mode 100644 hightrusted_capture/errors.py create mode 100644 hightrusted_capture/webhooks.py create mode 100644 pyproject.toml create mode 100644 tests/__init__.py create mode 100644 tests/test_client.py diff --git a/LICENSE b/LICENSE index c7ebe92..ed12564 100644 --- a/LICENSE +++ b/LICENSE @@ -1,9 +1,21 @@ MIT License -Copyright (c) 2026 hightrusted +Copyright (c) 2026 hightrusted GmbH -Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: -The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md index b03527d..b6b9d67 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,10 @@ in der Verarbeitungskette. Quelloffen unter MIT-Lizenz. ## Was die CAPTURE API tut Sie nimmt eine URL entgegen, rendert die Seite vollständig (inkl. JavaScript), -liefert sie als PDF/A zurück und versieht das Ergebnis mit einem qualifizierten +liefert sie als PDF/A-3 zurück und versieht das Ergebnis mit einem qualifizierten Zeitstempel. Das Ergebnis ist gerichtsverwertbar und Jahre später noch verifizierbar — auch nachdem die Original-Seite längst offline ist. -Anwendungsfälle: Markenrechtsverletzungen dokumentieren, Auftragsbedingungen -zum Buchungszeitpunkt sichern, Compliance-Nachweise für Behörden, Beweissicherung -durch Anwälte und Sachverständige. - ## Installation ```bash @@ -35,17 +31,101 @@ client = Client(api_key="ht_live_...") # Synchron — wartet bis zu 30 s auf das fertige PDF capture = client.capture(url="https://example.com") -print(capture.id) -print(capture.verify_url) -print(capture.timestamp.issued_at) +print(capture["id"]) +print(capture["verify_url"]) +print(capture["timestamp"]["issued_at"]) # PDF herunterladen -capture.download("./beweis.pdf") +client.download_pdf(capture["id"], "./beweis.pdf") ``` -## Authentifizierung +## Drei Aufruf-Modi -Bearer-Token mit API-Key. Key-Erzeugung im Dashboard: +### Synchron (Default) +```python +capture = client.capture(url="https://example.com") +# Blocks bis zu 30 s, gibt fertige Capture zurück +``` + +### Asynchron mit Polling +```python +capture = client.capture_async( + url="https://example.com", + reference="case-001", + wait_for_ready=True, # Default — pollt bis ready + max_wait=60.0, +) +``` + +### Webhook +```python +job = client.capture_webhook( + url="https://example.com", + webhook_url="https://your-app.tld/webhooks/capture", + reference="case-001", +) +# job["status"] == "queued" +# Server liefert das fertige Capture per HTTP-POST aus +``` + +## Verifikation (kostenlos, ohne Quota) + +```python +# Per Capture-ID +result = client.verify(source="cap_...") + +# Per Verify-URL +result = client.verify(source="https://verify.hightrusted.net/c/...") + +# Per PDF-Upload (Datei oder File-Objekt) +result = client.verify(pdf="./beweis.pdf") + +print(result["valid"]) # True +print(result["audit_log"]["chain_valid"]) # True +``` + +## Webhook-Signatur prüfen + +```python +from flask import Flask, request +from hightrusted_capture import verify_webhook_signature + +@app.post("/webhooks/capture") +def webhook(): + body = request.get_data() # raw bytes! + sig = request.headers.get("X-Hightrusted-Signature", "") + if not verify_webhook_signature(body, sig, "wh_secret_..."): + return {"error": "invalid_signature"}, 401 + payload = json.loads(body) + # ... +``` + +## Fehler-Behandlung + +```python +from hightrusted_capture import ( + Client, + InvalidApiKeyError, + QuotaExceededError, + RateLimitedError, + UnreachableUrlError, +) + +try: + capture = client.capture(url="https://example.com") +except InvalidApiKeyError: + print("API-Key ungültig") +except QuotaExceededError: + print("Monats-Quota erschöpft") +except RateLimitedError as e: + print(f"Zu viele Anfragen, retry in {e.retry_after_seconds}s") +except UnreachableUrlError: + print("Quelle nicht erreichbar") +``` + +## API-Key + +Bearer-Token mit API-Key. Erzeugen unter https://capture.hightrusted.net/dashboard/api-keys ```python @@ -53,32 +133,10 @@ client = Client(api_key="ht_live_...") # alternativ via Umgebungsvariable HIGHTRUSTED_API_KEY ``` -## Asynchrone Captures + Webhooks - -```python -job = client.capture_async( - url="https://example.com", - webhook_url="https://your-app.tld/webhooks/capture", -) -# job.status == "queued" - -# später, sobald der Webhook capture.ready geliefert hat: -capture = client.get(job.id) -capture.download("./beweis.pdf") -``` - -## Verify - -```python -result = client.verify(capture_id="cap_...") -print(result.valid) # True -print(result.timestamp) # 2026-04-25T11:29:40Z -``` - ## Rate Limits -Limits werden pro API-Key gemessen. Bei Überschreitung: HTTP 429 mit -`Retry-After`-Header. Das SDK respektiert den Header automatisch und retried. +Limits pro API-Key. Bei Überschreitung: HTTP 429 mit `Retry-After`-Header. +Das SDK respektiert den Header automatisch (`max_retries` ist Default 3). | Plan | req/min | Calls/Monat | |-----------|---------|-------------| @@ -104,9 +162,9 @@ pytest ## Roadmap -- [ ] v0.1 — Basis-Client (sync + async), Verify, Download -- [ ] v0.2 — Retry-Logik, Webhook-Verifikation, Type-Hints vollständig -- [ ] v0.3 — Async/Await-Variante (`httpx`) +- [x] v0.1 — Basis-Client (sync + async + webhook), Verify, Download, typisierte Errors +- [ ] v0.2 — Async/Await-Variante mit `httpx`, retry-after-Helper +- [ ] v0.3 — Streaming für große PDFs - [ ] v1.0 — Stabile API, semantische Versionierung ## Verwandte Repositorys @@ -121,8 +179,8 @@ pytest **Plattform-übergreifend** ([`hightrusted`](https://git.hightrusted.net/hightrusted)): -- [`platform`](https://git.hightrusted.net/hightrusted/platform) — Plattform-Übersicht, Architektur, Produkt-Liste -- [`developer-portal`](https://git.hightrusted.net/hightrusted/developer-portal) — gemeinsame Konventionen, Auth, Errors, Rate-Limits +- [`platform`](https://git.hightrusted.net/hightrusted/platform) — Plattform-Übersicht +- [`developer-portal`](https://git.hightrusted.net/hightrusted/developer-portal) — gemeinsame Konventionen - [`compliance`](https://git.hightrusted.net/hightrusted/compliance) — DSGVO, AGB-Templates, Whitepaper ## Support diff --git a/SECURITY.md b/SECURITY.md index 9ed887a..63ed5b2 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -25,9 +25,3 @@ Während der `v0.x`-Phase werden nur die jeweils aktuellste Minor-Version und deren letzte zwei Patch-Versionen aktiv mit Sicherheits-Updates versorgt. Ab `v1.0` gilt: aktuelle Major + vorherige Major (12 Monate Übergangsfrist). - -## Out of Scope - -Diese Policy gilt für die SDKs in diesem Repository und die zugehörige -hightrusted CAPTURE API. Für Schwachstellen in anderen hightrusted-Produkten -(SIGN, ID, MEET, PAY, …) gilt jeweils die dortige `SECURITY.md`. diff --git a/examples/quickstart.py b/examples/quickstart.py new file mode 100644 index 0000000..dee24e6 --- /dev/null +++ b/examples/quickstart.py @@ -0,0 +1,56 @@ +"""hightrusted CAPTURE — Quickstart. + +Voraussetzung: + pip install hightrusted-capture + +API-Key holen: + https://capture.hightrusted.net/dashboard/api-keys +""" + +import os + +from hightrusted_capture import Client, RateLimitedError + +API_KEY = os.environ.get("HIGHTRUSTED_API_KEY", "ht_live_REPLACE_ME") + +client = Client(api_key=API_KEY) + +# ─────────────────────────────────────────────────────────────────── +# 1. Synchrone Capture +# ─────────────────────────────────────────────────────────────────── +print("→ Erstelle Capture (synchron)...") +capture = client.capture( + "https://example.com", + reference="quickstart-demo", + viewport={"width": 1920, "height": 1080}, +) +print(f" ID: {capture['id']}") +print(f" Status: {capture['status']}") +print(f" Verify-URL: {capture['verify_url']}") +print(f" Zeitstempel: {capture['timestamp']['issued_at']}") + +# ─────────────────────────────────────────────────────────────────── +# 2. Verifikation (kostenlos) +# ─────────────────────────────────────────────────────────────────── +print("\n→ Verifiziere die Capture...") +result = client.verify(source=capture["id"]) +print(f" Gültig: {result['valid']}") +print(f" Audit-Log: {result['audit_log']['chain_valid']}") + +# ─────────────────────────────────────────────────────────────────── +# 3. PDF herunterladen +# ─────────────────────────────────────────────────────────────────── +print("\n→ Lade PDF herunter...") +pdf_path = client.download_pdf(capture["id"], f"./capture_{capture['id'][:8]}.pdf") +print(f" Gespeichert: {pdf_path}") + +# ─────────────────────────────────────────────────────────────────── +# 4. Quota prüfen +# ─────────────────────────────────────────────────────────────────── +print("\n→ Quota-Status...") +try: + usage = client.usage() + print(f" Plan: {usage['plan']}") + print(f" Verbraucht: {usage['used_calls']}/{usage['included_calls']}") +except RateLimitedError as e: + print(f" Rate-limited, retry in {e.retry_after_seconds}s") diff --git a/examples/webhook_receiver.py b/examples/webhook_receiver.py new file mode 100644 index 0000000..ceecf74 --- /dev/null +++ b/examples/webhook_receiver.py @@ -0,0 +1,60 @@ +"""Webhook-Empfänger für hightrusted CAPTURE — minimaler Flask-Server. + +Voraussetzung: + pip install hightrusted-capture flask + +Lauf: + export HIGHTRUSTED_WEBHOOK_SECRET=wh_secret_... + python examples/webhook_receiver.py + +Capture mit Webhook anlegen: + client.capture_webhook( + url="https://example.com", + webhook_url="https://your-domain.tld/webhooks/capture", + reference="case-001", + ) +""" + +import json +import os + +from flask import Flask, request + +from hightrusted_capture import verify_webhook_signature + +WEBHOOK_SECRET = os.environ["HIGHTRUSTED_WEBHOOK_SECRET"] + +app = Flask(__name__) + + +@app.post("/webhooks/capture") +def capture_webhook(): + body = request.get_data() # raw bytes — wichtig für HMAC! + signature = request.headers.get("X-Hightrusted-Signature", "") + + if not verify_webhook_signature(body, signature, WEBHOOK_SECRET): + app.logger.warning("Webhook signature mismatch") + return {"error": "invalid_signature"}, 401 + + payload = json.loads(body) + event = payload["event"] + capture = payload["capture"] + + if event == "capture.ready": + capture_id = capture["id"] + verify_url = capture["verify_url"] + reference = capture.get("reference", "") + app.logger.info( + "Capture %s ready (ref=%s, verify=%s)", + capture_id, reference, verify_url, + ) + # → hier: PDF herunterladen, ins Archiv legen, Mandant benachrichtigen, ... + + elif event == "capture.failed": + app.logger.error("Capture failed: %s", payload.get("error")) + + return {"ok": True}, 200 + + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=8000) diff --git a/hightrusted_capture/__init__.py b/hightrusted_capture/__init__.py new file mode 100644 index 0000000..b4e2b17 --- /dev/null +++ b/hightrusted_capture/__init__.py @@ -0,0 +1,37 @@ +"""hightrusted CAPTURE — Python SDK. + +Forensische Web-Captures mit qualifiziertem Zeitstempel nach RFC 3161 / eIDAS Art. 41. + +Quickstart: + >>> from hightrusted_capture import Client + >>> client = Client(api_key="ht_live_...") + >>> capture = client.capture(url="https://example.com") + >>> print(capture["verify_url"]) + +Dokumentation: https://capture.hightrusted.net/api/docs +""" + +from hightrusted_capture.client import Client +from hightrusted_capture.errors import ( + CaptureNotFoundError, + HightrustedError, + InvalidApiKeyError, + QuotaExceededError, + RateLimitedError, + UnreachableUrlError, +) +from hightrusted_capture.webhooks import verify_webhook_signature + +__version__ = "0.1.0" + +__all__ = [ + "Client", + "HightrustedError", + "InvalidApiKeyError", + "QuotaExceededError", + "RateLimitedError", + "CaptureNotFoundError", + "UnreachableUrlError", + "verify_webhook_signature", + "__version__", +] diff --git a/hightrusted_capture/client.py b/hightrusted_capture/client.py new file mode 100644 index 0000000..de73b39 --- /dev/null +++ b/hightrusted_capture/client.py @@ -0,0 +1,322 @@ +"""hightrusted CAPTURE Client.""" + +from __future__ import annotations + +import os +import time +from pathlib import Path +from typing import Any, BinaryIO + +import requests + +from hightrusted_capture.errors import ( + CaptureNotReadyError, + HightrustedError, + RateLimitedError, + error_for_code, +) + +DEFAULT_BASE_URL = "https://capture.hightrusted.net/api/v1" +DEFAULT_TIMEOUT = 35 +DEFAULT_USER_AGENT = "hightrusted-capture-python/0.1.0" + + +class Client: + """Synchroner Client für die hightrusted CAPTURE API. + + Args: + api_key: API-Key. Wenn nicht gesetzt, wird `HIGHTRUSTED_API_KEY` aus der + Umgebung gelesen. + base_url: API-Endpoint, default `https://capture.hightrusted.net/api/v1`. + timeout: HTTP-Timeout in Sekunden, default 35. + max_retries: Anzahl automatischer Retries bei 429 / 5xx, default 3. + + Beispiel:: + + from hightrusted_capture import Client + + client = Client(api_key="ht_live_...") + capture = client.capture(url="https://example.com") + print(capture["verify_url"]) + """ + + def __init__( + self, + api_key: str | None = None, + *, + base_url: str = DEFAULT_BASE_URL, + timeout: int = DEFAULT_TIMEOUT, + max_retries: int = 3, + ) -> None: + key = api_key or os.environ.get("HIGHTRUSTED_API_KEY") + if not key: + raise ValueError( + "API-Key fehlt. Setze entweder den Parameter `api_key` oder die " + "Umgebungsvariable HIGHTRUSTED_API_KEY." + ) + self.api_key = key + self.base_url = base_url.rstrip("/") + self.timeout = timeout + self.max_retries = max_retries + + self._session = requests.Session() + self._session.headers.update( + { + "Authorization": f"Bearer {self.api_key}", + "User-Agent": DEFAULT_USER_AGENT, + } + ) + + # ────────────────────────────────────────────────────────────────── + # Public API + # ────────────────────────────────────────────────────────────────── + def capture( + self, + url: str, + *, + reference: str | None = None, + viewport: dict[str, int] | None = None, + wait_until: str | None = None, + full_page: bool | None = None, + co_branding: dict[str, str] | None = None, + ) -> dict[str, Any]: + """Synchrones Capture — wartet bis zu 30 s auf das fertige PDF.""" + body = self._build_capture_body( + url=url, + mode=None, # sync = default + reference=reference, + viewport=viewport, + wait_until=wait_until, + full_page=full_page, + co_branding=co_branding, + ) + return self._request("POST", "/captures", json=body) + + def capture_async( + self, + url: str, + *, + reference: str | None = None, + wait_for_ready: bool = True, + poll_interval: float = 2.0, + max_wait: float = 60.0, + **kwargs: Any, + ) -> dict[str, Any]: + """Asynchrones Capture mit optionalem Polling. + + Wenn `wait_for_ready=True` (Default), wird gepollt bis das Capture fertig + ist oder `max_wait` Sekunden um sind. Setze `wait_for_ready=False`, um nur + die Queue-Bestätigung zurückzubekommen. + """ + body = self._build_capture_body(url=url, mode="async", reference=reference, **kwargs) + queued = self._request("POST", "/captures", json=body) + + if not wait_for_ready: + return queued + + capture_id = queued["id"] + deadline = time.monotonic() + max_wait + time.sleep(min(3.0, poll_interval)) # erstes Polling nach 3 s + + while time.monotonic() < deadline: + detail = self.get(capture_id) + if detail["status"] == "ready": + return detail + if detail["status"] == "failed": + raise HightrustedError( + f"Capture failed: {detail.get('error', 'unknown')}", + code="capture_failed", + raw=detail, + ) + time.sleep(poll_interval) + + raise TimeoutError(f"Capture {capture_id} nicht fertig nach {max_wait}s") + + def capture_webhook( + self, + url: str, + webhook_url: str, + *, + reference: str | None = None, + **kwargs: Any, + ) -> dict[str, Any]: + """Capture starten, Server liefert das Ergebnis per HTTP-POST aus.""" + body = self._build_capture_body( + url=url, + mode="webhook", + webhook_url=webhook_url, + reference=reference, + **kwargs, + ) + return self._request("POST", "/captures", json=body) + + def get(self, capture_id: str) -> dict[str, Any]: + """Status / Detail einer einzelnen Capture.""" + return self._request("GET", f"/captures/{capture_id}") + + def list( + self, + *, + status: str | None = None, + reference: str | None = None, + limit: int = 25, + cursor: str | None = None, + ) -> dict[str, Any]: + """Captures auflisten, paginiert per Cursor.""" + params: dict[str, Any] = {"limit": limit} + if status: + params["status"] = status + if reference: + params["reference"] = reference + if cursor: + params["cursor"] = cursor + return self._request("GET", "/captures", params=params) + + def download_pdf(self, capture_id: str, target_path: str | Path) -> Path: + """Lädt das PDF einer fertigen Capture lokal herunter.""" + url = f"{self.base_url}/captures/{capture_id}/pdf" + target = Path(target_path) + + with self._session.get(url, timeout=self.timeout, stream=True) as resp: + self._raise_for_error(resp) + with target.open("wb") as f: + for chunk in resp.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + return target + + def verify( + self, + source: str | None = None, + *, + pdf: str | Path | BinaryIO | None = None, + ) -> dict[str, Any]: + """Verifiziert ein Capture per ID, Verify-URL oder PDF-Upload. + + Args: + source: Capture-ID oder Verify-URL. + pdf: Pfad zu einer PDF-Datei oder offenes File-Objekt. + + Genau einer von `source` oder `pdf` muss gesetzt sein. + """ + if (source is None) == (pdf is None): + raise ValueError("Genau einer von `source` oder `pdf` muss gesetzt sein.") + + url = f"{self.base_url}/verify" + + if source is not None: + resp = self._session.post( + url, json={"source": source}, timeout=self.timeout + ) + else: + assert pdf is not None # mypy + if isinstance(pdf, (str, Path)): + with open(pdf, "rb") as f: + resp = self._session.post(url, files={"pdf": f}, timeout=self.timeout) + else: + resp = self._session.post(url, files={"pdf": pdf}, timeout=self.timeout) + + self._raise_for_error(resp) + return resp.json() + + def usage(self) -> dict[str, Any]: + """Aktueller Verbrauch und Restkontingent.""" + return self._request("GET", "/usage") + + # ────────────────────────────────────────────────────────────────── + # Internals + # ────────────────────────────────────────────────────────────────── + def _build_capture_body( + self, + *, + url: str, + mode: str | None = None, + webhook_url: str | None = None, + reference: str | None = None, + viewport: dict[str, int] | None = None, + wait_until: str | None = None, + full_page: bool | None = None, + co_branding: dict[str, str] | None = None, + ) -> dict[str, Any]: + body: dict[str, Any] = {"url": url} + if mode is not None: + body["mode"] = mode + if webhook_url is not None: + body["webhook_url"] = webhook_url + if reference is not None: + body["reference"] = reference + if viewport is not None: + body["viewport"] = viewport + if wait_until is not None: + body["wait_until"] = wait_until + if full_page is not None: + body["full_page"] = full_page + if co_branding is not None: + body["co_branding"] = co_branding + return body + + def _request( + self, + method: str, + path: str, + *, + json: dict[str, Any] | None = None, + params: dict[str, Any] | None = None, + ) -> dict[str, Any]: + url = f"{self.base_url}{path}" + + attempt = 0 + while True: + resp = self._session.request( + method, url, json=json, params=params, timeout=self.timeout + ) + + if resp.status_code == 429 and attempt < self.max_retries: + retry_after = int(resp.headers.get("Retry-After", "1")) + time.sleep(retry_after) + attempt += 1 + continue + + if 500 <= resp.status_code < 600 and attempt < self.max_retries: + time.sleep(2**attempt) + attempt += 1 + continue + + self._raise_for_error(resp) + return resp.json() + + def _raise_for_error(self, resp: requests.Response) -> None: + if resp.ok: + return + + try: + payload = resp.json() + err = payload.get("error", {}) if isinstance(payload, dict) else {} + code = err.get("code", "unknown_error") + message = err.get("message", resp.text or resp.reason) + request_id = err.get("request_id") + except ValueError: + code = "unknown_error" + message = resp.text or resp.reason or "Unknown error" + request_id = None + payload = None + + exc_cls = error_for_code(code) + kwargs: dict[str, Any] = { + "code": code, + "request_id": request_id, + "status_code": resp.status_code, + "raw": payload, + } + if exc_cls is RateLimitedError: + try: + kwargs["retry_after_seconds"] = int(resp.headers.get("Retry-After", "0")) + except ValueError: + kwargs["retry_after_seconds"] = None + + # CaptureNotReadyError ist 409 — Special-Case + if resp.status_code == 409 and code == "capture_not_ready": + raise CaptureNotReadyError(message, **kwargs) + + raise exc_cls(message, **kwargs) diff --git a/hightrusted_capture/errors.py b/hightrusted_capture/errors.py new file mode 100644 index 0000000..591c9c4 --- /dev/null +++ b/hightrusted_capture/errors.py @@ -0,0 +1,87 @@ +"""Typisierte Exception-Hierarchie für SDK-Konsumenten.""" + +from __future__ import annotations + +from typing import Any + + +class HightrustedError(Exception): + """Basisklasse für alle hightrusted CAPTURE Fehler.""" + + def __init__( + self, + message: str, + *, + code: str | None = None, + request_id: str | None = None, + status_code: int | None = None, + raw: dict[str, Any] | None = None, + ) -> None: + super().__init__(message) + self.message = message + self.code = code + self.request_id = request_id + self.status_code = status_code + self.raw = raw + + def __str__(self) -> str: + parts = [self.message] + if self.code: + parts.append(f"code={self.code}") + if self.request_id: + parts.append(f"request_id={self.request_id}") + return " | ".join(parts) + + +class InvalidApiKeyError(HightrustedError): + """HTTP 401 — invalid_api_key""" + + +class QuotaExceededError(HightrustedError): + """HTTP 402 — quota_exceeded""" + + +class CaptureNotFoundError(HightrustedError): + """HTTP 404 — capture_not_found""" + + +class CaptureNotReadyError(HightrustedError): + """HTTP 409 — capture_not_ready""" + + +class UnreachableUrlError(HightrustedError): + """HTTP 422 — unreachable_url""" + + +class RateLimitedError(HightrustedError): + """HTTP 429 — rate_limited. + + Attribut `retry_after_seconds` enthält den Wert aus dem `Retry-After`-Header. + """ + + def __init__(self, *args: Any, retry_after_seconds: int | None = None, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.retry_after_seconds = retry_after_seconds + + +class TsaUnavailableError(HightrustedError): + """HTTP 503 — tsa_unavailable""" + + +# ───────────────────────────────────────────────────────────────────── +# Code → Exception Mapping +# ───────────────────────────────────────────────────────────────────── +_ERROR_CODE_MAP: dict[str, type[HightrustedError]] = { + "invalid_api_key": InvalidApiKeyError, + "quota_exceeded": QuotaExceededError, + "capture_not_found": CaptureNotFoundError, + "capture_not_ready": CaptureNotReadyError, + "unreachable_url": UnreachableUrlError, + "rate_limited": RateLimitedError, + "tsa_unavailable": TsaUnavailableError, +} + + +def error_for_code(code: str) -> type[HightrustedError]: + """Mappt einen Error-Code auf die passende Exception-Klasse.""" + return _ERROR_CODE_MAP.get(code, HightrustedError) diff --git a/hightrusted_capture/webhooks.py b/hightrusted_capture/webhooks.py new file mode 100644 index 0000000..32756b9 --- /dev/null +++ b/hightrusted_capture/webhooks.py @@ -0,0 +1,48 @@ +"""Webhook-Signatur-Verifikation für hightrusted CAPTURE.""" + +from __future__ import annotations + +import hashlib +import hmac + + +def verify_webhook_signature(body: bytes, signature_header: str, secret: str) -> bool: + """Prüft die HMAC-SHA-256-Signatur eines eingehenden Webhooks. + + Der Header `X-Hightrusted-Signature` hat das Format:: + + sha256= + + Das Webhook-Secret wird im Dashboard pro API-Key konfiguriert. + + Args: + body: Roh-Body des Requests (bytes!), nicht den geparsten JSON. + signature_header: Wert des Headers `X-Hightrusted-Signature`. + secret: Dein Webhook-Secret aus dem Dashboard. + + Returns: + True wenn Signatur valid, False sonst. + + Beispiel:: + + from hightrusted_capture import verify_webhook_signature + + @app.post("/webhooks/capture") + def webhook(request): + body = request.body # raw bytes! + sig = request.headers["X-Hightrusted-Signature"] + if not verify_webhook_signature(body, sig, "wh_secret_..."): + return 401 + payload = json.loads(body) + ... + """ + if not signature_header or not secret or not body: + return False + + expected = "sha256=" + hmac.new( + secret.encode("utf-8"), + body, + hashlib.sha256, + ).hexdigest() + + return hmac.compare_digest(expected, signature_header) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9936896 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,69 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "hightrusted-capture" +version = "0.1.0" +description = "Python SDK für die hightrusted CAPTURE API — forensische Web-Captures mit qualifiziertem Zeitstempel" +readme = "README.md" +requires-python = ">=3.9" +license = { text = "MIT" } +authors = [ + { name = "hightrusted GmbH", email = "developers@hightrusted.net" } +] +keywords = ["capture", "screenshot", "rfc3161", "timestamp", "eidas", "forensic", "evidence"] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Intended Audience :: Legal Industry", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: Internet :: WWW/HTTP", + "Topic :: Office/Business", + "Topic :: Security :: Cryptography", +] +dependencies = [ + "requests>=2.31", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-cov>=4.1", + "responses>=0.25", + "ruff>=0.5", + "mypy>=1.10", + "types-requests", +] + +[project.urls] +Homepage = "https://capture.hightrusted.net" +Documentation = "https://capture.hightrusted.net/api/docs" +Repository = "https://git.hightrusted.net/hightrusted-capture/python" +Changelog = "https://git.hightrusted.net/hightrusted-capture/python/src/branch/main/CHANGELOG.md" +"Bug Tracker" = "https://git.hightrusted.net/hightrusted-capture/python/issues" + +[tool.setuptools.packages.find] +include = ["hightrusted_capture*"] + +[tool.ruff] +line-length = 100 +target-version = "py39" + +[tool.ruff.lint] +select = ["E", "F", "W", "I", "B", "UP", "S", "C4"] +ignore = ["S101"] # asserts in tests sind ok + +[tool.ruff.lint.per-file-ignores] +"tests/*" = ["S"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +addopts = "-ra --strict-markers --cov=hightrusted_capture --cov-report=term-missing" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..d1d72d6 --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,181 @@ +"""Tests für hightrusted_capture.Client.""" + +from __future__ import annotations + +import json + +import pytest +import responses + +from hightrusted_capture import ( + Client, + InvalidApiKeyError, + QuotaExceededError, + RateLimitedError, + verify_webhook_signature, +) +from hightrusted_capture.client import DEFAULT_BASE_URL + + +@pytest.fixture +def client() -> Client: + return Client(api_key="ht_test_abc123", max_retries=0) + + +# ─────────────────────────────────────────────────────────────────── +# Capture +# ─────────────────────────────────────────────────────────────────── +@responses.activate +def test_capture_sync_returns_object(client: Client) -> None: + responses.add( + responses.POST, + f"{DEFAULT_BASE_URL}/captures", + json={ + "id": "550e8400-e29b-41d4-a716-446655440000", + "status": "ready", + "url": "https://example.com", + "verify_url": "https://verify.hightrusted.net/c/550e", + "created_at": "2026-04-25T14:32:08Z", + }, + status=200, + ) + + capture = client.capture("https://example.com") + assert capture["status"] == "ready" + assert capture["id"] == "550e8400-e29b-41d4-a716-446655440000" + + +@responses.activate +def test_capture_sends_authorization_header(client: Client) -> None: + responses.add( + responses.POST, + f"{DEFAULT_BASE_URL}/captures", + json={"id": "x", "status": "ready", "created_at": "2026-04-25T00:00:00Z"}, + status=200, + ) + client.capture("https://example.com") + assert responses.calls[0].request.headers["Authorization"] == "Bearer ht_test_abc123" + + +@responses.activate +def test_capture_with_reference_and_viewport(client: Client) -> None: + responses.add( + responses.POST, + f"{DEFAULT_BASE_URL}/captures", + json={"id": "x", "status": "ready", "created_at": "2026-04-25T00:00:00Z"}, + status=200, + ) + client.capture( + "https://example.com", + reference="case-001", + viewport={"width": 1920, "height": 1080}, + ) + body = json.loads(responses.calls[0].request.body) + assert body["reference"] == "case-001" + assert body["viewport"]["width"] == 1920 + + +# ─────────────────────────────────────────────────────────────────── +# Verify +# ─────────────────────────────────────────────────────────────────── +@responses.activate +def test_verify_by_id(client: Client) -> None: + responses.add( + responses.POST, + f"{DEFAULT_BASE_URL}/verify", + json={"valid": True, "capture_id": "550e"}, + status=200, + ) + result = client.verify(source="550e8400-e29b-41d4-a716-446655440000") + assert result["valid"] is True + + +def test_verify_requires_exactly_one_source(client: Client) -> None: + with pytest.raises(ValueError, match="Genau einer"): + client.verify() + with pytest.raises(ValueError, match="Genau einer"): + client.verify(source="x", pdf="y") + + +# ─────────────────────────────────────────────────────────────────── +# Errors +# ─────────────────────────────────────────────────────────────────── +@responses.activate +def test_invalid_api_key_raises(client: Client) -> None: + responses.add( + responses.POST, + f"{DEFAULT_BASE_URL}/captures", + json={"error": {"code": "invalid_api_key", "message": "Bad key", "request_id": "r1"}}, + status=401, + ) + with pytest.raises(InvalidApiKeyError) as exc_info: + client.capture("https://example.com") + assert exc_info.value.code == "invalid_api_key" + assert exc_info.value.request_id == "r1" + + +@responses.activate +def test_quota_exceeded_raises(client: Client) -> None: + responses.add( + responses.POST, + f"{DEFAULT_BASE_URL}/captures", + json={"error": {"code": "quota_exceeded", "message": "X", "request_id": "r1"}}, + status=402, + ) + with pytest.raises(QuotaExceededError): + client.capture("https://example.com") + + +@responses.activate +def test_rate_limit_includes_retry_after(client: Client) -> None: + responses.add( + responses.POST, + f"{DEFAULT_BASE_URL}/captures", + json={"error": {"code": "rate_limited", "message": "X", "request_id": "r1"}}, + status=429, + headers={"Retry-After": "12"}, + ) + with pytest.raises(RateLimitedError) as exc_info: + client.capture("https://example.com") + assert exc_info.value.retry_after_seconds == 12 + + +# ─────────────────────────────────────────────────────────────────── +# Webhook-Signatur +# ─────────────────────────────────────────────────────────────────── +def test_verify_webhook_signature_matches() -> None: + body = b'{"event":"capture.ready"}' + secret = "wh_secret_test" + # Erwartet: sha256=hmac(body, secret) + import hashlib + import hmac + + sig = "sha256=" + hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + assert verify_webhook_signature(body, sig, secret) is True + + +def test_verify_webhook_signature_rejects_wrong_secret() -> None: + body = b'{"event":"capture.ready"}' + bad_sig = "sha256=" + "0" * 64 + assert verify_webhook_signature(body, bad_sig, "secret") is False + + +def test_verify_webhook_signature_rejects_empty_inputs() -> None: + assert verify_webhook_signature(b"", "sha256=...", "secret") is False + assert verify_webhook_signature(b"x", "", "secret") is False + assert verify_webhook_signature(b"x", "sha256=...", "") is False + + +# ─────────────────────────────────────────────────────────────────── +# Konfiguration +# ─────────────────────────────────────────────────────────────────── +def test_client_requires_api_key(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("HIGHTRUSTED_API_KEY", raising=False) + with pytest.raises(ValueError, match="API-Key fehlt"): + Client() + + +def test_client_reads_api_key_from_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("HIGHTRUSTED_API_KEY", "ht_test_from_env") + client = Client() + assert client.api_key == "ht_test_from_env"