synor/sdk/python/synor_rpc/client.py
Gulshan Yadav 59a7123535 feat(sdk): implement Phase 1 SDKs for Wallet, RPC, and Storage
Implements comprehensive SDK support for three core services across
four programming languages (JavaScript/TypeScript, Python, Go, Rust).

## New SDKs

### Wallet SDK
- Key management (create, import, export)
- Transaction signing
- Message signing and verification
- Balance and UTXO queries
- Stealth address support

### RPC SDK
- Block and transaction queries
- Chain state information
- Fee estimation
- Mempool information
- WebSocket subscriptions for real-time updates

### Storage SDK
- Content upload and download
- Pinning operations
- CAR file support
- Directory management
- Gateway URL generation

## Shared Infrastructure

- JSON Schema definitions for all 11 services
- Common type definitions (Address, Amount, UTXO, etc.)
- Unified error handling patterns
- Builder patterns for configuration

## Package Updates

- JavaScript: Updated to @synor/sdk with module exports
- Python: Updated to synor-sdk with websockets dependency
- Go: Added gorilla/websocket dependency
- Rust: Added base64, urlencoding, multipart support

## Fixes

- Fixed Tensor Default trait implementation
- Fixed ProcessorType enum casing
2026-01-27 00:46:24 +05:30

454 lines
14 KiB
Python

"""Synor RPC Client."""
import asyncio
import json
import time
from typing import Any, Callable, Optional
import httpx
import websockets
from .types import (
RpcConfig,
Network,
Priority,
TransactionStatus,
SubscriptionType,
BlockHeader,
Block,
TxInput,
TxOutput,
ScriptPubKey,
Transaction,
FeeEstimate,
ChainInfo,
MempoolInfo,
UTXO,
Balance,
Subscription,
)
class RpcError(Exception):
"""Synor RPC SDK error."""
def __init__(
self,
message: str,
status_code: Optional[int] = None,
code: Optional[str] = None,
):
super().__init__(message)
self.status_code = status_code
self.code = code
class SynorRpc:
"""
Synor RPC SDK client.
Example:
>>> async with SynorRpc(api_key="sk_...") as rpc:
... block = await rpc.get_latest_block()
... print(f"Height: {block.height}")
...
... # Subscribe to new blocks
... async def on_block(block):
... print(f"New block: {block.height}")
... sub = await rpc.subscribe_blocks(on_block)
"""
def __init__(
self,
api_key: str,
endpoint: str = "https://rpc.synor.cc/api/v1",
ws_endpoint: str = "wss://rpc.synor.cc/ws",
network: Network = Network.MAINNET,
timeout: float = 30.0,
debug: bool = False,
):
self.config = RpcConfig(
api_key=api_key,
endpoint=endpoint,
ws_endpoint=ws_endpoint,
network=network,
timeout=timeout,
debug=debug,
)
self._client = httpx.AsyncClient(
base_url=endpoint,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"X-Network": network.value,
},
timeout=timeout,
)
self._ws: Optional[websockets.WebSocketClientProtocol] = None
self._subscriptions: dict[str, Callable] = {}
self._ws_task: Optional[asyncio.Task] = None
async def __aenter__(self) -> "SynorRpc":
return self
async def __aexit__(self, *args: Any) -> None:
await self.close()
async def close(self) -> None:
"""Close the client."""
await self._client.aclose()
if self._ws:
await self._ws.close()
if self._ws_task:
self._ws_task.cancel()
# ==================== Block Operations ====================
async def get_block(self, hash_or_height: str | int) -> Block:
"""Get block by hash or height."""
if isinstance(hash_or_height, int):
path = f"/blocks/height/{hash_or_height}"
else:
path = f"/blocks/{hash_or_height}"
data = await self._request("GET", path)
return self._parse_block(data)
async def get_latest_block(self) -> Block:
"""Get the latest block."""
data = await self._request("GET", "/blocks/latest")
return self._parse_block(data)
async def get_block_header(self, hash_or_height: str | int) -> BlockHeader:
"""Get block header only."""
if isinstance(hash_or_height, int):
path = f"/blocks/height/{hash_or_height}/header"
else:
path = f"/blocks/{hash_or_height}/header"
data = await self._request("GET", path)
return self._parse_block_header(data)
async def get_blocks(self, start_height: int, end_height: int) -> list[Block]:
"""Get blocks in a range."""
data = await self._request(
"GET",
"/blocks",
params={"start": str(start_height), "end": str(end_height)},
)
return [self._parse_block(b) for b in data.get("blocks", [])]
# ==================== Transaction Operations ====================
async def get_transaction(self, txid: str) -> Transaction:
"""Get transaction by hash."""
data = await self._request("GET", f"/transactions/{txid}")
return self._parse_transaction(data)
async def get_raw_transaction(self, txid: str) -> str:
"""Get raw transaction hex."""
data = await self._request("GET", f"/transactions/{txid}/raw")
return data["raw"]
async def send_raw_transaction(self, raw_tx: str) -> str:
"""Send a raw transaction."""
data = await self._request("POST", "/transactions", {"raw": raw_tx})
return data["txid"]
async def get_address_transactions(
self,
address: str,
limit: int = 20,
offset: int = 0,
) -> list[Transaction]:
"""Get transactions for an address."""
data = await self._request(
"GET",
f"/addresses/{address}/transactions",
params={"limit": str(limit), "offset": str(offset)},
)
return [self._parse_transaction(tx) for tx in data.get("transactions", [])]
# ==================== Fee Estimation ====================
async def estimate_fee(self, priority: Priority = Priority.MEDIUM) -> FeeEstimate:
"""Estimate fee for a priority level."""
data = await self._request(
"GET", "/fees/estimate", params={"priority": priority.value}
)
return FeeEstimate(
priority=Priority(data["priority"]),
fee_rate=data["feeRate"],
estimated_blocks=data["estimatedBlocks"],
)
async def get_all_fee_estimates(self) -> list[FeeEstimate]:
"""Get all fee estimates."""
data = await self._request("GET", "/fees/estimate/all")
return [
FeeEstimate(
priority=Priority(e["priority"]),
fee_rate=e["feeRate"],
estimated_blocks=e["estimatedBlocks"],
)
for e in data.get("estimates", [])
]
# ==================== Chain Information ====================
async def get_chain_info(self) -> ChainInfo:
"""Get chain information."""
data = await self._request("GET", "/chain")
return ChainInfo(
chain=data["chain"],
network=data["network"],
height=data["height"],
best_block_hash=data["bestBlockHash"],
difficulty=data["difficulty"],
median_time=data["medianTime"],
chain_work=data["chainWork"],
syncing=data["syncing"],
sync_progress=data["syncProgress"],
)
async def get_mempool_info(self) -> MempoolInfo:
"""Get mempool information."""
data = await self._request("GET", "/mempool")
return MempoolInfo(
size=data["size"],
bytes=data["bytes"],
usage=data["usage"],
max_mempool=data["maxMempool"],
min_fee=data["minFee"],
)
async def get_mempool_transactions(self, limit: int = 100) -> list[str]:
"""Get mempool transaction IDs."""
data = await self._request(
"GET", "/mempool/transactions", params={"limit": str(limit)}
)
return data.get("txids", [])
# ==================== UTXO Operations ====================
async def get_utxos(self, address: str) -> list[UTXO]:
"""Get UTXOs for an address."""
data = await self._request("GET", f"/addresses/{address}/utxos")
return [
UTXO(
txid=u["txid"],
vout=u["vout"],
amount=u["amount"],
address=u["address"],
confirmations=u["confirmations"],
script_pub_key=u.get("scriptPubKey"),
)
for u in data.get("utxos", [])
]
async def get_balance(self, address: str) -> Balance:
"""Get balance for an address."""
data = await self._request("GET", f"/addresses/{address}/balance")
return Balance(
confirmed=data["confirmed"],
unconfirmed=data["unconfirmed"],
total=data["total"],
)
# ==================== Subscriptions ====================
async def subscribe_blocks(
self, callback: Callable[[Block], None]
) -> Subscription:
"""Subscribe to new blocks."""
return await self._subscribe(
SubscriptionType.BLOCKS,
lambda data: callback(self._parse_block(data["block"])),
)
async def subscribe_address(
self, address: str, callback: Callable[[Transaction], None]
) -> Subscription:
"""Subscribe to address transactions."""
return await self._subscribe(
SubscriptionType.ADDRESS,
lambda data: callback(self._parse_transaction(data["transaction"])),
{"address": address},
)
async def subscribe_mempool(
self, callback: Callable[[Transaction], None]
) -> Subscription:
"""Subscribe to mempool transactions."""
return await self._subscribe(
SubscriptionType.MEMPOOL,
lambda data: callback(self._parse_transaction(data["transaction"])),
)
async def _subscribe(
self,
sub_type: SubscriptionType,
callback: Callable,
params: Optional[dict] = None,
) -> Subscription:
"""Create a subscription."""
await self._ensure_websocket()
sub_id = f"{sub_type.value}_{int(time.time() * 1000)}"
self._subscriptions[sub_id] = callback
message = {
"type": "subscribe",
"id": sub_id,
"subscription": sub_type.value,
**(params or {}),
}
await self._ws.send(json.dumps(message))
def unsubscribe():
self._subscriptions.pop(sub_id, None)
if self._ws:
asyncio.create_task(
self._ws.send(json.dumps({"type": "unsubscribe", "id": sub_id}))
)
return Subscription(
id=sub_id,
type=sub_type,
created_at=int(time.time() * 1000),
unsubscribe=unsubscribe,
)
async def _ensure_websocket(self) -> None:
"""Ensure WebSocket connection is established."""
if self._ws and self._ws.open:
return
ws_url = (
f"{self.config.ws_endpoint}"
f"?apiKey={self.config.api_key}"
f"&network={self.config.network.value}"
)
self._ws = await websockets.connect(ws_url)
self._ws_task = asyncio.create_task(self._ws_listener())
async def _ws_listener(self) -> None:
"""Listen for WebSocket messages."""
try:
async for message in self._ws:
try:
data = json.loads(message)
sub_id = data.get("subscriptionId")
if sub_id and sub_id in self._subscriptions:
self._subscriptions[sub_id](data.get("data", {}))
except json.JSONDecodeError:
pass
except websockets.ConnectionClosed:
if self.config.debug:
print("[SynorRpc] WebSocket closed")
# ==================== HTTP Request ====================
async def _request(
self,
method: str,
path: str,
data: Optional[dict[str, Any]] = None,
params: Optional[dict[str, str]] = None,
) -> dict[str, Any]:
"""Make an API request."""
if self.config.debug:
print(f"[SynorRpc] {method} {path}")
response = await self._client.request(
method,
path,
json=data,
params=params,
)
if response.status_code >= 400:
error = (
response.json()
if response.content
else {"message": response.reason_phrase}
)
raise RpcError(
error.get("message", "Request failed"),
response.status_code,
error.get("code"),
)
return response.json()
# ==================== Parsers ====================
def _parse_block_header(self, data: dict) -> BlockHeader:
"""Parse block header from API response."""
return BlockHeader(
hash=data["hash"],
height=data["height"],
version=data["version"],
previous_hash=data["previousHash"],
merkle_root=data["merkleRoot"],
timestamp=data["timestamp"],
difficulty=data["difficulty"],
nonce=data["nonce"],
)
def _parse_block(self, data: dict) -> Block:
"""Parse block from API response."""
return Block(
hash=data["hash"],
height=data["height"],
version=data["version"],
previous_hash=data["previousHash"],
merkle_root=data["merkleRoot"],
timestamp=data["timestamp"],
difficulty=data["difficulty"],
nonce=data["nonce"],
transactions=data.get("transactions", []),
size=data.get("size", 0),
weight=data.get("weight", 0),
tx_count=data.get("txCount", 0),
)
def _parse_transaction(self, data: dict) -> Transaction:
"""Parse transaction from API response."""
inputs = [
TxInput(
txid=i["txid"],
vout=i["vout"],
script_sig=i["scriptSig"],
sequence=i["sequence"],
)
for i in data.get("inputs", [])
]
outputs = [
TxOutput(
value=o["value"],
n=o["n"],
script_pub_key=ScriptPubKey(
asm=o["scriptPubKey"]["asm"],
hex=o["scriptPubKey"]["hex"],
type=o["scriptPubKey"]["type"],
addresses=o["scriptPubKey"].get("addresses", []),
),
)
for o in data.get("outputs", [])
]
return Transaction(
txid=data["txid"],
confirmations=data.get("confirmations", 0),
status=TransactionStatus(data.get("status", "pending")),
size=data.get("size", 0),
fee=data.get("fee", "0"),
inputs=inputs,
outputs=outputs,
block_hash=data.get("blockHash"),
block_height=data.get("blockHeight"),
timestamp=data.get("timestamp"),
raw=data.get("raw"),
)