Multi-model database layer for Synor blockchain: - Key-Value Store: Redis-compatible API with TTL, INCR, MGET/MSET - Document Store: MongoDB-compatible queries with filters - Vector Store: AI/RAG optimized with cosine, euclidean, dot product similarity - Time-Series Store: Metrics with downsampling and aggregations - Query Engine: Unified queries across all data models - Index Manager: B-tree, hash, unique, and compound indexes - Schema Validator: Field validation with type checking - Database Pricing: Pay-per-use model (0.1 SYNOR/GB/month) Updates roadmap with Phase 10-12 milestones: - Phase 10: Synor Database L2 - Phase 11: Economics & Billing - Phase 12: Fiat Gateway (Ramp Network integration) 41 tests passing
442 lines
12 KiB
Rust
442 lines
12 KiB
Rust
//! Key-Value Store - Redis-compatible API.
|
|
//!
|
|
//! Provides fast in-memory key-value operations with optional TTL.
|
|
|
|
use crate::error::DatabaseError;
|
|
use parking_lot::RwLock;
|
|
use serde::{Deserialize, Serialize};
|
|
use std::collections::HashMap;
|
|
use std::time::{Duration, Instant};
|
|
|
|
/// Key-value entry with optional expiration.
|
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
pub struct KvEntry {
|
|
/// The value (stored as bytes).
|
|
pub value: Vec<u8>,
|
|
/// Creation timestamp (ms since epoch).
|
|
pub created_at: u64,
|
|
/// Last modified timestamp.
|
|
pub modified_at: u64,
|
|
/// TTL in seconds (0 = no expiry).
|
|
pub ttl: u64,
|
|
/// Expiration instant (internal).
|
|
#[serde(skip)]
|
|
expires_at: Option<Instant>,
|
|
}
|
|
|
|
impl KvEntry {
|
|
/// Creates a new entry.
|
|
pub fn new(value: Vec<u8>, ttl: u64) -> Self {
|
|
let now = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_millis() as u64;
|
|
|
|
let expires_at = if ttl > 0 {
|
|
Some(Instant::now() + Duration::from_secs(ttl))
|
|
} else {
|
|
None
|
|
};
|
|
|
|
Self {
|
|
value,
|
|
created_at: now,
|
|
modified_at: now,
|
|
ttl,
|
|
expires_at,
|
|
}
|
|
}
|
|
|
|
/// Checks if the entry has expired.
|
|
pub fn is_expired(&self) -> bool {
|
|
self.expires_at.map(|e| Instant::now() > e).unwrap_or(false)
|
|
}
|
|
|
|
/// Returns remaining TTL in seconds.
|
|
pub fn remaining_ttl(&self) -> Option<u64> {
|
|
self.expires_at.map(|e| {
|
|
let now = Instant::now();
|
|
if now > e {
|
|
0
|
|
} else {
|
|
(e - now).as_secs()
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
/// Key-value pair for iteration.
|
|
#[derive(Clone, Debug)]
|
|
pub struct KeyValue {
|
|
pub key: String,
|
|
pub value: Vec<u8>,
|
|
pub ttl: Option<u64>,
|
|
}
|
|
|
|
/// Redis-compatible key-value store.
|
|
pub struct KeyValueStore {
|
|
/// Data storage.
|
|
data: RwLock<HashMap<String, KvEntry>>,
|
|
/// Statistics.
|
|
stats: RwLock<KvStats>,
|
|
}
|
|
|
|
/// Key-value store statistics.
|
|
#[derive(Clone, Debug, Default)]
|
|
pub struct KvStats {
|
|
pub gets: u64,
|
|
pub sets: u64,
|
|
pub deletes: u64,
|
|
pub hits: u64,
|
|
pub misses: u64,
|
|
pub expired: u64,
|
|
}
|
|
|
|
impl KeyValueStore {
|
|
/// Creates a new key-value store.
|
|
pub fn new() -> Self {
|
|
Self {
|
|
data: RwLock::new(HashMap::new()),
|
|
stats: RwLock::new(KvStats::default()),
|
|
}
|
|
}
|
|
|
|
/// Gets a value by key.
|
|
pub fn get(&self, key: &str) -> Option<Vec<u8>> {
|
|
let mut stats = self.stats.write();
|
|
stats.gets += 1;
|
|
|
|
let data = self.data.read();
|
|
if let Some(entry) = data.get(key) {
|
|
if entry.is_expired() {
|
|
stats.expired += 1;
|
|
stats.misses += 1;
|
|
drop(data);
|
|
self.delete(key).ok();
|
|
None
|
|
} else {
|
|
stats.hits += 1;
|
|
Some(entry.value.clone())
|
|
}
|
|
} else {
|
|
stats.misses += 1;
|
|
None
|
|
}
|
|
}
|
|
|
|
/// Gets a value as string.
|
|
pub fn get_string(&self, key: &str) -> Option<String> {
|
|
self.get(key)
|
|
.and_then(|v| String::from_utf8(v).ok())
|
|
}
|
|
|
|
/// Sets a value with optional TTL.
|
|
pub fn set(&self, key: &str, value: Vec<u8>, ttl: u64) -> Result<(), DatabaseError> {
|
|
let entry = KvEntry::new(value, ttl);
|
|
self.data.write().insert(key.to_string(), entry);
|
|
self.stats.write().sets += 1;
|
|
Ok(())
|
|
}
|
|
|
|
/// Sets a string value.
|
|
pub fn set_string(&self, key: &str, value: &str, ttl: u64) -> Result<(), DatabaseError> {
|
|
self.set(key, value.as_bytes().to_vec(), ttl)
|
|
}
|
|
|
|
/// Sets a value only if key doesn't exist (SETNX).
|
|
pub fn set_nx(&self, key: &str, value: Vec<u8>, ttl: u64) -> Result<bool, DatabaseError> {
|
|
let mut data = self.data.write();
|
|
|
|
// Check if exists and not expired
|
|
if let Some(entry) = data.get(key) {
|
|
if !entry.is_expired() {
|
|
return Ok(false);
|
|
}
|
|
}
|
|
|
|
data.insert(key.to_string(), KvEntry::new(value, ttl));
|
|
self.stats.write().sets += 1;
|
|
Ok(true)
|
|
}
|
|
|
|
/// Sets a value only if key exists (SETXX).
|
|
pub fn set_xx(&self, key: &str, value: Vec<u8>, ttl: u64) -> Result<bool, DatabaseError> {
|
|
let mut data = self.data.write();
|
|
|
|
if let Some(entry) = data.get(key) {
|
|
if entry.is_expired() {
|
|
return Ok(false);
|
|
}
|
|
data.insert(key.to_string(), KvEntry::new(value, ttl));
|
|
self.stats.write().sets += 1;
|
|
Ok(true)
|
|
} else {
|
|
Ok(false)
|
|
}
|
|
}
|
|
|
|
/// Deletes a key.
|
|
pub fn delete(&self, key: &str) -> Result<bool, DatabaseError> {
|
|
let removed = self.data.write().remove(key).is_some();
|
|
if removed {
|
|
self.stats.write().deletes += 1;
|
|
}
|
|
Ok(removed)
|
|
}
|
|
|
|
/// Checks if a key exists.
|
|
pub fn exists(&self, key: &str) -> bool {
|
|
let data = self.data.read();
|
|
data.get(key).map(|e| !e.is_expired()).unwrap_or(false)
|
|
}
|
|
|
|
/// Sets TTL on existing key.
|
|
pub fn expire(&self, key: &str, ttl: u64) -> Result<bool, DatabaseError> {
|
|
let mut data = self.data.write();
|
|
if let Some(entry) = data.get_mut(key) {
|
|
if entry.is_expired() {
|
|
return Ok(false);
|
|
}
|
|
entry.ttl = ttl;
|
|
entry.expires_at = if ttl > 0 {
|
|
Some(Instant::now() + Duration::from_secs(ttl))
|
|
} else {
|
|
None
|
|
};
|
|
Ok(true)
|
|
} else {
|
|
Ok(false)
|
|
}
|
|
}
|
|
|
|
/// Gets remaining TTL.
|
|
pub fn ttl(&self, key: &str) -> Option<u64> {
|
|
self.data.read().get(key).and_then(|e| e.remaining_ttl())
|
|
}
|
|
|
|
/// Increments a numeric value.
|
|
pub fn incr(&self, key: &str, delta: i64) -> Result<i64, DatabaseError> {
|
|
let mut data = self.data.write();
|
|
|
|
let current = if let Some(entry) = data.get(key) {
|
|
if entry.is_expired() {
|
|
0
|
|
} else {
|
|
let s = String::from_utf8(entry.value.clone())
|
|
.map_err(|_| DatabaseError::InvalidOperation("Value is not a string".into()))?;
|
|
s.parse::<i64>()
|
|
.map_err(|_| DatabaseError::InvalidOperation("Value is not an integer".into()))?
|
|
}
|
|
} else {
|
|
0
|
|
};
|
|
|
|
let new_value = current + delta;
|
|
let entry = KvEntry::new(new_value.to_string().into_bytes(), 0);
|
|
data.insert(key.to_string(), entry);
|
|
self.stats.write().sets += 1;
|
|
|
|
Ok(new_value)
|
|
}
|
|
|
|
/// Appends to a string value.
|
|
pub fn append(&self, key: &str, value: &[u8]) -> Result<usize, DatabaseError> {
|
|
let mut data = self.data.write();
|
|
|
|
let entry = data.entry(key.to_string()).or_insert_with(|| {
|
|
KvEntry::new(Vec::new(), 0)
|
|
});
|
|
|
|
if entry.is_expired() {
|
|
entry.value.clear();
|
|
}
|
|
|
|
entry.value.extend_from_slice(value);
|
|
entry.modified_at = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_millis() as u64;
|
|
|
|
Ok(entry.value.len())
|
|
}
|
|
|
|
/// Gets multiple keys.
|
|
pub fn mget(&self, keys: &[&str]) -> Vec<Option<Vec<u8>>> {
|
|
keys.iter().map(|k| self.get(k)).collect()
|
|
}
|
|
|
|
/// Sets multiple key-value pairs.
|
|
pub fn mset(&self, pairs: &[(&str, Vec<u8>)], ttl: u64) -> Result<(), DatabaseError> {
|
|
let mut data = self.data.write();
|
|
for (key, value) in pairs {
|
|
data.insert(key.to_string(), KvEntry::new(value.clone(), ttl));
|
|
}
|
|
self.stats.write().sets += pairs.len() as u64;
|
|
Ok(())
|
|
}
|
|
|
|
/// Returns all keys matching a pattern (simple glob).
|
|
pub fn keys(&self, pattern: &str) -> Vec<String> {
|
|
let data = self.data.read();
|
|
data.keys()
|
|
.filter(|k| self.matches_pattern(k, pattern))
|
|
.cloned()
|
|
.collect()
|
|
}
|
|
|
|
/// Simple glob pattern matching.
|
|
fn matches_pattern(&self, key: &str, pattern: &str) -> bool {
|
|
if pattern == "*" {
|
|
return true;
|
|
}
|
|
|
|
let parts: Vec<&str> = pattern.split('*').collect();
|
|
if parts.len() == 1 {
|
|
return key == pattern;
|
|
}
|
|
|
|
let mut pos = 0;
|
|
for (i, part) in parts.iter().enumerate() {
|
|
if part.is_empty() {
|
|
continue;
|
|
}
|
|
if let Some(found) = key[pos..].find(part) {
|
|
if i == 0 && found != 0 {
|
|
return false;
|
|
}
|
|
pos += found + part.len();
|
|
} else {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if !parts.last().unwrap().is_empty() {
|
|
key.ends_with(parts.last().unwrap())
|
|
} else {
|
|
true
|
|
}
|
|
}
|
|
|
|
/// Returns the number of keys.
|
|
pub fn len(&self) -> usize {
|
|
self.data.read().len()
|
|
}
|
|
|
|
/// Checks if store is empty.
|
|
pub fn is_empty(&self) -> bool {
|
|
self.data.read().is_empty()
|
|
}
|
|
|
|
/// Clears all keys.
|
|
pub fn clear(&self) {
|
|
self.data.write().clear();
|
|
}
|
|
|
|
/// Returns statistics.
|
|
pub fn stats(&self) -> KvStats {
|
|
self.stats.read().clone()
|
|
}
|
|
|
|
/// Evicts expired entries.
|
|
pub fn evict_expired(&self) -> usize {
|
|
let mut data = self.data.write();
|
|
let before = data.len();
|
|
data.retain(|_, entry| !entry.is_expired());
|
|
let evicted = before - data.len();
|
|
self.stats.write().expired += evicted as u64;
|
|
evicted
|
|
}
|
|
}
|
|
|
|
impl Default for KeyValueStore {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_basic_operations() {
|
|
let store = KeyValueStore::new();
|
|
|
|
store.set_string("key1", "value1", 0).unwrap();
|
|
assert_eq!(store.get_string("key1"), Some("value1".to_string()));
|
|
|
|
store.delete("key1").unwrap();
|
|
assert_eq!(store.get_string("key1"), None);
|
|
}
|
|
|
|
#[test]
|
|
fn test_set_nx() {
|
|
let store = KeyValueStore::new();
|
|
|
|
assert!(store.set_nx("key", b"value1".to_vec(), 0).unwrap());
|
|
assert!(!store.set_nx("key", b"value2".to_vec(), 0).unwrap());
|
|
|
|
assert_eq!(store.get_string("key"), Some("value1".to_string()));
|
|
}
|
|
|
|
#[test]
|
|
fn test_incr() {
|
|
let store = KeyValueStore::new();
|
|
|
|
assert_eq!(store.incr("counter", 1).unwrap(), 1);
|
|
assert_eq!(store.incr("counter", 1).unwrap(), 2);
|
|
assert_eq!(store.incr("counter", 5).unwrap(), 7);
|
|
assert_eq!(store.incr("counter", -3).unwrap(), 4);
|
|
}
|
|
|
|
#[test]
|
|
fn test_mget_mset() {
|
|
let store = KeyValueStore::new();
|
|
|
|
store.mset(&[
|
|
("k1", b"v1".to_vec()),
|
|
("k2", b"v2".to_vec()),
|
|
("k3", b"v3".to_vec()),
|
|
], 0).unwrap();
|
|
|
|
let results = store.mget(&["k1", "k2", "k4"]);
|
|
assert_eq!(results.len(), 3);
|
|
assert_eq!(results[0], Some(b"v1".to_vec()));
|
|
assert_eq!(results[1], Some(b"v2".to_vec()));
|
|
assert_eq!(results[2], None);
|
|
}
|
|
|
|
#[test]
|
|
fn test_keys_pattern() {
|
|
let store = KeyValueStore::new();
|
|
|
|
store.set_string("user:1", "alice", 0).unwrap();
|
|
store.set_string("user:2", "bob", 0).unwrap();
|
|
store.set_string("session:1", "data", 0).unwrap();
|
|
|
|
let user_keys = store.keys("user:*");
|
|
assert_eq!(user_keys.len(), 2);
|
|
|
|
let all_keys = store.keys("*");
|
|
assert_eq!(all_keys.len(), 3);
|
|
}
|
|
|
|
#[test]
|
|
fn test_append() {
|
|
let store = KeyValueStore::new();
|
|
|
|
store.append("msg", b"hello").unwrap();
|
|
store.append("msg", b" world").unwrap();
|
|
|
|
assert_eq!(store.get_string("msg"), Some("hello world".to_string()));
|
|
}
|
|
|
|
#[test]
|
|
fn test_exists() {
|
|
let store = KeyValueStore::new();
|
|
|
|
assert!(!store.exists("key"));
|
|
store.set_string("key", "value", 0).unwrap();
|
|
assert!(store.exists("key"));
|
|
}
|
|
}
|