"""hightrusted CAPTURE Client.""" from __future__ import annotations import os import time from pathlib import Path from typing import Any, BinaryIO import requests from hightrusted_capture.errors import ( CaptureNotReadyError, HightrustedError, RateLimitedError, error_for_code, ) DEFAULT_BASE_URL = "https://capture.hightrusted.net/api/v1" DEFAULT_TIMEOUT = 35 DEFAULT_USER_AGENT = "hightrusted-capture-python/0.1.0" class Client: """Synchroner Client für die hightrusted CAPTURE API. Args: api_key: API-Key. Wenn nicht gesetzt, wird `HIGHTRUSTED_API_KEY` aus der Umgebung gelesen. base_url: API-Endpoint, default `https://capture.hightrusted.net/api/v1`. timeout: HTTP-Timeout in Sekunden, default 35. max_retries: Anzahl automatischer Retries bei 429 / 5xx, default 3. Beispiel:: from hightrusted_capture import Client client = Client(api_key="ht_live_...") capture = client.capture(url="https://example.com") print(capture["verify_url"]) """ def __init__( self, api_key: str | None = None, *, base_url: str = DEFAULT_BASE_URL, timeout: int = DEFAULT_TIMEOUT, max_retries: int = 3, ) -> None: key = api_key or os.environ.get("HIGHTRUSTED_API_KEY") if not key: raise ValueError( "API-Key fehlt. Setze entweder den Parameter `api_key` oder die " "Umgebungsvariable HIGHTRUSTED_API_KEY." ) self.api_key = key self.base_url = base_url.rstrip("/") self.timeout = timeout self.max_retries = max_retries self._session = requests.Session() self._session.headers.update( { "Authorization": f"Bearer {self.api_key}", "User-Agent": DEFAULT_USER_AGENT, } ) # ────────────────────────────────────────────────────────────────── # Public API # ────────────────────────────────────────────────────────────────── def capture( self, url: str, *, reference: str | None = None, viewport: dict[str, int] | None = None, wait_until: str | None = None, full_page: bool | None = None, co_branding: dict[str, str] | None = None, ) -> dict[str, Any]: """Synchrones Capture — wartet bis zu 30 s auf das fertige PDF.""" body = self._build_capture_body( url=url, mode=None, # sync = default reference=reference, viewport=viewport, wait_until=wait_until, full_page=full_page, co_branding=co_branding, ) return self._request("POST", "/captures", json=body) def capture_async( self, url: str, *, reference: str | None = None, wait_for_ready: bool = True, poll_interval: float = 2.0, max_wait: float = 60.0, **kwargs: Any, ) -> dict[str, Any]: """Asynchrones Capture mit optionalem Polling. Wenn `wait_for_ready=True` (Default), wird gepollt bis das Capture fertig ist oder `max_wait` Sekunden um sind. Setze `wait_for_ready=False`, um nur die Queue-Bestätigung zurückzubekommen. """ body = self._build_capture_body(url=url, mode="async", reference=reference, **kwargs) queued = self._request("POST", "/captures", json=body) if not wait_for_ready: return queued capture_id = queued["id"] deadline = time.monotonic() + max_wait time.sleep(min(3.0, poll_interval)) # erstes Polling nach 3 s while time.monotonic() < deadline: detail = self.get(capture_id) if detail["status"] == "ready": return detail if detail["status"] == "failed": raise HightrustedError( f"Capture failed: {detail.get('error', 'unknown')}", code="capture_failed", raw=detail, ) time.sleep(poll_interval) raise TimeoutError(f"Capture {capture_id} nicht fertig nach {max_wait}s") def capture_webhook( self, url: str, webhook_url: str, *, reference: str | None = None, **kwargs: Any, ) -> dict[str, Any]: """Capture starten, Server liefert das Ergebnis per HTTP-POST aus.""" body = self._build_capture_body( url=url, mode="webhook", webhook_url=webhook_url, reference=reference, **kwargs, ) return self._request("POST", "/captures", json=body) def get(self, capture_id: str) -> dict[str, Any]: """Status / Detail einer einzelnen Capture.""" return self._request("GET", f"/captures/{capture_id}") def list( self, *, status: str | None = None, reference: str | None = None, limit: int = 25, cursor: str | None = None, ) -> dict[str, Any]: """Captures auflisten, paginiert per Cursor.""" params: dict[str, Any] = {"limit": limit} if status: params["status"] = status if reference: params["reference"] = reference if cursor: params["cursor"] = cursor return self._request("GET", "/captures", params=params) def download_pdf(self, capture_id: str, target_path: str | Path) -> Path: """Lädt das PDF einer fertigen Capture lokal herunter.""" url = f"{self.base_url}/captures/{capture_id}/pdf" target = Path(target_path) with self._session.get(url, timeout=self.timeout, stream=True) as resp: self._raise_for_error(resp) with target.open("wb") as f: for chunk in resp.iter_content(chunk_size=8192): if chunk: f.write(chunk) return target def verify( self, source: str | None = None, *, pdf: str | Path | BinaryIO | None = None, ) -> dict[str, Any]: """Verifiziert ein Capture per ID, Verify-URL oder PDF-Upload. Args: source: Capture-ID oder Verify-URL. pdf: Pfad zu einer PDF-Datei oder offenes File-Objekt. Genau einer von `source` oder `pdf` muss gesetzt sein. """ if (source is None) == (pdf is None): raise ValueError("Genau einer von `source` oder `pdf` muss gesetzt sein.") url = f"{self.base_url}/verify" if source is not None: resp = self._session.post( url, json={"source": source}, timeout=self.timeout ) else: assert pdf is not None # mypy if isinstance(pdf, (str, Path)): with open(pdf, "rb") as f: resp = self._session.post(url, files={"pdf": f}, timeout=self.timeout) else: resp = self._session.post(url, files={"pdf": pdf}, timeout=self.timeout) self._raise_for_error(resp) return resp.json() def usage(self) -> dict[str, Any]: """Aktueller Verbrauch und Restkontingent.""" return self._request("GET", "/usage") # ────────────────────────────────────────────────────────────────── # Internals # ────────────────────────────────────────────────────────────────── def _build_capture_body( self, *, url: str, mode: str | None = None, webhook_url: str | None = None, reference: str | None = None, viewport: dict[str, int] | None = None, wait_until: str | None = None, full_page: bool | None = None, co_branding: dict[str, str] | None = None, ) -> dict[str, Any]: body: dict[str, Any] = {"url": url} if mode is not None: body["mode"] = mode if webhook_url is not None: body["webhook_url"] = webhook_url if reference is not None: body["reference"] = reference if viewport is not None: body["viewport"] = viewport if wait_until is not None: body["wait_until"] = wait_until if full_page is not None: body["full_page"] = full_page if co_branding is not None: body["co_branding"] = co_branding return body def _request( self, method: str, path: str, *, json: dict[str, Any] | None = None, params: dict[str, Any] | None = None, ) -> dict[str, Any]: url = f"{self.base_url}{path}" attempt = 0 while True: resp = self._session.request( method, url, json=json, params=params, timeout=self.timeout ) if resp.status_code == 429 and attempt < self.max_retries: retry_after = int(resp.headers.get("Retry-After", "1")) time.sleep(retry_after) attempt += 1 continue if 500 <= resp.status_code < 600 and attempt < self.max_retries: time.sleep(2**attempt) attempt += 1 continue self._raise_for_error(resp) return resp.json() def _raise_for_error(self, resp: requests.Response) -> None: if resp.ok: return try: payload = resp.json() err = payload.get("error", {}) if isinstance(payload, dict) else {} code = err.get("code", "unknown_error") message = err.get("message", resp.text or resp.reason) request_id = err.get("request_id") except ValueError: code = "unknown_error" message = resp.text or resp.reason or "Unknown error" request_id = None payload = None exc_cls = error_for_code(code) kwargs: dict[str, Any] = { "code": code, "request_id": request_id, "status_code": resp.status_code, "raw": payload, } if exc_cls is RateLimitedError: try: kwargs["retry_after_seconds"] = int(resp.headers.get("Retry-After", "0")) except ValueError: kwargs["retry_after_seconds"] = None # CaptureNotReadyError ist 409 — Special-Case if resp.status_code == 409 and code == "capture_not_ready": raise CaptureNotReadyError(message, **kwargs) raise exc_cls(message, **kwargs)