Source code for artifacts.http

"""Low-level async HTTP transport for the Artifacts API."""

from __future__ import annotations

import asyncio
import base64
import json as _json
import logging
import random
import sys
from dataclasses import dataclass
from typing import Any, Optional, Type, TypeVar
from urllib.parse import urlencode

import pydantic
from pydantic import BaseModel

from .errors import ArtifactsAPIError, ArtifactsError, RetryExhaustedError, raise_for_error
from .models.pagination import DataPage

T = TypeVar("T", bound=BaseModel)

_PYDANTIC_V2 = int(pydantic.VERSION.split(".")[0]) >= 2

logger = logging.getLogger("artifacts.http")


def _parse(model: Type[T], data: Any) -> T:
    """Parse *data* into *model*, compatible with pydantic v1 and v2."""
    if _PYDANTIC_V2:
        return model.model_validate(data)  # type: ignore[attr-defined]
    return model.parse_obj(data)  # type: ignore[attr-defined]


_PYODIDE = sys.platform == "emscripten"

if not _PYODIDE:
    import aiohttp


[docs] @dataclass class RetryConfig: """Configuration for automatic request retries. Parameters ---------- max_retries: Maximum number of retry attempts (default 3). base_delay: Initial delay in seconds before the first retry (default 1.0). max_delay: Maximum delay cap in seconds for exponential backoff (default 30.0). retry_on_status: HTTP status codes that trigger a retry. retry_on_cooldown: If ``True``, automatically wait and retry on ``CooldownActiveError`` (status 499). """ max_retries: int = 3 base_delay: float = 1.0 max_delay: float = 30.0 retry_on_status: tuple[int, ...] = (429, 500, 502, 503, 504) retry_on_cooldown: bool = True
class HttpClient: """Async HTTP client wrapping aiohttp (native) or pyodide.http.pyfetch (WebAssembly). Manages the session lifecycle and provides typed request helpers that automatically unwrap the ``{"data": ...}`` envelope returned by the Artifacts API. """ def __init__( self, base_url: str = "https://api.artifactsmmo.com", token: Optional[str] = None, retry: Optional[RetryConfig] = None, ): self._base_url = base_url.rstrip("/") self._token = token self._retry = retry or RetryConfig() self._headers: dict[str, str] = { "Content-Type": "application/json", "Accept": "application/json", } if self._token: self._headers["Authorization"] = f"Bearer {self._token}" # Native-only: aiohttp session self._session: Optional[Any] = None # -- Lifecycle -- async def start(self) -> None: """Create the underlying HTTP session (native only).""" if _PYODIDE: return self._session = aiohttp.ClientSession( base_url=self._base_url, headers=self._headers, ) async def close(self) -> None: """Close the underlying session.""" if _PYODIDE: return if self._session and not self._session.closed: await self._session.close() @property def session(self) -> Any: if _PYODIDE: raise ArtifactsError( "Direct session access is not available in Pyodide." ) if self._session is None or self._session.closed: raise ArtifactsError( "HttpClient session is not started. " "Use 'async with AsyncArtifactsClient(...) as client:' " "or call 'await client.start()' first." ) return self._session # -- Retry helpers -- def _backoff_delay(self, attempt: int) -> float: """Exponential backoff with 10 % jitter.""" delay = min(self._retry.base_delay * (2 ** attempt), self._retry.max_delay) return delay + random.uniform(0, delay * 0.1) @staticmethod def _extract_cooldown_seconds(exc: ArtifactsAPIError) -> float: """Extract remaining cooldown from error data, with a safe fallback.""" if exc.data and "remaining_seconds" in exc.data: return float(exc.data["remaining_seconds"]) return 3.0 # -- Low-level request -- async def request( self, method: str, path: str, *, json: Any = None, params: Optional[dict[str, Any]] = None, auth: Optional[tuple[str, str]] = None, ) -> dict[str, Any]: """Send a request and return the raw JSON response dict. Includes automatic retry with exponential backoff for transient errors, rate limits (429), and cooldown conflicts (499). Parameters ---------- auth: Optional ``(username, password)`` tuple for HTTP Basic auth. Raises :class:`ArtifactsAPIError` (or a subclass) on non-2xx responses, or :class:`RetryExhaustedError` when all retries are exhausted. """ # Strip None values from params if params: params = {k: v for k, v in params.items() if v is not None} last_exc: ArtifactsAPIError | None = None for attempt in range(self._retry.max_retries + 1): try: logger.debug("%s %s (attempt %d)", method, path, attempt + 1) if _PYODIDE: return await self._request_pyodide( method, path, json=json, params=params, auth=auth ) return await self._request_native( method, path, json=json, params=params, auth=auth ) except ArtifactsAPIError as exc: last_exc = exc # No retries left → re-raise if attempt >= self._retry.max_retries: break # Rate-limit (429) if exc.code == 429: delay = self._backoff_delay(attempt) logger.warning( "Rate limited on %s %s, retrying in %.1fs (attempt %d/%d)", method, path, delay, attempt + 1, self._retry.max_retries, ) await asyncio.sleep(delay) continue # Cooldown active (499) if exc.code == 499 and self._retry.retry_on_cooldown: cd_secs = self._extract_cooldown_seconds(exc) logger.debug( "Cooldown active on %s %s, waiting %.1fs", method, path, cd_secs, ) await asyncio.sleep(cd_secs) continue # Server errors (500, 502, 503, 504) if exc.code in self._retry.retry_on_status: delay = self._backoff_delay(attempt) logger.warning( "Server error %d on %s %s, retrying in %.1fs (attempt %d/%d)", exc.code, method, path, delay, attempt + 1, self._retry.max_retries, ) await asyncio.sleep(delay) continue # Non-retryable error → raise immediately raise # All retries exhausted if last_exc is not None: raise RetryExhaustedError( f"Request {method} {path} failed after {self._retry.max_retries + 1} attempts", last_exception=last_exc, ) raise ArtifactsError("Unexpected retry loop exit") # pragma: no cover async def _request_native( self, method: str, path: str, *, json: Any = None, params: Optional[dict[str, Any]] = None, auth: Optional[tuple[str, str]] = None, ) -> dict[str, Any]: """Send a request using aiohttp (native Python).""" aiohttp_auth = None if auth: aiohttp_auth = aiohttp.BasicAuth(auth[0], auth[1]) resp = await self.session.request( method, path, json=json, params=params, auth=aiohttp_auth, ) if resp.status >= 400: try: body = await resp.json() except Exception: body = {} raise_for_error(resp.status, body) if resp.status == 204: return {} return await resp.json() async def _request_pyodide( self, method: str, path: str, *, json: Any = None, params: Optional[dict[str, Any]] = None, auth: Optional[tuple[str, str]] = None, ) -> dict[str, Any]: """Send a request using pyodide.http.pyfetch (WebAssembly).""" from pyodide.http import pyfetch # type: ignore[import-not-found] headers = dict(self._headers) if auth: cred = base64.b64encode(f"{auth[0]}:{auth[1]}".encode()).decode() headers["Authorization"] = f"Basic {cred}" url = f"{self._base_url}{path}" if params: url += "?" + urlencode(params) kwargs: dict[str, Any] = {"method": method, "headers": headers} if json is not None: kwargs["body"] = _json.dumps(json) resp = await pyfetch(url, **kwargs) if resp.status >= 400: try: body = await resp.json() except Exception: body = {} raise_for_error(resp.status, body) if resp.status == 204: return {} return await resp.json() # -- Convenience shortcuts -- async def get( self, path: str, *, params: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: return await self.request("GET", path, params=params) async def post( self, path: str, *, json: Any = None, auth: Optional[tuple[str, str]] = None, ) -> dict[str, Any]: return await self.request("POST", path, json=json, auth=auth) # -- Typed helpers (unwrap .data automatically) -- async def get_model( self, path: str, model: Type[T], *, params: Optional[dict[str, Any]] = None, ) -> T: """GET, unwrap ``{"data": ...}``, parse into *model*.""" data = await self.get(path, params=params) return _parse(model, data["data"]) async def post_model( self, path: str, model: Type[T], *, json: Any = None, auth: Optional[tuple[str, str]] = None, ) -> T: """POST, unwrap ``{"data": ...}``, parse into *model*.""" data = await self.post(path, json=json, auth=auth) return _parse(model, data["data"]) async def get_page( self, path: str, item_model: Type[T], *, params: Optional[dict[str, Any]] = None, ) -> DataPage[T]: """GET a paginated endpoint and return a typed ``DataPage``.""" raw = await self.get(path, params=params) items = [_parse(item_model, x) for x in raw["data"]] return DataPage( data=items, total=raw["total"], page=raw["page"], size=raw["size"], pages=raw["pages"], ) async def get_all_pages( self, path: str, item_model: Type[T], *, params: Optional[dict[str, Any]] = None, page_size: int = 100, ) -> list[T]: """Fetch *all* pages of a paginated endpoint and return a flat list.""" if params is None: params = {} params["size"] = page_size params["page"] = 1 first = await self.get_page(path, item_model, params=params) all_items: list[T] = list(first.data) if first.pages <= 1: return all_items # Fetch remaining pages concurrently async def _fetch_page(p: int) -> list[T]: page_params = {**params, "page": p} page_result = await self.get_page(path, item_model, params=page_params) return list(page_result.data) remaining = await asyncio.gather( *[_fetch_page(p) for p in range(2, first.pages + 1)] ) for page_items in remaining: all_items.extend(page_items) return all_items