package database import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "net/url" "strconv" "time" ) // Client is the Synor Database SDK client type Client struct { config Config httpClient *http.Client closed bool // Stores KV *KeyValueStore Documents *DocumentStore Vectors *VectorStore TimeSeries *TimeSeriesStore } // New creates a new Database client func New(config Config) *Client { if config.Endpoint == "" { config.Endpoint = "https://database.synor.io/v1" } if config.Timeout == 0 { config.Timeout = 30 * time.Second } if config.Retries == 0 { config.Retries = 3 } client := &Client{ config: config, httpClient: &http.Client{ Timeout: config.Timeout, }, } client.KV = &KeyValueStore{client: client} client.Documents = &DocumentStore{client: client} client.Vectors = &VectorStore{client: client} client.TimeSeries = &TimeSeriesStore{client: client} return client } // GetStats returns database statistics func (c *Client) GetStats(ctx context.Context) (*DatabaseStats, error) { var stats DatabaseStats err := c.request(ctx, "GET", "/stats", nil, &stats) if err != nil { return nil, err } return &stats, nil } // HealthCheck performs a health check func (c *Client) HealthCheck(ctx context.Context) bool { var result struct { Status string `json:"status"` } err := c.request(ctx, "GET", "/health", nil, &result) return err == nil && result.Status == "healthy" } // Close closes the client func (c *Client) Close() { c.closed = true } // IsClosed returns whether the client is closed func (c *Client) IsClosed() bool { return c.closed } func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) error { if c.closed { return NewDatabaseError("Client has been closed", "CLIENT_CLOSED", 0) } var lastErr error for attempt := 0; attempt < c.config.Retries; attempt++ { err := c.doRequest(ctx, method, path, body, result) if err == nil { return nil } lastErr = err if c.config.Debug { fmt.Printf("Attempt %d failed: %v\n", attempt+1, err) } if attempt < c.config.Retries-1 { time.Sleep(time.Duration(attempt+1) * time.Second) } } return lastErr } func (c *Client) doRequest(ctx context.Context, method, path string, body interface{}, result interface{}) error { var bodyReader io.Reader if body != nil { jsonBody, err := json.Marshal(body) if err != nil { return err } bodyReader = bytes.NewReader(jsonBody) } req, err := http.NewRequestWithContext(ctx, method, c.config.Endpoint+path, bodyReader) if err != nil { return err } req.Header.Set("Authorization", "Bearer "+c.config.APIKey) req.Header.Set("Content-Type", "application/json") req.Header.Set("X-SDK-Version", "go/0.1.0") resp, err := c.httpClient.Do(req) if err != nil { return err } defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) if err != nil { return err } if resp.StatusCode >= 400 { var errResp struct { Message string `json:"message"` Error string `json:"error"` Code string `json:"code"` } json.Unmarshal(respBody, &errResp) msg := errResp.Message if msg == "" { msg = errResp.Error } if msg == "" { msg = fmt.Sprintf("HTTP %d", resp.StatusCode) } return NewDatabaseError(msg, errResp.Code, resp.StatusCode) } if result != nil && len(respBody) > 0 { return json.Unmarshal(respBody, result) } return nil } // ==================== Key-Value Store ==================== // KeyValueStore provides key-value operations type KeyValueStore struct { client *Client } // Get retrieves a value by key func (kv *KeyValueStore) Get(ctx context.Context, key string) (interface{}, error) { var result struct { Value interface{} `json:"value"` } err := kv.client.request(ctx, "GET", "/kv/"+url.PathEscape(key), nil, &result) if err != nil { return nil, err } return result.Value, nil } // GetEntry retrieves a full entry with metadata func (kv *KeyValueStore) GetEntry(ctx context.Context, key string) (*KeyValueEntry, error) { var entry KeyValueEntry err := kv.client.request(ctx, "GET", "/kv/"+url.PathEscape(key)+"/entry", nil, &entry) if err != nil { return nil, err } return &entry, nil } // Set sets a value func (kv *KeyValueStore) Set(ctx context.Context, key string, value interface{}, opts *SetOptions) error { body := map[string]interface{}{ "value": value, } if opts != nil { if opts.TTL != nil { body["ttl"] = *opts.TTL } if opts.Metadata != nil { body["metadata"] = opts.Metadata } if opts.IfNotExists { body["if_not_exists"] = true } if opts.IfExists { body["if_exists"] = true } } return kv.client.request(ctx, "PUT", "/kv/"+url.PathEscape(key), body, nil) } // Delete deletes a key func (kv *KeyValueStore) Delete(ctx context.Context, key string) (bool, error) { var result struct { Deleted bool `json:"deleted"` } err := kv.client.request(ctx, "DELETE", "/kv/"+url.PathEscape(key), nil, &result) if err != nil { return false, err } return result.Deleted, nil } // Exists checks if a key exists func (kv *KeyValueStore) Exists(ctx context.Context, key string) (bool, error) { var result struct { Exists bool `json:"exists"` } err := kv.client.request(ctx, "GET", "/kv/"+url.PathEscape(key)+"/exists", nil, &result) if err != nil { return false, err } return result.Exists, nil } // List lists keys with optional prefix filtering func (kv *KeyValueStore) List(ctx context.Context, opts *ListOptions) (*ListResult, error) { params := url.Values{} if opts != nil { if opts.Prefix != "" { params.Set("prefix", opts.Prefix) } if opts.Cursor != "" { params.Set("cursor", opts.Cursor) } if opts.Limit > 0 { params.Set("limit", strconv.Itoa(opts.Limit)) } } path := "/kv" if len(params) > 0 { path += "?" + params.Encode() } var result ListResult err := kv.client.request(ctx, "GET", path, nil, &result) if err != nil { return nil, err } return &result, nil } // MGet retrieves multiple values at once func (kv *KeyValueStore) MGet(ctx context.Context, keys []string) (map[string]interface{}, error) { var result struct { Entries []struct { Key string `json:"key"` Value interface{} `json:"value"` } `json:"entries"` } err := kv.client.request(ctx, "POST", "/kv/mget", map[string]interface{}{"keys": keys}, &result) if err != nil { return nil, err } values := make(map[string]interface{}) for _, e := range result.Entries { values[e.Key] = e.Value } return values, nil } // MSet sets multiple values at once func (kv *KeyValueStore) MSet(ctx context.Context, entries map[string]interface{}) error { entriesList := make([]map[string]interface{}, 0, len(entries)) for k, v := range entries { entriesList = append(entriesList, map[string]interface{}{"key": k, "value": v}) } return kv.client.request(ctx, "POST", "/kv/mset", map[string]interface{}{"entries": entriesList}, nil) } // Incr increments a numeric value func (kv *KeyValueStore) Incr(ctx context.Context, key string, by int64) (int64, error) { var result struct { Value int64 `json:"value"` } err := kv.client.request(ctx, "POST", "/kv/"+url.PathEscape(key)+"/incr", map[string]interface{}{"by": by}, &result) if err != nil { return 0, err } return result.Value, nil } // ==================== Document Store ==================== // DocumentStore provides document operations type DocumentStore struct { client *Client } // Create creates a new document func (ds *DocumentStore) Create(ctx context.Context, collection string, data map[string]interface{}, opts *CreateDocumentOptions) (*Document, error) { body := map[string]interface{}{ "data": data, } if opts != nil { if opts.ID != "" { body["id"] = opts.ID } if opts.Metadata != nil { body["metadata"] = opts.Metadata } } var doc Document err := ds.client.request(ctx, "POST", "/documents/"+url.PathEscape(collection), body, &doc) if err != nil { return nil, err } return &doc, nil } // Get retrieves a document by ID func (ds *DocumentStore) Get(ctx context.Context, collection, id string) (*Document, error) { var doc Document err := ds.client.request(ctx, "GET", "/documents/"+url.PathEscape(collection)+"/"+url.PathEscape(id), nil, &doc) if err != nil { return nil, err } return &doc, nil } // Update updates a document func (ds *DocumentStore) Update(ctx context.Context, collection, id string, update map[string]interface{}, opts *UpdateDocumentOptions) (*Document, error) { body := map[string]interface{}{ "update": update, } if opts != nil { if opts.Upsert { body["upsert"] = true } if opts.ReturnDocument != "" { body["return_document"] = opts.ReturnDocument } } var doc Document err := ds.client.request(ctx, "PATCH", "/documents/"+url.PathEscape(collection)+"/"+url.PathEscape(id), body, &doc) if err != nil { return nil, err } return &doc, nil } // Delete deletes a document func (ds *DocumentStore) Delete(ctx context.Context, collection, id string) (bool, error) { var result struct { Deleted bool `json:"deleted"` } err := ds.client.request(ctx, "DELETE", "/documents/"+url.PathEscape(collection)+"/"+url.PathEscape(id), nil, &result) if err != nil { return false, err } return result.Deleted, nil } // Query queries documents func (ds *DocumentStore) Query(ctx context.Context, collection string, opts *QueryOptions) (*QueryResult, error) { body := make(map[string]interface{}) if opts != nil { if opts.Filter != nil { body["filter"] = opts.Filter } if opts.Sort != nil { body["sort"] = opts.Sort } if opts.Skip > 0 { body["skip"] = opts.Skip } if opts.Limit > 0 { body["limit"] = opts.Limit } if opts.Projection != nil { body["projection"] = opts.Projection } } var result QueryResult err := ds.client.request(ctx, "POST", "/documents/"+url.PathEscape(collection)+"/query", body, &result) if err != nil { return nil, err } return &result, nil } // Count counts documents matching a filter func (ds *DocumentStore) Count(ctx context.Context, collection string, filter QueryFilter) (int, error) { var result struct { Count int `json:"count"` } err := ds.client.request(ctx, "POST", "/documents/"+url.PathEscape(collection)+"/count", map[string]interface{}{"filter": filter}, &result) if err != nil { return 0, err } return result.Count, nil } // Aggregate runs an aggregation pipeline func (ds *DocumentStore) Aggregate(ctx context.Context, collection string, pipeline []AggregateStage) ([]interface{}, error) { var result struct { Results []interface{} `json:"results"` } err := ds.client.request(ctx, "POST", "/documents/"+url.PathEscape(collection)+"/aggregate", map[string]interface{}{"pipeline": pipeline}, &result) if err != nil { return nil, err } return result.Results, nil } // ==================== Vector Store ==================== // VectorStore provides vector operations type VectorStore struct { client *Client } // CreateCollection creates a vector collection func (vs *VectorStore) CreateCollection(ctx context.Context, config VectorCollectionConfig) error { return vs.client.request(ctx, "POST", "/vectors/collections", config, nil) } // DeleteCollection deletes a vector collection func (vs *VectorStore) DeleteCollection(ctx context.Context, name string) error { return vs.client.request(ctx, "DELETE", "/vectors/collections/"+url.PathEscape(name), nil, nil) } // GetCollectionStats gets collection statistics func (vs *VectorStore) GetCollectionStats(ctx context.Context, name string) (*VectorCollectionStats, error) { var stats VectorCollectionStats err := vs.client.request(ctx, "GET", "/vectors/collections/"+url.PathEscape(name)+"/stats", nil, &stats) if err != nil { return nil, err } return &stats, nil } // Upsert upserts vectors func (vs *VectorStore) Upsert(ctx context.Context, collection string, vectors []VectorEntry, opts *UpsertVectorOptions) (int, error) { body := map[string]interface{}{ "vectors": vectors, } if opts != nil && opts.Namespace != "" { body["namespace"] = opts.Namespace } var result struct { Upserted int `json:"upserted"` } err := vs.client.request(ctx, "POST", "/vectors/"+url.PathEscape(collection)+"/upsert", body, &result) if err != nil { return 0, err } return result.Upserted, nil } // Search searches for similar vectors func (vs *VectorStore) Search(ctx context.Context, collection string, vector []float64, opts *SearchOptions) ([]SearchResult, error) { body := map[string]interface{}{ "vector": vector, } if opts != nil { if opts.Namespace != "" { body["namespace"] = opts.Namespace } if opts.TopK > 0 { body["top_k"] = opts.TopK } if opts.Filter != nil { body["filter"] = opts.Filter } body["include_metadata"] = opts.IncludeMetadata body["include_vectors"] = opts.IncludeVectors if opts.MinScore != nil { body["min_score"] = *opts.MinScore } } var result struct { Results []SearchResult `json:"results"` } err := vs.client.request(ctx, "POST", "/vectors/"+url.PathEscape(collection)+"/search", body, &result) if err != nil { return nil, err } return result.Results, nil } // Delete deletes vectors by ID func (vs *VectorStore) Delete(ctx context.Context, collection string, ids []string, namespace string) (int, error) { body := map[string]interface{}{ "ids": ids, } if namespace != "" { body["namespace"] = namespace } var result struct { Deleted int `json:"deleted"` } err := vs.client.request(ctx, "POST", "/vectors/"+url.PathEscape(collection)+"/delete", body, &result) if err != nil { return 0, err } return result.Deleted, nil } // Fetch fetches vectors by ID func (vs *VectorStore) Fetch(ctx context.Context, collection string, ids []string, namespace string) ([]VectorEntry, error) { body := map[string]interface{}{ "ids": ids, } if namespace != "" { body["namespace"] = namespace } var result struct { Vectors []VectorEntry `json:"vectors"` } err := vs.client.request(ctx, "POST", "/vectors/"+url.PathEscape(collection)+"/fetch", body, &result) if err != nil { return nil, err } return result.Vectors, nil } // ==================== Time Series Store ==================== // TimeSeriesStore provides time series operations type TimeSeriesStore struct { client *Client } // Write writes data points to a series func (ts *TimeSeriesStore) Write(ctx context.Context, series string, points []DataPoint, opts *WritePointsOptions) (int, error) { body := map[string]interface{}{ "points": points, } if opts != nil && opts.Precision != "" { body["precision"] = opts.Precision } var result struct { Written int `json:"written"` } err := ts.client.request(ctx, "POST", "/timeseries/"+url.PathEscape(series)+"/write", body, &result) if err != nil { return 0, err } return result.Written, nil } // Query queries a time series func (ts *TimeSeriesStore) Query(ctx context.Context, series string, timeRange TimeRange, opts *TimeSeriesQueryOptions) (*TimeSeriesResult, error) { body := map[string]interface{}{ "range": timeRange, } if opts != nil { if opts.Tags != nil { body["tags"] = opts.Tags } if opts.Aggregation != "" { body["aggregation"] = opts.Aggregation } if opts.Interval != "" { body["interval"] = opts.Interval } if opts.Fill != nil { body["fill"] = opts.Fill } if opts.Limit > 0 { body["limit"] = opts.Limit } } var result TimeSeriesResult err := ts.client.request(ctx, "POST", "/timeseries/"+url.PathEscape(series)+"/query", body, &result) if err != nil { return nil, err } return &result, nil } // Delete deletes data points in a range func (ts *TimeSeriesStore) Delete(ctx context.Context, series string, timeRange TimeRange, tags map[string]string) (int, error) { body := map[string]interface{}{ "range": timeRange, } if tags != nil { body["tags"] = tags } var result struct { Deleted int `json:"deleted"` } err := ts.client.request(ctx, "POST", "/timeseries/"+url.PathEscape(series)+"/delete", body, &result) if err != nil { return 0, err } return result.Deleted, nil } // GetSeriesInfo gets series information func (ts *TimeSeriesStore) GetSeriesInfo(ctx context.Context, series string) (*SeriesInfo, error) { var info SeriesInfo err := ts.client.request(ctx, "GET", "/timeseries/"+url.PathEscape(series)+"/info", nil, &info) if err != nil { return nil, err } return &info, nil } // ListSeries lists all series func (ts *TimeSeriesStore) ListSeries(ctx context.Context, prefix string) ([]SeriesInfo, error) { path := "/timeseries" if prefix != "" { path += "?prefix=" + url.QueryEscape(prefix) } var result struct { Series []SeriesInfo `json:"series"` } err := ts.client.request(ctx, "GET", path, nil, &result) if err != nil { return nil, err } return result.Series, nil } // SetRetention sets retention policy for a series func (ts *TimeSeriesStore) SetRetention(ctx context.Context, series string, retentionDays int) error { return ts.client.request(ctx, "PUT", "/timeseries/"+url.PathEscape(series)+"/retention", map[string]interface{}{"retention_days": retentionDays}, nil) }