diff --git a/sdk/go/go.mod b/sdk/go/go.mod new file mode 100644 index 0000000..1b08db0 --- /dev/null +++ b/sdk/go/go.mod @@ -0,0 +1,7 @@ +module github.com/synor/synor/sdk/go + +go 1.21 + +require ( + github.com/goccy/go-json v0.10.2 +) diff --git a/sdk/go/synor.go b/sdk/go/synor.go new file mode 100644 index 0000000..7558735 --- /dev/null +++ b/sdk/go/synor.go @@ -0,0 +1,352 @@ +// Package synor provides a Go SDK for Synor Compute. +// +// Access distributed heterogeneous compute resources (CPU, GPU, TPU, NPU, LPU) +// for AI/ML workloads at 90% cost reduction compared to traditional cloud. +// +// Example: +// +// client := synor.NewClient("your-api-key") +// result, err := client.MatMul(ctx, a, b, synor.FP16) +package synor + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +// Version of the SDK. +const Version = "0.1.0" + +// DefaultEndpoint is the default API endpoint. +const DefaultEndpoint = "https://compute.synor.cc/api/v1" + +// ProcessorType represents supported processor types. +type ProcessorType string + +const ( + CPU ProcessorType = "cpu" + GPU ProcessorType = "gpu" + TPU ProcessorType = "tpu" + NPU ProcessorType = "npu" + LPU ProcessorType = "lpu" + FPGA ProcessorType = "fpga" + WASM ProcessorType = "wasm" + WebGPU ProcessorType = "webgpu" +) + +// Precision represents computation precision levels. +type Precision string + +const ( + FP64 Precision = "fp64" + FP32 Precision = "fp32" + FP16 Precision = "fp16" + BF16 Precision = "bf16" + INT8 Precision = "int8" + INT4 Precision = "int4" +) + +// BalancingStrategy represents job scheduling strategies. +type BalancingStrategy string + +const ( + Speed BalancingStrategy = "speed" + Cost BalancingStrategy = "cost" + Energy BalancingStrategy = "energy" + Latency BalancingStrategy = "latency" + Balanced BalancingStrategy = "balanced" +) + +// JobStatus represents job states. +type JobStatus string + +const ( + Pending JobStatus = "pending" + Queued JobStatus = "queued" + Running JobStatus = "running" + Completed JobStatus = "completed" + Failed JobStatus = "failed" + Cancelled JobStatus = "cancelled" +) + +// Config holds client configuration. +type Config struct { + APIKey string + Endpoint string + Strategy BalancingStrategy + Precision Precision + Timeout time.Duration + Debug bool +} + +// DefaultConfig returns a default configuration. +func DefaultConfig(apiKey string) Config { + return Config{ + APIKey: apiKey, + Endpoint: DefaultEndpoint, + Strategy: Balanced, + Precision: FP32, + Timeout: 30 * time.Second, + Debug: false, + } +} + +// Client is the main Synor Compute client. +type Client struct { + config Config + httpClient *http.Client +} + +// NewClient creates a new client with the given API key. +func NewClient(apiKey string) *Client { + return NewClientWithConfig(DefaultConfig(apiKey)) +} + +// NewClientWithConfig creates a new client with custom configuration. +func NewClientWithConfig(config Config) *Client { + return &Client{ + config: config, + httpClient: &http.Client{ + Timeout: config.Timeout, + }, + } +} + +// JobResult represents the result of a job. +type JobResult struct { + JobID string `json:"job_id"` + Status JobStatus `json:"status"` + Data interface{} `json:"data,omitempty"` + Error string `json:"error,omitempty"` + Metrics *JobMetrics `json:"metrics,omitempty"` +} + +// JobMetrics contains execution metrics. +type JobMetrics struct { + ExecutionTimeMs float64 `json:"execution_time_ms"` + QueueTimeMs float64 `json:"queue_time_ms"` + ProcessorType ProcessorType `json:"processor_type"` + ProcessorID string `json:"processor_id"` + FLOPS float64 `json:"flops"` + MemoryBytes int64 `json:"memory_bytes"` + CostMicro int64 `json:"cost_micro"` + EnergyMJ float64 `json:"energy_mj"` +} + +// Tensor represents a multi-dimensional array. +type Tensor struct { + Data []float32 `json:"data"` + Shape []int `json:"shape"` + DType Precision `json:"dtype"` +} + +// NewTensor creates a new tensor from data and shape. +func NewTensor(data []float32, shape []int, dtype Precision) *Tensor { + return &Tensor{ + Data: data, + Shape: shape, + DType: dtype, + } +} + +// Zeros creates a tensor of zeros. +func Zeros(shape []int, dtype Precision) *Tensor { + size := 1 + for _, dim := range shape { + size *= dim + } + return &Tensor{ + Data: make([]float32, size), + Shape: shape, + DType: dtype, + } +} + +// Serialize converts tensor to transmission format. +func (t *Tensor) Serialize() map[string]interface{} { + buf := new(bytes.Buffer) + for _, v := range t.Data { + var b [4]byte + *(*float32)((&b[0])) = v + buf.Write(b[:]) + } + return map[string]interface{}{ + "data": base64.StdEncoding.EncodeToString(buf.Bytes()), + "shape": t.Shape, + "dtype": t.DType, + } +} + +// MatMul performs matrix multiplication. +func (c *Client) MatMul(ctx context.Context, a, b *Tensor, precision Precision) (*JobResult, error) { + job, err := c.SubmitJob(ctx, "matmul", map[string]interface{}{ + "a": a.Serialize(), + "b": b.Serialize(), + "precision": precision, + }) + if err != nil { + return nil, err + } + return job.Wait(ctx) +} + +// Inference runs model inference. +func (c *Client) Inference(ctx context.Context, model, input string, maxTokens int) (*JobResult, error) { + job, err := c.SubmitJob(ctx, "inference", map[string]interface{}{ + "model": model, + "input": input, + "max_tokens": maxTokens, + "strategy": c.config.Strategy, + }) + if err != nil { + return nil, err + } + return job.Wait(ctx) +} + +// Job represents a submitted compute job. +type Job struct { + ID string + client *Client + status JobStatus +} + +// SubmitJob submits a new compute job. +func (c *Client) SubmitJob(ctx context.Context, operation string, params map[string]interface{}) (*Job, error) { + body := map[string]interface{}{ + "operation": operation, + "params": params, + "strategy": c.config.Strategy, + } + + var resp struct { + JobID string `json:"job_id"` + } + + if err := c.request(ctx, "POST", "/jobs", body, &resp); err != nil { + return nil, err + } + + return &Job{ + ID: resp.JobID, + client: c, + status: Pending, + }, nil +} + +// Wait waits for the job to complete. +func (j *Job) Wait(ctx context.Context) (*JobResult, error) { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + result, err := j.client.GetJobStatus(ctx, j.ID) + if err != nil { + return nil, err + } + if result.Status == Completed || result.Status == Failed || result.Status == Cancelled { + return result, nil + } + } + } +} + +// GetJobStatus gets the status of a job. +func (c *Client) GetJobStatus(ctx context.Context, jobID string) (*JobResult, error) { + var result JobResult + if err := c.request(ctx, "GET", "/jobs/"+jobID, nil, &result); err != nil { + return nil, err + } + return &result, nil +} + +// CancelJob cancels a running job. +func (c *Client) CancelJob(ctx context.Context, jobID string) error { + return c.request(ctx, "DELETE", "/jobs/"+jobID, nil, nil) +} + +// PricingInfo contains pricing information. +type PricingInfo struct { + ProcessorType ProcessorType `json:"processor_type"` + SpotPrice float64 `json:"spot_price"` + AvgPrice24h float64 `json:"avg_price_24h"` + AWSEquivalent float64 `json:"aws_equivalent"` + SavingsPercent float64 `json:"savings_percent"` +} + +// GetPricing gets current pricing for all processor types. +func (c *Client) GetPricing(ctx context.Context) ([]PricingInfo, error) { + var resp struct { + Pricing []PricingInfo `json:"pricing"` + } + if err := c.request(ctx, "GET", "/pricing", nil, &resp); err != nil { + return nil, err + } + return resp.Pricing, nil +} + +func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) error { + url := c.config.Endpoint + path + + var bodyReader io.Reader + if body != nil { + bodyBytes, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("failed to marshal request: %w", err) + } + bodyReader = bytes.NewReader(bodyBytes) + } + + req, err := http.NewRequestWithContext(ctx, method, url, bodyReader) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Authorization", "Bearer "+c.config.APIKey) + req.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + var errResp struct { + Message string `json:"message"` + } + json.NewDecoder(resp.Body).Decode(&errResp) + return &SynorError{ + Message: errResp.Message, + StatusCode: resp.StatusCode, + } + } + + if result != nil { + if err := json.NewDecoder(resp.Body).Decode(result); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + } + + return nil +} + +// SynorError represents an API error. +type SynorError struct { + Message string + StatusCode int +} + +func (e *SynorError) Error() string { + return fmt.Sprintf("synor: %s (status %d)", e.Message, e.StatusCode) +} diff --git a/sdk/js/package.json b/sdk/js/package.json new file mode 100644 index 0000000..e7522cf --- /dev/null +++ b/sdk/js/package.json @@ -0,0 +1,48 @@ +{ + "name": "@synor/compute-sdk", + "version": "0.1.0", + "description": "Synor Compute SDK for browser and Node.js - Access distributed GPU/TPU/NPU compute", + "main": "dist/index.js", + "module": "dist/index.mjs", + "types": "dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.mjs", + "require": "./dist/index.js" + } + }, + "files": [ + "dist" + ], + "scripts": { + "build": "tsup src/index.ts --format cjs,esm --dts", + "dev": "tsup src/index.ts --format cjs,esm --dts --watch", + "test": "vitest", + "lint": "biome check src/", + "typecheck": "tsc --noEmit" + }, + "keywords": [ + "synor", + "compute", + "gpu", + "tpu", + "npu", + "ai", + "ml", + "distributed-computing", + "heterogeneous-compute" + ], + "author": "Synor Team", + "license": "MIT", + "repository": { + "type": "git", + "url": "https://github.com/synor/synor" + }, + "devDependencies": { + "@types/node": "^20.10.0", + "tsup": "^8.0.0", + "typescript": "^5.3.0", + "vitest": "^1.0.0" + } +} diff --git a/sdk/js/src/client.ts b/sdk/js/src/client.ts new file mode 100644 index 0000000..778e0a5 --- /dev/null +++ b/sdk/js/src/client.ts @@ -0,0 +1,228 @@ +/** + * Synor Compute Client + */ + +import type { + SynorConfig, + BalancingStrategy, + MatMulOptions, + Conv2dOptions, + AttentionOptions, + InferenceOptions, + JobResult, + PricingInfo, + ProcessorType, +} from './types'; +import { Tensor } from './tensor'; +import { ComputeJob } from './job'; + +const DEFAULT_ENDPOINT = 'https://compute.synor.cc/api/v1'; + +/** + * Main Synor Compute client. + * + * @example + * ```ts + * const compute = new SynorCompute({ apiKey: 'sk_...' }); + * + * // Run a matrix multiplication + * const result = await compute.matmul({ + * a: [[1, 2], [3, 4]], + * b: [[5, 6], [7, 8]] + * }); + * ``` + */ +export class SynorCompute { + private config: Required; + + constructor(config: SynorConfig) { + this.config = { + apiKey: config.apiKey, + endpoint: config.endpoint ?? DEFAULT_ENDPOINT, + strategy: config.strategy ?? 'balanced', + precision: config.precision ?? 'fp32', + timeout: config.timeout ?? 30000, + debug: config.debug ?? false, + }; + } + + /** + * Matrix multiplication on distributed compute. + */ + async matmul(options: MatMulOptions): Promise> { + const job = await this.submitJob('matmul', { + a: Tensor.from(options.a).serialize(), + b: Tensor.from(options.b).serialize(), + precision: options.precision ?? this.config.precision, + processor: options.processor, + priority: options.priority ?? 'normal', + }); + + return job.wait(); + } + + /** + * 2D Convolution on distributed compute. + */ + async conv2d(options: Conv2dOptions): Promise> { + const job = await this.submitJob('conv2d', { + input: Tensor.from(options.input).serialize(), + kernel: Tensor.from(options.kernel).serialize(), + stride: options.stride ?? 1, + padding: options.padding ?? 0, + precision: options.precision ?? this.config.precision, + }); + + return job.wait(); + } + + /** + * Self-attention on distributed compute. + */ + async attention(options: AttentionOptions): Promise> { + const job = await this.submitJob(options.flash ? 'flash_attention' : 'self_attention', { + query: Tensor.from(options.query).serialize(), + key: Tensor.from(options.key).serialize(), + value: Tensor.from(options.value).serialize(), + num_heads: options.numHeads, + precision: options.precision ?? this.config.precision, + }); + + return job.wait(); + } + + /** + * Run inference on a model. + */ + async inference(options: InferenceOptions): Promise> { + const job = await this.submitJob('inference', { + model: options.model, + input: typeof options.input === 'string' + ? options.input + : Tensor.from(options.input).serialize(), + max_tokens: options.maxTokens ?? 256, + temperature: options.temperature ?? 0.7, + top_k: options.topK ?? 50, + strategy: options.strategy ?? this.config.strategy, + }); + + if (options.stream) { + return this.streamJob(job); + } + + return job.wait(); + } + + /** + * Get current pricing for different processor types. + */ + async getPricing(): Promise { + const response = await this.request('GET', '/pricing'); + return response.pricing; + } + + /** + * Get pricing for a specific processor type. + */ + async getProcessorPricing(processor: ProcessorType): Promise { + const response = await this.request('GET', `/pricing/${processor}`); + return response; + } + + /** + * Get available compute capacity. + */ + async getCapacity(): Promise<{ + totalGflops: number; + availableGflops: number; + processorCounts: Record; + }> { + const response = await this.request('GET', '/capacity'); + return response; + } + + /** + * Set the default balancing strategy. + */ + setStrategy(strategy: BalancingStrategy): void { + this.config.strategy = strategy; + } + + /** + * Submit a generic compute job. + */ + async submitJob(operation: string, params: Record): Promise { + const response = await this.request('POST', '/jobs', { + operation, + params, + strategy: this.config.strategy, + }); + + return new ComputeJob(response.job_id, this); + } + + /** + * Get job status. + */ + async getJobStatus(jobId: string): Promise { + const response = await this.request('GET', `/jobs/${jobId}`); + return response; + } + + /** + * Cancel a job. + */ + async cancelJob(jobId: string): Promise { + await this.request('DELETE', `/jobs/${jobId}`); + } + + /** + * Stream job results. + */ + private async streamJob(job: ComputeJob): Promise> { + // In a full implementation, this would use SSE or WebSocket + return job.wait(); + } + + /** + * Make an API request. + */ + async request(method: string, path: string, body?: unknown): Promise { + const url = `${this.config.endpoint}${path}`; + + if (this.config.debug) { + console.log(`[SynorCompute] ${method} ${url}`); + } + + const response = await fetch(url, { + method, + headers: { + 'Authorization': `Bearer ${this.config.apiKey}`, + 'Content-Type': 'application/json', + }, + body: body ? JSON.stringify(body) : undefined, + signal: AbortSignal.timeout(this.config.timeout), + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ message: response.statusText })); + throw new SynorError(error.message || 'Request failed', response.status); + } + + return response.json(); + } +} + +/** + * Synor SDK error. + */ +export class SynorError extends Error { + constructor( + message: string, + public statusCode?: number, + public code?: string + ) { + super(message); + this.name = 'SynorError'; + } +} diff --git a/sdk/js/src/index.ts b/sdk/js/src/index.ts new file mode 100644 index 0000000..ba260d1 --- /dev/null +++ b/sdk/js/src/index.ts @@ -0,0 +1,39 @@ +/** + * Synor Compute SDK + * + * Access distributed heterogeneous compute resources (CPU, GPU, TPU, NPU, LPU) + * for AI/ML workloads at 90% cost reduction compared to traditional cloud. + * + * @example + * ```ts + * import { SynorCompute } from '@synor/compute-sdk'; + * + * const compute = new SynorCompute({ apiKey: 'your-api-key' }); + * + * // Run matrix multiplication on GPUs + * const result = await compute.matmul({ + * a: tensorA, + * b: tensorB, + * precision: 'fp16' + * }); + * + * // Run inference on best available hardware + * const output = await compute.inference({ + * model: 'llama-70b', + * prompt: 'Hello, world!', + * strategy: 'latency' // or 'cost', 'energy' + * }); + * ``` + */ + +// Re-export all types +export * from './types'; +export * from './client'; +export * from './tensor'; +export * from './job'; + +// Main client +export { SynorCompute } from './client'; + +// Version +export const VERSION = '0.1.0'; diff --git a/sdk/js/src/job.ts b/sdk/js/src/job.ts new file mode 100644 index 0000000..16dd12d --- /dev/null +++ b/sdk/js/src/job.ts @@ -0,0 +1,255 @@ +/** + * Compute Job management for Synor Compute SDK + */ + +import type { JobResult, JobStatus, JobMetrics } from './types'; +import type { SynorCompute } from './client'; + +const POLL_INTERVAL_MS = 500; +const MAX_POLL_ATTEMPTS = 120; // 60 seconds max + +/** + * Represents a compute job on the Synor network. + */ +export class ComputeJob { + readonly jobId: string; + private client: SynorCompute; + private status: JobStatus = 'pending'; + private result?: T; + private error?: string; + private metrics?: JobMetrics; + private callbacks: Map) => void)[]> = new Map(); + + constructor(jobId: string, client: SynorCompute) { + this.jobId = jobId; + this.client = client; + } + + /** + * Get current job status. + */ + getStatus(): JobStatus { + return this.status; + } + + /** + * Check if job is complete (success or failure). + */ + isComplete(): boolean { + return this.status === 'completed' || this.status === 'failed' || this.status === 'cancelled'; + } + + /** + * Wait for job to complete. + */ + async wait(options?: { timeout?: number; pollInterval?: number }): Promise> { + const timeout = options?.timeout ?? MAX_POLL_ATTEMPTS * POLL_INTERVAL_MS; + const pollInterval = options?.pollInterval ?? POLL_INTERVAL_MS; + const maxAttempts = Math.ceil(timeout / pollInterval); + + let attempts = 0; + while (!this.isComplete() && attempts < maxAttempts) { + await this.refresh(); + if (!this.isComplete()) { + await sleep(pollInterval); + } + attempts++; + } + + if (!this.isComplete()) { + throw new JobTimeoutError(this.jobId, timeout); + } + + return { + jobId: this.jobId, + status: this.status, + data: this.result, + error: this.error, + metrics: this.metrics, + }; + } + + /** + * Refresh job status from server. + */ + async refresh(): Promise { + const response = await this.client.getJobStatus(this.jobId); + const previousStatus = this.status; + + this.status = response.status; + this.result = response.data as T; + this.error = response.error; + this.metrics = response.metrics; + + // Trigger callbacks if status changed + if (previousStatus !== this.status) { + this.triggerCallbacks(this.status); + } + } + + /** + * Cancel the job. + */ + async cancel(): Promise { + await this.client.cancelJob(this.jobId); + this.status = 'cancelled'; + this.triggerCallbacks('cancelled'); + } + + /** + * Register a callback for status changes. + */ + on(status: JobStatus, callback: (job: ComputeJob) => void): this { + const callbacks = this.callbacks.get(status) ?? []; + callbacks.push(callback); + this.callbacks.set(status, callbacks); + return this; + } + + /** + * Get the result (throws if not complete or failed). + */ + getResult(): T { + if (this.status !== 'completed') { + throw new Error(`Job ${this.jobId} is not completed (status: ${this.status})`); + } + if (this.result === undefined) { + throw new Error(`Job ${this.jobId} has no result`); + } + return this.result; + } + + /** + * Get error message (if failed). + */ + getError(): string | undefined { + return this.error; + } + + /** + * Get execution metrics (if available). + */ + getMetrics(): JobMetrics | undefined { + return this.metrics; + } + + /** + * Convert to JSON-serializable object. + */ + toJSON(): JobResult { + return { + jobId: this.jobId, + status: this.status, + data: this.result, + error: this.error, + metrics: this.metrics, + }; + } + + private triggerCallbacks(status: JobStatus): void { + const callbacks = this.callbacks.get(status) ?? []; + for (const callback of callbacks) { + try { + callback(this); + } catch (err) { + console.error(`Error in job callback for status ${status}:`, err); + } + } + } +} + +/** + * Error thrown when a job times out. + */ +export class JobTimeoutError extends Error { + constructor( + public jobId: string, + public timeout: number + ) { + super(`Job ${jobId} timed out after ${timeout}ms`); + this.name = 'JobTimeoutError'; + } +} + +/** + * Error thrown when a job fails. + */ +export class JobFailedError extends Error { + constructor( + public jobId: string, + public reason: string + ) { + super(`Job ${jobId} failed: ${reason}`); + this.name = 'JobFailedError'; + } +} + +/** + * Utility to sleep for a given duration. + */ +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +/** + * Batch multiple jobs together for efficient execution. + */ +export class JobBatch { + private jobs: ComputeJob[] = []; + + /** + * Add a job to the batch. + */ + add(job: ComputeJob): this { + this.jobs.push(job); + return this; + } + + /** + * Get all jobs in the batch. + */ + getJobs(): ComputeJob[] { + return [...this.jobs]; + } + + /** + * Wait for all jobs to complete. + */ + async waitAll(options?: { timeout?: number }): Promise[]> { + return Promise.all(this.jobs.map(job => job.wait(options))); + } + + /** + * Wait for any job to complete. + */ + async waitAny(options?: { timeout?: number }): Promise> { + return Promise.race(this.jobs.map(job => job.wait(options))); + } + + /** + * Cancel all jobs. + */ + async cancelAll(): Promise { + await Promise.all(this.jobs.map(job => job.cancel())); + } + + /** + * Get count of jobs by status. + */ + getStatusCounts(): Record { + const counts: Record = { + pending: 0, + queued: 0, + running: 0, + completed: 0, + failed: 0, + cancelled: 0, + }; + + for (const job of this.jobs) { + counts[job.getStatus()]++; + } + + return counts; + } +} diff --git a/sdk/js/src/tensor.ts b/sdk/js/src/tensor.ts new file mode 100644 index 0000000..1b75216 --- /dev/null +++ b/sdk/js/src/tensor.ts @@ -0,0 +1,284 @@ +/** + * Tensor utilities for Synor Compute SDK + */ + +import type { TensorData, TensorShape, Precision } from './types'; + +/** + * Tensor wrapper for compute operations. + */ +export class Tensor { + readonly data: Float32Array | Float64Array | Int32Array | Int8Array | Uint8Array; + readonly shape: number[]; + readonly dtype: Precision; + + private constructor( + data: Float32Array | Float64Array | Int32Array | Int8Array | Uint8Array, + shape: number[], + dtype: Precision + ) { + this.data = data; + this.shape = shape; + this.dtype = dtype; + } + + /** + * Create a tensor from various input formats. + */ + static from(data: TensorData, dtype: Precision = 'fp32'): Tensor { + if (data instanceof Float32Array) { + return new Tensor(data, [data.length], 'fp32'); + } + if (data instanceof Float64Array) { + return new Tensor(data, [data.length], 'fp64'); + } + if (data instanceof Int32Array) { + return new Tensor(data, [data.length], 'int8'); + } + if (data instanceof Int8Array) { + return new Tensor(data, [data.length], 'int8'); + } + if (data instanceof Uint8Array) { + return new Tensor(data, [data.length], 'int8'); + } + + // Handle nested arrays + const shape = Tensor.inferShape(data); + const flat = Tensor.flatten(data); + + const typedArray = dtype === 'fp64' + ? new Float64Array(flat) + : dtype === 'fp32' + ? new Float32Array(flat) + : dtype === 'int8' || dtype === 'int4' + ? new Int8Array(flat) + : new Float32Array(flat); + + return new Tensor(typedArray, shape, dtype); + } + + /** + * Create a tensor of zeros. + */ + static zeros(shape: number[], dtype: Precision = 'fp32'): Tensor { + const size = shape.reduce((a, b) => a * b, 1); + const data = dtype === 'fp64' + ? new Float64Array(size) + : dtype === 'fp32' + ? new Float32Array(size) + : new Int8Array(size); + return new Tensor(data, shape, dtype); + } + + /** + * Create a tensor of ones. + */ + static ones(shape: number[], dtype: Precision = 'fp32'): Tensor { + const size = shape.reduce((a, b) => a * b, 1); + const data = dtype === 'fp64' + ? new Float64Array(size).fill(1) + : dtype === 'fp32' + ? new Float32Array(size).fill(1) + : new Int8Array(size).fill(1); + return new Tensor(data, shape, dtype); + } + + /** + * Create a tensor with random values. + */ + static random(shape: number[], dtype: Precision = 'fp32'): Tensor { + const size = shape.reduce((a, b) => a * b, 1); + const data = dtype === 'fp64' + ? Float64Array.from({ length: size }, () => Math.random()) + : dtype === 'fp32' + ? Float32Array.from({ length: size }, () => Math.random()) + : Int8Array.from({ length: size }, () => Math.floor(Math.random() * 256) - 128); + return new Tensor(data, shape, dtype); + } + + /** + * Create a tensor with normal distribution. + */ + static randn(shape: number[], dtype: Precision = 'fp32'): Tensor { + const size = shape.reduce((a, b) => a * b, 1); + const values: number[] = []; + + for (let i = 0; i < size; i++) { + // Box-Muller transform + const u1 = Math.random(); + const u2 = Math.random(); + values.push(Math.sqrt(-2 * Math.log(u1)) * Math.cos(2 * Math.PI * u2)); + } + + const data = dtype === 'fp64' + ? new Float64Array(values) + : dtype === 'fp32' + ? new Float32Array(values) + : new Int8Array(values.map(v => Math.max(-128, Math.min(127, Math.round(v * 50))))); + + return new Tensor(data, shape, dtype); + } + + /** + * Get tensor size (total elements). + */ + get size(): number { + return this.shape.reduce((a, b) => a * b, 1); + } + + /** + * Get number of dimensions. + */ + get ndim(): number { + return this.shape.length; + } + + /** + * Get byte size. + */ + get byteSize(): number { + const bytesPerElement = this.dtype === 'fp64' ? 8 + : this.dtype === 'fp32' ? 4 + : this.dtype === 'fp16' || this.dtype === 'bf16' ? 2 + : 1; + return this.size * bytesPerElement; + } + + /** + * Reshape tensor. + */ + reshape(newShape: number[]): Tensor { + const newSize = newShape.reduce((a, b) => a * b, 1); + if (newSize !== this.size) { + throw new Error(`Cannot reshape tensor of size ${this.size} to shape [${newShape}]`); + } + return new Tensor(this.data, newShape, this.dtype); + } + + /** + * Convert to different precision. + */ + to(dtype: Precision): Tensor { + if (dtype === this.dtype) return this; + + const newData = dtype === 'fp64' + ? Float64Array.from(this.data) + : dtype === 'fp32' + ? Float32Array.from(this.data) + : Int8Array.from(this.data); + + return new Tensor(newData, this.shape, dtype); + } + + /** + * Get value at index. + */ + get(...indices: number[]): number { + const idx = this.flatIndex(indices); + return this.data[idx]; + } + + /** + * Set value at index. + */ + set(value: number, ...indices: number[]): void { + const idx = this.flatIndex(indices); + this.data[idx] = value; + } + + /** + * Convert to nested array. + */ + toArray(): number[] | number[][] | number[][][] { + if (this.ndim === 1) { + return Array.from(this.data); + } + + const result: any[] = []; + const innerSize = this.shape.slice(1).reduce((a, b) => a * b, 1); + + for (let i = 0; i < this.shape[0]; i++) { + const slice = new Tensor( + this.data.slice(i * innerSize, (i + 1) * innerSize) as Float32Array, + this.shape.slice(1), + this.dtype + ); + result.push(slice.toArray()); + } + + return result; + } + + /** + * Serialize tensor for transmission. + */ + serialize(): { data: string; shape: number[]; dtype: Precision } { + // Convert to base64 for efficient transmission + const bytes = new Uint8Array(this.data.buffer); + const base64 = btoa(String.fromCharCode(...bytes)); + + return { + data: base64, + shape: this.shape, + dtype: this.dtype, + }; + } + + /** + * Deserialize tensor from transmission format. + */ + static deserialize(serialized: { data: string; shape: number[]; dtype: Precision }): Tensor { + const binary = atob(serialized.data); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < binary.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + + const data = serialized.dtype === 'fp64' + ? new Float64Array(bytes.buffer) + : serialized.dtype === 'fp32' + ? new Float32Array(bytes.buffer) + : new Int8Array(bytes.buffer); + + return new Tensor(data, serialized.shape, serialized.dtype); + } + + /** + * Calculate flat index from multi-dimensional indices. + */ + private flatIndex(indices: number[]): number { + if (indices.length !== this.ndim) { + throw new Error(`Expected ${this.ndim} indices, got ${indices.length}`); + } + + let idx = 0; + let stride = 1; + for (let i = this.ndim - 1; i >= 0; i--) { + if (indices[i] < 0 || indices[i] >= this.shape[i]) { + throw new Error(`Index ${indices[i]} out of bounds for axis ${i} with size ${this.shape[i]}`); + } + idx += indices[i] * stride; + stride *= this.shape[i]; + } + return idx; + } + + /** + * Infer shape from nested array. + */ + private static inferShape(data: number[] | number[][]): number[] { + if (!Array.isArray(data)) return []; + if (!Array.isArray(data[0])) return [data.length]; + return [data.length, ...Tensor.inferShape(data[0] as number[] | number[][])]; + } + + /** + * Flatten nested array. + */ + private static flatten(data: number[] | number[][]): number[] { + if (!Array.isArray(data)) return [data as unknown as number]; + return (data as any[]).flatMap(item => + Array.isArray(item) ? Tensor.flatten(item) : item + ); + } +} diff --git a/sdk/js/src/types.ts b/sdk/js/src/types.ts new file mode 100644 index 0000000..5f27f3e --- /dev/null +++ b/sdk/js/src/types.ts @@ -0,0 +1,198 @@ +/** + * Synor Compute SDK Types + */ + +/** Supported processor types */ +export type ProcessorType = + | 'cpu' + | 'gpu' + | 'tpu' + | 'npu' + | 'lpu' + | 'fpga' + | 'wasm' + | 'webgpu'; + +/** Precision levels for compute operations */ +export type Precision = 'fp64' | 'fp32' | 'fp16' | 'bf16' | 'int8' | 'int4'; + +/** Balancing strategy for job scheduling */ +export type BalancingStrategy = + | 'speed' // Minimize execution time + | 'cost' // Minimize cost (prefer consumer devices) + | 'energy' // Minimize energy consumption + | 'latency' // Minimize latency (for inference) + | 'balanced'; // Balance all factors + +/** Task priority levels */ +export type TaskPriority = 'critical' | 'high' | 'normal' | 'background'; + +/** Job status */ +export type JobStatus = + | 'pending' + | 'queued' + | 'running' + | 'completed' + | 'failed' + | 'cancelled'; + +/** SDK configuration */ +export interface SynorConfig { + /** API key for authentication */ + apiKey: string; + /** API endpoint (defaults to production) */ + endpoint?: string; + /** Default balancing strategy */ + strategy?: BalancingStrategy; + /** Default precision */ + precision?: Precision; + /** Request timeout in milliseconds */ + timeout?: number; + /** Enable debug logging */ + debug?: boolean; +} + +/** Tensor shape descriptor */ +export interface TensorShape { + /** Dimensions */ + dims: number[]; + /** Data type */ + dtype: Precision; +} + +/** Tensor data (can be typed array or nested array) */ +export type TensorData = + | Float64Array + | Float32Array + | Int32Array + | Int16Array + | Int8Array + | Uint8Array + | number[] + | number[][]; + +/** Matrix multiplication options */ +export interface MatMulOptions { + /** First matrix */ + a: TensorData; + /** Second matrix */ + b: TensorData; + /** Precision */ + precision?: Precision; + /** Preferred processor type */ + processor?: ProcessorType; + /** Task priority */ + priority?: TaskPriority; +} + +/** Convolution options */ +export interface Conv2dOptions { + /** Input tensor [batch, channels, height, width] */ + input: TensorData; + /** Kernel/filter tensor */ + kernel: TensorData; + /** Stride */ + stride?: number | [number, number]; + /** Padding */ + padding?: number | [number, number]; + /** Precision */ + precision?: Precision; +} + +/** Attention options */ +export interface AttentionOptions { + /** Query tensor */ + query: TensorData; + /** Key tensor */ + key: TensorData; + /** Value tensor */ + value: TensorData; + /** Number of attention heads */ + numHeads: number; + /** Use flash attention */ + flash?: boolean; + /** Precision */ + precision?: Precision; +} + +/** Inference options */ +export interface InferenceOptions { + /** Model name or CID */ + model: string; + /** Input prompt or data */ + input: string | TensorData; + /** Maximum tokens to generate */ + maxTokens?: number; + /** Temperature for sampling */ + temperature?: number; + /** Top-k sampling */ + topK?: number; + /** Balancing strategy */ + strategy?: BalancingStrategy; + /** Stream responses */ + stream?: boolean; +} + +/** Job submission result */ +export interface JobResult { + /** Job ID */ + jobId: string; + /** Job status */ + status: JobStatus; + /** Result data (if completed) */ + data?: T; + /** Error message (if failed) */ + error?: string; + /** Execution metrics */ + metrics?: JobMetrics; +} + +/** Job execution metrics */ +export interface JobMetrics { + /** Total execution time (ms) */ + executionTimeMs: number; + /** Queue wait time (ms) */ + queueTimeMs: number; + /** Processor type used */ + processorType: ProcessorType; + /** Processor ID */ + processorId: string; + /** Estimated FLOPS */ + flops: number; + /** Memory used (bytes) */ + memoryBytes: number; + /** Cost in microcredits */ + costMicro: number; + /** Energy consumed (mJ) */ + energyMj: number; +} + +/** Spot market order */ +export interface SpotOrder { + /** Order ID */ + orderId: string; + /** Order type */ + type: 'bid' | 'ask'; + /** Processor type */ + processorType: ProcessorType; + /** Price in microcredits per GFLOP */ + priceMicro: number; + /** Quantity (GFLOPS) */ + quantity: number; + /** Region preference */ + region?: string; +} + +/** Compute pricing info */ +export interface PricingInfo { + /** Processor type */ + processorType: ProcessorType; + /** Current spot price (microcredits/GFLOP) */ + spotPrice: number; + /** 24h average price */ + avgPrice24h: number; + /** AWS equivalent price */ + awsEquivalent: number; + /** Savings percentage vs AWS */ + savingsPercent: number; +} diff --git a/sdk/python/pyproject.toml b/sdk/python/pyproject.toml new file mode 100644 index 0000000..f0bf2ec --- /dev/null +++ b/sdk/python/pyproject.toml @@ -0,0 +1,51 @@ +[project] +name = "synor-compute" +version = "0.1.0" +description = "Synor Compute SDK for Python - Access distributed GPU/TPU/NPU compute" +readme = "README.md" +license = { text = "MIT" } +authors = [ + { name = "Synor Team", email = "dev@synor.cc" } +] +keywords = ["synor", "compute", "gpu", "tpu", "npu", "ai", "ml", "distributed-computing"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Scientific/Engineering :: Artificial Intelligence", +] +requires-python = ">=3.10" +dependencies = [ + "httpx>=0.25.0", + "numpy>=1.24.0", + "pydantic>=2.0.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0.0", + "pytest-asyncio>=0.21.0", + "ruff>=0.1.0", + "mypy>=1.0.0", +] + +[project.urls] +Homepage = "https://synor.cc" +Documentation = "https://docs.synor.cc/compute" +Repository = "https://github.com/synor/synor" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.ruff] +line-length = 100 +target-version = "py310" + +[tool.mypy] +python_version = "3.10" +strict = true diff --git a/sdk/python/synor_compute/__init__.py b/sdk/python/synor_compute/__init__.py new file mode 100644 index 0000000..a876e2a --- /dev/null +++ b/sdk/python/synor_compute/__init__.py @@ -0,0 +1,44 @@ +""" +Synor Compute SDK for Python + +Access distributed heterogeneous compute resources (CPU, GPU, TPU, NPU, LPU) +for AI/ML workloads at 90% cost reduction compared to traditional cloud. + +Example: + >>> from synor_compute import SynorCompute + >>> compute = SynorCompute(api_key="your-api-key") + >>> result = await compute.matmul(a, b, precision="fp16") +""" + +from .client import SynorCompute, SynorError +from .tensor import Tensor +from .job import ComputeJob, JobBatch +from .types import ( + ProcessorType, + Precision, + BalancingStrategy, + TaskPriority, + JobStatus, + SynorConfig, + JobResult, + JobMetrics, + PricingInfo, +) + +__version__ = "0.1.0" +__all__ = [ + "SynorCompute", + "SynorError", + "Tensor", + "ComputeJob", + "JobBatch", + "ProcessorType", + "Precision", + "BalancingStrategy", + "TaskPriority", + "JobStatus", + "SynorConfig", + "JobResult", + "JobMetrics", + "PricingInfo", +] diff --git a/sdk/python/synor_compute/client.py b/sdk/python/synor_compute/client.py new file mode 100644 index 0000000..eb1fdf7 --- /dev/null +++ b/sdk/python/synor_compute/client.py @@ -0,0 +1,298 @@ +"""Synor Compute Client.""" + +from typing import Any, Optional +import httpx +import numpy as np + +from .types import ( + SynorConfig, + BalancingStrategy, + Precision, + ProcessorType, + TaskPriority, + JobResult, + PricingInfo, +) +from .tensor import Tensor +from .job import ComputeJob + + +class SynorCompute: + """ + Main Synor Compute client. + + Example: + >>> compute = SynorCompute(api_key="sk_...") + >>> result = await compute.matmul( + ... a=np.random.randn(1024, 1024), + ... b=np.random.randn(1024, 1024), + ... precision=Precision.FP16 + ... ) + """ + + def __init__( + self, + api_key: str, + endpoint: str = "https://compute.synor.cc/api/v1", + strategy: BalancingStrategy = BalancingStrategy.BALANCED, + precision: Precision = Precision.FP32, + timeout: float = 30.0, + debug: bool = False, + ): + self.config = SynorConfig( + api_key=api_key, + endpoint=endpoint, + strategy=strategy, + precision=precision, + timeout=timeout, + debug=debug, + ) + self._client = httpx.AsyncClient( + base_url=endpoint, + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + timeout=timeout, + ) + + async def __aenter__(self) -> "SynorCompute": + return self + + async def __aexit__(self, *args: Any) -> None: + await self.close() + + async def close(self) -> None: + """Close the client.""" + await self._client.aclose() + + async def matmul( + self, + a: np.ndarray | Tensor, + b: np.ndarray | Tensor, + precision: Optional[Precision] = None, + processor: Optional[ProcessorType] = None, + priority: TaskPriority = TaskPriority.NORMAL, + ) -> JobResult: + """ + Matrix multiplication on distributed compute. + + Args: + a: First matrix + b: Second matrix + precision: Compute precision + processor: Preferred processor type + priority: Task priority + + Returns: + Job result with computed matrix + """ + tensor_a = Tensor.from_numpy(a) if isinstance(a, np.ndarray) else a + tensor_b = Tensor.from_numpy(b) if isinstance(b, np.ndarray) else b + + job = await self.submit_job( + "matmul", + { + "a": tensor_a.serialize(), + "b": tensor_b.serialize(), + "precision": (precision or self.config.precision).value, + "processor": processor.value if processor else None, + "priority": priority.value, + }, + ) + return await job.wait() + + async def conv2d( + self, + input: np.ndarray | Tensor, + kernel: np.ndarray | Tensor, + stride: int | tuple[int, int] = 1, + padding: int | tuple[int, int] = 0, + precision: Optional[Precision] = None, + ) -> JobResult: + """ + 2D Convolution on distributed compute. + + Args: + input: Input tensor [batch, channels, height, width] + kernel: Convolution kernel + stride: Stride + padding: Padding + precision: Compute precision + + Returns: + Job result with convolved tensor + """ + tensor_input = Tensor.from_numpy(input) if isinstance(input, np.ndarray) else input + tensor_kernel = Tensor.from_numpy(kernel) if isinstance(kernel, np.ndarray) else kernel + + job = await self.submit_job( + "conv2d", + { + "input": tensor_input.serialize(), + "kernel": tensor_kernel.serialize(), + "stride": stride, + "padding": padding, + "precision": (precision or self.config.precision).value, + }, + ) + return await job.wait() + + async def attention( + self, + query: np.ndarray | Tensor, + key: np.ndarray | Tensor, + value: np.ndarray | Tensor, + num_heads: int, + flash: bool = True, + precision: Optional[Precision] = None, + ) -> JobResult: + """ + Self-attention on distributed compute. + + Args: + query: Query tensor + key: Key tensor + value: Value tensor + num_heads: Number of attention heads + flash: Use FlashAttention + precision: Compute precision + + Returns: + Job result with attention output + """ + q = Tensor.from_numpy(query) if isinstance(query, np.ndarray) else query + k = Tensor.from_numpy(key) if isinstance(key, np.ndarray) else key + v = Tensor.from_numpy(value) if isinstance(value, np.ndarray) else value + + job = await self.submit_job( + "flash_attention" if flash else "self_attention", + { + "query": q.serialize(), + "key": k.serialize(), + "value": v.serialize(), + "num_heads": num_heads, + "precision": (precision or self.config.precision).value, + }, + ) + return await job.wait() + + async def inference( + self, + model: str, + input: str | np.ndarray | Tensor, + max_tokens: int = 256, + temperature: float = 0.7, + top_k: int = 50, + strategy: Optional[BalancingStrategy] = None, + stream: bool = False, + ) -> JobResult: + """ + Run inference on a model. + + Args: + model: Model name or CID + input: Input prompt or tensor + max_tokens: Maximum tokens to generate + temperature: Sampling temperature + top_k: Top-k sampling + strategy: Balancing strategy + stream: Stream responses + + Returns: + Job result with inference output + """ + input_data: str | dict + if isinstance(input, str): + input_data = input + elif isinstance(input, np.ndarray): + input_data = Tensor.from_numpy(input).serialize() + else: + input_data = input.serialize() + + job = await self.submit_job( + "inference", + { + "model": model, + "input": input_data, + "max_tokens": max_tokens, + "temperature": temperature, + "top_k": top_k, + "strategy": (strategy or self.config.strategy).value, + }, + ) + return await job.wait() + + async def get_pricing(self) -> list[PricingInfo]: + """Get current pricing for all processor types.""" + response = await self._request("GET", "/pricing") + return [PricingInfo(**p) for p in response["pricing"]] + + async def get_processor_pricing(self, processor: ProcessorType) -> PricingInfo: + """Get pricing for a specific processor type.""" + response = await self._request("GET", f"/pricing/{processor.value}") + return PricingInfo(**response) + + async def get_capacity(self) -> dict[str, Any]: + """Get available compute capacity.""" + return await self._request("GET", "/capacity") + + def set_strategy(self, strategy: BalancingStrategy) -> None: + """Set the default balancing strategy.""" + self.config.strategy = strategy + + async def submit_job(self, operation: str, params: dict[str, Any]) -> ComputeJob: + """Submit a generic compute job.""" + response = await self._request( + "POST", + "/jobs", + { + "operation": operation, + "params": params, + "strategy": self.config.strategy.value, + }, + ) + return ComputeJob(response["job_id"], self) + + async def get_job_status(self, job_id: str) -> JobResult: + """Get job status.""" + response = await self._request("GET", f"/jobs/{job_id}") + return JobResult(**response) + + async def cancel_job(self, job_id: str) -> None: + """Cancel a job.""" + await self._request("DELETE", f"/jobs/{job_id}") + + async def _request( + self, + method: str, + path: str, + data: Optional[dict[str, Any]] = None, + ) -> dict[str, Any]: + """Make an API request.""" + if self.config.debug: + print(f"[SynorCompute] {method} {path}") + + response = await self._client.request( + method, + path, + json=data, + ) + + if response.status_code >= 400: + error = response.json() if response.content else {"message": response.reason_phrase} + raise SynorError( + error.get("message", "Request failed"), + response.status_code, + ) + + return response.json() + + +class SynorError(Exception): + """Synor SDK error.""" + + def __init__(self, message: str, status_code: Optional[int] = None, code: Optional[str] = None): + super().__init__(message) + self.status_code = status_code + self.code = code diff --git a/sdk/python/synor_compute/job.py b/sdk/python/synor_compute/job.py new file mode 100644 index 0000000..3a08792 --- /dev/null +++ b/sdk/python/synor_compute/job.py @@ -0,0 +1,199 @@ +"""Compute Job management for Synor Compute SDK.""" + +from typing import TYPE_CHECKING, Any, Callable, Optional +import asyncio + +from .types import JobStatus, JobMetrics, JobResult + +if TYPE_CHECKING: + from .client import SynorCompute + + +POLL_INTERVAL_SEC = 0.5 +MAX_POLL_ATTEMPTS = 120 # 60 seconds max + + +class ComputeJob: + """ + Represents a compute job on the Synor network. + + Example: + >>> job = await compute.submit_job("matmul", {...}) + >>> result = await job.wait() + >>> print(result.data) + """ + + def __init__(self, job_id: str, client: "SynorCompute"): + self.job_id = job_id + self._client = client + self._status = JobStatus.PENDING + self._result: Optional[Any] = None + self._error: Optional[str] = None + self._metrics: Optional[JobMetrics] = None + self._callbacks: dict[JobStatus, list[Callable[["ComputeJob"], None]]] = {} + + @property + def status(self) -> JobStatus: + """Get current job status.""" + return self._status + + def is_complete(self) -> bool: + """Check if job is complete.""" + return self._status in (JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED) + + async def wait( + self, + timeout: Optional[float] = None, + poll_interval: float = POLL_INTERVAL_SEC, + ) -> JobResult: + """ + Wait for job to complete. + + Args: + timeout: Maximum wait time in seconds + poll_interval: Time between status checks + + Returns: + JobResult with data or error + """ + timeout = timeout or MAX_POLL_ATTEMPTS * POLL_INTERVAL_SEC + max_attempts = int(timeout / poll_interval) + + attempts = 0 + while not self.is_complete() and attempts < max_attempts: + await self.refresh() + if not self.is_complete(): + await asyncio.sleep(poll_interval) + attempts += 1 + + if not self.is_complete(): + raise JobTimeoutError(self.job_id, timeout) + + return JobResult( + job_id=self.job_id, + status=self._status, + data=self._result, + error=self._error, + metrics=self._metrics, + ) + + async def refresh(self) -> None: + """Refresh job status from server.""" + result = await self._client.get_job_status(self.job_id) + previous_status = self._status + + self._status = result.status + self._result = result.data + self._error = result.error + self._metrics = result.metrics + + if previous_status != self._status: + self._trigger_callbacks(self._status) + + async def cancel(self) -> None: + """Cancel the job.""" + await self._client.cancel_job(self.job_id) + self._status = JobStatus.CANCELLED + self._trigger_callbacks(JobStatus.CANCELLED) + + def on(self, status: JobStatus, callback: Callable[["ComputeJob"], None]) -> "ComputeJob": + """Register a callback for status changes.""" + if status not in self._callbacks: + self._callbacks[status] = [] + self._callbacks[status].append(callback) + return self + + def get_result(self) -> Any: + """Get the result (raises if not complete).""" + if self._status != JobStatus.COMPLETED: + raise ValueError(f"Job {self.job_id} is not completed (status: {self._status})") + return self._result + + def get_error(self) -> Optional[str]: + """Get error message (if failed).""" + return self._error + + def get_metrics(self) -> Optional[JobMetrics]: + """Get execution metrics (if available).""" + return self._metrics + + def _trigger_callbacks(self, status: JobStatus) -> None: + """Trigger callbacks for a status.""" + for callback in self._callbacks.get(status, []): + try: + callback(self) + except Exception as e: + print(f"Error in job callback for status {status}: {e}") + + def __repr__(self) -> str: + return f"ComputeJob(id={self.job_id}, status={self._status.value})" + + +class JobTimeoutError(Exception): + """Error raised when a job times out.""" + + def __init__(self, job_id: str, timeout: float): + super().__init__(f"Job {job_id} timed out after {timeout}s") + self.job_id = job_id + self.timeout = timeout + + +class JobFailedError(Exception): + """Error raised when a job fails.""" + + def __init__(self, job_id: str, reason: str): + super().__init__(f"Job {job_id} failed: {reason}") + self.job_id = job_id + self.reason = reason + + +class JobBatch: + """ + Batch multiple jobs for efficient execution. + + Example: + >>> batch = JobBatch() + >>> batch.add(job1).add(job2).add(job3) + >>> results = await batch.wait_all() + """ + + def __init__(self) -> None: + self._jobs: list[ComputeJob] = [] + + def add(self, job: ComputeJob) -> "JobBatch": + """Add a job to the batch.""" + self._jobs.append(job) + return self + + @property + def jobs(self) -> list[ComputeJob]: + """Get all jobs.""" + return list(self._jobs) + + async def wait_all(self, timeout: Optional[float] = None) -> list[JobResult]: + """Wait for all jobs to complete.""" + return await asyncio.gather( + *[job.wait(timeout=timeout) for job in self._jobs] + ) + + async def wait_any(self, timeout: Optional[float] = None) -> JobResult: + """Wait for any job to complete.""" + done, _ = await asyncio.wait( + [asyncio.create_task(job.wait(timeout=timeout)) for job in self._jobs], + return_when=asyncio.FIRST_COMPLETED, + ) + return done.pop().result() + + async def cancel_all(self) -> None: + """Cancel all jobs.""" + await asyncio.gather(*[job.cancel() for job in self._jobs]) + + def get_status_counts(self) -> dict[JobStatus, int]: + """Get count of jobs by status.""" + counts = {status: 0 for status in JobStatus} + for job in self._jobs: + counts[job.status] += 1 + return counts + + def __len__(self) -> int: + return len(self._jobs) diff --git a/sdk/python/synor_compute/tensor.py b/sdk/python/synor_compute/tensor.py new file mode 100644 index 0000000..cae8d0d --- /dev/null +++ b/sdk/python/synor_compute/tensor.py @@ -0,0 +1,153 @@ +"""Tensor utilities for Synor Compute SDK.""" + +from typing import Any +import base64 +import numpy as np + +from .types import Precision + + +class Tensor: + """ + Tensor wrapper for compute operations. + + Example: + >>> tensor = Tensor.from_numpy(np.random.randn(1024, 1024)) + >>> tensor = Tensor.zeros((1024, 1024), dtype=Precision.FP16) + """ + + def __init__( + self, + data: np.ndarray, + dtype: Precision = Precision.FP32, + ): + self._data = data + self._dtype = dtype + + @classmethod + def from_numpy(cls, array: np.ndarray, dtype: Precision | None = None) -> "Tensor": + """Create a tensor from a numpy array.""" + if dtype is None: + dtype = cls._infer_dtype(array.dtype) + return cls(array, dtype) + + @classmethod + def zeros(cls, shape: tuple[int, ...], dtype: Precision = Precision.FP32) -> "Tensor": + """Create a tensor of zeros.""" + np_dtype = cls._to_numpy_dtype(dtype) + return cls(np.zeros(shape, dtype=np_dtype), dtype) + + @classmethod + def ones(cls, shape: tuple[int, ...], dtype: Precision = Precision.FP32) -> "Tensor": + """Create a tensor of ones.""" + np_dtype = cls._to_numpy_dtype(dtype) + return cls(np.ones(shape, dtype=np_dtype), dtype) + + @classmethod + def random(cls, shape: tuple[int, ...], dtype: Precision = Precision.FP32) -> "Tensor": + """Create a tensor with random values.""" + np_dtype = cls._to_numpy_dtype(dtype) + return cls(np.random.random(shape).astype(np_dtype), dtype) + + @classmethod + def randn(cls, shape: tuple[int, ...], dtype: Precision = Precision.FP32) -> "Tensor": + """Create a tensor with normal distribution.""" + np_dtype = cls._to_numpy_dtype(dtype) + return cls(np.random.randn(*shape).astype(np_dtype), dtype) + + @property + def data(self) -> np.ndarray: + """Get the underlying numpy array.""" + return self._data + + @property + def shape(self) -> tuple[int, ...]: + """Get tensor shape.""" + return self._data.shape + + @property + def dtype(self) -> Precision: + """Get tensor dtype.""" + return self._dtype + + @property + def size(self) -> int: + """Get total number of elements.""" + return self._data.size + + @property + def ndim(self) -> int: + """Get number of dimensions.""" + return self._data.ndim + + @property + def nbytes(self) -> int: + """Get byte size.""" + return self._data.nbytes + + def reshape(self, shape: tuple[int, ...]) -> "Tensor": + """Reshape tensor.""" + return Tensor(self._data.reshape(shape), self._dtype) + + def to(self, dtype: Precision) -> "Tensor": + """Convert to different precision.""" + if dtype == self._dtype: + return self + np_dtype = self._to_numpy_dtype(dtype) + return Tensor(self._data.astype(np_dtype), dtype) + + def numpy(self) -> np.ndarray: + """Convert to numpy array.""" + return self._data + + def serialize(self) -> dict[str, Any]: + """Serialize tensor for transmission.""" + data_bytes = self._data.tobytes() + data_b64 = base64.b64encode(data_bytes).decode("ascii") + return { + "data": data_b64, + "shape": list(self._data.shape), + "dtype": self._dtype.value, + } + + @classmethod + def deserialize(cls, data: dict[str, Any]) -> "Tensor": + """Deserialize tensor from transmission format.""" + data_bytes = base64.b64decode(data["data"]) + dtype = Precision(data["dtype"]) + np_dtype = cls._to_numpy_dtype(dtype) + array = np.frombuffer(data_bytes, dtype=np_dtype).reshape(data["shape"]) + return cls(array, dtype) + + @staticmethod + def _infer_dtype(np_dtype: np.dtype) -> Precision: + """Infer Precision from numpy dtype.""" + if np_dtype == np.float64: + return Precision.FP64 + elif np_dtype == np.float32: + return Precision.FP32 + elif np_dtype == np.float16: + return Precision.FP16 + elif np_dtype == np.int8: + return Precision.INT8 + else: + return Precision.FP32 + + @staticmethod + def _to_numpy_dtype(dtype: Precision) -> np.dtype: + """Convert Precision to numpy dtype.""" + mapping = { + Precision.FP64: np.float64, + Precision.FP32: np.float32, + Precision.FP16: np.float16, + Precision.BF16: np.float16, # Approximate + Precision.INT8: np.int8, + Precision.INT4: np.int8, # Approximate + } + return np.dtype(mapping.get(dtype, np.float32)) + + def __repr__(self) -> str: + return f"Tensor(shape={self.shape}, dtype={self._dtype.value})" + + def __str__(self) -> str: + return f"Tensor({self._data}, dtype={self._dtype.value})" diff --git a/sdk/python/synor_compute/types.py b/sdk/python/synor_compute/types.py new file mode 100644 index 0000000..ea022fe --- /dev/null +++ b/sdk/python/synor_compute/types.py @@ -0,0 +1,94 @@ +"""Type definitions for Synor Compute SDK.""" + +from enum import Enum +from typing import Optional +from pydantic import BaseModel + + +class ProcessorType(str, Enum): + """Supported processor types.""" + CPU = "cpu" + GPU = "gpu" + TPU = "tpu" + NPU = "npu" + LPU = "lpu" + FPGA = "fpga" + WASM = "wasm" + WEBGPU = "webgpu" + + +class Precision(str, Enum): + """Precision levels for compute operations.""" + FP64 = "fp64" + FP32 = "fp32" + FP16 = "fp16" + BF16 = "bf16" + INT8 = "int8" + INT4 = "int4" + + +class BalancingStrategy(str, Enum): + """Balancing strategy for job scheduling.""" + SPEED = "speed" # Minimize execution time + COST = "cost" # Minimize cost + ENERGY = "energy" # Minimize energy consumption + LATENCY = "latency" # Minimize latency + BALANCED = "balanced" # Balance all factors + + +class TaskPriority(str, Enum): + """Task priority levels.""" + CRITICAL = "critical" + HIGH = "high" + NORMAL = "normal" + BACKGROUND = "background" + + +class JobStatus(str, Enum): + """Job status.""" + PENDING = "pending" + QUEUED = "queued" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +class SynorConfig(BaseModel): + """SDK configuration.""" + api_key: str + endpoint: str = "https://compute.synor.cc/api/v1" + strategy: BalancingStrategy = BalancingStrategy.BALANCED + precision: Precision = Precision.FP32 + timeout: float = 30.0 + debug: bool = False + + +class JobMetrics(BaseModel): + """Job execution metrics.""" + execution_time_ms: float + queue_time_ms: float + processor_type: ProcessorType + processor_id: str + flops: float + memory_bytes: int + cost_micro: int + energy_mj: float + + +class JobResult(BaseModel): + """Job submission result.""" + job_id: str + status: JobStatus + data: Optional[dict] = None + error: Optional[str] = None + metrics: Optional[JobMetrics] = None + + +class PricingInfo(BaseModel): + """Compute pricing info.""" + processor_type: ProcessorType + spot_price: float # microcredits per GFLOP + avg_price_24h: float + aws_equivalent: float + savings_percent: float