"""Synor Storage SDK Client. Decentralized storage, pinning, and content retrieval. Example: >>> from synor_storage import SynorStorage >>> async with SynorStorage(api_key="sk_...") as storage: ... result = await storage.upload(b"Hello, World!") ... print(f"CID: {result.cid}") """ import base64 from typing import Optional, List, AsyncIterator, Union from dataclasses import asdict import httpx from .types import ( StorageConfig, UploadOptions, UploadResponse, DownloadOptions, Pin, PinRequest, ListPinsOptions, ListPinsResponse, GatewayUrl, CarFile, CarBlock, FileEntry, DirectoryEntry, ImportCarResponse, StorageStats, PinStatus, ) class StorageError(Exception): """Storage API error.""" def __init__(self, message: str, status_code: int, code: Optional[str] = None): super().__init__(message) self.message = message self.status_code = status_code self.code = code class SynorStorage: """Synor Storage SDK client.""" def __init__( self, api_key: Optional[str] = None, config: Optional[StorageConfig] = None, ): """Initialize the storage client. Args: api_key: API key for authentication. config: Full configuration object (overrides api_key). """ if config: self.config = config elif api_key: self.config = StorageConfig(api_key=api_key) else: raise ValueError("Either api_key or config must be provided") self._client: Optional[httpx.AsyncClient] = None async def __aenter__(self) -> "SynorStorage": """Enter async context manager.""" self._client = httpx.AsyncClient( timeout=self.config.timeout, headers={ "Authorization": f"Bearer {self.config.api_key}", }, ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Exit async context manager.""" if self._client: await self._client.aclose() self._client = None @property def client(self) -> httpx.AsyncClient: """Get the HTTP client.""" if not self._client: raise RuntimeError("Client not initialized. Use 'async with' context manager.") return self._client async def upload( self, data: Union[bytes, str], options: Optional[UploadOptions] = None, ) -> UploadResponse: """Upload content to storage. Args: data: Content to upload (bytes or string). options: Upload options. Returns: Upload response with CID and size. """ opts = options or UploadOptions() if isinstance(data, str): data = data.encode() params = {} if opts.pin is not None: params["pin"] = str(opts.pin).lower() if opts.wrap_with_directory: params["wrapWithDirectory"] = "true" if opts.cid_version is not None: params["cidVersion"] = str(opts.cid_version) if opts.hash_algorithm: params["hashAlgorithm"] = opts.hash_algorithm.value files = {"file": ("file", data)} response = await self.client.post( f"{self.config.endpoint}/upload", files=files, params=params, ) self._check_response(response) result = response.json() return UploadResponse( cid=result["cid"], size=result["size"], name=result.get("name"), hash=result.get("hash"), ) async def download( self, cid: str, options: Optional[DownloadOptions] = None, ) -> bytes: """Download content by CID. Args: cid: Content ID. options: Download options. Returns: Content as bytes. """ opts = options or DownloadOptions() params = {} if opts.offset is not None: params["offset"] = opts.offset if opts.length is not None: params["length"] = opts.length response = await self.client.get( f"{self.config.endpoint}/content/{cid}", params=params, ) self._check_response(response) return response.content async def download_stream( self, cid: str, options: Optional[DownloadOptions] = None, ) -> AsyncIterator[bytes]: """Download content as a stream. Args: cid: Content ID. options: Download options. Yields: Content chunks as bytes. """ opts = options or DownloadOptions() params = {} if opts.offset is not None: params["offset"] = opts.offset if opts.length is not None: params["length"] = opts.length async with self.client.stream( "GET", f"{self.config.endpoint}/content/{cid}/stream", params=params, ) as response: self._check_response(response) async for chunk in response.aiter_bytes(): yield chunk async def pin(self, request: PinRequest) -> Pin: """Pin content by CID. Args: request: Pin request. Returns: Pin information. """ body = {"cid": request.cid} if request.name: body["name"] = request.name if request.duration: body["duration"] = request.duration if request.origins: body["origins"] = request.origins response = await self._request("POST", "/pins", body) return self._parse_pin(response) async def unpin(self, cid: str) -> None: """Unpin content by CID. Args: cid: Content ID. """ await self._request("DELETE", f"/pins/{cid}") async def get_pin_status(self, cid: str) -> Pin: """Get pin status by CID. Args: cid: Content ID. Returns: Pin information. """ response = await self._request("GET", f"/pins/{cid}") return self._parse_pin(response) async def list_pins( self, options: Optional[ListPinsOptions] = None, ) -> ListPinsResponse: """List pins. Args: options: List options. Returns: List of pins with pagination info. """ opts = options or ListPinsOptions() params = {} if opts.status: params["status"] = ",".join(s.value for s in opts.status) if opts.match: params["match"] = opts.match.value if opts.name: params["name"] = opts.name if opts.limit is not None: params["limit"] = opts.limit if opts.offset is not None: params["offset"] = opts.offset response = await self._request("GET", "/pins", params=params) pins = [self._parse_pin(p) for p in response.get("pins", [])] return ListPinsResponse( pins=pins, total=response.get("total", len(pins)), has_more=response.get("hasMore", False), ) def get_gateway_url(self, cid: str, path: Optional[str] = None) -> GatewayUrl: """Get gateway URL for content. Args: cid: Content ID. path: Optional path within content. Returns: Gateway URL information. """ full_path = f"/{cid}/{path}" if path else f"/{cid}" return GatewayUrl( url=f"{self.config.gateway}/ipfs{full_path}", cid=cid, path=path, ) async def create_car(self, files: List[FileEntry]) -> CarFile: """Create a CAR file from files. Args: files: Files to include in the CAR. Returns: CAR file information. """ file_data = [] for f in files: entry = {"name": f.name} if f.content: entry["content"] = base64.b64encode(f.content).decode() if f.cid: entry["cid"] = f.cid file_data.append(entry) response = await self._request("POST", "/car/create", {"files": file_data}) return CarFile( version=response["version"], roots=response["roots"], blocks=[ CarBlock( cid=b["cid"], data=b["data"], size=b.get("size"), ) for b in response.get("blocks", []) ], size=response.get("size"), ) async def import_car(self, car_data: bytes, pin: bool = True) -> ImportCarResponse: """Import a CAR file. Args: car_data: CAR file data. pin: Whether to pin imported content. Returns: Import result. """ encoded = base64.b64encode(car_data).decode() response = await self._request("POST", "/car/import", { "car": encoded, "pin": pin, }) return ImportCarResponse( roots=response["roots"], blocks_imported=response["blocksImported"], ) async def export_car(self, cid: str) -> bytes: """Export content as a CAR file. Args: cid: Content ID. Returns: CAR file data. """ response = await self.client.get( f"{self.config.endpoint}/car/export/{cid}" ) self._check_response(response) return response.content async def create_directory(self, files: List[FileEntry]) -> UploadResponse: """Create a directory from files. Args: files: Files to include in the directory. Returns: Upload response with directory CID. """ file_data = [] for f in files: entry = {"name": f.name} if f.content: entry["content"] = base64.b64encode(f.content).decode() if f.cid: entry["cid"] = f.cid file_data.append(entry) response = await self._request("POST", "/directory", {"files": file_data}) return UploadResponse( cid=response["cid"], size=response["size"], name=response.get("name"), ) async def list_directory( self, cid: str, path: Optional[str] = None, ) -> List[DirectoryEntry]: """List directory contents. Args: cid: Directory CID. path: Optional path within directory. Returns: List of directory entries. """ params = {"path": path} if path else {} response = await self._request("GET", f"/directory/{cid}", params=params) from .types import EntryType return [ DirectoryEntry( name=e["name"], cid=e["cid"], type=EntryType(e["type"]), size=e.get("size"), ) for e in response.get("entries", []) ] async def get_stats(self) -> StorageStats: """Get storage statistics. Returns: Storage statistics. """ response = await self._request("GET", "/stats") bandwidth = response.get("bandwidth", {}) return StorageStats( total_size=response["totalSize"], pin_count=response["pinCount"], upload_bandwidth=bandwidth.get("upload"), download_bandwidth=bandwidth.get("download"), ) async def exists(self, cid: str) -> bool: """Check if content exists. Args: cid: Content ID. Returns: True if content exists. """ try: response = await self.client.head( f"{self.config.endpoint}/content/{cid}" ) return response.status_code == 200 except Exception: return False async def get_metadata(self, cid: str) -> dict: """Get content metadata. Args: cid: Content ID. Returns: Metadata dictionary. """ return await self._request("GET", f"/content/{cid}/metadata") async def _request( self, method: str, path: str, body: Optional[dict] = None, params: Optional[dict] = None, ) -> dict: """Make an API request. Args: method: HTTP method. path: API path. body: Request body. params: Query parameters. Returns: Response data. """ url = f"{self.config.endpoint}{path}" if self.config.debug: print(f"[SynorStorage] {method} {path}") kwargs = {} if body: kwargs["json"] = body if params: kwargs["params"] = params response = await self.client.request(method, url, **kwargs) self._check_response(response) if response.status_code == 204: return {} return response.json() def _check_response(self, response: httpx.Response) -> None: """Check response for errors. Args: response: HTTP response. Raises: StorageError: If response indicates an error. """ if response.status_code >= 400: try: error = response.json() message = error.get("message", "Unknown error") code = error.get("code") except Exception: message = response.text or "Unknown error" code = None raise StorageError(message, response.status_code, code) def _parse_pin(self, data: dict) -> Pin: """Parse pin from API response. Args: data: API response data. Returns: Pin object. """ return Pin( cid=data["cid"], status=PinStatus(data["status"]), name=data.get("name"), size=data.get("size"), created_at=data.get("createdAt"), expires_at=data.get("expiresAt"), delegates=data.get("delegates", []), )