synor/crates/synor-database/src/sql/transaction.rs
2026-02-02 05:58:22 +05:30

374 lines
9.7 KiB
Rust

//! ACID transaction support for SQL.
use super::row::RowId;
use super::types::{SqlError, SqlValue};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
/// Transaction identifier.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TransactionId(pub u64);
impl TransactionId {
/// Creates a new transaction ID.
pub fn new() -> Self {
static COUNTER: AtomicU64 = AtomicU64::new(1);
TransactionId(COUNTER.fetch_add(1, Ordering::SeqCst))
}
}
impl Default for TransactionId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for TransactionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "txn_{}", self.0)
}
}
/// Transaction state.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum TransactionState {
/// Transaction is active.
Active,
/// Transaction is committed.
Committed,
/// Transaction is rolled back.
RolledBack,
}
/// A single operation in a transaction.
#[derive(Clone, Debug)]
pub enum TransactionOp {
/// Insert a row.
Insert {
table: String,
row_id: RowId,
values: HashMap<String, SqlValue>,
},
/// Update a row.
Update {
table: String,
row_id: RowId,
old_values: HashMap<String, SqlValue>,
new_values: HashMap<String, SqlValue>,
},
/// Delete a row.
Delete {
table: String,
row_id: RowId,
old_values: HashMap<String, SqlValue>,
},
}
/// Transaction for tracking changes.
#[derive(Debug)]
pub struct Transaction {
/// Transaction ID.
pub id: TransactionId,
/// Transaction state.
pub state: TransactionState,
/// Operations in this transaction.
operations: Vec<TransactionOp>,
/// Start time.
pub started_at: u64,
/// Isolation level.
pub isolation: IsolationLevel,
}
/// Transaction isolation levels.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum IsolationLevel {
/// Read uncommitted (dirty reads allowed).
ReadUncommitted,
/// Read committed (no dirty reads).
ReadCommitted,
/// Repeatable read (no non-repeatable reads).
RepeatableRead,
/// Serializable (full isolation).
Serializable,
}
impl Default for IsolationLevel {
fn default() -> Self {
IsolationLevel::ReadCommitted
}
}
impl Transaction {
/// Creates a new transaction.
pub fn new(isolation: IsolationLevel) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Self {
id: TransactionId::new(),
state: TransactionState::Active,
operations: Vec::new(),
started_at: now,
isolation,
}
}
/// Returns true if the transaction is active.
pub fn is_active(&self) -> bool {
self.state == TransactionState::Active
}
/// Records an insert operation.
pub fn record_insert(
&mut self,
table: String,
row_id: RowId,
values: HashMap<String, SqlValue>,
) {
self.operations.push(TransactionOp::Insert {
table,
row_id,
values,
});
}
/// Records an update operation.
pub fn record_update(
&mut self,
table: String,
row_id: RowId,
old_values: HashMap<String, SqlValue>,
new_values: HashMap<String, SqlValue>,
) {
self.operations.push(TransactionOp::Update {
table,
row_id,
old_values,
new_values,
});
}
/// Records a delete operation.
pub fn record_delete(
&mut self,
table: String,
row_id: RowId,
old_values: HashMap<String, SqlValue>,
) {
self.operations.push(TransactionOp::Delete {
table,
row_id,
old_values,
});
}
/// Returns operations for rollback (in reverse order).
pub fn rollback_ops(&self) -> impl Iterator<Item = &TransactionOp> {
self.operations.iter().rev()
}
/// Returns operations for commit.
pub fn commit_ops(&self) -> &[TransactionOp] {
&self.operations
}
/// Marks the transaction as committed.
pub fn mark_committed(&mut self) {
self.state = TransactionState::Committed;
}
/// Marks the transaction as rolled back.
pub fn mark_rolled_back(&mut self) {
self.state = TransactionState::RolledBack;
}
}
/// Transaction manager.
pub struct TransactionManager {
/// Active transactions.
transactions: RwLock<HashMap<TransactionId, Transaction>>,
}
impl TransactionManager {
/// Creates a new transaction manager.
pub fn new() -> Self {
Self {
transactions: RwLock::new(HashMap::new()),
}
}
/// Begins a new transaction.
pub fn begin(&self, isolation: IsolationLevel) -> TransactionId {
let txn = Transaction::new(isolation);
let id = txn.id;
self.transactions.write().insert(id, txn);
id
}
/// Gets a transaction by ID.
pub fn get(&self, id: TransactionId) -> Option<Transaction> {
self.transactions.read().get(&id).cloned()
}
/// Records an operation in a transaction.
pub fn record_op(&self, id: TransactionId, op: TransactionOp) -> Result<(), SqlError> {
let mut txns = self.transactions.write();
let txn = txns
.get_mut(&id)
.ok_or_else(|| SqlError::Transaction(format!("Transaction {} not found", id)))?;
if !txn.is_active() {
return Err(SqlError::Transaction(format!(
"Transaction {} is not active",
id
)));
}
txn.operations.push(op);
Ok(())
}
/// Commits a transaction.
pub fn commit(&self, id: TransactionId) -> Result<Vec<TransactionOp>, SqlError> {
let mut txns = self.transactions.write();
let txn = txns
.get_mut(&id)
.ok_or_else(|| SqlError::Transaction(format!("Transaction {} not found", id)))?;
if !txn.is_active() {
return Err(SqlError::Transaction(format!(
"Transaction {} is not active",
id
)));
}
txn.mark_committed();
let ops = txn.operations.clone();
txns.remove(&id);
Ok(ops)
}
/// Rolls back a transaction, returning operations to undo.
pub fn rollback(&self, id: TransactionId) -> Result<Vec<TransactionOp>, SqlError> {
let mut txns = self.transactions.write();
let txn = txns
.get_mut(&id)
.ok_or_else(|| SqlError::Transaction(format!("Transaction {} not found", id)))?;
if !txn.is_active() {
return Err(SqlError::Transaction(format!(
"Transaction {} is not active",
id
)));
}
txn.mark_rolled_back();
let ops: Vec<TransactionOp> = txn.operations.iter().rev().cloned().collect();
txns.remove(&id);
Ok(ops)
}
/// Returns the number of active transactions.
pub fn active_count(&self) -> usize {
self.transactions.read().len()
}
/// Checks if a transaction exists and is active.
pub fn is_active(&self, id: TransactionId) -> bool {
self.transactions
.read()
.get(&id)
.map(|t| t.is_active())
.unwrap_or(false)
}
}
impl Default for TransactionManager {
fn default() -> Self {
Self::new()
}
}
impl Clone for Transaction {
fn clone(&self) -> Self {
Self {
id: self.id,
state: self.state,
operations: self.operations.clone(),
started_at: self.started_at,
isolation: self.isolation,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_transaction_lifecycle() {
let manager = TransactionManager::new();
let txn_id = manager.begin(IsolationLevel::ReadCommitted);
assert!(manager.is_active(txn_id));
assert_eq!(manager.active_count(), 1);
let mut values = HashMap::new();
values.insert("name".to_string(), SqlValue::Text("Alice".to_string()));
manager
.record_op(
txn_id,
TransactionOp::Insert {
table: "users".to_string(),
row_id: RowId(1),
values,
},
)
.unwrap();
let ops = manager.commit(txn_id).unwrap();
assert_eq!(ops.len(), 1);
assert_eq!(manager.active_count(), 0);
}
#[test]
fn test_transaction_rollback() {
let manager = TransactionManager::new();
let txn_id = manager.begin(IsolationLevel::ReadCommitted);
let mut values = HashMap::new();
values.insert("name".to_string(), SqlValue::Text("Bob".to_string()));
manager
.record_op(
txn_id,
TransactionOp::Insert {
table: "users".to_string(),
row_id: RowId(1),
values,
},
)
.unwrap();
let ops = manager.rollback(txn_id).unwrap();
assert_eq!(ops.len(), 1);
assert_eq!(manager.active_count(), 0);
}
#[test]
fn test_transaction_not_found() {
let manager = TransactionManager::new();
let fake_id = TransactionId(99999);
assert!(!manager.is_active(fake_id));
assert!(manager.commit(fake_id).is_err());
assert!(manager.rollback(fake_id).is_err());
}
}