synor/sdk/python/synor_compute/job.py
Gulshan Yadav a808bb37a6 feat(sdk): add consumer SDKs for JavaScript, Python, and Go
Add complete SDK implementations for accessing Synor Compute:

JavaScript/TypeScript SDK (sdk/js/):
- Full async/await API with TypeScript types
- Tensor operations: matmul, conv2d, attention
- Model inference with streaming support
- WebSocket-based job monitoring
- Browser and Node.js compatible

Python SDK (sdk/python/):
- Async/await with aiohttp
- NumPy integration for tensors
- Context managers for cleanup
- Type hints throughout
- PyPI-ready package structure

Go SDK (sdk/go/):
- Idiomatic Go with context support
- Efficient binary tensor serialization
- HTTP client with configurable timeouts
- Zero external dependencies (stdlib only)

All SDKs support:
- Matrix multiplication (FP64 to INT4 precision)
- Convolution operations (2D, 3D)
- Flash attention
- LLM inference
- Spot pricing queries
- Job polling and cancellation
- Heterogeneous compute targeting (CPU/GPU/TPU/NPU/LPU)
2026-01-11 14:11:58 +05:30

199 lines
6 KiB
Python

"""Compute Job management for Synor Compute SDK."""
from typing import TYPE_CHECKING, Any, Callable, Optional
import asyncio
from .types import JobStatus, JobMetrics, JobResult
if TYPE_CHECKING:
from .client import SynorCompute
POLL_INTERVAL_SEC = 0.5
MAX_POLL_ATTEMPTS = 120 # 60 seconds max
class ComputeJob:
"""
Represents a compute job on the Synor network.
Example:
>>> job = await compute.submit_job("matmul", {...})
>>> result = await job.wait()
>>> print(result.data)
"""
def __init__(self, job_id: str, client: "SynorCompute"):
self.job_id = job_id
self._client = client
self._status = JobStatus.PENDING
self._result: Optional[Any] = None
self._error: Optional[str] = None
self._metrics: Optional[JobMetrics] = None
self._callbacks: dict[JobStatus, list[Callable[["ComputeJob"], None]]] = {}
@property
def status(self) -> JobStatus:
"""Get current job status."""
return self._status
def is_complete(self) -> bool:
"""Check if job is complete."""
return self._status in (JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED)
async def wait(
self,
timeout: Optional[float] = None,
poll_interval: float = POLL_INTERVAL_SEC,
) -> JobResult:
"""
Wait for job to complete.
Args:
timeout: Maximum wait time in seconds
poll_interval: Time between status checks
Returns:
JobResult with data or error
"""
timeout = timeout or MAX_POLL_ATTEMPTS * POLL_INTERVAL_SEC
max_attempts = int(timeout / poll_interval)
attempts = 0
while not self.is_complete() and attempts < max_attempts:
await self.refresh()
if not self.is_complete():
await asyncio.sleep(poll_interval)
attempts += 1
if not self.is_complete():
raise JobTimeoutError(self.job_id, timeout)
return JobResult(
job_id=self.job_id,
status=self._status,
data=self._result,
error=self._error,
metrics=self._metrics,
)
async def refresh(self) -> None:
"""Refresh job status from server."""
result = await self._client.get_job_status(self.job_id)
previous_status = self._status
self._status = result.status
self._result = result.data
self._error = result.error
self._metrics = result.metrics
if previous_status != self._status:
self._trigger_callbacks(self._status)
async def cancel(self) -> None:
"""Cancel the job."""
await self._client.cancel_job(self.job_id)
self._status = JobStatus.CANCELLED
self._trigger_callbacks(JobStatus.CANCELLED)
def on(self, status: JobStatus, callback: Callable[["ComputeJob"], None]) -> "ComputeJob":
"""Register a callback for status changes."""
if status not in self._callbacks:
self._callbacks[status] = []
self._callbacks[status].append(callback)
return self
def get_result(self) -> Any:
"""Get the result (raises if not complete)."""
if self._status != JobStatus.COMPLETED:
raise ValueError(f"Job {self.job_id} is not completed (status: {self._status})")
return self._result
def get_error(self) -> Optional[str]:
"""Get error message (if failed)."""
return self._error
def get_metrics(self) -> Optional[JobMetrics]:
"""Get execution metrics (if available)."""
return self._metrics
def _trigger_callbacks(self, status: JobStatus) -> None:
"""Trigger callbacks for a status."""
for callback in self._callbacks.get(status, []):
try:
callback(self)
except Exception as e:
print(f"Error in job callback for status {status}: {e}")
def __repr__(self) -> str:
return f"ComputeJob(id={self.job_id}, status={self._status.value})"
class JobTimeoutError(Exception):
"""Error raised when a job times out."""
def __init__(self, job_id: str, timeout: float):
super().__init__(f"Job {job_id} timed out after {timeout}s")
self.job_id = job_id
self.timeout = timeout
class JobFailedError(Exception):
"""Error raised when a job fails."""
def __init__(self, job_id: str, reason: str):
super().__init__(f"Job {job_id} failed: {reason}")
self.job_id = job_id
self.reason = reason
class JobBatch:
"""
Batch multiple jobs for efficient execution.
Example:
>>> batch = JobBatch()
>>> batch.add(job1).add(job2).add(job3)
>>> results = await batch.wait_all()
"""
def __init__(self) -> None:
self._jobs: list[ComputeJob] = []
def add(self, job: ComputeJob) -> "JobBatch":
"""Add a job to the batch."""
self._jobs.append(job)
return self
@property
def jobs(self) -> list[ComputeJob]:
"""Get all jobs."""
return list(self._jobs)
async def wait_all(self, timeout: Optional[float] = None) -> list[JobResult]:
"""Wait for all jobs to complete."""
return await asyncio.gather(
*[job.wait(timeout=timeout) for job in self._jobs]
)
async def wait_any(self, timeout: Optional[float] = None) -> JobResult:
"""Wait for any job to complete."""
done, _ = await asyncio.wait(
[asyncio.create_task(job.wait(timeout=timeout)) for job in self._jobs],
return_when=asyncio.FIRST_COMPLETED,
)
return done.pop().result()
async def cancel_all(self) -> None:
"""Cancel all jobs."""
await asyncio.gather(*[job.cancel() for job in self._jobs])
def get_status_counts(self) -> dict[JobStatus, int]:
"""Get count of jobs by status."""
counts = {status: 0 for status in JobStatus}
for job in self._jobs:
counts[job.status] += 1
return counts
def __len__(self) -> int:
return len(self._jobs)