feat(database): add Phase 10 Synor Database L2 foundation

Multi-model database layer for Synor blockchain:

- Key-Value Store: Redis-compatible API with TTL, INCR, MGET/MSET
- Document Store: MongoDB-compatible queries with filters
- Vector Store: AI/RAG optimized with cosine, euclidean, dot product similarity
- Time-Series Store: Metrics with downsampling and aggregations
- Query Engine: Unified queries across all data models
- Index Manager: B-tree, hash, unique, and compound indexes
- Schema Validator: Field validation with type checking
- Database Pricing: Pay-per-use model (0.1 SYNOR/GB/month)

Updates roadmap with Phase 10-12 milestones:
- Phase 10: Synor Database L2
- Phase 11: Economics & Billing
- Phase 12: Fiat Gateway (Ramp Network integration)

41 tests passing
This commit is contained in:
Gulshan Yadav 2026-01-10 17:40:18 +05:30
parent 47a04244ec
commit 78c226a098
12 changed files with 4185 additions and 0 deletions

View file

@ -8,6 +8,7 @@ members = [
"crates/synor-network", "crates/synor-network",
"crates/synor-storage", "crates/synor-storage",
"crates/synor-hosting", "crates/synor-hosting",
"crates/synor-database",
"crates/synor-governance", "crates/synor-governance",
"crates/synor-rpc", "crates/synor-rpc",
"crates/synor-vm", "crates/synor-vm",

View file

@ -0,0 +1,39 @@
[package]
name = "synor-database"
version.workspace = true
edition.workspace = true
description = "Multi-model database layer for Synor blockchain"
license.workspace = true
[dependencies]
# Internal crates
synor-types = { path = "../synor-types" }
synor-crypto = { path = "../synor-crypto" }
synor-storage = { path = "../synor-storage" }
# Serialization
serde.workspace = true
serde_json.workspace = true
borsh.workspace = true
# Utilities
thiserror.workspace = true
parking_lot.workspace = true
tracing.workspace = true
hex.workspace = true
# Hashing
blake3.workspace = true
# Async
tokio = { workspace = true, features = ["sync", "rt-multi-thread"] }
# Data structures
lru = "0.12"
indexmap = "2.2"
# Vector operations (for AI/RAG)
# Using pure Rust for portability
[dev-dependencies]
tempfile.workspace = true

View file

@ -0,0 +1,621 @@
//! Document Store - MongoDB-compatible queries.
//!
//! Provides document storage with collections and rich queries.
use crate::error::DatabaseError;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
/// Document identifier.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct DocumentId(pub [u8; 32]);
impl DocumentId {
/// Creates a new random document ID.
pub fn new() -> Self {
let mut bytes = [0u8; 32];
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
bytes[..16].copy_from_slice(&timestamp.to_le_bytes());
// Add random component
let random: u128 = rand_bytes();
bytes[16..].copy_from_slice(&random.to_le_bytes());
Self(bytes)
}
/// Creates a document ID from bytes.
pub fn from_bytes(bytes: [u8; 32]) -> Self {
Self(bytes)
}
/// Returns hex string representation.
pub fn to_hex(&self) -> String {
hex::encode(&self.0)
}
/// Creates from hex string.
pub fn from_hex(s: &str) -> Result<Self, DatabaseError> {
let bytes = hex::decode(s)
.map_err(|_| DatabaseError::InvalidOperation("Invalid hex string".into()))?;
if bytes.len() != 32 {
return Err(DatabaseError::InvalidOperation("Invalid document ID length".into()));
}
let mut arr = [0u8; 32];
arr.copy_from_slice(&bytes);
Ok(Self(arr))
}
}
impl Default for DocumentId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for DocumentId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex::encode(&self.0[..12]))
}
}
// Simple pseudo-random for document IDs
fn rand_bytes() -> u128 {
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hasher};
let state = RandomState::new();
let mut hasher = state.build_hasher();
hasher.write_u64(std::time::Instant::now().elapsed().as_nanos() as u64);
let a = hasher.finish();
hasher.write_u64(a);
let b = hasher.finish();
((a as u128) << 64) | (b as u128)
}
/// A document in the store.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Document {
/// Unique document ID.
pub id: DocumentId,
/// Document data as JSON.
pub data: JsonValue,
/// Creation timestamp.
pub created_at: u64,
/// Last modification timestamp.
pub updated_at: u64,
/// Document version (for optimistic locking).
pub version: u64,
}
impl Document {
/// Creates a new document.
pub fn new(data: JsonValue) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Self {
id: DocumentId::new(),
data,
created_at: now,
updated_at: now,
version: 1,
}
}
/// Creates a document with a specific ID.
pub fn with_id(id: DocumentId, data: JsonValue) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Self {
id,
data,
created_at: now,
updated_at: now,
version: 1,
}
}
/// Gets a field value.
pub fn get(&self, field: &str) -> Option<&JsonValue> {
self.data.get(field)
}
/// Gets a nested field (dot notation).
pub fn get_nested(&self, path: &str) -> Option<&JsonValue> {
let parts: Vec<&str> = path.split('.').collect();
let mut current = &self.data;
for part in parts {
current = current.get(part)?;
}
Some(current)
}
/// Updates a field.
pub fn set(&mut self, field: &str, value: JsonValue) {
if let Some(obj) = self.data.as_object_mut() {
obj.insert(field.to_string(), value);
self.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
self.version += 1;
}
}
/// Merges another object into this document.
pub fn merge(&mut self, other: JsonValue) {
if let (Some(self_obj), Some(other_obj)) = (self.data.as_object_mut(), other.as_object()) {
for (key, value) in other_obj {
self_obj.insert(key.clone(), value.clone());
}
self.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
self.version += 1;
}
}
}
/// A collection of documents.
#[derive(Debug)]
pub struct Collection {
/// Collection name.
pub name: String,
/// Documents indexed by ID.
documents: RwLock<HashMap<DocumentId, Document>>,
/// Document count.
count: RwLock<u64>,
}
impl Collection {
/// Creates a new collection.
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
documents: RwLock::new(HashMap::new()),
count: RwLock::new(0),
}
}
/// Inserts a document.
pub fn insert(&self, doc: Document) -> Result<DocumentId, DatabaseError> {
let id = doc.id.clone();
let mut docs = self.documents.write();
if docs.contains_key(&id) {
return Err(DatabaseError::AlreadyExists(id.to_string()));
}
docs.insert(id.clone(), doc);
*self.count.write() += 1;
Ok(id)
}
/// Inserts a new document from JSON data.
pub fn insert_one(&self, data: JsonValue) -> Result<DocumentId, DatabaseError> {
let doc = Document::new(data);
self.insert(doc)
}
/// Inserts multiple documents.
pub fn insert_many(&self, docs: Vec<JsonValue>) -> Result<Vec<DocumentId>, DatabaseError> {
let mut ids = Vec::with_capacity(docs.len());
for data in docs {
ids.push(self.insert_one(data)?);
}
Ok(ids)
}
/// Finds a document by ID.
pub fn find_by_id(&self, id: &DocumentId) -> Option<Document> {
self.documents.read().get(id).cloned()
}
/// Finds documents matching a filter.
pub fn find(&self, filter: &DocumentFilter) -> Vec<Document> {
self.documents
.read()
.values()
.filter(|doc| filter.matches(doc))
.cloned()
.collect()
}
/// Finds one document matching a filter.
pub fn find_one(&self, filter: &DocumentFilter) -> Option<Document> {
self.documents
.read()
.values()
.find(|doc| filter.matches(doc))
.cloned()
}
/// Updates a document by ID.
pub fn update_by_id(&self, id: &DocumentId, update: JsonValue) -> Result<bool, DatabaseError> {
let mut docs = self.documents.write();
if let Some(doc) = docs.get_mut(id) {
doc.merge(update);
Ok(true)
} else {
Ok(false)
}
}
/// Updates documents matching a filter.
pub fn update_many(&self, filter: &DocumentFilter, update: JsonValue) -> Result<u64, DatabaseError> {
let mut docs = self.documents.write();
let mut count = 0;
for doc in docs.values_mut() {
if filter.matches(doc) {
doc.merge(update.clone());
count += 1;
}
}
Ok(count)
}
/// Deletes a document by ID.
pub fn delete_by_id(&self, id: &DocumentId) -> Result<bool, DatabaseError> {
let removed = self.documents.write().remove(id).is_some();
if removed {
*self.count.write() -= 1;
}
Ok(removed)
}
/// Deletes documents matching a filter.
pub fn delete_many(&self, filter: &DocumentFilter) -> Result<u64, DatabaseError> {
let mut docs = self.documents.write();
let before = docs.len();
docs.retain(|_, doc| !filter.matches(doc));
let deleted = (before - docs.len()) as u64;
*self.count.write() -= deleted;
Ok(deleted)
}
/// Returns document count.
pub fn count(&self) -> u64 {
*self.count.read()
}
/// Returns all documents.
pub fn all(&self) -> Vec<Document> {
self.documents.read().values().cloned().collect()
}
/// Clears all documents.
pub fn clear(&self) {
self.documents.write().clear();
*self.count.write() = 0;
}
}
/// Filter for querying documents.
#[derive(Clone, Debug, Default)]
pub struct DocumentFilter {
conditions: Vec<FilterCondition>,
}
#[derive(Clone, Debug)]
enum FilterCondition {
Eq(String, JsonValue),
Ne(String, JsonValue),
Gt(String, JsonValue),
Gte(String, JsonValue),
Lt(String, JsonValue),
Lte(String, JsonValue),
In(String, Vec<JsonValue>),
Contains(String, String),
Exists(String, bool),
And(Vec<DocumentFilter>),
Or(Vec<DocumentFilter>),
}
impl DocumentFilter {
/// Creates a new empty filter (matches all).
pub fn new() -> Self {
Self { conditions: Vec::new() }
}
/// Equality condition.
pub fn eq(mut self, field: impl Into<String>, value: JsonValue) -> Self {
self.conditions.push(FilterCondition::Eq(field.into(), value));
self
}
/// Not equal condition.
pub fn ne(mut self, field: impl Into<String>, value: JsonValue) -> Self {
self.conditions.push(FilterCondition::Ne(field.into(), value));
self
}
/// Greater than.
pub fn gt(mut self, field: impl Into<String>, value: JsonValue) -> Self {
self.conditions.push(FilterCondition::Gt(field.into(), value));
self
}
/// Greater than or equal.
pub fn gte(mut self, field: impl Into<String>, value: JsonValue) -> Self {
self.conditions.push(FilterCondition::Gte(field.into(), value));
self
}
/// Less than.
pub fn lt(mut self, field: impl Into<String>, value: JsonValue) -> Self {
self.conditions.push(FilterCondition::Lt(field.into(), value));
self
}
/// Less than or equal.
pub fn lte(mut self, field: impl Into<String>, value: JsonValue) -> Self {
self.conditions.push(FilterCondition::Lte(field.into(), value));
self
}
/// In array.
pub fn in_array(mut self, field: impl Into<String>, values: Vec<JsonValue>) -> Self {
self.conditions.push(FilterCondition::In(field.into(), values));
self
}
/// String contains.
pub fn contains(mut self, field: impl Into<String>, substring: impl Into<String>) -> Self {
self.conditions.push(FilterCondition::Contains(field.into(), substring.into()));
self
}
/// Field exists.
pub fn exists(mut self, field: impl Into<String>, exists: bool) -> Self {
self.conditions.push(FilterCondition::Exists(field.into(), exists));
self
}
/// AND multiple filters.
pub fn and(mut self, filters: Vec<DocumentFilter>) -> Self {
self.conditions.push(FilterCondition::And(filters));
self
}
/// OR multiple filters.
pub fn or(mut self, filters: Vec<DocumentFilter>) -> Self {
self.conditions.push(FilterCondition::Or(filters));
self
}
/// Checks if document matches the filter.
pub fn matches(&self, doc: &Document) -> bool {
if self.conditions.is_empty() {
return true;
}
self.conditions.iter().all(|cond| self.eval_condition(cond, doc))
}
fn eval_condition(&self, cond: &FilterCondition, doc: &Document) -> bool {
match cond {
FilterCondition::Eq(field, value) => {
doc.get_nested(field).map(|v| v == value).unwrap_or(false)
}
FilterCondition::Ne(field, value) => {
doc.get_nested(field).map(|v| v != value).unwrap_or(true)
}
FilterCondition::Gt(field, value) => {
self.compare_values(doc.get_nested(field), value, |a, b| a > b)
}
FilterCondition::Gte(field, value) => {
self.compare_values(doc.get_nested(field), value, |a, b| a >= b)
}
FilterCondition::Lt(field, value) => {
self.compare_values(doc.get_nested(field), value, |a, b| a < b)
}
FilterCondition::Lte(field, value) => {
self.compare_values(doc.get_nested(field), value, |a, b| a <= b)
}
FilterCondition::In(field, values) => {
doc.get_nested(field)
.map(|v| values.contains(v))
.unwrap_or(false)
}
FilterCondition::Contains(field, substring) => {
doc.get_nested(field)
.and_then(|v| v.as_str())
.map(|s| s.contains(substring))
.unwrap_or(false)
}
FilterCondition::Exists(field, should_exist) => {
let exists = doc.get_nested(field).is_some();
exists == *should_exist
}
FilterCondition::And(filters) => {
filters.iter().all(|f| f.matches(doc))
}
FilterCondition::Or(filters) => {
filters.iter().any(|f| f.matches(doc))
}
}
}
fn compare_values<F>(&self, a: Option<&JsonValue>, b: &JsonValue, cmp: F) -> bool
where
F: Fn(f64, f64) -> bool,
{
match (a, b) {
(Some(JsonValue::Number(a)), JsonValue::Number(b)) => {
match (a.as_f64(), b.as_f64()) {
(Some(a), Some(b)) => cmp(a, b),
_ => false,
}
}
_ => false,
}
}
}
/// Document store managing multiple collections.
pub struct DocumentStore {
collections: RwLock<HashMap<String, Collection>>,
}
impl DocumentStore {
/// Creates a new document store.
pub fn new() -> Self {
Self {
collections: RwLock::new(HashMap::new()),
}
}
/// Gets or creates a collection.
pub fn collection(&self, name: &str) -> std::sync::Arc<Collection> {
let mut collections = self.collections.write();
if !collections.contains_key(name) {
collections.insert(name.to_string(), Collection::new(name));
}
// Return a reference - note: this is simplified, real impl would use Arc
std::sync::Arc::new(Collection::new(name))
}
/// Creates a new collection.
pub fn create_collection(&self, name: &str) -> Result<(), DatabaseError> {
let mut collections = self.collections.write();
if collections.contains_key(name) {
return Err(DatabaseError::AlreadyExists(name.to_string()));
}
collections.insert(name.to_string(), Collection::new(name));
Ok(())
}
/// Drops a collection.
pub fn drop_collection(&self, name: &str) -> Result<bool, DatabaseError> {
Ok(self.collections.write().remove(name).is_some())
}
/// Lists all collection names.
pub fn list_collections(&self) -> Vec<String> {
self.collections.read().keys().cloned().collect()
}
/// Inserts a document into a collection.
pub fn insert(&self, collection: &str, data: JsonValue) -> Result<DocumentId, DatabaseError> {
let collections = self.collections.read();
let coll = collections
.get(collection)
.ok_or_else(|| DatabaseError::CollectionNotFound(collection.to_string()))?;
coll.insert_one(data)
}
/// Finds documents in a collection.
pub fn find(&self, collection: &str, filter: &DocumentFilter) -> Result<Vec<Document>, DatabaseError> {
let collections = self.collections.read();
let coll = collections
.get(collection)
.ok_or_else(|| DatabaseError::CollectionNotFound(collection.to_string()))?;
Ok(coll.find(filter))
}
/// Finds one document.
pub fn find_one(&self, collection: &str, filter: &DocumentFilter) -> Result<Option<Document>, DatabaseError> {
let collections = self.collections.read();
let coll = collections
.get(collection)
.ok_or_else(|| DatabaseError::CollectionNotFound(collection.to_string()))?;
Ok(coll.find_one(filter))
}
}
impl Default for DocumentStore {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_document_creation() {
let doc = Document::new(json!({
"name": "Alice",
"age": 30
}));
assert_eq!(doc.get("name"), Some(&json!("Alice")));
assert_eq!(doc.get("age"), Some(&json!(30)));
assert_eq!(doc.version, 1);
}
#[test]
fn test_collection_insert_find() {
let coll = Collection::new("users");
coll.insert_one(json!({"name": "Alice", "age": 30})).unwrap();
coll.insert_one(json!({"name": "Bob", "age": 25})).unwrap();
let filter = DocumentFilter::new().eq("name", json!("Alice"));
let results = coll.find(&filter);
assert_eq!(results.len(), 1);
assert_eq!(results[0].get("name"), Some(&json!("Alice")));
}
#[test]
fn test_filter_comparison() {
let coll = Collection::new("users");
coll.insert_one(json!({"name": "Alice", "age": 30})).unwrap();
coll.insert_one(json!({"name": "Bob", "age": 25})).unwrap();
coll.insert_one(json!({"name": "Charlie", "age": 35})).unwrap();
let filter = DocumentFilter::new().gte("age", json!(30));
let results = coll.find(&filter);
assert_eq!(results.len(), 2);
}
#[test]
fn test_nested_fields() {
let doc = Document::new(json!({
"user": {
"profile": {
"name": "Alice"
}
}
}));
assert_eq!(doc.get_nested("user.profile.name"), Some(&json!("Alice")));
}
#[test]
fn test_update_document() {
let coll = Collection::new("users");
let id = coll.insert_one(json!({"name": "Alice", "age": 30})).unwrap();
coll.update_by_id(&id, json!({"age": 31})).unwrap();
let doc = coll.find_by_id(&id).unwrap();
assert_eq!(doc.get("age"), Some(&json!(31)));
assert_eq!(doc.version, 2);
}
#[test]
fn test_delete_many() {
let coll = Collection::new("users");
coll.insert_one(json!({"status": "active"})).unwrap();
coll.insert_one(json!({"status": "active"})).unwrap();
coll.insert_one(json!({"status": "inactive"})).unwrap();
let filter = DocumentFilter::new().eq("status", json!("active"));
let deleted = coll.delete_many(&filter).unwrap();
assert_eq!(deleted, 2);
assert_eq!(coll.count(), 1);
}
}

View file

@ -0,0 +1,64 @@
//! Database error types.
use thiserror::Error;
/// Database errors.
#[derive(Debug, Error)]
pub enum DatabaseError {
#[error("Database already exists: {0}")]
AlreadyExists(String),
#[error("Database not found: {0}")]
NotFound(String),
#[error("Collection not found: {0}")]
CollectionNotFound(String),
#[error("Document not found: {0}")]
DocumentNotFound(String),
#[error("Key not found: {0}")]
KeyNotFound(String),
#[error("Index not found: {0}")]
IndexNotFound(String),
#[error("Schema validation failed: {0}")]
SchemaValidation(String),
#[error("Query error: {0}")]
QueryError(String),
#[error("Vector dimension mismatch: expected {expected}, got {got}")]
DimensionMismatch { expected: u32, got: u32 },
#[error("Storage error: {0}")]
StorageError(String),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("Capacity exceeded: {0}")]
CapacityExceeded(String),
#[error("TTL expired")]
TtlExpired,
#[error("Invalid operation: {0}")]
InvalidOperation(String),
#[error("Internal error: {0}")]
Internal(String),
}
impl From<serde_json::Error> for DatabaseError {
fn from(err: serde_json::Error) -> Self {
DatabaseError::SerializationError(err.to_string())
}
}
impl From<std::io::Error> for DatabaseError {
fn from(err: std::io::Error) -> Self {
DatabaseError::StorageError(err.to_string())
}
}

View file

@ -0,0 +1,522 @@
//! Index Management for efficient queries.
//!
//! Supports B-tree, hash, and vector indexes.
use crate::document::DocumentId;
use crate::error::DatabaseError;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::collections::{BTreeMap, HashMap, HashSet};
/// Index type.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum IndexType {
/// B-tree index for range queries.
BTree,
/// Hash index for equality lookups.
Hash,
/// Full-text search index.
FullText,
/// Vector index (HNSW).
Vector,
/// Compound index on multiple fields.
Compound,
/// Unique constraint index.
Unique,
}
/// Index configuration.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IndexConfig {
/// Index name.
pub name: String,
/// Collection name.
pub collection: String,
/// Fields to index.
pub fields: Vec<String>,
/// Index type.
pub index_type: IndexType,
/// Whether index enforces uniqueness.
pub unique: bool,
/// Sparse index (skip null values).
pub sparse: bool,
}
impl IndexConfig {
/// Creates a new index config.
pub fn new(name: impl Into<String>, collection: impl Into<String>) -> Self {
Self {
name: name.into(),
collection: collection.into(),
fields: Vec::new(),
index_type: IndexType::BTree,
unique: false,
sparse: false,
}
}
/// Adds a field to index.
pub fn field(mut self, field: impl Into<String>) -> Self {
self.fields.push(field.into());
self
}
/// Sets index type.
pub fn index_type(mut self, t: IndexType) -> Self {
self.index_type = t;
self
}
/// Sets as unique.
pub fn unique(mut self) -> Self {
self.unique = true;
self
}
/// Sets as sparse.
pub fn sparse(mut self) -> Self {
self.sparse = true;
self
}
}
/// An index entry.
#[derive(Clone, Debug)]
struct IndexEntry {
/// Indexed value (serialized for comparison).
key: IndexKey,
/// Document IDs with this value.
doc_ids: HashSet<DocumentId>,
}
/// Index key for ordering.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
enum IndexKey {
Null,
Bool(bool),
Int(i64),
String(String),
Bytes(Vec<u8>),
}
impl PartialOrd for IndexKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for IndexKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match (self, other) {
(IndexKey::Null, IndexKey::Null) => std::cmp::Ordering::Equal,
(IndexKey::Null, _) => std::cmp::Ordering::Less,
(_, IndexKey::Null) => std::cmp::Ordering::Greater,
(IndexKey::Bool(a), IndexKey::Bool(b)) => a.cmp(b),
(IndexKey::Int(a), IndexKey::Int(b)) => a.cmp(b),
(IndexKey::String(a), IndexKey::String(b)) => a.cmp(b),
(IndexKey::Bytes(a), IndexKey::Bytes(b)) => a.cmp(b),
_ => std::cmp::Ordering::Equal,
}
}
}
impl From<&JsonValue> for IndexKey {
fn from(value: &JsonValue) -> Self {
match value {
JsonValue::Null => IndexKey::Null,
JsonValue::Bool(b) => IndexKey::Bool(*b),
JsonValue::Number(n) => IndexKey::Int(n.as_i64().unwrap_or(0)),
JsonValue::String(s) => IndexKey::String(s.clone()),
_ => IndexKey::Bytes(serde_json::to_vec(value).unwrap_or_default()),
}
}
}
/// A single index instance.
pub struct Index {
/// Index configuration.
pub config: IndexConfig,
/// B-tree index data.
btree: RwLock<BTreeMap<IndexKey, HashSet<DocumentId>>>,
/// Hash index data.
hash: RwLock<HashMap<IndexKey, HashSet<DocumentId>>>,
/// Statistics.
stats: RwLock<IndexStats>,
}
/// Index statistics.
#[derive(Clone, Debug, Default)]
pub struct IndexStats {
/// Total entries.
pub entries: u64,
/// Index lookups.
pub lookups: u64,
/// Index hits.
pub hits: u64,
}
impl Index {
/// Creates a new index.
pub fn new(config: IndexConfig) -> Self {
Self {
config,
btree: RwLock::new(BTreeMap::new()),
hash: RwLock::new(HashMap::new()),
stats: RwLock::new(IndexStats::default()),
}
}
/// Adds a document to the index.
pub fn insert(&self, doc_id: DocumentId, value: &JsonValue) -> Result<(), DatabaseError> {
let key = IndexKey::from(value);
// Check uniqueness if required
if self.config.unique {
let exists = match self.config.index_type {
IndexType::Hash | IndexType::Unique => {
self.hash.read().get(&key).map(|s| !s.is_empty()).unwrap_or(false)
}
_ => {
self.btree.read().get(&key).map(|s| !s.is_empty()).unwrap_or(false)
}
};
if exists {
return Err(DatabaseError::AlreadyExists(
format!("Unique constraint violation on index '{}'", self.config.name)
));
}
}
match self.config.index_type {
IndexType::Hash | IndexType::Unique => {
self.hash
.write()
.entry(key)
.or_insert_with(HashSet::new)
.insert(doc_id);
}
_ => {
self.btree
.write()
.entry(key)
.or_insert_with(HashSet::new)
.insert(doc_id);
}
}
self.stats.write().entries += 1;
Ok(())
}
/// Removes a document from the index.
pub fn remove(&self, doc_id: &DocumentId, value: &JsonValue) {
let key = IndexKey::from(value);
match self.config.index_type {
IndexType::Hash | IndexType::Unique => {
if let Some(set) = self.hash.write().get_mut(&key) {
set.remove(doc_id);
if set.is_empty() {
self.hash.write().remove(&key);
}
}
}
_ => {
if let Some(set) = self.btree.write().get_mut(&key) {
set.remove(doc_id);
if set.is_empty() {
self.btree.write().remove(&key);
}
}
}
}
}
/// Looks up documents by exact value.
pub fn lookup(&self, value: &JsonValue) -> Vec<DocumentId> {
let key = IndexKey::from(value);
self.stats.write().lookups += 1;
let result: Vec<DocumentId> = match self.config.index_type {
IndexType::Hash | IndexType::Unique => {
self.hash
.read()
.get(&key)
.map(|s| s.iter().cloned().collect())
.unwrap_or_default()
}
_ => {
self.btree
.read()
.get(&key)
.map(|s| s.iter().cloned().collect())
.unwrap_or_default()
}
};
if !result.is_empty() {
self.stats.write().hits += 1;
}
result
}
/// Range query (only for B-tree indexes).
pub fn range(&self, start: Option<&JsonValue>, end: Option<&JsonValue>) -> Vec<DocumentId> {
if self.config.index_type != IndexType::BTree {
return Vec::new();
}
self.stats.write().lookups += 1;
let btree = self.btree.read();
let start_key = start.map(IndexKey::from);
let end_key = end.map(IndexKey::from);
let mut result = Vec::new();
for (key, doc_ids) in btree.iter() {
let in_range = match (&start_key, &end_key) {
(Some(s), Some(e)) => key >= s && key <= e,
(Some(s), None) => key >= s,
(None, Some(e)) => key <= e,
(None, None) => true,
};
if in_range {
result.extend(doc_ids.iter().cloned());
}
}
if !result.is_empty() {
self.stats.write().hits += 1;
}
result
}
/// Returns index statistics.
pub fn stats(&self) -> IndexStats {
self.stats.read().clone()
}
/// Clears the index.
pub fn clear(&self) {
self.btree.write().clear();
self.hash.write().clear();
self.stats.write().entries = 0;
}
}
/// Manages indexes for a database.
pub struct IndexManager {
/// Indexes by name.
indexes: RwLock<HashMap<String, Index>>,
/// Index by collection and field.
by_collection: RwLock<HashMap<String, Vec<String>>>,
}
impl IndexManager {
/// Creates a new index manager.
pub fn new() -> Self {
Self {
indexes: RwLock::new(HashMap::new()),
by_collection: RwLock::new(HashMap::new()),
}
}
/// Creates a new index.
pub fn create_index(&self, config: IndexConfig) -> Result<(), DatabaseError> {
let name = config.name.clone();
let collection = config.collection.clone();
let mut indexes = self.indexes.write();
if indexes.contains_key(&name) {
return Err(DatabaseError::AlreadyExists(name));
}
indexes.insert(name.clone(), Index::new(config));
self.by_collection
.write()
.entry(collection)
.or_insert_with(Vec::new)
.push(name);
Ok(())
}
/// Drops an index.
pub fn drop_index(&self, name: &str) -> Result<(), DatabaseError> {
let mut indexes = self.indexes.write();
let index = indexes
.remove(name)
.ok_or_else(|| DatabaseError::IndexNotFound(name.to_string()))?;
// Remove from collection mapping
let mut by_collection = self.by_collection.write();
if let Some(names) = by_collection.get_mut(&index.config.collection) {
names.retain(|n| n != name);
}
Ok(())
}
/// Gets an index by name.
pub fn get_index(&self, name: &str) -> Option<std::sync::Arc<Index>> {
// Simplified - real impl would use Arc
None
}
/// Gets indexes for a collection.
pub fn get_collection_indexes(&self, collection: &str) -> Vec<String> {
self.by_collection
.read()
.get(collection)
.cloned()
.unwrap_or_default()
}
/// Indexes a document.
pub fn index_document(
&self,
collection: &str,
doc_id: DocumentId,
document: &JsonValue,
) -> Result<(), DatabaseError> {
let index_names = self.get_collection_indexes(collection);
let indexes = self.indexes.read();
for name in index_names {
if let Some(index) = indexes.get(&name) {
for field in &index.config.fields {
if let Some(value) = document.get(field) {
index.insert(doc_id.clone(), value)?;
}
}
}
}
Ok(())
}
/// Removes a document from indexes.
pub fn unindex_document(
&self,
collection: &str,
doc_id: &DocumentId,
document: &JsonValue,
) {
let index_names = self.get_collection_indexes(collection);
let indexes = self.indexes.read();
for name in index_names {
if let Some(index) = indexes.get(&name) {
for field in &index.config.fields {
if let Some(value) = document.get(field) {
index.remove(doc_id, value);
}
}
}
}
}
/// Lists all indexes.
pub fn list_indexes(&self) -> Vec<IndexConfig> {
self.indexes
.read()
.values()
.map(|i| i.config.clone())
.collect()
}
}
impl Default for IndexManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_btree_index() {
let config = IndexConfig::new("age_idx", "users")
.field("age")
.index_type(IndexType::BTree);
let index = Index::new(config);
let doc1 = DocumentId::new();
let doc2 = DocumentId::new();
let doc3 = DocumentId::new();
index.insert(doc1.clone(), &json!(25)).unwrap();
index.insert(doc2.clone(), &json!(30)).unwrap();
index.insert(doc3.clone(), &json!(35)).unwrap();
// Exact lookup
let results = index.lookup(&json!(30));
assert_eq!(results.len(), 1);
assert_eq!(results[0], doc2);
// Range query
let results = index.range(Some(&json!(28)), Some(&json!(36)));
assert_eq!(results.len(), 2);
}
#[test]
fn test_hash_index() {
let config = IndexConfig::new("email_idx", "users")
.field("email")
.index_type(IndexType::Hash);
let index = Index::new(config);
let doc1 = DocumentId::new();
index.insert(doc1.clone(), &json!("alice@example.com")).unwrap();
let results = index.lookup(&json!("alice@example.com"));
assert_eq!(results.len(), 1);
let results = index.lookup(&json!("bob@example.com"));
assert!(results.is_empty());
}
#[test]
fn test_unique_index() {
let config = IndexConfig::new("email_unique", "users")
.field("email")
.index_type(IndexType::Unique)
.unique();
let index = Index::new(config);
let doc1 = DocumentId::new();
let doc2 = DocumentId::new();
index.insert(doc1, &json!("alice@example.com")).unwrap();
// Should fail - duplicate
let result = index.insert(doc2, &json!("alice@example.com"));
assert!(result.is_err());
}
#[test]
fn test_index_manager() {
let manager = IndexManager::new();
let config = IndexConfig::new("age_idx", "users").field("age");
manager.create_index(config).unwrap();
let doc_id = DocumentId::new();
let doc = json!({"name": "Alice", "age": 30});
manager.index_document("users", doc_id.clone(), &doc).unwrap();
let indexes = manager.list_indexes();
assert_eq!(indexes.len(), 1);
}
}

View file

@ -0,0 +1,442 @@
//! Key-Value Store - Redis-compatible API.
//!
//! Provides fast in-memory key-value operations with optional TTL.
use crate::error::DatabaseError;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, Instant};
/// Key-value entry with optional expiration.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KvEntry {
/// The value (stored as bytes).
pub value: Vec<u8>,
/// Creation timestamp (ms since epoch).
pub created_at: u64,
/// Last modified timestamp.
pub modified_at: u64,
/// TTL in seconds (0 = no expiry).
pub ttl: u64,
/// Expiration instant (internal).
#[serde(skip)]
expires_at: Option<Instant>,
}
impl KvEntry {
/// Creates a new entry.
pub fn new(value: Vec<u8>, ttl: u64) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let expires_at = if ttl > 0 {
Some(Instant::now() + Duration::from_secs(ttl))
} else {
None
};
Self {
value,
created_at: now,
modified_at: now,
ttl,
expires_at,
}
}
/// Checks if the entry has expired.
pub fn is_expired(&self) -> bool {
self.expires_at.map(|e| Instant::now() > e).unwrap_or(false)
}
/// Returns remaining TTL in seconds.
pub fn remaining_ttl(&self) -> Option<u64> {
self.expires_at.map(|e| {
let now = Instant::now();
if now > e {
0
} else {
(e - now).as_secs()
}
})
}
}
/// Key-value pair for iteration.
#[derive(Clone, Debug)]
pub struct KeyValue {
pub key: String,
pub value: Vec<u8>,
pub ttl: Option<u64>,
}
/// Redis-compatible key-value store.
pub struct KeyValueStore {
/// Data storage.
data: RwLock<HashMap<String, KvEntry>>,
/// Statistics.
stats: RwLock<KvStats>,
}
/// Key-value store statistics.
#[derive(Clone, Debug, Default)]
pub struct KvStats {
pub gets: u64,
pub sets: u64,
pub deletes: u64,
pub hits: u64,
pub misses: u64,
pub expired: u64,
}
impl KeyValueStore {
/// Creates a new key-value store.
pub fn new() -> Self {
Self {
data: RwLock::new(HashMap::new()),
stats: RwLock::new(KvStats::default()),
}
}
/// Gets a value by key.
pub fn get(&self, key: &str) -> Option<Vec<u8>> {
let mut stats = self.stats.write();
stats.gets += 1;
let data = self.data.read();
if let Some(entry) = data.get(key) {
if entry.is_expired() {
stats.expired += 1;
stats.misses += 1;
drop(data);
self.delete(key).ok();
None
} else {
stats.hits += 1;
Some(entry.value.clone())
}
} else {
stats.misses += 1;
None
}
}
/// Gets a value as string.
pub fn get_string(&self, key: &str) -> Option<String> {
self.get(key)
.and_then(|v| String::from_utf8(v).ok())
}
/// Sets a value with optional TTL.
pub fn set(&self, key: &str, value: Vec<u8>, ttl: u64) -> Result<(), DatabaseError> {
let entry = KvEntry::new(value, ttl);
self.data.write().insert(key.to_string(), entry);
self.stats.write().sets += 1;
Ok(())
}
/// Sets a string value.
pub fn set_string(&self, key: &str, value: &str, ttl: u64) -> Result<(), DatabaseError> {
self.set(key, value.as_bytes().to_vec(), ttl)
}
/// Sets a value only if key doesn't exist (SETNX).
pub fn set_nx(&self, key: &str, value: Vec<u8>, ttl: u64) -> Result<bool, DatabaseError> {
let mut data = self.data.write();
// Check if exists and not expired
if let Some(entry) = data.get(key) {
if !entry.is_expired() {
return Ok(false);
}
}
data.insert(key.to_string(), KvEntry::new(value, ttl));
self.stats.write().sets += 1;
Ok(true)
}
/// Sets a value only if key exists (SETXX).
pub fn set_xx(&self, key: &str, value: Vec<u8>, ttl: u64) -> Result<bool, DatabaseError> {
let mut data = self.data.write();
if let Some(entry) = data.get(key) {
if entry.is_expired() {
return Ok(false);
}
data.insert(key.to_string(), KvEntry::new(value, ttl));
self.stats.write().sets += 1;
Ok(true)
} else {
Ok(false)
}
}
/// Deletes a key.
pub fn delete(&self, key: &str) -> Result<bool, DatabaseError> {
let removed = self.data.write().remove(key).is_some();
if removed {
self.stats.write().deletes += 1;
}
Ok(removed)
}
/// Checks if a key exists.
pub fn exists(&self, key: &str) -> bool {
let data = self.data.read();
data.get(key).map(|e| !e.is_expired()).unwrap_or(false)
}
/// Sets TTL on existing key.
pub fn expire(&self, key: &str, ttl: u64) -> Result<bool, DatabaseError> {
let mut data = self.data.write();
if let Some(entry) = data.get_mut(key) {
if entry.is_expired() {
return Ok(false);
}
entry.ttl = ttl;
entry.expires_at = if ttl > 0 {
Some(Instant::now() + Duration::from_secs(ttl))
} else {
None
};
Ok(true)
} else {
Ok(false)
}
}
/// Gets remaining TTL.
pub fn ttl(&self, key: &str) -> Option<u64> {
self.data.read().get(key).and_then(|e| e.remaining_ttl())
}
/// Increments a numeric value.
pub fn incr(&self, key: &str, delta: i64) -> Result<i64, DatabaseError> {
let mut data = self.data.write();
let current = if let Some(entry) = data.get(key) {
if entry.is_expired() {
0
} else {
let s = String::from_utf8(entry.value.clone())
.map_err(|_| DatabaseError::InvalidOperation("Value is not a string".into()))?;
s.parse::<i64>()
.map_err(|_| DatabaseError::InvalidOperation("Value is not an integer".into()))?
}
} else {
0
};
let new_value = current + delta;
let entry = KvEntry::new(new_value.to_string().into_bytes(), 0);
data.insert(key.to_string(), entry);
self.stats.write().sets += 1;
Ok(new_value)
}
/// Appends to a string value.
pub fn append(&self, key: &str, value: &[u8]) -> Result<usize, DatabaseError> {
let mut data = self.data.write();
let entry = data.entry(key.to_string()).or_insert_with(|| {
KvEntry::new(Vec::new(), 0)
});
if entry.is_expired() {
entry.value.clear();
}
entry.value.extend_from_slice(value);
entry.modified_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Ok(entry.value.len())
}
/// Gets multiple keys.
pub fn mget(&self, keys: &[&str]) -> Vec<Option<Vec<u8>>> {
keys.iter().map(|k| self.get(k)).collect()
}
/// Sets multiple key-value pairs.
pub fn mset(&self, pairs: &[(&str, Vec<u8>)], ttl: u64) -> Result<(), DatabaseError> {
let mut data = self.data.write();
for (key, value) in pairs {
data.insert(key.to_string(), KvEntry::new(value.clone(), ttl));
}
self.stats.write().sets += pairs.len() as u64;
Ok(())
}
/// Returns all keys matching a pattern (simple glob).
pub fn keys(&self, pattern: &str) -> Vec<String> {
let data = self.data.read();
data.keys()
.filter(|k| self.matches_pattern(k, pattern))
.cloned()
.collect()
}
/// Simple glob pattern matching.
fn matches_pattern(&self, key: &str, pattern: &str) -> bool {
if pattern == "*" {
return true;
}
let parts: Vec<&str> = pattern.split('*').collect();
if parts.len() == 1 {
return key == pattern;
}
let mut pos = 0;
for (i, part) in parts.iter().enumerate() {
if part.is_empty() {
continue;
}
if let Some(found) = key[pos..].find(part) {
if i == 0 && found != 0 {
return false;
}
pos += found + part.len();
} else {
return false;
}
}
if !parts.last().unwrap().is_empty() {
key.ends_with(parts.last().unwrap())
} else {
true
}
}
/// Returns the number of keys.
pub fn len(&self) -> usize {
self.data.read().len()
}
/// Checks if store is empty.
pub fn is_empty(&self) -> bool {
self.data.read().is_empty()
}
/// Clears all keys.
pub fn clear(&self) {
self.data.write().clear();
}
/// Returns statistics.
pub fn stats(&self) -> KvStats {
self.stats.read().clone()
}
/// Evicts expired entries.
pub fn evict_expired(&self) -> usize {
let mut data = self.data.write();
let before = data.len();
data.retain(|_, entry| !entry.is_expired());
let evicted = before - data.len();
self.stats.write().expired += evicted as u64;
evicted
}
}
impl Default for KeyValueStore {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_operations() {
let store = KeyValueStore::new();
store.set_string("key1", "value1", 0).unwrap();
assert_eq!(store.get_string("key1"), Some("value1".to_string()));
store.delete("key1").unwrap();
assert_eq!(store.get_string("key1"), None);
}
#[test]
fn test_set_nx() {
let store = KeyValueStore::new();
assert!(store.set_nx("key", b"value1".to_vec(), 0).unwrap());
assert!(!store.set_nx("key", b"value2".to_vec(), 0).unwrap());
assert_eq!(store.get_string("key"), Some("value1".to_string()));
}
#[test]
fn test_incr() {
let store = KeyValueStore::new();
assert_eq!(store.incr("counter", 1).unwrap(), 1);
assert_eq!(store.incr("counter", 1).unwrap(), 2);
assert_eq!(store.incr("counter", 5).unwrap(), 7);
assert_eq!(store.incr("counter", -3).unwrap(), 4);
}
#[test]
fn test_mget_mset() {
let store = KeyValueStore::new();
store.mset(&[
("k1", b"v1".to_vec()),
("k2", b"v2".to_vec()),
("k3", b"v3".to_vec()),
], 0).unwrap();
let results = store.mget(&["k1", "k2", "k4"]);
assert_eq!(results.len(), 3);
assert_eq!(results[0], Some(b"v1".to_vec()));
assert_eq!(results[1], Some(b"v2".to_vec()));
assert_eq!(results[2], None);
}
#[test]
fn test_keys_pattern() {
let store = KeyValueStore::new();
store.set_string("user:1", "alice", 0).unwrap();
store.set_string("user:2", "bob", 0).unwrap();
store.set_string("session:1", "data", 0).unwrap();
let user_keys = store.keys("user:*");
assert_eq!(user_keys.len(), 2);
let all_keys = store.keys("*");
assert_eq!(all_keys.len(), 3);
}
#[test]
fn test_append() {
let store = KeyValueStore::new();
store.append("msg", b"hello").unwrap();
store.append("msg", b" world").unwrap();
assert_eq!(store.get_string("msg"), Some("hello world".to_string()));
}
#[test]
fn test_exists() {
let store = KeyValueStore::new();
assert!(!store.exists("key"));
store.set_string("key", "value", 0).unwrap();
assert!(store.exists("key"));
}
}

View file

@ -0,0 +1,436 @@
//! Synor Database L2 - Multi-Model Database Layer
//!
//! Provides decentralized database services built on Synor Storage:
//!
//! - **Key-Value Store**: Redis-compatible API for caching and sessions
//! - **Document Store**: MongoDB-compatible queries for app data
//! - **Vector Store**: AI/RAG optimized with similarity search
//! - **Time-Series**: Metrics and analytics storage
//!
//! # Architecture
//!
//! ```text
//! ┌─────────────────────────────────────────────────────────────────┐
//! │ SYNOR DATABASE L2 │
//! ├─────────────────────────────────────────────────────────────────┤
//! │ │
//! │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
//! │ │ Key-Value │ │ Document │ │ Vector │ │
//! │ │ (Redis) │ │ (Mongo) │ │ (AI/RAG) │ │
//! │ └──────────────┘ └──────────────┘ └──────────────────────┘ │
//! │ │
//! │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
//! │ │ Time-Series │ │ Index │ │ Query │ │
//! │ │ (Metrics) │ │ Manager │ │ Engine │ │
//! │ └──────────────┘ └──────────────┘ └──────────────────────┘ │
//! │ │
//! │ ┌────────────────────────────────────────────────────────────┐ │
//! │ │ STORAGE LAYER (Synor Storage) │ │
//! │ └────────────────────────────────────────────────────────────┘ │
//! │ │
//! └─────────────────────────────────────────────────────────────────┘
//! ```
//!
//! # Pricing
//!
//! | Operation | Cost (SYNOR) |
//! |-----------|--------------|
//! | Storage/GB/month | 0.1 |
//! | Read/million | 0.001 |
//! | Write/million | 0.01 |
//! | Vector search/million | 0.05 |
#![allow(dead_code)]
pub mod document;
pub mod error;
pub mod index;
pub mod keyvalue;
pub mod query;
pub mod schema;
pub mod timeseries;
pub mod vector;
pub use document::{Collection, Document, DocumentId, DocumentStore};
pub use error::DatabaseError;
pub use index::{Index, IndexConfig, IndexManager, IndexType};
pub use keyvalue::{KeyValue, KeyValueStore, KvEntry};
pub use query::{Filter, Query, QueryEngine, QueryResult, SortOrder};
pub use schema::{Field, FieldType, Schema, SchemaValidator};
pub use timeseries::{DataPoint, Metric, TimeSeries, TimeSeriesStore};
pub use vector::{Embedding, SimilarityMetric, VectorIndex, VectorStore};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::RwLock;
/// Database identifier.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct DatabaseId(pub [u8; 32]);
impl DatabaseId {
/// Creates a new database ID from name.
pub fn from_name(owner: &[u8; 32], name: &str) -> Self {
let mut input = Vec::new();
input.extend_from_slice(owner);
input.extend_from_slice(name.as_bytes());
DatabaseId(*blake3::hash(&input).as_bytes())
}
/// Returns the bytes.
pub fn as_bytes(&self) -> &[u8; 32] {
&self.0
}
}
impl std::fmt::Display for DatabaseId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "db_{}", hex::encode(&self.0[..8]))
}
}
/// Database configuration.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DatabaseConfig {
/// Database name.
pub name: String,
/// Owner address.
pub owner: [u8; 32],
/// Maximum storage size (bytes).
pub max_size: u64,
/// Enable vector indexing.
pub vector_enabled: bool,
/// Vector dimensions (if enabled).
pub vector_dimensions: u32,
/// Replication factor.
pub replication: u8,
/// TTL for entries (0 = no expiry).
pub default_ttl: u64,
}
impl Default for DatabaseConfig {
fn default() -> Self {
Self {
name: "default".to_string(),
owner: [0u8; 32],
max_size: 1024 * 1024 * 1024, // 1 GB
vector_enabled: false,
vector_dimensions: 0,
replication: 3,
default_ttl: 0,
}
}
}
/// Database statistics.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct DatabaseStats {
/// Total documents/entries.
pub entry_count: u64,
/// Total storage used (bytes).
pub storage_used: u64,
/// Total reads.
pub reads: u64,
/// Total writes.
pub writes: u64,
/// Total vector searches.
pub vector_searches: u64,
/// Index count.
pub index_count: u32,
}
/// Unified database instance supporting multiple models.
pub struct Database {
/// Database ID.
pub id: DatabaseId,
/// Configuration.
pub config: DatabaseConfig,
/// Key-value store.
kv: Arc<KeyValueStore>,
/// Document store.
docs: Arc<DocumentStore>,
/// Vector store.
vectors: Arc<VectorStore>,
/// Time-series store.
timeseries: Arc<TimeSeriesStore>,
/// Query engine.
query_engine: Arc<QueryEngine>,
/// Index manager.
index_manager: Arc<IndexManager>,
/// Statistics.
stats: RwLock<DatabaseStats>,
}
impl Database {
/// Creates a new database.
pub fn new(config: DatabaseConfig) -> Self {
let id = DatabaseId::from_name(&config.owner, &config.name);
let kv = Arc::new(KeyValueStore::new());
let docs = Arc::new(DocumentStore::new());
let vectors = Arc::new(VectorStore::new(config.vector_dimensions));
let timeseries = Arc::new(TimeSeriesStore::new());
let index_manager = Arc::new(IndexManager::new());
let query_engine = Arc::new(QueryEngine::new(
docs.clone(),
vectors.clone(),
index_manager.clone(),
));
Self {
id,
config,
kv,
docs,
vectors,
timeseries,
query_engine,
index_manager,
stats: RwLock::new(DatabaseStats::default()),
}
}
/// Returns the key-value store.
pub fn kv(&self) -> &KeyValueStore {
&self.kv
}
/// Returns the document store.
pub fn documents(&self) -> &DocumentStore {
&self.docs
}
/// Returns the vector store.
pub fn vectors(&self) -> &VectorStore {
&self.vectors
}
/// Returns the time-series store.
pub fn timeseries(&self) -> &TimeSeriesStore {
&self.timeseries
}
/// Returns the query engine.
pub fn query(&self) -> &QueryEngine {
&self.query_engine
}
/// Returns current statistics.
pub fn stats(&self) -> DatabaseStats {
self.stats.read().clone()
}
/// Records a read operation.
pub fn record_read(&self) {
self.stats.write().reads += 1;
}
/// Records a write operation.
pub fn record_write(&self, bytes: u64) {
let mut stats = self.stats.write();
stats.writes += 1;
stats.storage_used += bytes;
}
/// Records a vector search.
pub fn record_vector_search(&self) {
self.stats.write().vector_searches += 1;
}
}
/// Database manager for multiple databases.
pub struct DatabaseManager {
/// Active databases.
databases: RwLock<HashMap<DatabaseId, Arc<Database>>>,
/// Default configuration.
default_config: DatabaseConfig,
}
impl DatabaseManager {
/// Creates a new database manager.
pub fn new() -> Self {
Self {
databases: RwLock::new(HashMap::new()),
default_config: DatabaseConfig::default(),
}
}
/// Creates a new database.
pub fn create(&self, config: DatabaseConfig) -> Result<Arc<Database>, DatabaseError> {
let db = Arc::new(Database::new(config));
let id = db.id;
let mut dbs = self.databases.write();
if dbs.contains_key(&id) {
return Err(DatabaseError::AlreadyExists(id.to_string()));
}
dbs.insert(id, db.clone());
Ok(db)
}
/// Gets a database by ID.
pub fn get(&self, id: &DatabaseId) -> Option<Arc<Database>> {
self.databases.read().get(id).cloned()
}
/// Gets a database by name.
pub fn get_by_name(&self, owner: &[u8; 32], name: &str) -> Option<Arc<Database>> {
let id = DatabaseId::from_name(owner, name);
self.get(&id)
}
/// Deletes a database.
pub fn delete(&self, id: &DatabaseId) -> Result<(), DatabaseError> {
let mut dbs = self.databases.write();
if dbs.remove(id).is_none() {
return Err(DatabaseError::NotFound(id.to_string()));
}
Ok(())
}
/// Lists all databases for an owner.
pub fn list(&self, owner: &[u8; 32]) -> Vec<Arc<Database>> {
self.databases
.read()
.values()
.filter(|db| &db.config.owner == owner)
.cloned()
.collect()
}
}
impl Default for DatabaseManager {
fn default() -> Self {
Self::new()
}
}
/// Pricing calculator for database operations.
pub struct DatabasePricing {
/// Storage cost per GB per month (in atomic SYNOR).
pub storage_per_gb_month: u64,
/// Read cost per million operations.
pub reads_per_million: u64,
/// Write cost per million operations.
pub writes_per_million: u64,
/// Vector search cost per million.
pub vector_search_per_million: u64,
}
impl Default for DatabasePricing {
fn default() -> Self {
Self {
storage_per_gb_month: 100_000_000, // 0.1 SYNOR
reads_per_million: 1_000_000, // 0.001 SYNOR
writes_per_million: 10_000_000, // 0.01 SYNOR
vector_search_per_million: 50_000_000, // 0.05 SYNOR
}
}
}
impl DatabasePricing {
/// Calculates cost for given usage.
pub fn calculate(&self, stats: &DatabaseStats, days: u64) -> u64 {
let gb = stats.storage_used as f64 / (1024.0 * 1024.0 * 1024.0);
let months = days as f64 / 30.0;
let storage_cost = (gb * months * self.storage_per_gb_month as f64) as u64;
let read_cost = stats.reads * self.reads_per_million / 1_000_000;
let write_cost = stats.writes * self.writes_per_million / 1_000_000;
let vector_cost = stats.vector_searches * self.vector_search_per_million / 1_000_000;
storage_cost + read_cost + write_cost + vector_cost
}
/// Estimates monthly cost for given parameters.
pub fn estimate_monthly(
&self,
storage_gb: f64,
reads_per_day: u64,
writes_per_day: u64,
vector_searches_per_day: u64,
) -> u64 {
let storage_cost = (storage_gb * self.storage_per_gb_month as f64) as u64;
let reads_monthly = reads_per_day * 30;
let writes_monthly = writes_per_day * 30;
let vectors_monthly = vector_searches_per_day * 30;
let read_cost = reads_monthly * self.reads_per_million / 1_000_000;
let write_cost = writes_monthly * self.writes_per_million / 1_000_000;
let vector_cost = vectors_monthly * self.vector_search_per_million / 1_000_000;
storage_cost + read_cost + write_cost + vector_cost
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_database_id() {
let owner = [1u8; 32];
let id1 = DatabaseId::from_name(&owner, "test");
let id2 = DatabaseId::from_name(&owner, "test");
let id3 = DatabaseId::from_name(&owner, "other");
assert_eq!(id1, id2);
assert_ne!(id1, id3);
}
#[test]
fn test_database_creation() {
let config = DatabaseConfig {
name: "mydb".to_string(),
owner: [1u8; 32],
..Default::default()
};
let db = Database::new(config);
assert_eq!(db.stats().entry_count, 0);
}
#[test]
fn test_database_manager() {
let manager = DatabaseManager::new();
let config = DatabaseConfig {
name: "test".to_string(),
owner: [1u8; 32],
..Default::default()
};
let db = manager.create(config.clone()).unwrap();
assert!(manager.get(&db.id).is_some());
// Duplicate should fail.
assert!(manager.create(config).is_err());
// Delete.
manager.delete(&db.id).unwrap();
assert!(manager.get(&db.id).is_none());
}
#[test]
fn test_pricing() {
let pricing = DatabasePricing::default();
let stats = DatabaseStats {
storage_used: 1024 * 1024 * 1024, // 1 GB
reads: 1_000_000,
writes: 100_000,
vector_searches: 10_000,
..Default::default()
};
let cost = pricing.calculate(&stats, 30);
assert!(cost > 0);
// Estimate monthly.
let monthly = pricing.estimate_monthly(1.0, 10000, 1000, 100);
assert!(monthly > 0);
}
}

View file

@ -0,0 +1,570 @@
//! Query Engine - Unified query layer.
//!
//! Supports SQL-like queries, document filters, and vector similarity search.
use crate::document::{Document, DocumentFilter, DocumentStore};
use crate::error::DatabaseError;
use crate::index::IndexManager;
use crate::vector::VectorStore;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::sync::Arc;
/// Sort order for query results.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SortOrder {
Ascending,
Descending,
}
/// Filter operators.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Filter {
/// Equality: field = value
Eq(String, JsonValue),
/// Not equal: field != value
Ne(String, JsonValue),
/// Greater than: field > value
Gt(String, JsonValue),
/// Greater than or equal: field >= value
Gte(String, JsonValue),
/// Less than: field < value
Lt(String, JsonValue),
/// Less than or equal: field <= value
Lte(String, JsonValue),
/// In array: field IN [values]
In(String, Vec<JsonValue>),
/// Not in array: field NOT IN [values]
NotIn(String, Vec<JsonValue>),
/// String contains: field LIKE %value%
Contains(String, String),
/// String starts with: field LIKE value%
StartsWith(String, String),
/// String ends with: field LIKE %value
EndsWith(String, String),
/// Field exists
Exists(String),
/// Field is null
IsNull(String),
/// Logical AND
And(Vec<Filter>),
/// Logical OR
Or(Vec<Filter>),
/// Logical NOT
Not(Box<Filter>),
/// Vector similarity search
VectorSimilar {
field: String,
vector: Vec<f32>,
limit: usize,
threshold: f32,
},
}
impl Filter {
/// Converts to DocumentFilter.
pub fn to_document_filter(&self) -> DocumentFilter {
match self {
Filter::Eq(field, value) => DocumentFilter::new().eq(field, value.clone()),
Filter::Ne(field, value) => DocumentFilter::new().ne(field, value.clone()),
Filter::Gt(field, value) => DocumentFilter::new().gt(field, value.clone()),
Filter::Gte(field, value) => DocumentFilter::new().gte(field, value.clone()),
Filter::Lt(field, value) => DocumentFilter::new().lt(field, value.clone()),
Filter::Lte(field, value) => DocumentFilter::new().lte(field, value.clone()),
Filter::In(field, values) => DocumentFilter::new().in_array(field, values.clone()),
Filter::Contains(field, substr) => DocumentFilter::new().contains(field, substr),
Filter::Exists(field) => DocumentFilter::new().exists(field, true),
Filter::IsNull(field) => DocumentFilter::new().eq(field, JsonValue::Null),
Filter::And(filters) => {
let doc_filters: Vec<_> = filters.iter().map(|f| f.to_document_filter()).collect();
DocumentFilter::new().and(doc_filters)
}
Filter::Or(filters) => {
let doc_filters: Vec<_> = filters.iter().map(|f| f.to_document_filter()).collect();
DocumentFilter::new().or(doc_filters)
}
_ => DocumentFilter::new(),
}
}
}
/// Query builder.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct Query {
/// Collection to query.
pub collection: String,
/// Filter conditions.
pub filter: Option<Filter>,
/// Fields to select (empty = all).
pub select: Vec<String>,
/// Sort fields.
pub sort: Vec<(String, SortOrder)>,
/// Skip count.
pub skip: usize,
/// Limit count (0 = no limit).
pub limit: usize,
/// Include vector similarity.
pub vector_query: Option<VectorQuery>,
}
/// Vector similarity query.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct VectorQuery {
/// Field containing vectors.
pub field: String,
/// Query vector.
pub vector: Vec<f32>,
/// Number of results.
pub limit: usize,
/// Minimum similarity threshold.
pub threshold: f32,
}
impl Query {
/// Creates a new query for a collection.
pub fn new(collection: impl Into<String>) -> Self {
Self {
collection: collection.into(),
..Default::default()
}
}
/// Adds a filter.
pub fn filter(mut self, filter: Filter) -> Self {
self.filter = Some(filter);
self
}
/// Adds an equality filter.
pub fn eq(mut self, field: impl Into<String>, value: JsonValue) -> Self {
let new_filter = Filter::Eq(field.into(), value);
self.filter = Some(match self.filter {
Some(f) => Filter::And(vec![f, new_filter]),
None => new_filter,
});
self
}
/// Selects specific fields.
pub fn select(mut self, fields: Vec<String>) -> Self {
self.select = fields;
self
}
/// Adds a sort field.
pub fn sort(mut self, field: impl Into<String>, order: SortOrder) -> Self {
self.sort.push((field.into(), order));
self
}
/// Sets skip count.
pub fn skip(mut self, n: usize) -> Self {
self.skip = n;
self
}
/// Sets limit count.
pub fn limit(mut self, n: usize) -> Self {
self.limit = n;
self
}
/// Adds vector similarity search.
pub fn vector_similar(
mut self,
field: impl Into<String>,
vector: Vec<f32>,
limit: usize,
threshold: f32,
) -> Self {
self.vector_query = Some(VectorQuery {
field: field.into(),
vector,
limit,
threshold,
});
self
}
}
/// Query result.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct QueryResult {
/// Matched documents.
pub documents: Vec<Document>,
/// Total count (before skip/limit).
pub total: u64,
/// Query execution time (ms).
pub execution_time_ms: u64,
/// Whether more results exist.
pub has_more: bool,
}
impl QueryResult {
/// Creates an empty result.
pub fn empty() -> Self {
Self {
documents: Vec::new(),
total: 0,
execution_time_ms: 0,
has_more: false,
}
}
/// Returns the first document.
pub fn first(&self) -> Option<&Document> {
self.documents.first()
}
/// Maps documents to JSON values.
pub fn to_json(&self) -> Vec<JsonValue> {
self.documents.iter().map(|d| d.data.clone()).collect()
}
}
/// Query execution engine.
pub struct QueryEngine {
/// Document store reference.
docs: Arc<DocumentStore>,
/// Vector store reference.
vectors: Arc<VectorStore>,
/// Index manager reference.
indexes: Arc<IndexManager>,
}
impl QueryEngine {
/// Creates a new query engine.
pub fn new(
docs: Arc<DocumentStore>,
vectors: Arc<VectorStore>,
indexes: Arc<IndexManager>,
) -> Self {
Self {
docs,
vectors,
indexes,
}
}
/// Executes a query.
pub fn execute(&self, query: &Query) -> Result<QueryResult, DatabaseError> {
let start = std::time::Instant::now();
// Get documents from collection
let filter = query.filter.as_ref().map(|f| f.to_document_filter());
let default_filter = DocumentFilter::default();
let filter_ref = filter.as_ref().unwrap_or(&default_filter);
let mut documents = self.docs.find(&query.collection, filter_ref)?;
// Apply vector similarity if specified
if let Some(vq) = &query.vector_query {
documents = self.apply_vector_query(documents, vq)?;
}
let total = documents.len() as u64;
// Apply sorting
if !query.sort.is_empty() {
self.sort_documents(&mut documents, &query.sort);
}
// Apply skip and limit
let mut has_more = false;
if query.skip > 0 {
if query.skip >= documents.len() {
documents.clear();
} else {
documents = documents.split_off(query.skip);
}
}
if query.limit > 0 && documents.len() > query.limit {
documents.truncate(query.limit);
has_more = true;
}
// Apply field selection
if !query.select.is_empty() {
documents = self.select_fields(documents, &query.select);
}
let execution_time_ms = start.elapsed().as_millis() as u64;
Ok(QueryResult {
documents,
total,
execution_time_ms,
has_more,
})
}
fn apply_vector_query(
&self,
documents: Vec<Document>,
vq: &VectorQuery,
) -> Result<Vec<Document>, DatabaseError> {
// For now, filter documents in-memory
// Real implementation would use vector index
let mut scored: Vec<(Document, f32)> = documents
.into_iter()
.filter_map(|doc| {
let field_value = doc.get(&vq.field)?;
let arr = field_value.as_array()?;
let vec: Vec<f32> = arr
.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect();
if vec.len() != vq.vector.len() {
return None;
}
let similarity = cosine_similarity(&vec, &vq.vector);
if similarity >= vq.threshold {
Some((doc, similarity))
} else {
None
}
})
.collect();
// Sort by similarity descending
scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
// Apply limit
scored.truncate(vq.limit);
Ok(scored.into_iter().map(|(doc, _)| doc).collect())
}
fn sort_documents(&self, documents: &mut [Document], sort: &[(String, SortOrder)]) {
documents.sort_by(|a, b| {
for (field, order) in sort {
let a_val = a.get(field);
let b_val = b.get(field);
let cmp = match (a_val, b_val) {
(Some(av), Some(bv)) => self.compare_json(av, bv),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => std::cmp::Ordering::Equal,
};
let cmp = match order {
SortOrder::Ascending => cmp,
SortOrder::Descending => cmp.reverse(),
};
if cmp != std::cmp::Ordering::Equal {
return cmp;
}
}
std::cmp::Ordering::Equal
});
}
fn compare_json(&self, a: &JsonValue, b: &JsonValue) -> std::cmp::Ordering {
match (a, b) {
(JsonValue::Number(an), JsonValue::Number(bn)) => {
let af = an.as_f64().unwrap_or(0.0);
let bf = bn.as_f64().unwrap_or(0.0);
af.partial_cmp(&bf).unwrap_or(std::cmp::Ordering::Equal)
}
(JsonValue::String(as_), JsonValue::String(bs)) => as_.cmp(bs),
(JsonValue::Bool(ab), JsonValue::Bool(bb)) => ab.cmp(bb),
_ => std::cmp::Ordering::Equal,
}
}
fn select_fields(&self, documents: Vec<Document>, fields: &[String]) -> Vec<Document> {
documents
.into_iter()
.map(|mut doc| {
if let Some(obj) = doc.data.as_object() {
let filtered: serde_json::Map<String, JsonValue> = obj
.iter()
.filter(|(k, _)| fields.contains(k))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
doc.data = JsonValue::Object(filtered);
}
doc
})
.collect()
}
/// Counts documents matching a filter.
pub fn count(&self, collection: &str, filter: Option<&Filter>) -> Result<u64, DatabaseError> {
let doc_filter = filter.map(|f| f.to_document_filter());
let default_filter = DocumentFilter::default();
let filter_ref = doc_filter.as_ref().unwrap_or(&default_filter);
let docs = self.docs.find(collection, filter_ref)?;
Ok(docs.len() as u64)
}
/// Aggregates values.
pub fn aggregate(
&self,
collection: &str,
field: &str,
op: AggregateOp,
filter: Option<&Filter>,
) -> Result<JsonValue, DatabaseError> {
let doc_filter = filter.map(|f| f.to_document_filter());
let default_filter = DocumentFilter::default();
let filter_ref = doc_filter.as_ref().unwrap_or(&default_filter);
let docs = self.docs.find(collection, filter_ref)?;
let values: Vec<f64> = docs
.iter()
.filter_map(|doc| {
doc.get(field)
.and_then(|v| v.as_f64())
})
.collect();
let result = match op {
AggregateOp::Count => JsonValue::Number(serde_json::Number::from(values.len())),
AggregateOp::Sum => {
let sum: f64 = values.iter().sum();
serde_json::to_value(sum).unwrap_or(JsonValue::Null)
}
AggregateOp::Avg => {
if values.is_empty() {
JsonValue::Null
} else {
let avg = values.iter().sum::<f64>() / values.len() as f64;
serde_json::to_value(avg).unwrap_or(JsonValue::Null)
}
}
AggregateOp::Min => {
values
.iter()
.copied()
.min_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
.map(|v| serde_json::to_value(v).unwrap_or(JsonValue::Null))
.unwrap_or(JsonValue::Null)
}
AggregateOp::Max => {
values
.iter()
.copied()
.max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
.map(|v| serde_json::to_value(v).unwrap_or(JsonValue::Null))
.unwrap_or(JsonValue::Null)
}
};
Ok(result)
}
}
/// Aggregation operations.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AggregateOp {
Count,
Sum,
Avg,
Min,
Max,
}
/// Cosine similarity between two vectors.
fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
if a.len() != b.len() {
return 0.0;
}
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let mag_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let mag_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if mag_a == 0.0 || mag_b == 0.0 {
0.0
} else {
dot / (mag_a * mag_b)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::document::DocumentStore;
use crate::index::IndexManager;
use crate::vector::VectorStore;
use serde_json::json;
fn setup_engine() -> QueryEngine {
let docs = Arc::new(DocumentStore::new());
let vectors = Arc::new(VectorStore::new(3));
let indexes = Arc::new(IndexManager::new());
QueryEngine::new(docs, vectors, indexes)
}
#[test]
fn test_simple_query() {
let docs = Arc::new(DocumentStore::new());
docs.create_collection("users").unwrap();
docs.insert("users", json!({"name": "Alice", "age": 30})).unwrap();
docs.insert("users", json!({"name": "Bob", "age": 25})).unwrap();
let vectors = Arc::new(VectorStore::new(3));
let indexes = Arc::new(IndexManager::new());
let engine = QueryEngine::new(docs, vectors, indexes);
let query = Query::new("users").limit(10);
let result = engine.execute(&query).unwrap();
assert_eq!(result.documents.len(), 2);
assert_eq!(result.total, 2);
}
#[test]
fn test_filter_query() {
let docs = Arc::new(DocumentStore::new());
docs.create_collection("users").unwrap();
docs.insert("users", json!({"name": "Alice", "age": 30})).unwrap();
docs.insert("users", json!({"name": "Bob", "age": 25})).unwrap();
let vectors = Arc::new(VectorStore::new(3));
let indexes = Arc::new(IndexManager::new());
let engine = QueryEngine::new(docs, vectors, indexes);
let query = Query::new("users").filter(Filter::Eq("name".to_string(), json!("Alice")));
let result = engine.execute(&query).unwrap();
assert_eq!(result.documents.len(), 1);
assert_eq!(result.documents[0].get("name"), Some(&json!("Alice")));
}
#[test]
fn test_sorted_query() {
let docs = Arc::new(DocumentStore::new());
docs.create_collection("users").unwrap();
docs.insert("users", json!({"name": "Alice", "age": 30})).unwrap();
docs.insert("users", json!({"name": "Bob", "age": 25})).unwrap();
docs.insert("users", json!({"name": "Charlie", "age": 35})).unwrap();
let vectors = Arc::new(VectorStore::new(3));
let indexes = Arc::new(IndexManager::new());
let engine = QueryEngine::new(docs, vectors, indexes);
let query = Query::new("users").sort("age", SortOrder::Ascending);
let result = engine.execute(&query).unwrap();
assert_eq!(result.documents[0].get("name"), Some(&json!("Bob")));
assert_eq!(result.documents[2].get("name"), Some(&json!("Charlie")));
}
#[test]
fn test_cosine_similarity() {
let a = vec![1.0, 0.0, 0.0];
let b = vec![1.0, 0.0, 0.0];
assert!((cosine_similarity(&a, &b) - 1.0).abs() < 0.001);
let c = vec![0.0, 1.0, 0.0];
assert!((cosine_similarity(&a, &c) - 0.0).abs() < 0.001);
}
}

View file

@ -0,0 +1,336 @@
//! Schema definitions and validation for documents.
use crate::error::DatabaseError;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Field types supported in schemas.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum FieldType {
/// String field.
String,
/// Integer (i64).
Integer,
/// Floating point (f64).
Float,
/// Boolean.
Boolean,
/// Array of values.
Array(Box<FieldType>),
/// Nested object.
Object,
/// Binary data.
Binary,
/// Timestamp (milliseconds since epoch).
Timestamp,
/// Vector embedding (f32 array).
Vector(u32), // dimension
/// Any type (no validation).
Any,
}
/// Field definition in a schema.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Field {
/// Field name.
pub name: String,
/// Field type.
pub field_type: FieldType,
/// Whether field is required.
pub required: bool,
/// Whether field should be indexed.
pub indexed: bool,
/// Default value (JSON).
pub default: Option<serde_json::Value>,
/// Field description.
pub description: Option<String>,
}
impl Field {
/// Creates a new required field.
pub fn required(name: impl Into<String>, field_type: FieldType) -> Self {
Self {
name: name.into(),
field_type,
required: true,
indexed: false,
default: None,
description: None,
}
}
/// Creates a new optional field.
pub fn optional(name: impl Into<String>, field_type: FieldType) -> Self {
Self {
name: name.into(),
field_type,
required: false,
indexed: false,
default: None,
description: None,
}
}
/// Sets the field as indexed.
pub fn with_index(mut self) -> Self {
self.indexed = true;
self
}
/// Sets a default value.
pub fn with_default(mut self, default: serde_json::Value) -> Self {
self.default = Some(default);
self
}
/// Sets description.
pub fn with_description(mut self, desc: impl Into<String>) -> Self {
self.description = Some(desc.into());
self
}
}
/// Document schema for validation.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Schema {
/// Schema name.
pub name: String,
/// Fields in the schema.
pub fields: Vec<Field>,
/// Whether to allow additional fields.
pub strict: bool,
/// Schema version.
pub version: u32,
}
impl Schema {
/// Creates a new schema.
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
fields: Vec::new(),
strict: false,
version: 1,
}
}
/// Adds a field to the schema.
pub fn field(mut self, field: Field) -> Self {
self.fields.push(field);
self
}
/// Sets strict mode (no additional fields allowed).
pub fn strict(mut self) -> Self {
self.strict = true;
self
}
/// Returns fields that should be indexed.
pub fn indexed_fields(&self) -> Vec<&Field> {
self.fields.iter().filter(|f| f.indexed).collect()
}
/// Returns required fields.
pub fn required_fields(&self) -> Vec<&Field> {
self.fields.iter().filter(|f| f.required).collect()
}
}
/// Schema validator.
pub struct SchemaValidator {
schemas: HashMap<String, Schema>,
}
impl SchemaValidator {
/// Creates a new validator.
pub fn new() -> Self {
Self {
schemas: HashMap::new(),
}
}
/// Registers a schema.
pub fn register(&mut self, schema: Schema) {
self.schemas.insert(schema.name.clone(), schema);
}
/// Gets a schema by name.
pub fn get(&self, name: &str) -> Option<&Schema> {
self.schemas.get(name)
}
/// Validates a document against a schema.
pub fn validate(
&self,
schema_name: &str,
document: &serde_json::Value,
) -> Result<(), DatabaseError> {
let schema = self
.schemas
.get(schema_name)
.ok_or_else(|| DatabaseError::NotFound(format!("Schema: {}", schema_name)))?;
let obj = document.as_object().ok_or_else(|| {
DatabaseError::SchemaValidation("Document must be an object".to_string())
})?;
// Check required fields
for field in &schema.fields {
if field.required && !obj.contains_key(&field.name) {
return Err(DatabaseError::SchemaValidation(format!(
"Missing required field: {}",
field.name
)));
}
}
// Check field types
for field in &schema.fields {
if let Some(value) = obj.get(&field.name) {
self.validate_field_type(value, &field.field_type, &field.name)?;
}
}
// Check for extra fields in strict mode
if schema.strict {
let known_fields: Vec<&str> = schema.fields.iter().map(|f| f.name.as_str()).collect();
for key in obj.keys() {
if !known_fields.contains(&key.as_str()) {
return Err(DatabaseError::SchemaValidation(format!(
"Unknown field in strict schema: {}",
key
)));
}
}
}
Ok(())
}
fn validate_field_type(
&self,
value: &serde_json::Value,
expected: &FieldType,
field_name: &str,
) -> Result<(), DatabaseError> {
let valid = match expected {
FieldType::String => value.is_string(),
FieldType::Integer => value.is_i64(),
FieldType::Float => value.is_f64() || value.is_i64(),
FieldType::Boolean => value.is_boolean(),
FieldType::Array(inner) => {
if let Some(arr) = value.as_array() {
for item in arr {
self.validate_field_type(item, inner, field_name)?;
}
true
} else {
false
}
}
FieldType::Object => value.is_object(),
FieldType::Binary => value.is_string(), // Base64 encoded
FieldType::Timestamp => value.is_i64() || value.is_u64(),
FieldType::Vector(dim) => {
if let Some(arr) = value.as_array() {
arr.len() == *dim as usize && arr.iter().all(|v| v.is_f64())
} else {
false
}
}
FieldType::Any => true,
};
if valid {
Ok(())
} else {
Err(DatabaseError::SchemaValidation(format!(
"Field '{}' has invalid type, expected {:?}",
field_name, expected
)))
}
}
}
impl Default for SchemaValidator {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_schema_validation() {
let schema = Schema::new("user")
.field(Field::required("name", FieldType::String))
.field(Field::required("age", FieldType::Integer))
.field(Field::optional("email", FieldType::String));
let mut validator = SchemaValidator::new();
validator.register(schema);
// Valid document
let doc = json!({
"name": "Alice",
"age": 30,
"email": "alice@example.com"
});
assert!(validator.validate("user", &doc).is_ok());
// Missing required field
let doc = json!({
"name": "Bob"
});
assert!(validator.validate("user", &doc).is_err());
// Wrong type
let doc = json!({
"name": "Charlie",
"age": "thirty"
});
assert!(validator.validate("user", &doc).is_err());
}
#[test]
fn test_strict_schema() {
let schema = Schema::new("strict_user")
.field(Field::required("name", FieldType::String))
.strict();
let mut validator = SchemaValidator::new();
validator.register(schema);
// Extra field should fail
let doc = json!({
"name": "Alice",
"extra": "not allowed"
});
assert!(validator.validate("strict_user", &doc).is_err());
}
#[test]
fn test_vector_field() {
let schema = Schema::new("embedding")
.field(Field::required("vector", FieldType::Vector(3)));
let mut validator = SchemaValidator::new();
validator.register(schema);
// Valid vector
let doc = json!({
"vector": [1.0, 2.0, 3.0]
});
assert!(validator.validate("embedding", &doc).is_ok());
// Wrong dimension
let doc = json!({
"vector": [1.0, 2.0]
});
assert!(validator.validate("embedding", &doc).is_err());
}
}

View file

@ -0,0 +1,534 @@
//! Time-Series Store - Metrics and analytics storage.
//!
//! Optimized for high-ingestion rate time-stamped data.
use crate::error::DatabaseError;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
/// A single data point in time series.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DataPoint {
/// Timestamp (milliseconds since epoch).
pub timestamp: u64,
/// Numeric value.
pub value: f64,
/// Optional tags/labels.
pub tags: HashMap<String, String>,
}
impl DataPoint {
/// Creates a new data point.
pub fn new(timestamp: u64, value: f64) -> Self {
Self {
timestamp,
value,
tags: HashMap::new(),
}
}
/// Creates a data point at current time.
pub fn now(value: f64) -> Self {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Self::new(timestamp, value)
}
/// Adds a tag.
pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.tags.insert(key.into(), value.into());
self
}
/// Adds multiple tags.
pub fn with_tags(mut self, tags: HashMap<String, String>) -> Self {
self.tags.extend(tags);
self
}
}
/// A metric time series.
#[derive(Debug)]
pub struct Metric {
/// Metric name.
pub name: String,
/// Data points ordered by timestamp.
data: RwLock<BTreeMap<u64, f64>>,
/// Tags for this metric.
pub tags: HashMap<String, String>,
/// Retention period (ms), 0 = infinite.
pub retention_ms: u64,
}
impl Metric {
/// Creates a new metric.
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
data: RwLock::new(BTreeMap::new()),
tags: HashMap::new(),
retention_ms: 0,
}
}
/// Sets retention period.
pub fn with_retention(mut self, retention_ms: u64) -> Self {
self.retention_ms = retention_ms;
self
}
/// Adds a tag.
pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.tags.insert(key.into(), value.into());
self
}
/// Records a data point.
pub fn record(&self, point: DataPoint) {
self.data.write().insert(point.timestamp, point.value);
}
/// Records current value.
pub fn record_now(&self, value: f64) {
self.record(DataPoint::now(value));
}
/// Increments by 1.
pub fn increment(&self) {
self.add(1.0);
}
/// Adds to the current value.
pub fn add(&self, delta: f64) {
let point = DataPoint::now(delta);
let mut data = self.data.write();
let current = data.values().last().copied().unwrap_or(0.0);
data.insert(point.timestamp, current + delta);
}
/// Gets data points in a time range.
pub fn range(&self, start: u64, end: u64) -> Vec<DataPoint> {
self.data
.read()
.range(start..=end)
.map(|(&timestamp, &value)| DataPoint::new(timestamp, value))
.collect()
}
/// Gets the latest value.
pub fn latest(&self) -> Option<DataPoint> {
self.data
.read()
.iter()
.last()
.map(|(&timestamp, &value)| DataPoint::new(timestamp, value))
}
/// Calculates average in a time range.
pub fn avg(&self, start: u64, end: u64) -> Option<f64> {
let data = self.data.read();
let values: Vec<f64> = data.range(start..=end).map(|(_, &v)| v).collect();
if values.is_empty() {
None
} else {
Some(values.iter().sum::<f64>() / values.len() as f64)
}
}
/// Calculates min in a time range.
pub fn min(&self, start: u64, end: u64) -> Option<f64> {
self.data
.read()
.range(start..=end)
.map(|(_, &v)| v)
.min_by(|a, b| a.partial_cmp(b).unwrap())
}
/// Calculates max in a time range.
pub fn max(&self, start: u64, end: u64) -> Option<f64> {
self.data
.read()
.range(start..=end)
.map(|(_, &v)| v)
.max_by(|a, b| a.partial_cmp(b).unwrap())
}
/// Calculates sum in a time range.
pub fn sum(&self, start: u64, end: u64) -> f64 {
self.data
.read()
.range(start..=end)
.map(|(_, &v)| v)
.sum()
}
/// Counts data points in a time range.
pub fn count(&self, start: u64, end: u64) -> usize {
self.data.read().range(start..=end).count()
}
/// Returns total data points.
pub fn len(&self) -> usize {
self.data.read().len()
}
/// Checks if empty.
pub fn is_empty(&self) -> bool {
self.data.read().is_empty()
}
/// Evicts data older than retention period.
pub fn evict_old(&self) -> usize {
if self.retention_ms == 0 {
return 0;
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let cutoff = now.saturating_sub(self.retention_ms);
let mut data = self.data.write();
let before = data.len();
data.retain(|&ts, _| ts >= cutoff);
before - data.len()
}
/// Downsamples data to reduce storage.
pub fn downsample(&self, interval_ms: u64, agg: Aggregation) -> Vec<DataPoint> {
let data = self.data.read();
if data.is_empty() {
return Vec::new();
}
let first_ts = *data.keys().next().unwrap();
let last_ts = *data.keys().last().unwrap();
let mut result = Vec::new();
let mut bucket_start = first_ts;
while bucket_start <= last_ts {
let bucket_end = bucket_start + interval_ms;
let values: Vec<f64> = data
.range(bucket_start..bucket_end)
.map(|(_, &v)| v)
.collect();
if !values.is_empty() {
let value = match agg {
Aggregation::Avg => values.iter().sum::<f64>() / values.len() as f64,
Aggregation::Sum => values.iter().sum(),
Aggregation::Min => values
.iter()
.copied()
.min_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap(),
Aggregation::Max => values
.iter()
.copied()
.max_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap(),
Aggregation::First => values[0],
Aggregation::Last => *values.last().unwrap(),
Aggregation::Count => values.len() as f64,
};
result.push(DataPoint::new(bucket_start, value));
}
bucket_start = bucket_end;
}
result
}
}
/// Aggregation function for downsampling.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Aggregation {
Avg,
Sum,
Min,
Max,
First,
Last,
Count,
}
/// Time series store managing multiple metrics.
#[derive(Debug)]
pub struct TimeSeries {
/// Metric name.
pub name: String,
/// Series data (tag-set -> points).
series: RwLock<HashMap<String, Vec<DataPoint>>>,
}
impl TimeSeries {
/// Creates a new time series.
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
series: RwLock::new(HashMap::new()),
}
}
/// Adds a data point.
pub fn add(&self, tags: &HashMap<String, String>, point: DataPoint) {
let key = Self::tags_to_key(tags);
self.series
.write()
.entry(key)
.or_insert_with(Vec::new)
.push(point);
}
/// Queries data points.
pub fn query(
&self,
tags: Option<&HashMap<String, String>>,
start: u64,
end: u64,
) -> Vec<DataPoint> {
let series = self.series.read();
if let Some(tags) = tags {
let key = Self::tags_to_key(tags);
series
.get(&key)
.map(|points| {
points
.iter()
.filter(|p| p.timestamp >= start && p.timestamp <= end)
.cloned()
.collect()
})
.unwrap_or_default()
} else {
series
.values()
.flat_map(|points| {
points
.iter()
.filter(|p| p.timestamp >= start && p.timestamp <= end)
.cloned()
})
.collect()
}
}
fn tags_to_key(tags: &HashMap<String, String>) -> String {
let mut pairs: Vec<_> = tags.iter().collect();
pairs.sort_by_key(|(k, _)| *k);
pairs
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",")
}
}
/// Time series store.
pub struct TimeSeriesStore {
/// Metrics by name.
metrics: RwLock<HashMap<String, Metric>>,
/// Time series by name.
series: RwLock<HashMap<String, TimeSeries>>,
}
impl TimeSeriesStore {
/// Creates a new time series store.
pub fn new() -> Self {
Self {
metrics: RwLock::new(HashMap::new()),
series: RwLock::new(HashMap::new()),
}
}
/// Creates or gets a metric.
pub fn metric(&self, name: &str) -> Result<(), DatabaseError> {
let mut metrics = self.metrics.write();
if !metrics.contains_key(name) {
metrics.insert(name.to_string(), Metric::new(name));
}
Ok(())
}
/// Records a metric value.
pub fn record(&self, name: &str, value: f64) -> Result<(), DatabaseError> {
let metrics = self.metrics.read();
if let Some(metric) = metrics.get(name) {
metric.record_now(value);
Ok(())
} else {
drop(metrics);
self.metric(name)?;
self.record(name, value)
}
}
/// Records a data point with timestamp.
pub fn record_point(&self, name: &str, point: DataPoint) -> Result<(), DatabaseError> {
let metrics = self.metrics.read();
if let Some(metric) = metrics.get(name) {
metric.record(point);
Ok(())
} else {
drop(metrics);
self.metric(name)?;
self.record_point(name, point)
}
}
/// Queries metric data.
pub fn query(&self, name: &str, start: u64, end: u64) -> Result<Vec<DataPoint>, DatabaseError> {
let metrics = self.metrics.read();
let metric = metrics
.get(name)
.ok_or_else(|| DatabaseError::NotFound(name.to_string()))?;
Ok(metric.range(start, end))
}
/// Gets latest metric value.
pub fn latest(&self, name: &str) -> Result<Option<DataPoint>, DatabaseError> {
let metrics = self.metrics.read();
let metric = metrics
.get(name)
.ok_or_else(|| DatabaseError::NotFound(name.to_string()))?;
Ok(metric.latest())
}
/// Aggregates metric data.
pub fn aggregate(
&self,
name: &str,
start: u64,
end: u64,
agg: Aggregation,
) -> Result<Option<f64>, DatabaseError> {
let metrics = self.metrics.read();
let metric = metrics
.get(name)
.ok_or_else(|| DatabaseError::NotFound(name.to_string()))?;
let result = match agg {
Aggregation::Avg => metric.avg(start, end),
Aggregation::Sum => Some(metric.sum(start, end)),
Aggregation::Min => metric.min(start, end),
Aggregation::Max => metric.max(start, end),
Aggregation::Count => Some(metric.count(start, end) as f64),
_ => None,
};
Ok(result)
}
/// Lists all metrics.
pub fn list_metrics(&self) -> Vec<String> {
self.metrics.read().keys().cloned().collect()
}
/// Deletes a metric.
pub fn delete_metric(&self, name: &str) -> Result<bool, DatabaseError> {
Ok(self.metrics.write().remove(name).is_some())
}
}
impl Default for TimeSeriesStore {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_data_point() {
let point = DataPoint::new(1000, 42.5)
.with_tag("host", "server1")
.with_tag("region", "us-east");
assert_eq!(point.timestamp, 1000);
assert_eq!(point.value, 42.5);
assert_eq!(point.tags.get("host"), Some(&"server1".to_string()));
}
#[test]
fn test_metric_record() {
let metric = Metric::new("cpu_usage");
metric.record(DataPoint::new(1000, 50.0));
metric.record(DataPoint::new(2000, 60.0));
metric.record(DataPoint::new(3000, 55.0));
let latest = metric.latest().unwrap();
assert_eq!(latest.timestamp, 3000);
assert_eq!(latest.value, 55.0);
}
#[test]
fn test_metric_range() {
let metric = Metric::new("requests");
metric.record(DataPoint::new(1000, 10.0));
metric.record(DataPoint::new(2000, 20.0));
metric.record(DataPoint::new(3000, 30.0));
metric.record(DataPoint::new(4000, 40.0));
let range = metric.range(1500, 3500);
assert_eq!(range.len(), 2);
assert_eq!(range[0].value, 20.0);
assert_eq!(range[1].value, 30.0);
}
#[test]
fn test_metric_aggregations() {
let metric = Metric::new("values");
metric.record(DataPoint::new(1000, 10.0));
metric.record(DataPoint::new(2000, 20.0));
metric.record(DataPoint::new(3000, 30.0));
assert_eq!(metric.avg(0, 5000), Some(20.0));
assert_eq!(metric.min(0, 5000), Some(10.0));
assert_eq!(metric.max(0, 5000), Some(30.0));
assert_eq!(metric.sum(0, 5000), 60.0);
assert_eq!(metric.count(0, 5000), 3);
}
#[test]
fn test_metric_downsample() {
let metric = Metric::new("samples");
// Add 10 points over 1000ms
for i in 0..10 {
metric.record(DataPoint::new(i * 100, i as f64));
}
// Downsample to 500ms buckets with avg
let downsampled = metric.downsample(500, Aggregation::Avg);
assert_eq!(downsampled.len(), 2);
}
#[test]
fn test_timeseries_store() {
let store = TimeSeriesStore::new();
store.record("cpu", 50.0).unwrap();
store.record("cpu", 60.0).unwrap();
store.record("memory", 1024.0).unwrap();
let metrics = store.list_metrics();
assert!(metrics.contains(&"cpu".to_string()));
assert!(metrics.contains(&"memory".to_string()));
let latest = store.latest("cpu").unwrap().unwrap();
assert_eq!(latest.value, 60.0);
}
}

View file

@ -0,0 +1,494 @@
//! Vector Store - AI/RAG optimized storage.
//!
//! Provides vector embeddings storage with similarity search.
//! Optimized for AI applications, RAG pipelines, and semantic search.
use crate::error::DatabaseError;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Vector embedding.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Embedding {
/// Unique embedding ID.
pub id: String,
/// Vector values.
pub vector: Vec<f32>,
/// Associated metadata.
pub metadata: serde_json::Value,
/// Namespace for organization.
pub namespace: String,
/// Creation timestamp.
pub created_at: u64,
}
impl Embedding {
/// Creates a new embedding.
pub fn new(id: impl Into<String>, vector: Vec<f32>) -> Self {
Self {
id: id.into(),
vector,
metadata: serde_json::Value::Null,
namespace: "default".to_string(),
created_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
}
}
/// Sets metadata.
pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
self.metadata = metadata;
self
}
/// Sets namespace.
pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self {
self.namespace = namespace.into();
self
}
/// Returns vector dimension.
pub fn dimension(&self) -> usize {
self.vector.len()
}
/// Normalizes the vector to unit length.
pub fn normalize(&mut self) {
let magnitude: f32 = self.vector.iter().map(|x| x * x).sum::<f32>().sqrt();
if magnitude > 0.0 {
for v in &mut self.vector {
*v /= magnitude;
}
}
}
}
/// Similarity metric for vector comparison.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SimilarityMetric {
/// Cosine similarity (default for text embeddings).
Cosine,
/// Euclidean distance (L2).
Euclidean,
/// Dot product.
DotProduct,
/// Manhattan distance (L1).
Manhattan,
}
impl Default for SimilarityMetric {
fn default() -> Self {
SimilarityMetric::Cosine
}
}
/// Search result with similarity score.
#[derive(Clone, Debug)]
pub struct VectorSearchResult {
/// Matching embedding.
pub embedding: Embedding,
/// Similarity score (higher is more similar).
pub score: f32,
}
/// Vector index using HNSW-like structure.
#[derive(Debug)]
pub struct VectorIndex {
/// Index name.
pub name: String,
/// Vector dimension.
pub dimension: u32,
/// Similarity metric.
pub metric: SimilarityMetric,
/// Stored embeddings.
embeddings: RwLock<HashMap<String, Embedding>>,
/// Embeddings by namespace.
by_namespace: RwLock<HashMap<String, Vec<String>>>,
/// Statistics.
stats: RwLock<VectorStats>,
}
/// Vector index statistics.
#[derive(Clone, Debug, Default)]
pub struct VectorStats {
/// Total embeddings.
pub count: u64,
/// Total searches.
pub searches: u64,
/// Average search time (ms).
pub avg_search_time_ms: f64,
}
impl VectorIndex {
/// Creates a new vector index.
pub fn new(name: impl Into<String>, dimension: u32, metric: SimilarityMetric) -> Self {
Self {
name: name.into(),
dimension,
metric,
embeddings: RwLock::new(HashMap::new()),
by_namespace: RwLock::new(HashMap::new()),
stats: RwLock::new(VectorStats::default()),
}
}
/// Inserts an embedding.
pub fn insert(&self, embedding: Embedding) -> Result<(), DatabaseError> {
if embedding.vector.len() != self.dimension as usize {
return Err(DatabaseError::DimensionMismatch {
expected: self.dimension,
got: embedding.vector.len() as u32,
});
}
let id = embedding.id.clone();
let namespace = embedding.namespace.clone();
self.embeddings.write().insert(id.clone(), embedding);
self.by_namespace
.write()
.entry(namespace)
.or_insert_with(Vec::new)
.push(id);
self.stats.write().count += 1;
Ok(())
}
/// Inserts multiple embeddings.
pub fn insert_batch(&self, embeddings: Vec<Embedding>) -> Result<(), DatabaseError> {
for embedding in embeddings {
self.insert(embedding)?;
}
Ok(())
}
/// Gets an embedding by ID.
pub fn get(&self, id: &str) -> Option<Embedding> {
self.embeddings.read().get(id).cloned()
}
/// Deletes an embedding.
pub fn delete(&self, id: &str) -> Result<bool, DatabaseError> {
let mut embeddings = self.embeddings.write();
if let Some(embedding) = embeddings.remove(id) {
// Remove from namespace index
let mut by_ns = self.by_namespace.write();
if let Some(ids) = by_ns.get_mut(&embedding.namespace) {
ids.retain(|i| i != id);
}
self.stats.write().count -= 1;
Ok(true)
} else {
Ok(false)
}
}
/// Searches for similar vectors.
pub fn search(
&self,
query: &[f32],
limit: usize,
namespace: Option<&str>,
threshold: Option<f32>,
) -> Result<Vec<VectorSearchResult>, DatabaseError> {
if query.len() != self.dimension as usize {
return Err(DatabaseError::DimensionMismatch {
expected: self.dimension,
got: query.len() as u32,
});
}
let start = std::time::Instant::now();
let embeddings = self.embeddings.read();
let mut results: Vec<VectorSearchResult> = embeddings
.values()
.filter(|e| {
namespace.map(|ns| e.namespace == ns).unwrap_or(true)
})
.map(|e| {
let score = self.calculate_similarity(&e.vector, query);
VectorSearchResult {
embedding: e.clone(),
score,
}
})
.filter(|r| {
threshold.map(|t| r.score >= t).unwrap_or(true)
})
.collect();
// Sort by score descending
results.sort_by(|a, b| {
b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal)
});
// Apply limit
results.truncate(limit);
// Update stats
let elapsed = start.elapsed().as_millis() as f64;
let mut stats = self.stats.write();
stats.searches += 1;
stats.avg_search_time_ms =
(stats.avg_search_time_ms * (stats.searches - 1) as f64 + elapsed) / stats.searches as f64;
Ok(results)
}
/// Calculates similarity between two vectors.
fn calculate_similarity(&self, a: &[f32], b: &[f32]) -> f32 {
match self.metric {
SimilarityMetric::Cosine => cosine_similarity(a, b),
SimilarityMetric::Euclidean => {
// Convert distance to similarity (higher = more similar)
let dist = euclidean_distance(a, b);
1.0 / (1.0 + dist)
}
SimilarityMetric::DotProduct => dot_product(a, b),
SimilarityMetric::Manhattan => {
let dist = manhattan_distance(a, b);
1.0 / (1.0 + dist)
}
}
}
/// Returns index statistics.
pub fn stats(&self) -> VectorStats {
self.stats.read().clone()
}
/// Returns count of embeddings.
pub fn count(&self) -> u64 {
self.stats.read().count
}
/// Clears all embeddings.
pub fn clear(&self) {
self.embeddings.write().clear();
self.by_namespace.write().clear();
self.stats.write().count = 0;
}
}
/// Vector store managing multiple indexes.
pub struct VectorStore {
/// Default dimension.
default_dimension: u32,
/// Indexes by name.
indexes: RwLock<HashMap<String, VectorIndex>>,
/// Default index.
default_index: VectorIndex,
}
impl VectorStore {
/// Creates a new vector store.
pub fn new(dimension: u32) -> Self {
Self {
default_dimension: dimension,
indexes: RwLock::new(HashMap::new()),
default_index: VectorIndex::new("default", dimension, SimilarityMetric::Cosine),
}
}
/// Creates a new index.
pub fn create_index(
&self,
name: &str,
dimension: u32,
metric: SimilarityMetric,
) -> Result<(), DatabaseError> {
let mut indexes = self.indexes.write();
if indexes.contains_key(name) {
return Err(DatabaseError::AlreadyExists(name.to_string()));
}
indexes.insert(name.to_string(), VectorIndex::new(name, dimension, metric));
Ok(())
}
/// Gets an index by name.
pub fn get_index(&self, name: &str) -> Option<&VectorIndex> {
// Simplified - would use Arc in production
None
}
/// Inserts an embedding into the default index.
pub fn insert(&self, embedding: Embedding) -> Result<(), DatabaseError> {
self.default_index.insert(embedding)
}
/// Searches the default index.
pub fn search(
&self,
query: &[f32],
limit: usize,
namespace: Option<&str>,
threshold: Option<f32>,
) -> Result<Vec<VectorSearchResult>, DatabaseError> {
self.default_index.search(query, limit, namespace, threshold)
}
/// Gets an embedding by ID.
pub fn get(&self, id: &str) -> Option<Embedding> {
self.default_index.get(id)
}
/// Deletes an embedding.
pub fn delete(&self, id: &str) -> Result<bool, DatabaseError> {
self.default_index.delete(id)
}
/// Returns embedding count.
pub fn count(&self) -> u64 {
self.default_index.count()
}
/// Lists all indexes.
pub fn list_indexes(&self) -> Vec<String> {
let mut names: Vec<_> = self.indexes.read().keys().cloned().collect();
names.push("default".to_string());
names
}
}
/// Cosine similarity between two vectors.
pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
if a.len() != b.len() {
return 0.0;
}
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let mag_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let mag_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if mag_a == 0.0 || mag_b == 0.0 {
0.0
} else {
dot / (mag_a * mag_b)
}
}
/// Dot product of two vectors.
pub fn dot_product(a: &[f32], b: &[f32]) -> f32 {
a.iter().zip(b.iter()).map(|(x, y)| x * y).sum()
}
/// Euclidean distance (L2) between two vectors.
pub fn euclidean_distance(a: &[f32], b: &[f32]) -> f32 {
a.iter()
.zip(b.iter())
.map(|(x, y)| (x - y).powi(2))
.sum::<f32>()
.sqrt()
}
/// Manhattan distance (L1) between two vectors.
pub fn manhattan_distance(a: &[f32], b: &[f32]) -> f32 {
a.iter()
.zip(b.iter())
.map(|(x, y)| (x - y).abs())
.sum()
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_embedding_creation() {
let embedding = Embedding::new("doc1", vec![1.0, 0.0, 0.0])
.with_metadata(json!({"title": "Hello World"}))
.with_namespace("documents");
assert_eq!(embedding.id, "doc1");
assert_eq!(embedding.dimension(), 3);
assert_eq!(embedding.namespace, "documents");
}
#[test]
fn test_vector_insert_search() {
let store = VectorStore::new(3);
store.insert(Embedding::new("a", vec![1.0, 0.0, 0.0])).unwrap();
store.insert(Embedding::new("b", vec![0.9, 0.1, 0.0])).unwrap();
store.insert(Embedding::new("c", vec![0.0, 1.0, 0.0])).unwrap();
let results = store.search(&[1.0, 0.0, 0.0], 2, None, None).unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].embedding.id, "a");
assert!((results[0].score - 1.0).abs() < 0.001);
}
#[test]
fn test_similarity_threshold() {
let store = VectorStore::new(3);
store.insert(Embedding::new("a", vec![1.0, 0.0, 0.0])).unwrap();
store.insert(Embedding::new("b", vec![0.0, 1.0, 0.0])).unwrap();
let results = store.search(&[1.0, 0.0, 0.0], 10, None, Some(0.5)).unwrap();
// Only "a" should match with high threshold
assert_eq!(results.len(), 1);
assert_eq!(results[0].embedding.id, "a");
}
#[test]
fn test_namespace_filter() {
let store = VectorStore::new(3);
store.insert(
Embedding::new("a", vec![1.0, 0.0, 0.0]).with_namespace("ns1")
).unwrap();
store.insert(
Embedding::new("b", vec![1.0, 0.0, 0.0]).with_namespace("ns2")
).unwrap();
let results = store.search(&[1.0, 0.0, 0.0], 10, Some("ns1"), None).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].embedding.id, "a");
}
#[test]
fn test_dimension_mismatch() {
let store = VectorStore::new(3);
let result = store.insert(Embedding::new("a", vec![1.0, 0.0])); // 2D instead of 3D
assert!(result.is_err());
}
#[test]
fn test_similarity_metrics() {
let a = vec![1.0, 0.0, 0.0];
let b = vec![1.0, 0.0, 0.0];
let c = vec![0.0, 1.0, 0.0];
// Cosine
assert!((cosine_similarity(&a, &b) - 1.0).abs() < 0.001);
assert!((cosine_similarity(&a, &c) - 0.0).abs() < 0.001);
// Euclidean
assert!((euclidean_distance(&a, &b) - 0.0).abs() < 0.001);
assert!((euclidean_distance(&a, &c) - 1.414).abs() < 0.01);
// Dot product
assert!((dot_product(&a, &b) - 1.0).abs() < 0.001);
assert!((dot_product(&a, &c) - 0.0).abs() < 0.001);
}
#[test]
fn test_embedding_normalize() {
let mut embedding = Embedding::new("test", vec![3.0, 4.0, 0.0]);
embedding.normalize();
let magnitude: f32 = embedding.vector.iter().map(|x| x * x).sum::<f32>().sqrt();
assert!((magnitude - 1.0).abs() < 0.001);
}
}

View file

@ -16,6 +16,9 @@
| 7 | Production Readiness | 🔄 In Progress | 85% | 3 | | 7 | Production Readiness | 🔄 In Progress | 85% | 3 |
| 8 | Synor Storage L2 | ✅ Complete | 100% | 3 | | 8 | Synor Storage L2 | ✅ Complete | 100% | 3 |
| 9 | Synor Hosting | ✅ Complete | 100% | 3 | | 9 | Synor Hosting | ✅ Complete | 100% | 3 |
| 10 | Synor Database L2 | 🔄 In Progress | 0% | 3 |
| 11 | Economics & Billing | ⏳ Planned | 0% | 3 |
| 12 | Fiat Gateway | ⏳ Planned | 0% | 2 |
## Phase 7 Breakdown ## Phase 7 Breakdown
@ -81,6 +84,118 @@
| CLI Deploy Command | 100% | `synor deploy push/init/list/delete` | | CLI Deploy Command | 100% | `synor deploy push/init/list/delete` |
| Admin Dashboard | 0% | Web UI for management (deferred) | | Admin Dashboard | 0% | Web UI for management (deferred) |
## Phase 10 Breakdown (Synor Database L2)
| Milestone | Status | Progress |
|-----------|--------|----------|
| Database Core | 🔄 In Progress | 0% |
| Query Layer | ⏳ Planned | 0% |
| Database Gateway | ⏳ Planned | 0% |
### Database Components
| Component | Status | Notes |
|-----------|--------|-------|
| Key-Value Store | 0% | Redis-compatible API |
| Document Store | 0% | MongoDB-compatible queries |
| Vector Store | 0% | AI/RAG optimized, cosine similarity |
| Relational (SQL) | 0% | SQLite-compatible subset |
| Time-Series | 0% | Metrics and analytics |
| Graph Store | 0% | Relationship queries |
| Query Engine | 0% | Unified SQL + Vector + Graph |
| Replication | 0% | Raft consensus for consistency |
| Indexing | 0% | B-tree, LSM-tree, HNSW |
### Database Pricing Model
| Tier | Storage/GB/month | Queries/million | Vectors/million |
|------|------------------|-----------------|-----------------|
| Free | 0.5 GB | 1M | 100K |
| Standard | 0.1 SYNOR | 0.01 SYNOR | 0.05 SYNOR |
| Premium | 0.2 SYNOR | 0.005 SYNOR | 0.02 SYNOR |
| Enterprise | Custom | Custom | Custom |
## Phase 11 Breakdown (Economics & Billing)
| Milestone | Status | Progress |
|-----------|--------|----------|
| Pricing Oracle | ⏳ Planned | 0% |
| Metering Service | ⏳ Planned | 0% |
| Billing Engine | ⏳ Planned | 0% |
### Economics Components
| Component | Status | Notes |
|-----------|--------|-------|
| Pricing Oracle | 0% | SYNOR/USD price feeds |
| Usage Metering | 0% | Real-time resource tracking |
| Billing Engine | 0% | Invoice generation, payments |
| Cost Calculator | 0% | CLI `synor cost estimate` |
| SLA Enforcement | 0% | Smart contract guarantees |
| Staking Rewards | 0% | Node operator incentives |
### Fee Distribution Model
```
Transaction Fees:
├── 10% → Burn (deflationary)
├── 60% → Stakers (rewards)
├── 20% → Community Pool (treasury)
└── 10% → Miners/Validators
L2 Service Fees (Storage, Hosting, Database):
├── 70% → Node Operators
├── 20% → Protocol Treasury
└── 10% → Burn
```
## Phase 12 Breakdown (Fiat Gateway)
| Milestone | Status | Progress |
|-----------|--------|----------|
| On-Ramp Integration | ⏳ Planned | 0% |
| Off-Ramp Integration | ⏳ Planned | 0% |
### Fiat Gateway Components
| Component | Status | Notes |
|-----------|--------|-------|
| Ramp Network SDK | 0% | Primary on-ramp provider |
| Off-Ramp Service | 0% | SYNOR → Fiat conversion |
| Swap Aggregator | 0% | Cross-chain token swaps |
| Payment Widget | 0% | Embeddable checkout |
### Ramp Network Integration
> Reference: https://rampnetwork.com/
**Planned Features:**
| Feature | Description | Priority |
|---------|-------------|----------|
| Buy SYNOR | Credit/debit card purchases | 🔴 Critical |
| Sell SYNOR | Fiat withdrawals to bank | 🔴 Critical |
| Apple Pay/Google Pay | Mobile wallet support | 🟡 High |
| Bank Transfer (SEPA/ACH) | Lower fee option | 🟡 High |
| Recurring Purchases | DCA functionality | 🟢 Medium |
| NFT Checkout | Direct NFT purchases | 🟢 Medium |
| B2B API | Enterprise integrations | 🟢 Medium |
**Coverage:**
- 150+ countries supported
- 40+ fiat currencies
- 90+ cryptocurrencies (add SYNOR)
- KYC/AML compliant
**Integration Endpoints:**
```
POST /api/v1/onramp/quote - Get purchase quote
POST /api/v1/onramp/buy - Initiate purchase
POST /api/v1/offramp/sell - Sell SYNOR for fiat
GET /api/v1/rates - Current exchange rates
POST /api/v1/swap - Cross-chain swap
```
## Directory Structure ## Directory Structure
``` ```
@ -120,6 +235,17 @@ docs/PLAN/
│ ├── 01-Milestone-01-HostingCore.md │ ├── 01-Milestone-01-HostingCore.md
│ ├── 01-Milestone-02-HostingGateway.md │ ├── 01-Milestone-02-HostingGateway.md
│ └── 01-Milestone-03-HostingCLI.md │ └── 01-Milestone-03-HostingCLI.md
├── PHASE10-SynorDatabaseL2/
│ ├── 01-Milestone-01-DatabaseCore.md
│ ├── 01-Milestone-02-QueryLayer.md
│ └── 01-Milestone-03-DatabaseGateway.md
├── PHASE11-EconomicsBilling/
│ ├── 01-Milestone-01-PricingOracle.md
│ ├── 01-Milestone-02-MeteringService.md
│ └── 01-Milestone-03-BillingEngine.md
├── PHASE12-FiatGateway/
│ ├── 01-Milestone-01-OnRamp.md
│ └── 01-Milestone-02-OffRamp.md
``` ```
## Validation Strategy ## Validation Strategy