synor/crates/synor-storage/src/db.rs
2026-01-08 05:22:24 +05:30

520 lines
15 KiB
Rust

//! RocksDB database wrapper for Synor.
//!
//! Provides a typed interface to the underlying RocksDB instance.
use crate::cf;
use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, DBCompressionType, Options, WriteBatch, DB};
use std::path::Path;
use std::sync::Arc;
use thiserror::Error;
/// Database configuration options.
#[derive(Clone, Debug)]
pub struct DatabaseConfig {
/// Maximum number of open files.
pub max_open_files: i32,
/// Write buffer size in bytes.
pub write_buffer_size: usize,
/// Maximum number of write buffers.
pub max_write_buffer_number: i32,
/// Target file size for level-1 files.
pub target_file_size_base: u64,
/// Maximum total WAL size.
pub max_total_wal_size: u64,
/// Enable compression.
pub enable_compression: bool,
/// Block cache size in bytes.
pub block_cache_size: usize,
/// Enable statistics.
pub enable_statistics: bool,
/// Create database if missing.
pub create_if_missing: bool,
/// Number of background threads.
pub parallelism: i32,
}
impl Default for DatabaseConfig {
fn default() -> Self {
DatabaseConfig {
max_open_files: 512,
write_buffer_size: 64 * 1024 * 1024, // 64 MB
max_write_buffer_number: 3,
target_file_size_base: 64 * 1024 * 1024, // 64 MB
max_total_wal_size: 256 * 1024 * 1024, // 256 MB
enable_compression: true,
block_cache_size: 128 * 1024 * 1024, // 128 MB
enable_statistics: false,
create_if_missing: true,
parallelism: 4,
}
}
}
impl DatabaseConfig {
/// Creates a configuration optimized for SSDs.
pub fn ssd() -> Self {
DatabaseConfig {
max_open_files: 1024,
write_buffer_size: 128 * 1024 * 1024,
max_write_buffer_number: 4,
target_file_size_base: 128 * 1024 * 1024,
max_total_wal_size: 512 * 1024 * 1024,
enable_compression: true,
block_cache_size: 512 * 1024 * 1024,
enable_statistics: false,
create_if_missing: true,
parallelism: 8,
}
}
/// Creates a configuration for low-memory environments.
pub fn low_memory() -> Self {
DatabaseConfig {
max_open_files: 128,
write_buffer_size: 16 * 1024 * 1024,
max_write_buffer_number: 2,
target_file_size_base: 32 * 1024 * 1024,
max_total_wal_size: 64 * 1024 * 1024,
enable_compression: true,
block_cache_size: 32 * 1024 * 1024,
enable_statistics: false,
create_if_missing: true,
parallelism: 2,
}
}
/// Creates a configuration for testing.
pub fn for_testing() -> Self {
DatabaseConfig {
max_open_files: 64,
write_buffer_size: 4 * 1024 * 1024,
max_write_buffer_number: 2,
target_file_size_base: 4 * 1024 * 1024,
max_total_wal_size: 16 * 1024 * 1024,
enable_compression: false,
block_cache_size: 8 * 1024 * 1024,
enable_statistics: true,
create_if_missing: true,
parallelism: 1,
}
}
/// Converts to RocksDB Options.
fn to_options(&self) -> Options {
let mut opts = Options::default();
opts.create_if_missing(self.create_if_missing);
opts.create_missing_column_families(true);
opts.set_max_open_files(self.max_open_files);
opts.set_write_buffer_size(self.write_buffer_size);
opts.set_max_write_buffer_number(self.max_write_buffer_number);
opts.set_target_file_size_base(self.target_file_size_base);
opts.set_max_total_wal_size(self.max_total_wal_size);
opts.increase_parallelism(self.parallelism);
if self.enable_compression {
opts.set_compression_type(DBCompressionType::Lz4);
}
opts
}
}
/// Database errors.
#[derive(Debug, Error)]
pub enum DbError {
#[error("RocksDB error: {0}")]
RocksDb(#[from] rocksdb::Error),
#[error("Column family not found: {0}")]
ColumnFamilyNotFound(String),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Deserialization error: {0}")]
Deserialization(String),
#[error("Key not found")]
NotFound,
#[error("Database is closed")]
Closed,
#[error("Invalid key format")]
InvalidKey,
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
}
/// The main database handle.
pub struct Database {
db: Arc<DB>,
path: String,
}
impl Database {
/// Opens or creates a database at the given path.
pub fn open<P: AsRef<Path>>(path: P, config: &DatabaseConfig) -> Result<Self, DbError> {
let path_str = path.as_ref().to_string_lossy().to_string();
let opts = config.to_options();
// Create column family descriptors
let cf_descriptors: Vec<ColumnFamilyDescriptor> = cf::all()
.into_iter()
.map(|name| {
let mut cf_opts = Options::default();
if config.enable_compression {
cf_opts.set_compression_type(DBCompressionType::Lz4);
}
ColumnFamilyDescriptor::new(name, cf_opts)
})
.collect();
let db = DB::open_cf_descriptors(&opts, &path_str, cf_descriptors)?;
Ok(Database {
db: Arc::new(db),
path: path_str,
})
}
/// Opens a database in read-only mode.
pub fn open_read_only<P: AsRef<Path>>(
path: P,
config: &DatabaseConfig,
) -> Result<Self, DbError> {
let path_str = path.as_ref().to_string_lossy().to_string();
let opts = config.to_options();
let cf_names: Vec<&str> = cf::all();
let db = DB::open_cf_for_read_only(&opts, &path_str, cf_names, false)?;
Ok(Database {
db: Arc::new(db),
path: path_str,
})
}
/// Returns the database path.
pub fn path(&self) -> &str {
&self.path
}
/// Gets a value by key from a column family.
pub fn get(&self, cf_name: &str, key: &[u8]) -> Result<Option<Vec<u8>>, DbError> {
let cf = self.get_cf(cf_name)?;
Ok(self.db.get_cf(cf, key)?)
}
/// Puts a key-value pair into a column family.
pub fn put(&self, cf_name: &str, key: &[u8], value: &[u8]) -> Result<(), DbError> {
let cf = self.get_cf(cf_name)?;
self.db.put_cf(cf, key, value)?;
Ok(())
}
/// Deletes a key from a column family.
pub fn delete(&self, cf_name: &str, key: &[u8]) -> Result<(), DbError> {
let cf = self.get_cf(cf_name)?;
self.db.delete_cf(cf, key)?;
Ok(())
}
/// Checks if a key exists in a column family.
pub fn exists(&self, cf_name: &str, key: &[u8]) -> Result<bool, DbError> {
let cf = self.get_cf(cf_name)?;
Ok(self.db.get_cf(cf, key)?.is_some())
}
/// Creates a new write batch.
pub fn batch(&self) -> WriteBatchWrapper {
WriteBatchWrapper {
batch: WriteBatch::default(),
db: Arc::clone(&self.db),
}
}
/// Writes a batch atomically.
pub fn write_batch(&self, batch: WriteBatchWrapper) -> Result<(), DbError> {
self.db.write(batch.batch)?;
Ok(())
}
/// Iterates over all keys in a column family.
pub fn iter(
&self,
cf_name: &str,
) -> Result<impl Iterator<Item = (Box<[u8]>, Box<[u8]>)> + '_, DbError> {
let cf = self.get_cf(cf_name)?;
let iter = self.db.iterator_cf(cf, rocksdb::IteratorMode::Start);
Ok(iter.map(|r| r.unwrap()))
}
/// Iterates over keys with a given prefix.
pub fn prefix_iter(
&self,
cf_name: &str,
prefix: &[u8],
) -> Result<impl Iterator<Item = (Box<[u8]>, Box<[u8]>)> + '_, DbError> {
let cf = self.get_cf(cf_name)?;
let iter = self.db.prefix_iterator_cf(cf, prefix);
let prefix_owned = prefix.to_vec();
Ok(iter
.map(|r| r.unwrap())
.take_while(move |(k, _)| k.starts_with(&prefix_owned)))
}
/// Gets multiple values by keys.
pub fn multi_get(
&self,
cf_name: &str,
keys: &[&[u8]],
) -> Result<Vec<Option<Vec<u8>>>, DbError> {
let cf = self.get_cf(cf_name)?;
let cf_keys: Vec<_> = keys.iter().map(|k| (cf, *k)).collect();
let results = self.db.multi_get_cf(cf_keys);
results
.into_iter()
.map(|r| r.map_err(DbError::from))
.collect()
}
/// Flushes the WAL to disk.
pub fn flush(&self) -> Result<(), DbError> {
self.db.flush()?;
Ok(())
}
/// Compacts the entire database.
pub fn compact(&self) -> Result<(), DbError> {
for cf_name in cf::all() {
if let Ok(cf) = self.get_cf(cf_name) {
self.db.compact_range_cf(cf, None::<&[u8]>, None::<&[u8]>);
}
}
Ok(())
}
/// Returns database statistics if enabled.
pub fn stats(&self) -> Option<String> {
self.db.property_value("rocksdb.stats").ok().flatten()
}
/// Returns the approximate size of a column family in bytes.
pub fn cf_size(&self, cf_name: &str) -> Result<u64, DbError> {
let cf = self.get_cf(cf_name)?;
let size = self
.db
.property_int_value_cf(cf, "rocksdb.estimate-live-data-size")
.map_err(|e| DbError::RocksDb(e))?
.unwrap_or(0);
Ok(size)
}
/// Gets a column family handle.
fn get_cf(&self, name: &str) -> Result<&ColumnFamily, DbError> {
self.db
.cf_handle(name)
.ok_or_else(|| DbError::ColumnFamilyNotFound(name.to_string()))
}
/// Returns an Arc to the underlying database.
pub fn inner(&self) -> Arc<DB> {
Arc::clone(&self.db)
}
}
impl std::fmt::Debug for Database {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Database")
.field("path", &self.path)
.finish()
}
}
/// A wrapper around RocksDB WriteBatch with column family support.
pub struct WriteBatchWrapper {
batch: WriteBatch,
db: Arc<DB>,
}
impl WriteBatchWrapper {
/// Adds a put operation to the batch.
pub fn put(&mut self, cf_name: &str, key: &[u8], value: &[u8]) -> Result<(), DbError> {
let cf = self
.db
.cf_handle(cf_name)
.ok_or_else(|| DbError::ColumnFamilyNotFound(cf_name.to_string()))?;
self.batch.put_cf(cf, key, value);
Ok(())
}
/// Adds a delete operation to the batch.
pub fn delete(&mut self, cf_name: &str, key: &[u8]) -> Result<(), DbError> {
let cf = self
.db
.cf_handle(cf_name)
.ok_or_else(|| DbError::ColumnFamilyNotFound(cf_name.to_string()))?;
self.batch.delete_cf(cf, key);
Ok(())
}
/// Returns the number of operations in the batch.
pub fn len(&self) -> usize {
self.batch.len()
}
/// Returns true if the batch is empty.
pub fn is_empty(&self) -> bool {
self.batch.is_empty()
}
/// Clears all operations from the batch.
pub fn clear(&mut self) {
self.batch.clear();
}
}
/// Utility functions for key encoding.
pub mod keys {
/// Encodes a u64 as big-endian bytes for lexicographic ordering.
pub fn encode_u64(value: u64) -> [u8; 8] {
value.to_be_bytes()
}
/// Decodes a u64 from big-endian bytes.
pub fn decode_u64(bytes: &[u8]) -> Option<u64> {
if bytes.len() < 8 {
return None;
}
let arr: [u8; 8] = bytes[..8].try_into().ok()?;
Some(u64::from_be_bytes(arr))
}
/// Encodes a u32 as big-endian bytes.
pub fn encode_u32(value: u32) -> [u8; 4] {
value.to_be_bytes()
}
/// Decodes a u32 from big-endian bytes.
pub fn decode_u32(bytes: &[u8]) -> Option<u32> {
if bytes.len() < 4 {
return None;
}
let arr: [u8; 4] = bytes[..4].try_into().ok()?;
Some(u32::from_be_bytes(arr))
}
/// Creates a prefixed key.
pub fn prefixed_key(prefix: u8, key: &[u8]) -> Vec<u8> {
let mut result = Vec::with_capacity(1 + key.len());
result.push(prefix);
result.extend_from_slice(key);
result
}
/// Creates a composite key from multiple parts.
pub fn composite_key(parts: &[&[u8]]) -> Vec<u8> {
let total_len: usize = parts.iter().map(|p| p.len()).sum();
let mut result = Vec::with_capacity(total_len);
for part in parts {
result.extend_from_slice(part);
}
result
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_database_open_close() {
let dir = tempdir().unwrap();
let config = DatabaseConfig::for_testing();
let db = Database::open(dir.path(), &config).unwrap();
assert!(db.path().contains(dir.path().to_str().unwrap()));
}
#[test]
fn test_put_get_delete() {
let dir = tempdir().unwrap();
let config = DatabaseConfig::for_testing();
let db = Database::open(dir.path(), &config).unwrap();
// Put
db.put(cf::HEADERS, b"key1", b"value1").unwrap();
// Get
let value = db.get(cf::HEADERS, b"key1").unwrap();
assert_eq!(value, Some(b"value1".to_vec()));
// Delete
db.delete(cf::HEADERS, b"key1").unwrap();
let value = db.get(cf::HEADERS, b"key1").unwrap();
assert_eq!(value, None);
}
#[test]
fn test_batch_operations() {
let dir = tempdir().unwrap();
let config = DatabaseConfig::for_testing();
let db = Database::open(dir.path(), &config).unwrap();
let mut batch = db.batch();
batch.put(cf::HEADERS, b"key1", b"value1").unwrap();
batch.put(cf::HEADERS, b"key2", b"value2").unwrap();
batch.put(cf::BLOCKS, b"block1", b"data1").unwrap();
assert_eq!(batch.len(), 3);
db.write_batch(batch).unwrap();
assert_eq!(
db.get(cf::HEADERS, b"key1").unwrap(),
Some(b"value1".to_vec())
);
assert_eq!(
db.get(cf::HEADERS, b"key2").unwrap(),
Some(b"value2".to_vec())
);
assert_eq!(
db.get(cf::BLOCKS, b"block1").unwrap(),
Some(b"data1".to_vec())
);
}
#[test]
fn test_exists() {
let dir = tempdir().unwrap();
let config = DatabaseConfig::for_testing();
let db = Database::open(dir.path(), &config).unwrap();
assert!(!db.exists(cf::HEADERS, b"key1").unwrap());
db.put(cf::HEADERS, b"key1", b"value1").unwrap();
assert!(db.exists(cf::HEADERS, b"key1").unwrap());
}
#[test]
fn test_key_encoding() {
let value: u64 = 12345678901234567890;
let encoded = keys::encode_u64(value);
let decoded = keys::decode_u64(&encoded).unwrap();
assert_eq!(value, decoded);
let value: u32 = 1234567890;
let encoded = keys::encode_u32(value);
let decoded = keys::decode_u32(&encoded).unwrap();
assert_eq!(value, decoded);
}
#[test]
fn test_prefixed_key() {
let key = keys::prefixed_key(0x01, b"test");
assert_eq!(key, vec![0x01, b't', b'e', b's', b't']);
}
}