synor/sdk/python/src/synor_dex/client.py
Gulshan Yadav e7dc8f70a0 feat(sdk): implement DEX SDK with perpetual futures for all 12 languages
Complete decentralized exchange client implementation featuring:
- AMM swaps (constant product, stable, concentrated liquidity)
- Liquidity provision with impermanent loss tracking
- Perpetual futures (up to 100x leverage, funding rates, liquidation)
- Order books with limit orders (GTC, IOC, FOK, GTD)
- Yield farming & staking with reward claiming
- Real-time WebSocket subscriptions
- Analytics (OHLCV, trade history, volume, TVL)

Languages: JS/TS, Python, Go, Rust, Java, Kotlin, Swift, Flutter,
C, C++, C#, Ruby
2026-01-28 12:32:04 +05:30

496 lines
17 KiB
Python

"""
Synor DEX SDK Client
Complete decentralized exchange client with support for:
- AMM swaps (constant product, stable, concentrated)
- Liquidity provision
- Perpetual futures (up to 100x leverage)
- Order books (limit orders)
- Farming & staking
"""
import asyncio
import json
import time
import uuid
from typing import Optional, List, Callable, Any, TypeVar, Dict
from urllib.parse import urlencode
import httpx
import websockets
from .types import (
DexConfig,
Token,
Pool,
PoolFilter,
Quote,
QuoteParams,
SwapParams,
SwapResult,
AddLiquidityParams,
RemoveLiquidityParams,
LiquidityResult,
LPPosition,
PerpMarket,
OpenPositionParams,
ClosePositionParams,
ModifyPositionParams,
PerpPosition,
PerpOrder,
FundingPayment,
OrderBook,
LimitOrderParams,
Order,
Farm,
StakeParams,
FarmPosition,
OHLCV,
TradeHistory,
VolumeStats,
TVLStats,
Subscription,
DexError,
)
T = TypeVar("T")
class PerpsClient:
"""Perpetual futures sub-client"""
def __init__(self, dex: "SynorDex"):
self._dex = dex
async def list_markets(self) -> List[PerpMarket]:
"""List all perpetual markets"""
return await self._dex._get("/perps/markets")
async def get_market(self, symbol: str) -> PerpMarket:
"""Get a specific perpetual market"""
return await self._dex._get(f"/perps/markets/{symbol}")
async def open_position(self, params: OpenPositionParams) -> PerpPosition:
"""Open a perpetual position"""
return await self._dex._post("/perps/positions", {
"market": params.market,
"side": params.side.value,
"size": str(params.size),
"leverage": params.leverage,
"order_type": params.order_type.value,
"limit_price": params.limit_price,
"stop_loss": params.stop_loss,
"take_profit": params.take_profit,
"margin_type": params.margin_type.value,
"reduce_only": params.reduce_only,
})
async def close_position(self, params: ClosePositionParams) -> PerpPosition:
"""Close a perpetual position"""
return await self._dex._post("/perps/positions/close", {
"market": params.market,
"size": str(params.size) if params.size else None,
"order_type": params.order_type.value,
"limit_price": params.limit_price,
})
async def modify_position(self, params: ModifyPositionParams) -> PerpPosition:
"""Modify a perpetual position"""
return await self._dex._post(f"/perps/positions/{params.position_id}/modify", {
"new_leverage": params.new_leverage,
"new_margin": str(params.new_margin) if params.new_margin else None,
"new_stop_loss": params.new_stop_loss,
"new_take_profit": params.new_take_profit,
})
async def get_positions(self) -> List[PerpPosition]:
"""Get all open positions"""
return await self._dex._get("/perps/positions")
async def get_position(self, market: str) -> Optional[PerpPosition]:
"""Get position for a specific market"""
return await self._dex._get(f"/perps/positions/{market}")
async def get_orders(self) -> List[PerpOrder]:
"""Get all open orders"""
return await self._dex._get("/perps/orders")
async def cancel_order(self, order_id: str) -> None:
"""Cancel an order"""
await self._dex._delete(f"/perps/orders/{order_id}")
async def cancel_all_orders(self, market: Optional[str] = None) -> int:
"""Cancel all orders, optionally for a specific market"""
path = f"/perps/orders?market={market}" if market else "/perps/orders"
result = await self._dex._delete(path)
return result.get("cancelled", 0)
async def get_funding_history(self, market: str, limit: int = 100) -> List[FundingPayment]:
"""Get funding payment history"""
return await self._dex._get(f"/perps/funding/{market}?limit={limit}")
async def get_funding_rate(self, market: str) -> Dict[str, Any]:
"""Get current funding rate"""
return await self._dex._get(f"/perps/funding/{market}/current")
async def subscribe_position(self, callback: Callable[[PerpPosition], None]) -> Subscription:
"""Subscribe to position updates"""
return await self._dex._subscribe("position", {}, callback)
class OrderBookClient:
"""Order book sub-client"""
def __init__(self, dex: "SynorDex"):
self._dex = dex
async def get_order_book(self, market: str, depth: int = 20) -> OrderBook:
"""Get order book for a market"""
return await self._dex._get(f"/orderbook/{market}?depth={depth}")
async def place_limit_order(self, params: LimitOrderParams) -> Order:
"""Place a limit order"""
return await self._dex._post("/orderbook/orders", {
"market": params.market,
"side": params.side,
"price": params.price,
"size": str(params.size),
"time_in_force": params.time_in_force.value,
"post_only": params.post_only,
})
async def cancel_order(self, order_id: str) -> None:
"""Cancel an order"""
await self._dex._delete(f"/orderbook/orders/{order_id}")
async def get_open_orders(self, market: Optional[str] = None) -> List[Order]:
"""Get all open orders"""
path = f"/orderbook/orders?market={market}" if market else "/orderbook/orders"
return await self._dex._get(path)
async def get_order_history(self, limit: int = 50) -> List[Order]:
"""Get order history"""
return await self._dex._get(f"/orderbook/orders/history?limit={limit}")
class FarmsClient:
"""Farms and staking sub-client"""
def __init__(self, dex: "SynorDex"):
self._dex = dex
async def list_farms(self) -> List[Farm]:
"""List all farms"""
return await self._dex._get("/farms")
async def get_farm(self, farm_id: str) -> Farm:
"""Get a specific farm"""
return await self._dex._get(f"/farms/{farm_id}")
async def stake(self, params: StakeParams) -> FarmPosition:
"""Stake tokens in a farm"""
return await self._dex._post("/farms/stake", {
"farm": params.farm,
"amount": str(params.amount),
})
async def unstake(self, farm: str, amount: int) -> FarmPosition:
"""Unstake tokens from a farm"""
return await self._dex._post("/farms/unstake", {
"farm": farm,
"amount": str(amount),
})
async def claim_rewards(self, farm: str) -> Dict[str, Any]:
"""Claim rewards from a farm"""
return await self._dex._post("/farms/claim", {"farm": farm})
async def get_my_farm_positions(self) -> List[FarmPosition]:
"""Get all farm positions"""
return await self._dex._get("/farms/positions")
class SynorDex:
"""
Synor DEX SDK Client
Complete decentralized exchange client with support for:
- AMM swaps (constant product, stable, concentrated)
- Liquidity provision
- Perpetual futures (up to 100x leverage)
- Order books (limit orders)
- Farming & staking
"""
def __init__(self, config: DexConfig):
self._config = config
self._closed = False
self._ws = None
self._subscriptions: Dict[str, Callable] = {}
self._client = httpx.AsyncClient(
base_url=config.endpoint,
timeout=config.timeout / 1000,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {config.api_key}",
"X-SDK-Version": "python/0.1.0",
},
)
# Sub-clients
self.perps = PerpsClient(self)
self.orderbook = OrderBookClient(self)
self.farms = FarmsClient(self)
# Token Operations
async def get_token(self, address: str) -> Token:
"""Get token information"""
return await self._get(f"/tokens/{address}")
async def list_tokens(self) -> List[Token]:
"""List all tokens"""
return await self._get("/tokens")
async def search_tokens(self, query: str) -> List[Token]:
"""Search tokens"""
return await self._get(f"/tokens/search?q={query}")
# Pool Operations
async def get_pool(self, token_a: str, token_b: str) -> Pool:
"""Get pool for a token pair"""
return await self._get(f"/pools/{token_a}/{token_b}")
async def get_pool_by_id(self, pool_id: str) -> Pool:
"""Get pool by ID"""
return await self._get(f"/pools/{pool_id}")
async def list_pools(self, filter: Optional[PoolFilter] = None) -> List[Pool]:
"""List pools with optional filtering"""
params = {}
if filter:
if filter.tokens:
params["tokens"] = ",".join(filter.tokens)
if filter.min_tvl:
params["min_tvl"] = str(filter.min_tvl)
if filter.min_volume_24h:
params["min_volume"] = str(filter.min_volume_24h)
if filter.verified is not None:
params["verified"] = str(filter.verified).lower()
if filter.limit:
params["limit"] = str(filter.limit)
if filter.offset:
params["offset"] = str(filter.offset)
query = urlencode(params) if params else ""
return await self._get(f"/pools?{query}")
# Swap Operations
async def get_quote(self, params: QuoteParams) -> Quote:
"""Get a swap quote"""
return await self._post("/swap/quote", {
"token_in": params.token_in,
"token_out": params.token_out,
"amount_in": str(params.amount_in),
"slippage": params.slippage,
})
async def swap(self, params: SwapParams) -> SwapResult:
"""Execute a swap"""
deadline = params.deadline or int(time.time()) + 1200
return await self._post("/swap", {
"token_in": params.token_in,
"token_out": params.token_out,
"amount_in": str(params.amount_in),
"min_amount_out": str(params.min_amount_out),
"deadline": deadline,
"recipient": params.recipient,
})
# Liquidity Operations
async def add_liquidity(self, params: AddLiquidityParams) -> LiquidityResult:
"""Add liquidity to a pool"""
deadline = params.deadline or int(time.time()) + 1200
return await self._post("/liquidity/add", {
"token_a": params.token_a,
"token_b": params.token_b,
"amount_a": str(params.amount_a),
"amount_b": str(params.amount_b),
"min_amount_a": str(params.min_amount_a) if params.min_amount_a else None,
"min_amount_b": str(params.min_amount_b) if params.min_amount_b else None,
"deadline": deadline,
})
async def remove_liquidity(self, params: RemoveLiquidityParams) -> LiquidityResult:
"""Remove liquidity from a pool"""
deadline = params.deadline or int(time.time()) + 1200
return await self._post("/liquidity/remove", {
"pool": params.pool,
"lp_amount": str(params.lp_amount),
"min_amount_a": str(params.min_amount_a) if params.min_amount_a else None,
"min_amount_b": str(params.min_amount_b) if params.min_amount_b else None,
"deadline": deadline,
})
async def get_my_positions(self) -> List[LPPosition]:
"""Get all LP positions"""
return await self._get("/liquidity/positions")
# Analytics
async def get_price_history(self, pair: str, interval: str, limit: int = 100) -> List[OHLCV]:
"""Get price history (OHLCV candles)"""
return await self._get(f"/analytics/candles/{pair}?interval={interval}&limit={limit}")
async def get_trade_history(self, pair: str, limit: int = 50) -> List[TradeHistory]:
"""Get trade history"""
return await self._get(f"/analytics/trades/{pair}?limit={limit}")
async def get_volume_stats(self) -> VolumeStats:
"""Get volume statistics"""
return await self._get("/analytics/volume")
async def get_tvl(self) -> TVLStats:
"""Get TVL statistics"""
return await self._get("/analytics/tvl")
# Subscriptions
async def subscribe_price(self, market: str, callback: Callable[[float], None]) -> Subscription:
"""Subscribe to price updates"""
return await self._subscribe("price", {"market": market}, callback)
async def subscribe_trades(self, market: str, callback: Callable[[TradeHistory], None]) -> Subscription:
"""Subscribe to trade updates"""
return await self._subscribe("trades", {"market": market}, callback)
async def subscribe_order_book(self, market: str, callback: Callable[[OrderBook], None]) -> Subscription:
"""Subscribe to order book updates"""
return await self._subscribe("orderbook", {"market": market}, callback)
# Lifecycle
async def health_check(self) -> bool:
"""Check if the service is healthy"""
try:
response = await self._get("/health")
return response.get("status") == "healthy"
except Exception:
return False
async def close(self) -> None:
"""Close the client"""
self._closed = True
if self._ws:
await self._ws.close()
self._ws = None
self._subscriptions.clear()
await self._client.aclose()
# Internal Methods
async def _get(self, path: str) -> Any:
"""Make a GET request"""
return await self._request("GET", path)
async def _post(self, path: str, body: dict) -> Any:
"""Make a POST request"""
return await self._request("POST", path, body)
async def _delete(self, path: str) -> Any:
"""Make a DELETE request"""
return await self._request("DELETE", path)
async def _request(self, method: str, path: str, body: Optional[dict] = None) -> Any:
"""Make an HTTP request with retries"""
if self._closed:
raise DexError("Client has been closed", "CLIENT_CLOSED")
last_error = None
for attempt in range(self._config.retries):
try:
if method == "GET":
response = await self._client.get(path)
elif method == "POST":
response = await self._client.post(path, json=body)
elif method == "DELETE":
response = await self._client.delete(path)
else:
raise DexError(f"Unknown method: {method}")
if not response.is_success:
try:
error = response.json()
except Exception:
error = {}
raise DexError(
error.get("message", f"HTTP {response.status_code}"),
error.get("code"),
response.status_code,
)
return response.json()
except DexError:
raise
except Exception as e:
last_error = e
if self._config.debug:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < self._config.retries - 1:
await asyncio.sleep(2 ** attempt)
raise last_error or DexError("Unknown error")
async def _subscribe(
self,
channel: str,
params: dict,
callback: Callable,
) -> Subscription:
"""Subscribe to a WebSocket channel"""
await self._ensure_websocket()
subscription_id = str(uuid.uuid4())[:8]
self._subscriptions[subscription_id] = callback
await self._ws.send(json.dumps({
"type": "subscribe",
"channel": channel,
"subscription_id": subscription_id,
**params,
}))
def cancel():
self._subscriptions.pop(subscription_id, None)
if self._ws:
asyncio.create_task(self._ws.send(json.dumps({
"type": "unsubscribe",
"subscription_id": subscription_id,
})))
return Subscription(id=subscription_id, channel=channel, cancel=cancel)
async def _ensure_websocket(self) -> None:
"""Ensure WebSocket connection is established"""
if self._ws and self._ws.open:
return
self._ws = await websockets.connect(self._config.ws_endpoint)
# Authenticate
await self._ws.send(json.dumps({
"type": "auth",
"api_key": self._config.api_key,
}))
# Start message handler
asyncio.create_task(self._handle_messages())
async def _handle_messages(self) -> None:
"""Handle incoming WebSocket messages"""
try:
async for message in self._ws:
data = json.loads(message)
subscription_id = data.get("subscription_id")
if subscription_id and subscription_id in self._subscriptions:
self._subscriptions[subscription_id](data.get("data"))
except Exception:
pass # Connection closed