"""Synor Compute Client.""" from typing import Any, Optional import httpx import numpy as np from .types import ( SynorConfig, BalancingStrategy, Precision, ProcessorType, TaskPriority, JobResult, PricingInfo, ) from .tensor import Tensor from .job import ComputeJob class SynorCompute: """ Main Synor Compute client. Example: >>> compute = SynorCompute(api_key="sk_...") >>> result = await compute.matmul( ... a=np.random.randn(1024, 1024), ... b=np.random.randn(1024, 1024), ... precision=Precision.FP16 ... ) """ def __init__( self, api_key: str, endpoint: str = "https://compute.synor.cc/api/v1", strategy: BalancingStrategy = BalancingStrategy.BALANCED, precision: Precision = Precision.FP32, timeout: float = 30.0, debug: bool = False, ): self.config = SynorConfig( api_key=api_key, endpoint=endpoint, strategy=strategy, precision=precision, timeout=timeout, debug=debug, ) self._client = httpx.AsyncClient( base_url=endpoint, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, timeout=timeout, ) async def __aenter__(self) -> "SynorCompute": return self async def __aexit__(self, *args: Any) -> None: await self.close() async def close(self) -> None: """Close the client.""" await self._client.aclose() async def matmul( self, a: np.ndarray | Tensor, b: np.ndarray | Tensor, precision: Optional[Precision] = None, processor: Optional[ProcessorType] = None, priority: TaskPriority = TaskPriority.NORMAL, ) -> JobResult: """ Matrix multiplication on distributed compute. Args: a: First matrix b: Second matrix precision: Compute precision processor: Preferred processor type priority: Task priority Returns: Job result with computed matrix """ tensor_a = Tensor.from_numpy(a) if isinstance(a, np.ndarray) else a tensor_b = Tensor.from_numpy(b) if isinstance(b, np.ndarray) else b job = await self.submit_job( "matmul", { "a": tensor_a.serialize(), "b": tensor_b.serialize(), "precision": (precision or self.config.precision).value, "processor": processor.value if processor else None, "priority": priority.value, }, ) return await job.wait() async def conv2d( self, input: np.ndarray | Tensor, kernel: np.ndarray | Tensor, stride: int | tuple[int, int] = 1, padding: int | tuple[int, int] = 0, precision: Optional[Precision] = None, ) -> JobResult: """ 2D Convolution on distributed compute. Args: input: Input tensor [batch, channels, height, width] kernel: Convolution kernel stride: Stride padding: Padding precision: Compute precision Returns: Job result with convolved tensor """ tensor_input = Tensor.from_numpy(input) if isinstance(input, np.ndarray) else input tensor_kernel = Tensor.from_numpy(kernel) if isinstance(kernel, np.ndarray) else kernel job = await self.submit_job( "conv2d", { "input": tensor_input.serialize(), "kernel": tensor_kernel.serialize(), "stride": stride, "padding": padding, "precision": (precision or self.config.precision).value, }, ) return await job.wait() async def attention( self, query: np.ndarray | Tensor, key: np.ndarray | Tensor, value: np.ndarray | Tensor, num_heads: int, flash: bool = True, precision: Optional[Precision] = None, ) -> JobResult: """ Self-attention on distributed compute. Args: query: Query tensor key: Key tensor value: Value tensor num_heads: Number of attention heads flash: Use FlashAttention precision: Compute precision Returns: Job result with attention output """ q = Tensor.from_numpy(query) if isinstance(query, np.ndarray) else query k = Tensor.from_numpy(key) if isinstance(key, np.ndarray) else key v = Tensor.from_numpy(value) if isinstance(value, np.ndarray) else value job = await self.submit_job( "flash_attention" if flash else "self_attention", { "query": q.serialize(), "key": k.serialize(), "value": v.serialize(), "num_heads": num_heads, "precision": (precision or self.config.precision).value, }, ) return await job.wait() async def inference( self, model: str, input: str | np.ndarray | Tensor, max_tokens: int = 256, temperature: float = 0.7, top_k: int = 50, strategy: Optional[BalancingStrategy] = None, stream: bool = False, ) -> JobResult: """ Run inference on a model. Args: model: Model name or CID input: Input prompt or tensor max_tokens: Maximum tokens to generate temperature: Sampling temperature top_k: Top-k sampling strategy: Balancing strategy stream: Stream responses Returns: Job result with inference output """ input_data: str | dict if isinstance(input, str): input_data = input elif isinstance(input, np.ndarray): input_data = Tensor.from_numpy(input).serialize() else: input_data = input.serialize() job = await self.submit_job( "inference", { "model": model, "input": input_data, "max_tokens": max_tokens, "temperature": temperature, "top_k": top_k, "strategy": (strategy or self.config.strategy).value, }, ) return await job.wait() async def get_pricing(self) -> list[PricingInfo]: """Get current pricing for all processor types.""" response = await self._request("GET", "/pricing") return [PricingInfo(**p) for p in response["pricing"]] async def get_processor_pricing(self, processor: ProcessorType) -> PricingInfo: """Get pricing for a specific processor type.""" response = await self._request("GET", f"/pricing/{processor.value}") return PricingInfo(**response) async def get_capacity(self) -> dict[str, Any]: """Get available compute capacity.""" return await self._request("GET", "/capacity") def set_strategy(self, strategy: BalancingStrategy) -> None: """Set the default balancing strategy.""" self.config.strategy = strategy async def submit_job(self, operation: str, params: dict[str, Any]) -> ComputeJob: """Submit a generic compute job.""" response = await self._request( "POST", "/jobs", { "operation": operation, "params": params, "strategy": self.config.strategy.value, }, ) return ComputeJob(response["job_id"], self) async def get_job_status(self, job_id: str) -> JobResult: """Get job status.""" response = await self._request("GET", f"/jobs/{job_id}") return JobResult(**response) async def cancel_job(self, job_id: str) -> None: """Cancel a job.""" await self._request("DELETE", f"/jobs/{job_id}") async def _request( self, method: str, path: str, data: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: """Make an API request.""" if self.config.debug: print(f"[SynorCompute] {method} {path}") response = await self._client.request( method, path, json=data, ) if response.status_code >= 400: error = response.json() if response.content else {"message": response.reason_phrase} raise SynorError( error.get("message", "Request failed"), response.status_code, ) return response.json() class SynorError(Exception): """Synor SDK error.""" def __init__(self, message: str, status_code: Optional[int] = None, code: Optional[str] = None): super().__init__(message) self.status_code = status_code self.code = code