//! Synor Compute client implementation. use crate::error::{Error, Result}; use crate::tensor::Tensor; use crate::types::*; use futures::stream::Stream; use reqwest::Client; use serde_json::{json, Value}; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; /// Synor Compute SDK client. pub struct SynorCompute { config: Config, client: Client, closed: Arc, } impl SynorCompute { /// Create a new client with an API key. pub fn new(api_key: impl Into) -> Self { Self::with_config(Config::new(api_key)) } /// Create a new client with configuration. pub fn with_config(config: Config) -> Self { let client = Client::builder() .timeout(std::time::Duration::from_secs(config.timeout_secs)) .build() .expect("Failed to create HTTP client"); Self { config, client, closed: Arc::new(AtomicBool::new(false)), } } // ==================== Matrix Operations ==================== /// Create a matrix multiplication request builder. pub fn matmul<'a>(&'a self, a: &'a Tensor, b: &'a Tensor) -> MatMulBuilder<'a> { MatMulBuilder::new(self, a, b) } /// Create a convolution request builder. pub fn conv2d<'a>(&'a self, input: &'a Tensor, kernel: &'a Tensor) -> Conv2dBuilder<'a> { Conv2dBuilder::new(self, input, kernel) } /// Create an attention request builder. pub fn attention<'a>( &'a self, query: &'a Tensor, key: &'a Tensor, value: &'a Tensor, ) -> AttentionBuilder<'a> { AttentionBuilder::new(self, query, key, value) } // ==================== LLM Inference ==================== /// Create an inference request builder. pub fn inference<'a>(&'a self, model: &'a str, prompt: &'a str) -> InferenceBuilder<'a> { InferenceBuilder::new(self, model, prompt) } /// Create a streaming inference request. pub async fn inference_stream( &self, model: &str, prompt: &str, ) -> Result> + Send>>> { self.check_closed()?; let body = json!({ "operation": "inference", "model": model, "prompt": prompt, "stream": true }); let response = self .client .post(format!("{}/inference/stream", self.config.base_url)) .header("Authorization", format!("Bearer {}", self.config.api_key)) .header("X-SDK-Version", format!("rust/{}", crate::VERSION)) .json(&body) .send() .await?; if !response.status().is_success() { return Err(Error::Api { status_code: response.status().as_u16(), message: response.text().await.unwrap_or_default(), }); } let stream = response.bytes_stream(); let mapped = futures::stream::unfold( (stream, String::new()), |(mut stream, mut buffer)| async move { use futures::StreamExt; loop { match stream.next().await { Some(Ok(chunk)) => { buffer.push_str(&String::from_utf8_lossy(&chunk)); while let Some(pos) = buffer.find('\n') { let line = buffer[..pos].to_string(); buffer = buffer[pos + 1..].to_string(); if line.starts_with("data: ") { let data = &line[6..]; if data == "[DONE]" { return None; } if let Ok(json) = serde_json::from_str::(data) { if let Some(token) = json.get("token").and_then(|t| t.as_str()) { return Some((Ok(token.to_string()), (stream, buffer))); } } } } } Some(Err(e)) => return Some((Err(Error::Http(e)), (stream, buffer))), None => return None, } } }, ); Ok(Box::pin(mapped)) } // ==================== Model Registry ==================== /// List available models. pub async fn list_models(&self, category: Option) -> Result> { self.check_closed()?; let url = match category { Some(c) => format!("{}/models?category={:?}", self.config.base_url, c), None => format!("{}/models", self.config.base_url), }; let response: Value = self.get(&url).await?; let models = response["models"] .as_array() .unwrap_or(&vec![]) .iter() .filter_map(|m| serde_json::from_value(m.clone()).ok()) .collect(); Ok(models) } /// Get model by ID. pub async fn get_model(&self, model_id: &str) -> Result { self.check_closed()?; let url = format!("{}/models/{}", self.config.base_url, model_id); self.get(&url).await } /// Search models. pub async fn search_models(&self, query: &str) -> Result> { self.check_closed()?; let url = format!("{}/models/search?q={}", self.config.base_url, query); let response: Value = self.get(&url).await?; let models = response["models"] .as_array() .unwrap_or(&vec![]) .iter() .filter_map(|m| serde_json::from_value(m.clone()).ok()) .collect(); Ok(models) } // ==================== Pricing & Usage ==================== /// Get pricing information. pub async fn get_pricing(&self) -> Result> { self.check_closed()?; let url = format!("{}/pricing", self.config.base_url); let response: Value = self.get(&url).await?; let pricing = response["pricing"] .as_array() .unwrap_or(&vec![]) .iter() .filter_map(|p| serde_json::from_value(p.clone()).ok()) .collect(); Ok(pricing) } /// Get usage statistics. pub async fn get_usage(&self) -> Result { self.check_closed()?; let url = format!("{}/usage", self.config.base_url); self.get(&url).await } // ==================== Health Check ==================== /// Check service health. pub async fn health_check(&self) -> bool { let url = format!("{}/health", self.config.base_url); match self.get::(&url).await { Ok(v) => v.get("status").and_then(|s| s.as_str()) == Some("healthy"), Err(_) => false, } } // ==================== Lifecycle ==================== /// Close the client. pub fn close(&self) { self.closed.store(true, Ordering::SeqCst); } /// Check if the client is closed. pub fn is_closed(&self) -> bool { self.closed.load(Ordering::SeqCst) } // ==================== Internal Methods ==================== fn check_closed(&self) -> Result<()> { if self.is_closed() { Err(Error::ClientClosed) } else { Ok(()) } } async fn get(&self, url: &str) -> Result { let response = self .client .get(url) .header("Authorization", format!("Bearer {}", self.config.api_key)) .header("X-SDK-Version", format!("rust/{}", crate::VERSION)) .send() .await?; if !response.status().is_success() { return Err(Error::Api { status_code: response.status().as_u16(), message: response.text().await.unwrap_or_default(), }); } Ok(response.json().await?) } async fn post(&self, path: &str, body: Value) -> Result { let response = self .client .post(format!("{}{}", self.config.base_url, path)) .header("Authorization", format!("Bearer {}", self.config.api_key)) .header("X-SDK-Version", format!("rust/{}", crate::VERSION)) .json(&body) .send() .await?; if !response.status().is_success() { return Err(Error::Api { status_code: response.status().as_u16(), message: response.text().await.unwrap_or_default(), }); } Ok(response.json().await?) } fn tensor_to_json(tensor: &Tensor) -> Value { json!({ "shape": tensor.shape(), "data": tensor.data(), "dtype": tensor.dtype().as_str() }) } } // ==================== Request Builders ==================== pub struct MatMulBuilder<'a> { client: &'a SynorCompute, a: &'a Tensor, b: &'a Tensor, options: MatMulOptions, } impl<'a> MatMulBuilder<'a> { fn new(client: &'a SynorCompute, a: &'a Tensor, b: &'a Tensor) -> Self { Self { client, a, b, options: MatMulOptions::default(), } } pub fn precision(mut self, precision: Precision) -> Self { self.options.precision = precision; self } pub fn processor(mut self, processor: ProcessorType) -> Self { self.options.processor = processor; self } pub fn priority(mut self, priority: Priority) -> Self { self.options.priority = priority; self } pub async fn send(self) -> Result> { self.client.check_closed()?; let body = json!({ "operation": "matmul", "a": SynorCompute::tensor_to_json(self.a), "b": SynorCompute::tensor_to_json(self.b), "precision": self.options.precision.as_str(), "processor": self.options.processor.as_str(), "priority": serde_json::to_value(&self.options.priority).unwrap() }); self.client.post("/compute", body).await } } pub struct Conv2dBuilder<'a> { client: &'a SynorCompute, input: &'a Tensor, kernel: &'a Tensor, options: Conv2dOptions, } impl<'a> Conv2dBuilder<'a> { fn new(client: &'a SynorCompute, input: &'a Tensor, kernel: &'a Tensor) -> Self { Self { client, input, kernel, options: Conv2dOptions::default(), } } pub fn stride(mut self, stride: (usize, usize)) -> Self { self.options.stride = stride; self } pub fn padding(mut self, padding: (usize, usize)) -> Self { self.options.padding = padding; self } pub async fn send(self) -> Result> { self.client.check_closed()?; let body = json!({ "operation": "conv2d", "input": SynorCompute::tensor_to_json(self.input), "kernel": SynorCompute::tensor_to_json(self.kernel), "stride": [self.options.stride.0, self.options.stride.1], "padding": [self.options.padding.0, self.options.padding.1], "precision": self.options.precision.as_str() }); self.client.post("/compute", body).await } } pub struct AttentionBuilder<'a> { client: &'a SynorCompute, query: &'a Tensor, key: &'a Tensor, value: &'a Tensor, options: AttentionOptions, } impl<'a> AttentionBuilder<'a> { fn new( client: &'a SynorCompute, query: &'a Tensor, key: &'a Tensor, value: &'a Tensor, ) -> Self { Self { client, query, key, value, options: AttentionOptions::default(), } } pub fn num_heads(mut self, num_heads: usize) -> Self { self.options.num_heads = num_heads; self } pub fn flash(mut self, flash: bool) -> Self { self.options.flash = flash; self } pub async fn send(self) -> Result> { self.client.check_closed()?; let body = json!({ "operation": "attention", "query": SynorCompute::tensor_to_json(self.query), "key": SynorCompute::tensor_to_json(self.key), "value": SynorCompute::tensor_to_json(self.value), "num_heads": self.options.num_heads, "flash": self.options.flash, "precision": self.options.precision.as_str() }); self.client.post("/compute", body).await } } pub struct InferenceBuilder<'a> { client: &'a SynorCompute, model: &'a str, prompt: &'a str, options: InferenceOptions, } impl<'a> InferenceBuilder<'a> { fn new(client: &'a SynorCompute, model: &'a str, prompt: &'a str) -> Self { Self { client, model, prompt, options: InferenceOptions::default(), } } pub fn max_tokens(mut self, max_tokens: usize) -> Self { self.options.max_tokens = max_tokens; self } pub fn temperature(mut self, temperature: f64) -> Self { self.options.temperature = temperature; self } pub fn top_p(mut self, top_p: f64) -> Self { self.options.top_p = top_p; self } pub fn top_k(mut self, top_k: usize) -> Self { self.options.top_k = top_k; self } pub fn processor(mut self, processor: ProcessorType) -> Self { self.options.processor = Some(processor); self } pub async fn send(self) -> Result> { self.client.check_closed()?; let mut body = json!({ "operation": "inference", "model": self.model, "prompt": self.prompt, "max_tokens": self.options.max_tokens, "temperature": self.options.temperature, "top_p": self.options.top_p, "top_k": self.options.top_k }); if let Some(processor) = &self.options.processor { body["processor"] = json!(processor.as_str()); } self.client.post("/inference", body).await } }