- Implement SynorStorage class for decentralized storage operations including upload, download, pinning, and CAR file management. - Create supporting types and models for storage operations such as UploadOptions, Pin, and StorageConfig. - Implement SynorWallet class for wallet operations including wallet creation, address generation, transaction signing, and balance queries. - Create supporting types and models for wallet operations such as Wallet, Address, and Transaction. - Introduce error handling for both storage and wallet operations.
658 lines
17 KiB
Go
658 lines
17 KiB
Go
package database
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"time"
|
|
)
|
|
|
|
// Client is the Synor Database SDK client
|
|
type Client struct {
|
|
config Config
|
|
httpClient *http.Client
|
|
closed bool
|
|
|
|
// Stores
|
|
KV *KeyValueStore
|
|
Documents *DocumentStore
|
|
Vectors *VectorStore
|
|
TimeSeries *TimeSeriesStore
|
|
}
|
|
|
|
// New creates a new Database client
|
|
func New(config Config) *Client {
|
|
if config.Endpoint == "" {
|
|
config.Endpoint = "https://database.synor.io/v1"
|
|
}
|
|
if config.Timeout == 0 {
|
|
config.Timeout = 30 * time.Second
|
|
}
|
|
if config.Retries == 0 {
|
|
config.Retries = 3
|
|
}
|
|
|
|
client := &Client{
|
|
config: config,
|
|
httpClient: &http.Client{
|
|
Timeout: config.Timeout,
|
|
},
|
|
}
|
|
|
|
client.KV = &KeyValueStore{client: client}
|
|
client.Documents = &DocumentStore{client: client}
|
|
client.Vectors = &VectorStore{client: client}
|
|
client.TimeSeries = &TimeSeriesStore{client: client}
|
|
|
|
return client
|
|
}
|
|
|
|
// GetStats returns database statistics
|
|
func (c *Client) GetStats(ctx context.Context) (*DatabaseStats, error) {
|
|
var stats DatabaseStats
|
|
err := c.request(ctx, "GET", "/stats", nil, &stats)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &stats, nil
|
|
}
|
|
|
|
// HealthCheck performs a health check
|
|
func (c *Client) HealthCheck(ctx context.Context) bool {
|
|
var result struct {
|
|
Status string `json:"status"`
|
|
}
|
|
err := c.request(ctx, "GET", "/health", nil, &result)
|
|
return err == nil && result.Status == "healthy"
|
|
}
|
|
|
|
// Close closes the client
|
|
func (c *Client) Close() {
|
|
c.closed = true
|
|
}
|
|
|
|
// IsClosed returns whether the client is closed
|
|
func (c *Client) IsClosed() bool {
|
|
return c.closed
|
|
}
|
|
|
|
func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) error {
|
|
if c.closed {
|
|
return NewDatabaseError("Client has been closed", "CLIENT_CLOSED", 0)
|
|
}
|
|
|
|
var lastErr error
|
|
|
|
for attempt := 0; attempt < c.config.Retries; attempt++ {
|
|
err := c.doRequest(ctx, method, path, body, result)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
lastErr = err
|
|
if c.config.Debug {
|
|
fmt.Printf("Attempt %d failed: %v\n", attempt+1, err)
|
|
}
|
|
|
|
if attempt < c.config.Retries-1 {
|
|
time.Sleep(time.Duration(attempt+1) * time.Second)
|
|
}
|
|
}
|
|
|
|
return lastErr
|
|
}
|
|
|
|
func (c *Client) doRequest(ctx context.Context, method, path string, body interface{}, result interface{}) error {
|
|
var bodyReader io.Reader
|
|
if body != nil {
|
|
jsonBody, err := json.Marshal(body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
bodyReader = bytes.NewReader(jsonBody)
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, method, c.config.Endpoint+path, bodyReader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req.Header.Set("Authorization", "Bearer "+c.config.APIKey)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("X-SDK-Version", "go/0.1.0")
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
respBody, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if resp.StatusCode >= 400 {
|
|
var errResp struct {
|
|
Message string `json:"message"`
|
|
Error string `json:"error"`
|
|
Code string `json:"code"`
|
|
}
|
|
json.Unmarshal(respBody, &errResp)
|
|
|
|
msg := errResp.Message
|
|
if msg == "" {
|
|
msg = errResp.Error
|
|
}
|
|
if msg == "" {
|
|
msg = fmt.Sprintf("HTTP %d", resp.StatusCode)
|
|
}
|
|
|
|
return NewDatabaseError(msg, errResp.Code, resp.StatusCode)
|
|
}
|
|
|
|
if result != nil && len(respBody) > 0 {
|
|
return json.Unmarshal(respBody, result)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ==================== Key-Value Store ====================
|
|
|
|
// KeyValueStore provides key-value operations
|
|
type KeyValueStore struct {
|
|
client *Client
|
|
}
|
|
|
|
// Get retrieves a value by key
|
|
func (kv *KeyValueStore) Get(ctx context.Context, key string) (interface{}, error) {
|
|
var result struct {
|
|
Value interface{} `json:"value"`
|
|
}
|
|
err := kv.client.request(ctx, "GET", "/kv/"+url.PathEscape(key), nil, &result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.Value, nil
|
|
}
|
|
|
|
// GetEntry retrieves a full entry with metadata
|
|
func (kv *KeyValueStore) GetEntry(ctx context.Context, key string) (*KeyValueEntry, error) {
|
|
var entry KeyValueEntry
|
|
err := kv.client.request(ctx, "GET", "/kv/"+url.PathEscape(key)+"/entry", nil, &entry)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &entry, nil
|
|
}
|
|
|
|
// Set sets a value
|
|
func (kv *KeyValueStore) Set(ctx context.Context, key string, value interface{}, opts *SetOptions) error {
|
|
body := map[string]interface{}{
|
|
"value": value,
|
|
}
|
|
if opts != nil {
|
|
if opts.TTL != nil {
|
|
body["ttl"] = *opts.TTL
|
|
}
|
|
if opts.Metadata != nil {
|
|
body["metadata"] = opts.Metadata
|
|
}
|
|
if opts.IfNotExists {
|
|
body["if_not_exists"] = true
|
|
}
|
|
if opts.IfExists {
|
|
body["if_exists"] = true
|
|
}
|
|
}
|
|
return kv.client.request(ctx, "PUT", "/kv/"+url.PathEscape(key), body, nil)
|
|
}
|
|
|
|
// Delete deletes a key
|
|
func (kv *KeyValueStore) Delete(ctx context.Context, key string) (bool, error) {
|
|
var result struct {
|
|
Deleted bool `json:"deleted"`
|
|
}
|
|
err := kv.client.request(ctx, "DELETE", "/kv/"+url.PathEscape(key), nil, &result)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return result.Deleted, nil
|
|
}
|
|
|
|
// Exists checks if a key exists
|
|
func (kv *KeyValueStore) Exists(ctx context.Context, key string) (bool, error) {
|
|
var result struct {
|
|
Exists bool `json:"exists"`
|
|
}
|
|
err := kv.client.request(ctx, "GET", "/kv/"+url.PathEscape(key)+"/exists", nil, &result)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return result.Exists, nil
|
|
}
|
|
|
|
// List lists keys with optional prefix filtering
|
|
func (kv *KeyValueStore) List(ctx context.Context, opts *ListOptions) (*ListResult, error) {
|
|
params := url.Values{}
|
|
if opts != nil {
|
|
if opts.Prefix != "" {
|
|
params.Set("prefix", opts.Prefix)
|
|
}
|
|
if opts.Cursor != "" {
|
|
params.Set("cursor", opts.Cursor)
|
|
}
|
|
if opts.Limit > 0 {
|
|
params.Set("limit", strconv.Itoa(opts.Limit))
|
|
}
|
|
}
|
|
|
|
path := "/kv"
|
|
if len(params) > 0 {
|
|
path += "?" + params.Encode()
|
|
}
|
|
|
|
var result ListResult
|
|
err := kv.client.request(ctx, "GET", path, nil, &result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &result, nil
|
|
}
|
|
|
|
// MGet retrieves multiple values at once
|
|
func (kv *KeyValueStore) MGet(ctx context.Context, keys []string) (map[string]interface{}, error) {
|
|
var result struct {
|
|
Entries []struct {
|
|
Key string `json:"key"`
|
|
Value interface{} `json:"value"`
|
|
} `json:"entries"`
|
|
}
|
|
err := kv.client.request(ctx, "POST", "/kv/mget", map[string]interface{}{"keys": keys}, &result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
values := make(map[string]interface{})
|
|
for _, e := range result.Entries {
|
|
values[e.Key] = e.Value
|
|
}
|
|
return values, nil
|
|
}
|
|
|
|
// MSet sets multiple values at once
|
|
func (kv *KeyValueStore) MSet(ctx context.Context, entries map[string]interface{}) error {
|
|
entriesList := make([]map[string]interface{}, 0, len(entries))
|
|
for k, v := range entries {
|
|
entriesList = append(entriesList, map[string]interface{}{"key": k, "value": v})
|
|
}
|
|
return kv.client.request(ctx, "POST", "/kv/mset", map[string]interface{}{"entries": entriesList}, nil)
|
|
}
|
|
|
|
// Incr increments a numeric value
|
|
func (kv *KeyValueStore) Incr(ctx context.Context, key string, by int64) (int64, error) {
|
|
var result struct {
|
|
Value int64 `json:"value"`
|
|
}
|
|
err := kv.client.request(ctx, "POST", "/kv/"+url.PathEscape(key)+"/incr", map[string]interface{}{"by": by}, &result)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return result.Value, nil
|
|
}
|
|
|
|
// ==================== Document Store ====================
|
|
|
|
// DocumentStore provides document operations
|
|
type DocumentStore struct {
|
|
client *Client
|
|
}
|
|
|
|
// Create creates a new document
|
|
func (ds *DocumentStore) Create(ctx context.Context, collection string, data map[string]interface{}, opts *CreateDocumentOptions) (*Document, error) {
|
|
body := map[string]interface{}{
|
|
"data": data,
|
|
}
|
|
if opts != nil {
|
|
if opts.ID != "" {
|
|
body["id"] = opts.ID
|
|
}
|
|
if opts.Metadata != nil {
|
|
body["metadata"] = opts.Metadata
|
|
}
|
|
}
|
|
|
|
var doc Document
|
|
err := ds.client.request(ctx, "POST", "/documents/"+url.PathEscape(collection), body, &doc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &doc, nil
|
|
}
|
|
|
|
// Get retrieves a document by ID
|
|
func (ds *DocumentStore) Get(ctx context.Context, collection, id string) (*Document, error) {
|
|
var doc Document
|
|
err := ds.client.request(ctx, "GET", "/documents/"+url.PathEscape(collection)+"/"+url.PathEscape(id), nil, &doc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &doc, nil
|
|
}
|
|
|
|
// Update updates a document
|
|
func (ds *DocumentStore) Update(ctx context.Context, collection, id string, update map[string]interface{}, opts *UpdateDocumentOptions) (*Document, error) {
|
|
body := map[string]interface{}{
|
|
"update": update,
|
|
}
|
|
if opts != nil {
|
|
if opts.Upsert {
|
|
body["upsert"] = true
|
|
}
|
|
if opts.ReturnDocument != "" {
|
|
body["return_document"] = opts.ReturnDocument
|
|
}
|
|
}
|
|
|
|
var doc Document
|
|
err := ds.client.request(ctx, "PATCH", "/documents/"+url.PathEscape(collection)+"/"+url.PathEscape(id), body, &doc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &doc, nil
|
|
}
|
|
|
|
// Delete deletes a document
|
|
func (ds *DocumentStore) Delete(ctx context.Context, collection, id string) (bool, error) {
|
|
var result struct {
|
|
Deleted bool `json:"deleted"`
|
|
}
|
|
err := ds.client.request(ctx, "DELETE", "/documents/"+url.PathEscape(collection)+"/"+url.PathEscape(id), nil, &result)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return result.Deleted, nil
|
|
}
|
|
|
|
// Query queries documents
|
|
func (ds *DocumentStore) Query(ctx context.Context, collection string, opts *QueryOptions) (*QueryResult, error) {
|
|
body := make(map[string]interface{})
|
|
if opts != nil {
|
|
if opts.Filter != nil {
|
|
body["filter"] = opts.Filter
|
|
}
|
|
if opts.Sort != nil {
|
|
body["sort"] = opts.Sort
|
|
}
|
|
if opts.Skip > 0 {
|
|
body["skip"] = opts.Skip
|
|
}
|
|
if opts.Limit > 0 {
|
|
body["limit"] = opts.Limit
|
|
}
|
|
if opts.Projection != nil {
|
|
body["projection"] = opts.Projection
|
|
}
|
|
}
|
|
|
|
var result QueryResult
|
|
err := ds.client.request(ctx, "POST", "/documents/"+url.PathEscape(collection)+"/query", body, &result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &result, nil
|
|
}
|
|
|
|
// Count counts documents matching a filter
|
|
func (ds *DocumentStore) Count(ctx context.Context, collection string, filter QueryFilter) (int, error) {
|
|
var result struct {
|
|
Count int `json:"count"`
|
|
}
|
|
err := ds.client.request(ctx, "POST", "/documents/"+url.PathEscape(collection)+"/count", map[string]interface{}{"filter": filter}, &result)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return result.Count, nil
|
|
}
|
|
|
|
// Aggregate runs an aggregation pipeline
|
|
func (ds *DocumentStore) Aggregate(ctx context.Context, collection string, pipeline []AggregateStage) ([]interface{}, error) {
|
|
var result struct {
|
|
Results []interface{} `json:"results"`
|
|
}
|
|
err := ds.client.request(ctx, "POST", "/documents/"+url.PathEscape(collection)+"/aggregate", map[string]interface{}{"pipeline": pipeline}, &result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.Results, nil
|
|
}
|
|
|
|
// ==================== Vector Store ====================
|
|
|
|
// VectorStore provides vector operations
|
|
type VectorStore struct {
|
|
client *Client
|
|
}
|
|
|
|
// CreateCollection creates a vector collection
|
|
func (vs *VectorStore) CreateCollection(ctx context.Context, config VectorCollectionConfig) error {
|
|
return vs.client.request(ctx, "POST", "/vectors/collections", config, nil)
|
|
}
|
|
|
|
// DeleteCollection deletes a vector collection
|
|
func (vs *VectorStore) DeleteCollection(ctx context.Context, name string) error {
|
|
return vs.client.request(ctx, "DELETE", "/vectors/collections/"+url.PathEscape(name), nil, nil)
|
|
}
|
|
|
|
// GetCollectionStats gets collection statistics
|
|
func (vs *VectorStore) GetCollectionStats(ctx context.Context, name string) (*VectorCollectionStats, error) {
|
|
var stats VectorCollectionStats
|
|
err := vs.client.request(ctx, "GET", "/vectors/collections/"+url.PathEscape(name)+"/stats", nil, &stats)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &stats, nil
|
|
}
|
|
|
|
// Upsert upserts vectors
|
|
func (vs *VectorStore) Upsert(ctx context.Context, collection string, vectors []VectorEntry, opts *UpsertVectorOptions) (int, error) {
|
|
body := map[string]interface{}{
|
|
"vectors": vectors,
|
|
}
|
|
if opts != nil && opts.Namespace != "" {
|
|
body["namespace"] = opts.Namespace
|
|
}
|
|
|
|
var result struct {
|
|
Upserted int `json:"upserted"`
|
|
}
|
|
err := vs.client.request(ctx, "POST", "/vectors/"+url.PathEscape(collection)+"/upsert", body, &result)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return result.Upserted, nil
|
|
}
|
|
|
|
// Search searches for similar vectors
|
|
func (vs *VectorStore) Search(ctx context.Context, collection string, vector []float64, opts *SearchOptions) ([]SearchResult, error) {
|
|
body := map[string]interface{}{
|
|
"vector": vector,
|
|
}
|
|
if opts != nil {
|
|
if opts.Namespace != "" {
|
|
body["namespace"] = opts.Namespace
|
|
}
|
|
if opts.TopK > 0 {
|
|
body["top_k"] = opts.TopK
|
|
}
|
|
if opts.Filter != nil {
|
|
body["filter"] = opts.Filter
|
|
}
|
|
body["include_metadata"] = opts.IncludeMetadata
|
|
body["include_vectors"] = opts.IncludeVectors
|
|
if opts.MinScore != nil {
|
|
body["min_score"] = *opts.MinScore
|
|
}
|
|
}
|
|
|
|
var result struct {
|
|
Results []SearchResult `json:"results"`
|
|
}
|
|
err := vs.client.request(ctx, "POST", "/vectors/"+url.PathEscape(collection)+"/search", body, &result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.Results, nil
|
|
}
|
|
|
|
// Delete deletes vectors by ID
|
|
func (vs *VectorStore) Delete(ctx context.Context, collection string, ids []string, namespace string) (int, error) {
|
|
body := map[string]interface{}{
|
|
"ids": ids,
|
|
}
|
|
if namespace != "" {
|
|
body["namespace"] = namespace
|
|
}
|
|
|
|
var result struct {
|
|
Deleted int `json:"deleted"`
|
|
}
|
|
err := vs.client.request(ctx, "POST", "/vectors/"+url.PathEscape(collection)+"/delete", body, &result)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return result.Deleted, nil
|
|
}
|
|
|
|
// Fetch fetches vectors by ID
|
|
func (vs *VectorStore) Fetch(ctx context.Context, collection string, ids []string, namespace string) ([]VectorEntry, error) {
|
|
body := map[string]interface{}{
|
|
"ids": ids,
|
|
}
|
|
if namespace != "" {
|
|
body["namespace"] = namespace
|
|
}
|
|
|
|
var result struct {
|
|
Vectors []VectorEntry `json:"vectors"`
|
|
}
|
|
err := vs.client.request(ctx, "POST", "/vectors/"+url.PathEscape(collection)+"/fetch", body, &result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.Vectors, nil
|
|
}
|
|
|
|
// ==================== Time Series Store ====================
|
|
|
|
// TimeSeriesStore provides time series operations
|
|
type TimeSeriesStore struct {
|
|
client *Client
|
|
}
|
|
|
|
// Write writes data points to a series
|
|
func (ts *TimeSeriesStore) Write(ctx context.Context, series string, points []DataPoint, opts *WritePointsOptions) (int, error) {
|
|
body := map[string]interface{}{
|
|
"points": points,
|
|
}
|
|
if opts != nil && opts.Precision != "" {
|
|
body["precision"] = opts.Precision
|
|
}
|
|
|
|
var result struct {
|
|
Written int `json:"written"`
|
|
}
|
|
err := ts.client.request(ctx, "POST", "/timeseries/"+url.PathEscape(series)+"/write", body, &result)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return result.Written, nil
|
|
}
|
|
|
|
// Query queries a time series
|
|
func (ts *TimeSeriesStore) Query(ctx context.Context, series string, timeRange TimeRange, opts *TimeSeriesQueryOptions) (*TimeSeriesResult, error) {
|
|
body := map[string]interface{}{
|
|
"range": timeRange,
|
|
}
|
|
if opts != nil {
|
|
if opts.Tags != nil {
|
|
body["tags"] = opts.Tags
|
|
}
|
|
if opts.Aggregation != "" {
|
|
body["aggregation"] = opts.Aggregation
|
|
}
|
|
if opts.Interval != "" {
|
|
body["interval"] = opts.Interval
|
|
}
|
|
if opts.Fill != nil {
|
|
body["fill"] = opts.Fill
|
|
}
|
|
if opts.Limit > 0 {
|
|
body["limit"] = opts.Limit
|
|
}
|
|
}
|
|
|
|
var result TimeSeriesResult
|
|
err := ts.client.request(ctx, "POST", "/timeseries/"+url.PathEscape(series)+"/query", body, &result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &result, nil
|
|
}
|
|
|
|
// Delete deletes data points in a range
|
|
func (ts *TimeSeriesStore) Delete(ctx context.Context, series string, timeRange TimeRange, tags map[string]string) (int, error) {
|
|
body := map[string]interface{}{
|
|
"range": timeRange,
|
|
}
|
|
if tags != nil {
|
|
body["tags"] = tags
|
|
}
|
|
|
|
var result struct {
|
|
Deleted int `json:"deleted"`
|
|
}
|
|
err := ts.client.request(ctx, "POST", "/timeseries/"+url.PathEscape(series)+"/delete", body, &result)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return result.Deleted, nil
|
|
}
|
|
|
|
// GetSeriesInfo gets series information
|
|
func (ts *TimeSeriesStore) GetSeriesInfo(ctx context.Context, series string) (*SeriesInfo, error) {
|
|
var info SeriesInfo
|
|
err := ts.client.request(ctx, "GET", "/timeseries/"+url.PathEscape(series)+"/info", nil, &info)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &info, nil
|
|
}
|
|
|
|
// ListSeries lists all series
|
|
func (ts *TimeSeriesStore) ListSeries(ctx context.Context, prefix string) ([]SeriesInfo, error) {
|
|
path := "/timeseries"
|
|
if prefix != "" {
|
|
path += "?prefix=" + url.QueryEscape(prefix)
|
|
}
|
|
|
|
var result struct {
|
|
Series []SeriesInfo `json:"series"`
|
|
}
|
|
err := ts.client.request(ctx, "GET", path, nil, &result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.Series, nil
|
|
}
|
|
|
|
// SetRetention sets retention policy for a series
|
|
func (ts *TimeSeriesStore) SetRetention(ctx context.Context, series string, retentionDays int) error {
|
|
return ts.client.request(ctx, "PUT", "/timeseries/"+url.PathEscape(series)+"/retention", map[string]interface{}{"retention_days": retentionDays}, nil)
|
|
}
|