synor/crates/synor-database/src/document.rs
Gulshan Yadav 78c226a098 feat(database): add Phase 10 Synor Database L2 foundation
Multi-model database layer for Synor blockchain:

- Key-Value Store: Redis-compatible API with TTL, INCR, MGET/MSET
- Document Store: MongoDB-compatible queries with filters
- Vector Store: AI/RAG optimized with cosine, euclidean, dot product similarity
- Time-Series Store: Metrics with downsampling and aggregations
- Query Engine: Unified queries across all data models
- Index Manager: B-tree, hash, unique, and compound indexes
- Schema Validator: Field validation with type checking
- Database Pricing: Pay-per-use model (0.1 SYNOR/GB/month)

Updates roadmap with Phase 10-12 milestones:
- Phase 10: Synor Database L2
- Phase 11: Economics & Billing
- Phase 12: Fiat Gateway (Ramp Network integration)

41 tests passing
2026-01-10 17:40:18 +05:30

621 lines
19 KiB
Rust

//! Document Store - MongoDB-compatible queries.
//!
//! Provides document storage with collections and rich queries.
use crate::error::DatabaseError;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
/// Document identifier.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct DocumentId(pub [u8; 32]);
impl DocumentId {
/// Creates a new random document ID.
pub fn new() -> Self {
let mut bytes = [0u8; 32];
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
bytes[..16].copy_from_slice(&timestamp.to_le_bytes());
// Add random component
let random: u128 = rand_bytes();
bytes[16..].copy_from_slice(&random.to_le_bytes());
Self(bytes)
}
/// Creates a document ID from bytes.
pub fn from_bytes(bytes: [u8; 32]) -> Self {
Self(bytes)
}
/// Returns hex string representation.
pub fn to_hex(&self) -> String {
hex::encode(&self.0)
}
/// Creates from hex string.
pub fn from_hex(s: &str) -> Result<Self, DatabaseError> {
let bytes = hex::decode(s)
.map_err(|_| DatabaseError::InvalidOperation("Invalid hex string".into()))?;
if bytes.len() != 32 {
return Err(DatabaseError::InvalidOperation("Invalid document ID length".into()));
}
let mut arr = [0u8; 32];
arr.copy_from_slice(&bytes);
Ok(Self(arr))
}
}
impl Default for DocumentId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for DocumentId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex::encode(&self.0[..12]))
}
}
// Simple pseudo-random for document IDs
fn rand_bytes() -> u128 {
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hasher};
let state = RandomState::new();
let mut hasher = state.build_hasher();
hasher.write_u64(std::time::Instant::now().elapsed().as_nanos() as u64);
let a = hasher.finish();
hasher.write_u64(a);
let b = hasher.finish();
((a as u128) << 64) | (b as u128)
}
/// A document in the store.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Document {
/// Unique document ID.
pub id: DocumentId,
/// Document data as JSON.
pub data: JsonValue,
/// Creation timestamp.
pub created_at: u64,
/// Last modification timestamp.
pub updated_at: u64,
/// Document version (for optimistic locking).
pub version: u64,
}
impl Document {
/// Creates a new document.
pub fn new(data: JsonValue) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Self {
id: DocumentId::new(),
data,
created_at: now,
updated_at: now,
version: 1,
}
}
/// Creates a document with a specific ID.
pub fn with_id(id: DocumentId, data: JsonValue) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Self {
id,
data,
created_at: now,
updated_at: now,
version: 1,
}
}
/// Gets a field value.
pub fn get(&self, field: &str) -> Option<&JsonValue> {
self.data.get(field)
}
/// Gets a nested field (dot notation).
pub fn get_nested(&self, path: &str) -> Option<&JsonValue> {
let parts: Vec<&str> = path.split('.').collect();
let mut current = &self.data;
for part in parts {
current = current.get(part)?;
}
Some(current)
}
/// Updates a field.
pub fn set(&mut self, field: &str, value: JsonValue) {
if let Some(obj) = self.data.as_object_mut() {
obj.insert(field.to_string(), value);
self.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
self.version += 1;
}
}
/// Merges another object into this document.
pub fn merge(&mut self, other: JsonValue) {
if let (Some(self_obj), Some(other_obj)) = (self.data.as_object_mut(), other.as_object()) {
for (key, value) in other_obj {
self_obj.insert(key.clone(), value.clone());
}
self.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
self.version += 1;
}
}
}
/// A collection of documents.
#[derive(Debug)]
pub struct Collection {
/// Collection name.
pub name: String,
/// Documents indexed by ID.
documents: RwLock<HashMap<DocumentId, Document>>,
/// Document count.
count: RwLock<u64>,
}
impl Collection {
/// Creates a new collection.
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
documents: RwLock::new(HashMap::new()),
count: RwLock::new(0),
}
}
/// Inserts a document.
pub fn insert(&self, doc: Document) -> Result<DocumentId, DatabaseError> {
let id = doc.id.clone();
let mut docs = self.documents.write();
if docs.contains_key(&id) {
return Err(DatabaseError::AlreadyExists(id.to_string()));
}
docs.insert(id.clone(), doc);
*self.count.write() += 1;
Ok(id)
}
/// Inserts a new document from JSON data.
pub fn insert_one(&self, data: JsonValue) -> Result<DocumentId, DatabaseError> {
let doc = Document::new(data);
self.insert(doc)
}
/// Inserts multiple documents.
pub fn insert_many(&self, docs: Vec<JsonValue>) -> Result<Vec<DocumentId>, DatabaseError> {
let mut ids = Vec::with_capacity(docs.len());
for data in docs {
ids.push(self.insert_one(data)?);
}
Ok(ids)
}
/// Finds a document by ID.
pub fn find_by_id(&self, id: &DocumentId) -> Option<Document> {
self.documents.read().get(id).cloned()
}
/// Finds documents matching a filter.
pub fn find(&self, filter: &DocumentFilter) -> Vec<Document> {
self.documents
.read()
.values()
.filter(|doc| filter.matches(doc))
.cloned()
.collect()
}
/// Finds one document matching a filter.
pub fn find_one(&self, filter: &DocumentFilter) -> Option<Document> {
self.documents
.read()
.values()
.find(|doc| filter.matches(doc))
.cloned()
}
/// Updates a document by ID.
pub fn update_by_id(&self, id: &DocumentId, update: JsonValue) -> Result<bool, DatabaseError> {
let mut docs = self.documents.write();
if let Some(doc) = docs.get_mut(id) {
doc.merge(update);
Ok(true)
} else {
Ok(false)
}
}
/// Updates documents matching a filter.
pub fn update_many(&self, filter: &DocumentFilter, update: JsonValue) -> Result<u64, DatabaseError> {
let mut docs = self.documents.write();
let mut count = 0;
for doc in docs.values_mut() {
if filter.matches(doc) {
doc.merge(update.clone());
count += 1;
}
}
Ok(count)
}
/// Deletes a document by ID.
pub fn delete_by_id(&self, id: &DocumentId) -> Result<bool, DatabaseError> {
let removed = self.documents.write().remove(id).is_some();
if removed {
*self.count.write() -= 1;
}
Ok(removed)
}
/// Deletes documents matching a filter.
pub fn delete_many(&self, filter: &DocumentFilter) -> Result<u64, DatabaseError> {
let mut docs = self.documents.write();
let before = docs.len();
docs.retain(|_, doc| !filter.matches(doc));
let deleted = (before - docs.len()) as u64;
*self.count.write() -= deleted;
Ok(deleted)
}
/// Returns document count.
pub fn count(&self) -> u64 {
*self.count.read()
}
/// Returns all documents.
pub fn all(&self) -> Vec<Document> {
self.documents.read().values().cloned().collect()
}
/// Clears all documents.
pub fn clear(&self) {
self.documents.write().clear();
*self.count.write() = 0;
}
}
/// Filter for querying documents.
#[derive(Clone, Debug, Default)]
pub struct DocumentFilter {
conditions: Vec<FilterCondition>,
}
#[derive(Clone, Debug)]
enum FilterCondition {
Eq(String, JsonValue),
Ne(String, JsonValue),
Gt(String, JsonValue),
Gte(String, JsonValue),
Lt(String, JsonValue),
Lte(String, JsonValue),
In(String, Vec<JsonValue>),
Contains(String, String),
Exists(String, bool),
And(Vec<DocumentFilter>),
Or(Vec<DocumentFilter>),
}
impl DocumentFilter {
/// Creates a new empty filter (matches all).
pub fn new() -> Self {
Self { conditions: Vec::new() }
}
/// Equality condition.
pub fn eq(mut self, field: impl Into<String>, value: JsonValue) -> Self {
self.conditions.push(FilterCondition::Eq(field.into(), value));
self
}
/// Not equal condition.
pub fn ne(mut self, field: impl Into<String>, value: JsonValue) -> Self {
self.conditions.push(FilterCondition::Ne(field.into(), value));
self
}
/// Greater than.
pub fn gt(mut self, field: impl Into<String>, value: JsonValue) -> Self {
self.conditions.push(FilterCondition::Gt(field.into(), value));
self
}
/// Greater than or equal.
pub fn gte(mut self, field: impl Into<String>, value: JsonValue) -> Self {
self.conditions.push(FilterCondition::Gte(field.into(), value));
self
}
/// Less than.
pub fn lt(mut self, field: impl Into<String>, value: JsonValue) -> Self {
self.conditions.push(FilterCondition::Lt(field.into(), value));
self
}
/// Less than or equal.
pub fn lte(mut self, field: impl Into<String>, value: JsonValue) -> Self {
self.conditions.push(FilterCondition::Lte(field.into(), value));
self
}
/// In array.
pub fn in_array(mut self, field: impl Into<String>, values: Vec<JsonValue>) -> Self {
self.conditions.push(FilterCondition::In(field.into(), values));
self
}
/// String contains.
pub fn contains(mut self, field: impl Into<String>, substring: impl Into<String>) -> Self {
self.conditions.push(FilterCondition::Contains(field.into(), substring.into()));
self
}
/// Field exists.
pub fn exists(mut self, field: impl Into<String>, exists: bool) -> Self {
self.conditions.push(FilterCondition::Exists(field.into(), exists));
self
}
/// AND multiple filters.
pub fn and(mut self, filters: Vec<DocumentFilter>) -> Self {
self.conditions.push(FilterCondition::And(filters));
self
}
/// OR multiple filters.
pub fn or(mut self, filters: Vec<DocumentFilter>) -> Self {
self.conditions.push(FilterCondition::Or(filters));
self
}
/// Checks if document matches the filter.
pub fn matches(&self, doc: &Document) -> bool {
if self.conditions.is_empty() {
return true;
}
self.conditions.iter().all(|cond| self.eval_condition(cond, doc))
}
fn eval_condition(&self, cond: &FilterCondition, doc: &Document) -> bool {
match cond {
FilterCondition::Eq(field, value) => {
doc.get_nested(field).map(|v| v == value).unwrap_or(false)
}
FilterCondition::Ne(field, value) => {
doc.get_nested(field).map(|v| v != value).unwrap_or(true)
}
FilterCondition::Gt(field, value) => {
self.compare_values(doc.get_nested(field), value, |a, b| a > b)
}
FilterCondition::Gte(field, value) => {
self.compare_values(doc.get_nested(field), value, |a, b| a >= b)
}
FilterCondition::Lt(field, value) => {
self.compare_values(doc.get_nested(field), value, |a, b| a < b)
}
FilterCondition::Lte(field, value) => {
self.compare_values(doc.get_nested(field), value, |a, b| a <= b)
}
FilterCondition::In(field, values) => {
doc.get_nested(field)
.map(|v| values.contains(v))
.unwrap_or(false)
}
FilterCondition::Contains(field, substring) => {
doc.get_nested(field)
.and_then(|v| v.as_str())
.map(|s| s.contains(substring))
.unwrap_or(false)
}
FilterCondition::Exists(field, should_exist) => {
let exists = doc.get_nested(field).is_some();
exists == *should_exist
}
FilterCondition::And(filters) => {
filters.iter().all(|f| f.matches(doc))
}
FilterCondition::Or(filters) => {
filters.iter().any(|f| f.matches(doc))
}
}
}
fn compare_values<F>(&self, a: Option<&JsonValue>, b: &JsonValue, cmp: F) -> bool
where
F: Fn(f64, f64) -> bool,
{
match (a, b) {
(Some(JsonValue::Number(a)), JsonValue::Number(b)) => {
match (a.as_f64(), b.as_f64()) {
(Some(a), Some(b)) => cmp(a, b),
_ => false,
}
}
_ => false,
}
}
}
/// Document store managing multiple collections.
pub struct DocumentStore {
collections: RwLock<HashMap<String, Collection>>,
}
impl DocumentStore {
/// Creates a new document store.
pub fn new() -> Self {
Self {
collections: RwLock::new(HashMap::new()),
}
}
/// Gets or creates a collection.
pub fn collection(&self, name: &str) -> std::sync::Arc<Collection> {
let mut collections = self.collections.write();
if !collections.contains_key(name) {
collections.insert(name.to_string(), Collection::new(name));
}
// Return a reference - note: this is simplified, real impl would use Arc
std::sync::Arc::new(Collection::new(name))
}
/// Creates a new collection.
pub fn create_collection(&self, name: &str) -> Result<(), DatabaseError> {
let mut collections = self.collections.write();
if collections.contains_key(name) {
return Err(DatabaseError::AlreadyExists(name.to_string()));
}
collections.insert(name.to_string(), Collection::new(name));
Ok(())
}
/// Drops a collection.
pub fn drop_collection(&self, name: &str) -> Result<bool, DatabaseError> {
Ok(self.collections.write().remove(name).is_some())
}
/// Lists all collection names.
pub fn list_collections(&self) -> Vec<String> {
self.collections.read().keys().cloned().collect()
}
/// Inserts a document into a collection.
pub fn insert(&self, collection: &str, data: JsonValue) -> Result<DocumentId, DatabaseError> {
let collections = self.collections.read();
let coll = collections
.get(collection)
.ok_or_else(|| DatabaseError::CollectionNotFound(collection.to_string()))?;
coll.insert_one(data)
}
/// Finds documents in a collection.
pub fn find(&self, collection: &str, filter: &DocumentFilter) -> Result<Vec<Document>, DatabaseError> {
let collections = self.collections.read();
let coll = collections
.get(collection)
.ok_or_else(|| DatabaseError::CollectionNotFound(collection.to_string()))?;
Ok(coll.find(filter))
}
/// Finds one document.
pub fn find_one(&self, collection: &str, filter: &DocumentFilter) -> Result<Option<Document>, DatabaseError> {
let collections = self.collections.read();
let coll = collections
.get(collection)
.ok_or_else(|| DatabaseError::CollectionNotFound(collection.to_string()))?;
Ok(coll.find_one(filter))
}
}
impl Default for DocumentStore {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_document_creation() {
let doc = Document::new(json!({
"name": "Alice",
"age": 30
}));
assert_eq!(doc.get("name"), Some(&json!("Alice")));
assert_eq!(doc.get("age"), Some(&json!(30)));
assert_eq!(doc.version, 1);
}
#[test]
fn test_collection_insert_find() {
let coll = Collection::new("users");
coll.insert_one(json!({"name": "Alice", "age": 30})).unwrap();
coll.insert_one(json!({"name": "Bob", "age": 25})).unwrap();
let filter = DocumentFilter::new().eq("name", json!("Alice"));
let results = coll.find(&filter);
assert_eq!(results.len(), 1);
assert_eq!(results[0].get("name"), Some(&json!("Alice")));
}
#[test]
fn test_filter_comparison() {
let coll = Collection::new("users");
coll.insert_one(json!({"name": "Alice", "age": 30})).unwrap();
coll.insert_one(json!({"name": "Bob", "age": 25})).unwrap();
coll.insert_one(json!({"name": "Charlie", "age": 35})).unwrap();
let filter = DocumentFilter::new().gte("age", json!(30));
let results = coll.find(&filter);
assert_eq!(results.len(), 2);
}
#[test]
fn test_nested_fields() {
let doc = Document::new(json!({
"user": {
"profile": {
"name": "Alice"
}
}
}));
assert_eq!(doc.get_nested("user.profile.name"), Some(&json!("Alice")));
}
#[test]
fn test_update_document() {
let coll = Collection::new("users");
let id = coll.insert_one(json!({"name": "Alice", "age": 30})).unwrap();
coll.update_by_id(&id, json!({"age": 31})).unwrap();
let doc = coll.find_by_id(&id).unwrap();
assert_eq!(doc.get("age"), Some(&json!(31)));
assert_eq!(doc.version, 2);
}
#[test]
fn test_delete_many() {
let coll = Collection::new("users");
coll.insert_one(json!({"status": "active"})).unwrap();
coll.insert_one(json!({"status": "active"})).unwrap();
coll.insert_one(json!({"status": "inactive"})).unwrap();
let filter = DocumentFilter::new().eq("status", json!("active"));
let deleted = coll.delete_many(&filter).unwrap();
assert_eq!(deleted, 2);
assert_eq!(coll.count(), 1);
}
}