- Installierbares Package mit pyproject.toml - Client-Klasse mit Sync, Async (mit Polling), Webhook, Verify, Download - Typisierte Exception-Hierarchie - Webhook-Signatur-Verifikation (HMAC-SHA-256) - Pytest-Suite + Quickstart und Webhook-Receiver-Beispiel
322 lines
11 KiB
Python
322 lines
11 KiB
Python
"""hightrusted CAPTURE Client."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, BinaryIO
|
|
|
|
import requests
|
|
|
|
from hightrusted_capture.errors import (
|
|
CaptureNotReadyError,
|
|
HightrustedError,
|
|
RateLimitedError,
|
|
error_for_code,
|
|
)
|
|
|
|
DEFAULT_BASE_URL = "https://capture.hightrusted.net/api/v1"
|
|
DEFAULT_TIMEOUT = 35
|
|
DEFAULT_USER_AGENT = "hightrusted-capture-python/0.1.0"
|
|
|
|
|
|
class Client:
|
|
"""Synchroner Client für die hightrusted CAPTURE API.
|
|
|
|
Args:
|
|
api_key: API-Key. Wenn nicht gesetzt, wird `HIGHTRUSTED_API_KEY` aus der
|
|
Umgebung gelesen.
|
|
base_url: API-Endpoint, default `https://capture.hightrusted.net/api/v1`.
|
|
timeout: HTTP-Timeout in Sekunden, default 35.
|
|
max_retries: Anzahl automatischer Retries bei 429 / 5xx, default 3.
|
|
|
|
Beispiel::
|
|
|
|
from hightrusted_capture import Client
|
|
|
|
client = Client(api_key="ht_live_...")
|
|
capture = client.capture(url="https://example.com")
|
|
print(capture["verify_url"])
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
api_key: str | None = None,
|
|
*,
|
|
base_url: str = DEFAULT_BASE_URL,
|
|
timeout: int = DEFAULT_TIMEOUT,
|
|
max_retries: int = 3,
|
|
) -> None:
|
|
key = api_key or os.environ.get("HIGHTRUSTED_API_KEY")
|
|
if not key:
|
|
raise ValueError(
|
|
"API-Key fehlt. Setze entweder den Parameter `api_key` oder die "
|
|
"Umgebungsvariable HIGHTRUSTED_API_KEY."
|
|
)
|
|
self.api_key = key
|
|
self.base_url = base_url.rstrip("/")
|
|
self.timeout = timeout
|
|
self.max_retries = max_retries
|
|
|
|
self._session = requests.Session()
|
|
self._session.headers.update(
|
|
{
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
"User-Agent": DEFAULT_USER_AGENT,
|
|
}
|
|
)
|
|
|
|
# ──────────────────────────────────────────────────────────────────
|
|
# Public API
|
|
# ──────────────────────────────────────────────────────────────────
|
|
def capture(
|
|
self,
|
|
url: str,
|
|
*,
|
|
reference: str | None = None,
|
|
viewport: dict[str, int] | None = None,
|
|
wait_until: str | None = None,
|
|
full_page: bool | None = None,
|
|
co_branding: dict[str, str] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Synchrones Capture — wartet bis zu 30 s auf das fertige PDF."""
|
|
body = self._build_capture_body(
|
|
url=url,
|
|
mode=None, # sync = default
|
|
reference=reference,
|
|
viewport=viewport,
|
|
wait_until=wait_until,
|
|
full_page=full_page,
|
|
co_branding=co_branding,
|
|
)
|
|
return self._request("POST", "/captures", json=body)
|
|
|
|
def capture_async(
|
|
self,
|
|
url: str,
|
|
*,
|
|
reference: str | None = None,
|
|
wait_for_ready: bool = True,
|
|
poll_interval: float = 2.0,
|
|
max_wait: float = 60.0,
|
|
**kwargs: Any,
|
|
) -> dict[str, Any]:
|
|
"""Asynchrones Capture mit optionalem Polling.
|
|
|
|
Wenn `wait_for_ready=True` (Default), wird gepollt bis das Capture fertig
|
|
ist oder `max_wait` Sekunden um sind. Setze `wait_for_ready=False`, um nur
|
|
die Queue-Bestätigung zurückzubekommen.
|
|
"""
|
|
body = self._build_capture_body(url=url, mode="async", reference=reference, **kwargs)
|
|
queued = self._request("POST", "/captures", json=body)
|
|
|
|
if not wait_for_ready:
|
|
return queued
|
|
|
|
capture_id = queued["id"]
|
|
deadline = time.monotonic() + max_wait
|
|
time.sleep(min(3.0, poll_interval)) # erstes Polling nach 3 s
|
|
|
|
while time.monotonic() < deadline:
|
|
detail = self.get(capture_id)
|
|
if detail["status"] == "ready":
|
|
return detail
|
|
if detail["status"] == "failed":
|
|
raise HightrustedError(
|
|
f"Capture failed: {detail.get('error', 'unknown')}",
|
|
code="capture_failed",
|
|
raw=detail,
|
|
)
|
|
time.sleep(poll_interval)
|
|
|
|
raise TimeoutError(f"Capture {capture_id} nicht fertig nach {max_wait}s")
|
|
|
|
def capture_webhook(
|
|
self,
|
|
url: str,
|
|
webhook_url: str,
|
|
*,
|
|
reference: str | None = None,
|
|
**kwargs: Any,
|
|
) -> dict[str, Any]:
|
|
"""Capture starten, Server liefert das Ergebnis per HTTP-POST aus."""
|
|
body = self._build_capture_body(
|
|
url=url,
|
|
mode="webhook",
|
|
webhook_url=webhook_url,
|
|
reference=reference,
|
|
**kwargs,
|
|
)
|
|
return self._request("POST", "/captures", json=body)
|
|
|
|
def get(self, capture_id: str) -> dict[str, Any]:
|
|
"""Status / Detail einer einzelnen Capture."""
|
|
return self._request("GET", f"/captures/{capture_id}")
|
|
|
|
def list(
|
|
self,
|
|
*,
|
|
status: str | None = None,
|
|
reference: str | None = None,
|
|
limit: int = 25,
|
|
cursor: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Captures auflisten, paginiert per Cursor."""
|
|
params: dict[str, Any] = {"limit": limit}
|
|
if status:
|
|
params["status"] = status
|
|
if reference:
|
|
params["reference"] = reference
|
|
if cursor:
|
|
params["cursor"] = cursor
|
|
return self._request("GET", "/captures", params=params)
|
|
|
|
def download_pdf(self, capture_id: str, target_path: str | Path) -> Path:
|
|
"""Lädt das PDF einer fertigen Capture lokal herunter."""
|
|
url = f"{self.base_url}/captures/{capture_id}/pdf"
|
|
target = Path(target_path)
|
|
|
|
with self._session.get(url, timeout=self.timeout, stream=True) as resp:
|
|
self._raise_for_error(resp)
|
|
with target.open("wb") as f:
|
|
for chunk in resp.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
|
|
return target
|
|
|
|
def verify(
|
|
self,
|
|
source: str | None = None,
|
|
*,
|
|
pdf: str | Path | BinaryIO | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Verifiziert ein Capture per ID, Verify-URL oder PDF-Upload.
|
|
|
|
Args:
|
|
source: Capture-ID oder Verify-URL.
|
|
pdf: Pfad zu einer PDF-Datei oder offenes File-Objekt.
|
|
|
|
Genau einer von `source` oder `pdf` muss gesetzt sein.
|
|
"""
|
|
if (source is None) == (pdf is None):
|
|
raise ValueError("Genau einer von `source` oder `pdf` muss gesetzt sein.")
|
|
|
|
url = f"{self.base_url}/verify"
|
|
|
|
if source is not None:
|
|
resp = self._session.post(
|
|
url, json={"source": source}, timeout=self.timeout
|
|
)
|
|
else:
|
|
assert pdf is not None # mypy
|
|
if isinstance(pdf, (str, Path)):
|
|
with open(pdf, "rb") as f:
|
|
resp = self._session.post(url, files={"pdf": f}, timeout=self.timeout)
|
|
else:
|
|
resp = self._session.post(url, files={"pdf": pdf}, timeout=self.timeout)
|
|
|
|
self._raise_for_error(resp)
|
|
return resp.json()
|
|
|
|
def usage(self) -> dict[str, Any]:
|
|
"""Aktueller Verbrauch und Restkontingent."""
|
|
return self._request("GET", "/usage")
|
|
|
|
# ──────────────────────────────────────────────────────────────────
|
|
# Internals
|
|
# ──────────────────────────────────────────────────────────────────
|
|
def _build_capture_body(
|
|
self,
|
|
*,
|
|
url: str,
|
|
mode: str | None = None,
|
|
webhook_url: str | None = None,
|
|
reference: str | None = None,
|
|
viewport: dict[str, int] | None = None,
|
|
wait_until: str | None = None,
|
|
full_page: bool | None = None,
|
|
co_branding: dict[str, str] | None = None,
|
|
) -> dict[str, Any]:
|
|
body: dict[str, Any] = {"url": url}
|
|
if mode is not None:
|
|
body["mode"] = mode
|
|
if webhook_url is not None:
|
|
body["webhook_url"] = webhook_url
|
|
if reference is not None:
|
|
body["reference"] = reference
|
|
if viewport is not None:
|
|
body["viewport"] = viewport
|
|
if wait_until is not None:
|
|
body["wait_until"] = wait_until
|
|
if full_page is not None:
|
|
body["full_page"] = full_page
|
|
if co_branding is not None:
|
|
body["co_branding"] = co_branding
|
|
return body
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
*,
|
|
json: dict[str, Any] | None = None,
|
|
params: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
url = f"{self.base_url}{path}"
|
|
|
|
attempt = 0
|
|
while True:
|
|
resp = self._session.request(
|
|
method, url, json=json, params=params, timeout=self.timeout
|
|
)
|
|
|
|
if resp.status_code == 429 and attempt < self.max_retries:
|
|
retry_after = int(resp.headers.get("Retry-After", "1"))
|
|
time.sleep(retry_after)
|
|
attempt += 1
|
|
continue
|
|
|
|
if 500 <= resp.status_code < 600 and attempt < self.max_retries:
|
|
time.sleep(2**attempt)
|
|
attempt += 1
|
|
continue
|
|
|
|
self._raise_for_error(resp)
|
|
return resp.json()
|
|
|
|
def _raise_for_error(self, resp: requests.Response) -> None:
|
|
if resp.ok:
|
|
return
|
|
|
|
try:
|
|
payload = resp.json()
|
|
err = payload.get("error", {}) if isinstance(payload, dict) else {}
|
|
code = err.get("code", "unknown_error")
|
|
message = err.get("message", resp.text or resp.reason)
|
|
request_id = err.get("request_id")
|
|
except ValueError:
|
|
code = "unknown_error"
|
|
message = resp.text or resp.reason or "Unknown error"
|
|
request_id = None
|
|
payload = None
|
|
|
|
exc_cls = error_for_code(code)
|
|
kwargs: dict[str, Any] = {
|
|
"code": code,
|
|
"request_id": request_id,
|
|
"status_code": resp.status_code,
|
|
"raw": payload,
|
|
}
|
|
if exc_cls is RateLimitedError:
|
|
try:
|
|
kwargs["retry_after_seconds"] = int(resp.headers.get("Retry-After", "0"))
|
|
except ValueError:
|
|
kwargs["retry_after_seconds"] = None
|
|
|
|
# CaptureNotReadyError ist 409 — Special-Case
|
|
if resp.status_code == 409 and code == "capture_not_ready":
|
|
raise CaptureNotReadyError(message, **kwargs)
|
|
|
|
raise exc_cls(message, **kwargs)
|