synor/sdk/go/database/client.go
Gulshan Yadav 74b82d2bb2 Add Synor Storage and Wallet SDKs for Swift
- Implement SynorStorage class for decentralized storage operations including upload, download, pinning, and CAR file management.
- Create supporting types and models for storage operations such as UploadOptions, Pin, and StorageConfig.
- Implement SynorWallet class for wallet operations including wallet creation, address generation, transaction signing, and balance queries.
- Create supporting types and models for wallet operations such as Wallet, Address, and Transaction.
- Introduce error handling for both storage and wallet operations.
2026-01-27 01:56:45 +05:30

658 lines
17 KiB
Go

package database
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"time"
)
// Client is the Synor Database SDK client
type Client struct {
config Config
httpClient *http.Client
closed bool
// Stores
KV *KeyValueStore
Documents *DocumentStore
Vectors *VectorStore
TimeSeries *TimeSeriesStore
}
// New creates a new Database client
func New(config Config) *Client {
if config.Endpoint == "" {
config.Endpoint = "https://database.synor.io/v1"
}
if config.Timeout == 0 {
config.Timeout = 30 * time.Second
}
if config.Retries == 0 {
config.Retries = 3
}
client := &Client{
config: config,
httpClient: &http.Client{
Timeout: config.Timeout,
},
}
client.KV = &KeyValueStore{client: client}
client.Documents = &DocumentStore{client: client}
client.Vectors = &VectorStore{client: client}
client.TimeSeries = &TimeSeriesStore{client: client}
return client
}
// GetStats returns database statistics
func (c *Client) GetStats(ctx context.Context) (*DatabaseStats, error) {
var stats DatabaseStats
err := c.request(ctx, "GET", "/stats", nil, &stats)
if err != nil {
return nil, err
}
return &stats, nil
}
// HealthCheck performs a health check
func (c *Client) HealthCheck(ctx context.Context) bool {
var result struct {
Status string `json:"status"`
}
err := c.request(ctx, "GET", "/health", nil, &result)
return err == nil && result.Status == "healthy"
}
// Close closes the client
func (c *Client) Close() {
c.closed = true
}
// IsClosed returns whether the client is closed
func (c *Client) IsClosed() bool {
return c.closed
}
func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) error {
if c.closed {
return NewDatabaseError("Client has been closed", "CLIENT_CLOSED", 0)
}
var lastErr error
for attempt := 0; attempt < c.config.Retries; attempt++ {
err := c.doRequest(ctx, method, path, body, result)
if err == nil {
return nil
}
lastErr = err
if c.config.Debug {
fmt.Printf("Attempt %d failed: %v\n", attempt+1, err)
}
if attempt < c.config.Retries-1 {
time.Sleep(time.Duration(attempt+1) * time.Second)
}
}
return lastErr
}
func (c *Client) doRequest(ctx context.Context, method, path string, body interface{}, result interface{}) error {
var bodyReader io.Reader
if body != nil {
jsonBody, err := json.Marshal(body)
if err != nil {
return err
}
bodyReader = bytes.NewReader(jsonBody)
}
req, err := http.NewRequestWithContext(ctx, method, c.config.Endpoint+path, bodyReader)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+c.config.APIKey)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-SDK-Version", "go/0.1.0")
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode >= 400 {
var errResp struct {
Message string `json:"message"`
Error string `json:"error"`
Code string `json:"code"`
}
json.Unmarshal(respBody, &errResp)
msg := errResp.Message
if msg == "" {
msg = errResp.Error
}
if msg == "" {
msg = fmt.Sprintf("HTTP %d", resp.StatusCode)
}
return NewDatabaseError(msg, errResp.Code, resp.StatusCode)
}
if result != nil && len(respBody) > 0 {
return json.Unmarshal(respBody, result)
}
return nil
}
// ==================== Key-Value Store ====================
// KeyValueStore provides key-value operations
type KeyValueStore struct {
client *Client
}
// Get retrieves a value by key
func (kv *KeyValueStore) Get(ctx context.Context, key string) (interface{}, error) {
var result struct {
Value interface{} `json:"value"`
}
err := kv.client.request(ctx, "GET", "/kv/"+url.PathEscape(key), nil, &result)
if err != nil {
return nil, err
}
return result.Value, nil
}
// GetEntry retrieves a full entry with metadata
func (kv *KeyValueStore) GetEntry(ctx context.Context, key string) (*KeyValueEntry, error) {
var entry KeyValueEntry
err := kv.client.request(ctx, "GET", "/kv/"+url.PathEscape(key)+"/entry", nil, &entry)
if err != nil {
return nil, err
}
return &entry, nil
}
// Set sets a value
func (kv *KeyValueStore) Set(ctx context.Context, key string, value interface{}, opts *SetOptions) error {
body := map[string]interface{}{
"value": value,
}
if opts != nil {
if opts.TTL != nil {
body["ttl"] = *opts.TTL
}
if opts.Metadata != nil {
body["metadata"] = opts.Metadata
}
if opts.IfNotExists {
body["if_not_exists"] = true
}
if opts.IfExists {
body["if_exists"] = true
}
}
return kv.client.request(ctx, "PUT", "/kv/"+url.PathEscape(key), body, nil)
}
// Delete deletes a key
func (kv *KeyValueStore) Delete(ctx context.Context, key string) (bool, error) {
var result struct {
Deleted bool `json:"deleted"`
}
err := kv.client.request(ctx, "DELETE", "/kv/"+url.PathEscape(key), nil, &result)
if err != nil {
return false, err
}
return result.Deleted, nil
}
// Exists checks if a key exists
func (kv *KeyValueStore) Exists(ctx context.Context, key string) (bool, error) {
var result struct {
Exists bool `json:"exists"`
}
err := kv.client.request(ctx, "GET", "/kv/"+url.PathEscape(key)+"/exists", nil, &result)
if err != nil {
return false, err
}
return result.Exists, nil
}
// List lists keys with optional prefix filtering
func (kv *KeyValueStore) List(ctx context.Context, opts *ListOptions) (*ListResult, error) {
params := url.Values{}
if opts != nil {
if opts.Prefix != "" {
params.Set("prefix", opts.Prefix)
}
if opts.Cursor != "" {
params.Set("cursor", opts.Cursor)
}
if opts.Limit > 0 {
params.Set("limit", strconv.Itoa(opts.Limit))
}
}
path := "/kv"
if len(params) > 0 {
path += "?" + params.Encode()
}
var result ListResult
err := kv.client.request(ctx, "GET", path, nil, &result)
if err != nil {
return nil, err
}
return &result, nil
}
// MGet retrieves multiple values at once
func (kv *KeyValueStore) MGet(ctx context.Context, keys []string) (map[string]interface{}, error) {
var result struct {
Entries []struct {
Key string `json:"key"`
Value interface{} `json:"value"`
} `json:"entries"`
}
err := kv.client.request(ctx, "POST", "/kv/mget", map[string]interface{}{"keys": keys}, &result)
if err != nil {
return nil, err
}
values := make(map[string]interface{})
for _, e := range result.Entries {
values[e.Key] = e.Value
}
return values, nil
}
// MSet sets multiple values at once
func (kv *KeyValueStore) MSet(ctx context.Context, entries map[string]interface{}) error {
entriesList := make([]map[string]interface{}, 0, len(entries))
for k, v := range entries {
entriesList = append(entriesList, map[string]interface{}{"key": k, "value": v})
}
return kv.client.request(ctx, "POST", "/kv/mset", map[string]interface{}{"entries": entriesList}, nil)
}
// Incr increments a numeric value
func (kv *KeyValueStore) Incr(ctx context.Context, key string, by int64) (int64, error) {
var result struct {
Value int64 `json:"value"`
}
err := kv.client.request(ctx, "POST", "/kv/"+url.PathEscape(key)+"/incr", map[string]interface{}{"by": by}, &result)
if err != nil {
return 0, err
}
return result.Value, nil
}
// ==================== Document Store ====================
// DocumentStore provides document operations
type DocumentStore struct {
client *Client
}
// Create creates a new document
func (ds *DocumentStore) Create(ctx context.Context, collection string, data map[string]interface{}, opts *CreateDocumentOptions) (*Document, error) {
body := map[string]interface{}{
"data": data,
}
if opts != nil {
if opts.ID != "" {
body["id"] = opts.ID
}
if opts.Metadata != nil {
body["metadata"] = opts.Metadata
}
}
var doc Document
err := ds.client.request(ctx, "POST", "/documents/"+url.PathEscape(collection), body, &doc)
if err != nil {
return nil, err
}
return &doc, nil
}
// Get retrieves a document by ID
func (ds *DocumentStore) Get(ctx context.Context, collection, id string) (*Document, error) {
var doc Document
err := ds.client.request(ctx, "GET", "/documents/"+url.PathEscape(collection)+"/"+url.PathEscape(id), nil, &doc)
if err != nil {
return nil, err
}
return &doc, nil
}
// Update updates a document
func (ds *DocumentStore) Update(ctx context.Context, collection, id string, update map[string]interface{}, opts *UpdateDocumentOptions) (*Document, error) {
body := map[string]interface{}{
"update": update,
}
if opts != nil {
if opts.Upsert {
body["upsert"] = true
}
if opts.ReturnDocument != "" {
body["return_document"] = opts.ReturnDocument
}
}
var doc Document
err := ds.client.request(ctx, "PATCH", "/documents/"+url.PathEscape(collection)+"/"+url.PathEscape(id), body, &doc)
if err != nil {
return nil, err
}
return &doc, nil
}
// Delete deletes a document
func (ds *DocumentStore) Delete(ctx context.Context, collection, id string) (bool, error) {
var result struct {
Deleted bool `json:"deleted"`
}
err := ds.client.request(ctx, "DELETE", "/documents/"+url.PathEscape(collection)+"/"+url.PathEscape(id), nil, &result)
if err != nil {
return false, err
}
return result.Deleted, nil
}
// Query queries documents
func (ds *DocumentStore) Query(ctx context.Context, collection string, opts *QueryOptions) (*QueryResult, error) {
body := make(map[string]interface{})
if opts != nil {
if opts.Filter != nil {
body["filter"] = opts.Filter
}
if opts.Sort != nil {
body["sort"] = opts.Sort
}
if opts.Skip > 0 {
body["skip"] = opts.Skip
}
if opts.Limit > 0 {
body["limit"] = opts.Limit
}
if opts.Projection != nil {
body["projection"] = opts.Projection
}
}
var result QueryResult
err := ds.client.request(ctx, "POST", "/documents/"+url.PathEscape(collection)+"/query", body, &result)
if err != nil {
return nil, err
}
return &result, nil
}
// Count counts documents matching a filter
func (ds *DocumentStore) Count(ctx context.Context, collection string, filter QueryFilter) (int, error) {
var result struct {
Count int `json:"count"`
}
err := ds.client.request(ctx, "POST", "/documents/"+url.PathEscape(collection)+"/count", map[string]interface{}{"filter": filter}, &result)
if err != nil {
return 0, err
}
return result.Count, nil
}
// Aggregate runs an aggregation pipeline
func (ds *DocumentStore) Aggregate(ctx context.Context, collection string, pipeline []AggregateStage) ([]interface{}, error) {
var result struct {
Results []interface{} `json:"results"`
}
err := ds.client.request(ctx, "POST", "/documents/"+url.PathEscape(collection)+"/aggregate", map[string]interface{}{"pipeline": pipeline}, &result)
if err != nil {
return nil, err
}
return result.Results, nil
}
// ==================== Vector Store ====================
// VectorStore provides vector operations
type VectorStore struct {
client *Client
}
// CreateCollection creates a vector collection
func (vs *VectorStore) CreateCollection(ctx context.Context, config VectorCollectionConfig) error {
return vs.client.request(ctx, "POST", "/vectors/collections", config, nil)
}
// DeleteCollection deletes a vector collection
func (vs *VectorStore) DeleteCollection(ctx context.Context, name string) error {
return vs.client.request(ctx, "DELETE", "/vectors/collections/"+url.PathEscape(name), nil, nil)
}
// GetCollectionStats gets collection statistics
func (vs *VectorStore) GetCollectionStats(ctx context.Context, name string) (*VectorCollectionStats, error) {
var stats VectorCollectionStats
err := vs.client.request(ctx, "GET", "/vectors/collections/"+url.PathEscape(name)+"/stats", nil, &stats)
if err != nil {
return nil, err
}
return &stats, nil
}
// Upsert upserts vectors
func (vs *VectorStore) Upsert(ctx context.Context, collection string, vectors []VectorEntry, opts *UpsertVectorOptions) (int, error) {
body := map[string]interface{}{
"vectors": vectors,
}
if opts != nil && opts.Namespace != "" {
body["namespace"] = opts.Namespace
}
var result struct {
Upserted int `json:"upserted"`
}
err := vs.client.request(ctx, "POST", "/vectors/"+url.PathEscape(collection)+"/upsert", body, &result)
if err != nil {
return 0, err
}
return result.Upserted, nil
}
// Search searches for similar vectors
func (vs *VectorStore) Search(ctx context.Context, collection string, vector []float64, opts *SearchOptions) ([]SearchResult, error) {
body := map[string]interface{}{
"vector": vector,
}
if opts != nil {
if opts.Namespace != "" {
body["namespace"] = opts.Namespace
}
if opts.TopK > 0 {
body["top_k"] = opts.TopK
}
if opts.Filter != nil {
body["filter"] = opts.Filter
}
body["include_metadata"] = opts.IncludeMetadata
body["include_vectors"] = opts.IncludeVectors
if opts.MinScore != nil {
body["min_score"] = *opts.MinScore
}
}
var result struct {
Results []SearchResult `json:"results"`
}
err := vs.client.request(ctx, "POST", "/vectors/"+url.PathEscape(collection)+"/search", body, &result)
if err != nil {
return nil, err
}
return result.Results, nil
}
// Delete deletes vectors by ID
func (vs *VectorStore) Delete(ctx context.Context, collection string, ids []string, namespace string) (int, error) {
body := map[string]interface{}{
"ids": ids,
}
if namespace != "" {
body["namespace"] = namespace
}
var result struct {
Deleted int `json:"deleted"`
}
err := vs.client.request(ctx, "POST", "/vectors/"+url.PathEscape(collection)+"/delete", body, &result)
if err != nil {
return 0, err
}
return result.Deleted, nil
}
// Fetch fetches vectors by ID
func (vs *VectorStore) Fetch(ctx context.Context, collection string, ids []string, namespace string) ([]VectorEntry, error) {
body := map[string]interface{}{
"ids": ids,
}
if namespace != "" {
body["namespace"] = namespace
}
var result struct {
Vectors []VectorEntry `json:"vectors"`
}
err := vs.client.request(ctx, "POST", "/vectors/"+url.PathEscape(collection)+"/fetch", body, &result)
if err != nil {
return nil, err
}
return result.Vectors, nil
}
// ==================== Time Series Store ====================
// TimeSeriesStore provides time series operations
type TimeSeriesStore struct {
client *Client
}
// Write writes data points to a series
func (ts *TimeSeriesStore) Write(ctx context.Context, series string, points []DataPoint, opts *WritePointsOptions) (int, error) {
body := map[string]interface{}{
"points": points,
}
if opts != nil && opts.Precision != "" {
body["precision"] = opts.Precision
}
var result struct {
Written int `json:"written"`
}
err := ts.client.request(ctx, "POST", "/timeseries/"+url.PathEscape(series)+"/write", body, &result)
if err != nil {
return 0, err
}
return result.Written, nil
}
// Query queries a time series
func (ts *TimeSeriesStore) Query(ctx context.Context, series string, timeRange TimeRange, opts *TimeSeriesQueryOptions) (*TimeSeriesResult, error) {
body := map[string]interface{}{
"range": timeRange,
}
if opts != nil {
if opts.Tags != nil {
body["tags"] = opts.Tags
}
if opts.Aggregation != "" {
body["aggregation"] = opts.Aggregation
}
if opts.Interval != "" {
body["interval"] = opts.Interval
}
if opts.Fill != nil {
body["fill"] = opts.Fill
}
if opts.Limit > 0 {
body["limit"] = opts.Limit
}
}
var result TimeSeriesResult
err := ts.client.request(ctx, "POST", "/timeseries/"+url.PathEscape(series)+"/query", body, &result)
if err != nil {
return nil, err
}
return &result, nil
}
// Delete deletes data points in a range
func (ts *TimeSeriesStore) Delete(ctx context.Context, series string, timeRange TimeRange, tags map[string]string) (int, error) {
body := map[string]interface{}{
"range": timeRange,
}
if tags != nil {
body["tags"] = tags
}
var result struct {
Deleted int `json:"deleted"`
}
err := ts.client.request(ctx, "POST", "/timeseries/"+url.PathEscape(series)+"/delete", body, &result)
if err != nil {
return 0, err
}
return result.Deleted, nil
}
// GetSeriesInfo gets series information
func (ts *TimeSeriesStore) GetSeriesInfo(ctx context.Context, series string) (*SeriesInfo, error) {
var info SeriesInfo
err := ts.client.request(ctx, "GET", "/timeseries/"+url.PathEscape(series)+"/info", nil, &info)
if err != nil {
return nil, err
}
return &info, nil
}
// ListSeries lists all series
func (ts *TimeSeriesStore) ListSeries(ctx context.Context, prefix string) ([]SeriesInfo, error) {
path := "/timeseries"
if prefix != "" {
path += "?prefix=" + url.QueryEscape(prefix)
}
var result struct {
Series []SeriesInfo `json:"series"`
}
err := ts.client.request(ctx, "GET", path, nil, &result)
if err != nil {
return nil, err
}
return result.Series, nil
}
// SetRetention sets retention policy for a series
func (ts *TimeSeriesStore) SetRetention(ctx context.Context, series string, retentionDays int) error {
return ts.client.request(ctx, "PUT", "/timeseries/"+url.PathEscape(series)+"/retention", map[string]interface{}{"retention_days": retentionDays}, nil)
}