synor/sdk/python/synor_storage/client.py
Gulshan Yadav 59a7123535 feat(sdk): implement Phase 1 SDKs for Wallet, RPC, and Storage
Implements comprehensive SDK support for three core services across
four programming languages (JavaScript/TypeScript, Python, Go, Rust).

## New SDKs

### Wallet SDK
- Key management (create, import, export)
- Transaction signing
- Message signing and verification
- Balance and UTXO queries
- Stealth address support

### RPC SDK
- Block and transaction queries
- Chain state information
- Fee estimation
- Mempool information
- WebSocket subscriptions for real-time updates

### Storage SDK
- Content upload and download
- Pinning operations
- CAR file support
- Directory management
- Gateway URL generation

## Shared Infrastructure

- JSON Schema definitions for all 11 services
- Common type definitions (Address, Amount, UTXO, etc.)
- Unified error handling patterns
- Builder patterns for configuration

## Package Updates

- JavaScript: Updated to @synor/sdk with module exports
- Python: Updated to synor-sdk with websockets dependency
- Go: Added gorilla/websocket dependency
- Rust: Added base64, urlencoding, multipart support

## Fixes

- Fixed Tensor Default trait implementation
- Fixed ProcessorType enum casing
2026-01-27 00:46:24 +05:30

524 lines
14 KiB
Python

"""Synor Storage SDK Client.
Decentralized storage, pinning, and content retrieval.
Example:
>>> from synor_storage import SynorStorage
>>> async with SynorStorage(api_key="sk_...") as storage:
... result = await storage.upload(b"Hello, World!")
... print(f"CID: {result.cid}")
"""
import base64
from typing import Optional, List, AsyncIterator, Union
from dataclasses import asdict
import httpx
from .types import (
StorageConfig,
UploadOptions,
UploadResponse,
DownloadOptions,
Pin,
PinRequest,
ListPinsOptions,
ListPinsResponse,
GatewayUrl,
CarFile,
CarBlock,
FileEntry,
DirectoryEntry,
ImportCarResponse,
StorageStats,
PinStatus,
)
class StorageError(Exception):
"""Storage API error."""
def __init__(self, message: str, status_code: int, code: Optional[str] = None):
super().__init__(message)
self.message = message
self.status_code = status_code
self.code = code
class SynorStorage:
"""Synor Storage SDK client."""
def __init__(
self,
api_key: Optional[str] = None,
config: Optional[StorageConfig] = None,
):
"""Initialize the storage client.
Args:
api_key: API key for authentication.
config: Full configuration object (overrides api_key).
"""
if config:
self.config = config
elif api_key:
self.config = StorageConfig(api_key=api_key)
else:
raise ValueError("Either api_key or config must be provided")
self._client: Optional[httpx.AsyncClient] = None
async def __aenter__(self) -> "SynorStorage":
"""Enter async context manager."""
self._client = httpx.AsyncClient(
timeout=self.config.timeout,
headers={
"Authorization": f"Bearer {self.config.api_key}",
},
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context manager."""
if self._client:
await self._client.aclose()
self._client = None
@property
def client(self) -> httpx.AsyncClient:
"""Get the HTTP client."""
if not self._client:
raise RuntimeError("Client not initialized. Use 'async with' context manager.")
return self._client
async def upload(
self,
data: Union[bytes, str],
options: Optional[UploadOptions] = None,
) -> UploadResponse:
"""Upload content to storage.
Args:
data: Content to upload (bytes or string).
options: Upload options.
Returns:
Upload response with CID and size.
"""
opts = options or UploadOptions()
if isinstance(data, str):
data = data.encode()
params = {}
if opts.pin is not None:
params["pin"] = str(opts.pin).lower()
if opts.wrap_with_directory:
params["wrapWithDirectory"] = "true"
if opts.cid_version is not None:
params["cidVersion"] = str(opts.cid_version)
if opts.hash_algorithm:
params["hashAlgorithm"] = opts.hash_algorithm.value
files = {"file": ("file", data)}
response = await self.client.post(
f"{self.config.endpoint}/upload",
files=files,
params=params,
)
self._check_response(response)
result = response.json()
return UploadResponse(
cid=result["cid"],
size=result["size"],
name=result.get("name"),
hash=result.get("hash"),
)
async def download(
self,
cid: str,
options: Optional[DownloadOptions] = None,
) -> bytes:
"""Download content by CID.
Args:
cid: Content ID.
options: Download options.
Returns:
Content as bytes.
"""
opts = options or DownloadOptions()
params = {}
if opts.offset is not None:
params["offset"] = opts.offset
if opts.length is not None:
params["length"] = opts.length
response = await self.client.get(
f"{self.config.endpoint}/content/{cid}",
params=params,
)
self._check_response(response)
return response.content
async def download_stream(
self,
cid: str,
options: Optional[DownloadOptions] = None,
) -> AsyncIterator[bytes]:
"""Download content as a stream.
Args:
cid: Content ID.
options: Download options.
Yields:
Content chunks as bytes.
"""
opts = options or DownloadOptions()
params = {}
if opts.offset is not None:
params["offset"] = opts.offset
if opts.length is not None:
params["length"] = opts.length
async with self.client.stream(
"GET",
f"{self.config.endpoint}/content/{cid}/stream",
params=params,
) as response:
self._check_response(response)
async for chunk in response.aiter_bytes():
yield chunk
async def pin(self, request: PinRequest) -> Pin:
"""Pin content by CID.
Args:
request: Pin request.
Returns:
Pin information.
"""
body = {"cid": request.cid}
if request.name:
body["name"] = request.name
if request.duration:
body["duration"] = request.duration
if request.origins:
body["origins"] = request.origins
response = await self._request("POST", "/pins", body)
return self._parse_pin(response)
async def unpin(self, cid: str) -> None:
"""Unpin content by CID.
Args:
cid: Content ID.
"""
await self._request("DELETE", f"/pins/{cid}")
async def get_pin_status(self, cid: str) -> Pin:
"""Get pin status by CID.
Args:
cid: Content ID.
Returns:
Pin information.
"""
response = await self._request("GET", f"/pins/{cid}")
return self._parse_pin(response)
async def list_pins(
self,
options: Optional[ListPinsOptions] = None,
) -> ListPinsResponse:
"""List pins.
Args:
options: List options.
Returns:
List of pins with pagination info.
"""
opts = options or ListPinsOptions()
params = {}
if opts.status:
params["status"] = ",".join(s.value for s in opts.status)
if opts.match:
params["match"] = opts.match.value
if opts.name:
params["name"] = opts.name
if opts.limit is not None:
params["limit"] = opts.limit
if opts.offset is not None:
params["offset"] = opts.offset
response = await self._request("GET", "/pins", params=params)
pins = [self._parse_pin(p) for p in response.get("pins", [])]
return ListPinsResponse(
pins=pins,
total=response.get("total", len(pins)),
has_more=response.get("hasMore", False),
)
def get_gateway_url(self, cid: str, path: Optional[str] = None) -> GatewayUrl:
"""Get gateway URL for content.
Args:
cid: Content ID.
path: Optional path within content.
Returns:
Gateway URL information.
"""
full_path = f"/{cid}/{path}" if path else f"/{cid}"
return GatewayUrl(
url=f"{self.config.gateway}/ipfs{full_path}",
cid=cid,
path=path,
)
async def create_car(self, files: List[FileEntry]) -> CarFile:
"""Create a CAR file from files.
Args:
files: Files to include in the CAR.
Returns:
CAR file information.
"""
file_data = []
for f in files:
entry = {"name": f.name}
if f.content:
entry["content"] = base64.b64encode(f.content).decode()
if f.cid:
entry["cid"] = f.cid
file_data.append(entry)
response = await self._request("POST", "/car/create", {"files": file_data})
return CarFile(
version=response["version"],
roots=response["roots"],
blocks=[
CarBlock(
cid=b["cid"],
data=b["data"],
size=b.get("size"),
)
for b in response.get("blocks", [])
],
size=response.get("size"),
)
async def import_car(self, car_data: bytes, pin: bool = True) -> ImportCarResponse:
"""Import a CAR file.
Args:
car_data: CAR file data.
pin: Whether to pin imported content.
Returns:
Import result.
"""
encoded = base64.b64encode(car_data).decode()
response = await self._request("POST", "/car/import", {
"car": encoded,
"pin": pin,
})
return ImportCarResponse(
roots=response["roots"],
blocks_imported=response["blocksImported"],
)
async def export_car(self, cid: str) -> bytes:
"""Export content as a CAR file.
Args:
cid: Content ID.
Returns:
CAR file data.
"""
response = await self.client.get(
f"{self.config.endpoint}/car/export/{cid}"
)
self._check_response(response)
return response.content
async def create_directory(self, files: List[FileEntry]) -> UploadResponse:
"""Create a directory from files.
Args:
files: Files to include in the directory.
Returns:
Upload response with directory CID.
"""
file_data = []
for f in files:
entry = {"name": f.name}
if f.content:
entry["content"] = base64.b64encode(f.content).decode()
if f.cid:
entry["cid"] = f.cid
file_data.append(entry)
response = await self._request("POST", "/directory", {"files": file_data})
return UploadResponse(
cid=response["cid"],
size=response["size"],
name=response.get("name"),
)
async def list_directory(
self,
cid: str,
path: Optional[str] = None,
) -> List[DirectoryEntry]:
"""List directory contents.
Args:
cid: Directory CID.
path: Optional path within directory.
Returns:
List of directory entries.
"""
params = {"path": path} if path else {}
response = await self._request("GET", f"/directory/{cid}", params=params)
from .types import EntryType
return [
DirectoryEntry(
name=e["name"],
cid=e["cid"],
type=EntryType(e["type"]),
size=e.get("size"),
)
for e in response.get("entries", [])
]
async def get_stats(self) -> StorageStats:
"""Get storage statistics.
Returns:
Storage statistics.
"""
response = await self._request("GET", "/stats")
bandwidth = response.get("bandwidth", {})
return StorageStats(
total_size=response["totalSize"],
pin_count=response["pinCount"],
upload_bandwidth=bandwidth.get("upload"),
download_bandwidth=bandwidth.get("download"),
)
async def exists(self, cid: str) -> bool:
"""Check if content exists.
Args:
cid: Content ID.
Returns:
True if content exists.
"""
try:
response = await self.client.head(
f"{self.config.endpoint}/content/{cid}"
)
return response.status_code == 200
except Exception:
return False
async def get_metadata(self, cid: str) -> dict:
"""Get content metadata.
Args:
cid: Content ID.
Returns:
Metadata dictionary.
"""
return await self._request("GET", f"/content/{cid}/metadata")
async def _request(
self,
method: str,
path: str,
body: Optional[dict] = None,
params: Optional[dict] = None,
) -> dict:
"""Make an API request.
Args:
method: HTTP method.
path: API path.
body: Request body.
params: Query parameters.
Returns:
Response data.
"""
url = f"{self.config.endpoint}{path}"
if self.config.debug:
print(f"[SynorStorage] {method} {path}")
kwargs = {}
if body:
kwargs["json"] = body
if params:
kwargs["params"] = params
response = await self.client.request(method, url, **kwargs)
self._check_response(response)
if response.status_code == 204:
return {}
return response.json()
def _check_response(self, response: httpx.Response) -> None:
"""Check response for errors.
Args:
response: HTTP response.
Raises:
StorageError: If response indicates an error.
"""
if response.status_code >= 400:
try:
error = response.json()
message = error.get("message", "Unknown error")
code = error.get("code")
except Exception:
message = response.text or "Unknown error"
code = None
raise StorageError(message, response.status_code, code)
def _parse_pin(self, data: dict) -> Pin:
"""Parse pin from API response.
Args:
data: API response data.
Returns:
Pin object.
"""
return Pin(
cid=data["cid"],
status=PinStatus(data["status"]),
name=data.get("name"),
size=data.get("size"),
created_at=data.get("createdAt"),
expires_at=data.get("expiresAt"),
delegates=data.get("delegates", []),
)