//! Parallel Execution Scheduler (Sealevel-style). //! //! Enables parallel execution of transactions with non-conflicting state access. //! Inspired by Solana's Sealevel runtime, adapted for Synor's DAG architecture. //! //! # How It Works //! //! 1. Transactions declare which accounts/storage they will read/write //! 2. Scheduler groups transactions into batches with non-overlapping write sets //! 3. Each batch executes in parallel across multiple threads //! 4. Results are collected and state changes applied sequentially //! //! # Benefits //! //! - 10-100x throughput improvement for non-conflicting transactions //! - Natural fit for DAG's parallel block production //! - Maintains determinism through careful ordering use std::collections::{HashMap, HashSet}; use std::sync::Arc; use parking_lot::{Mutex, RwLock}; use crate::{ContractId, ExecutionResult, VmError}; /// Account/state access declaration for a transaction. #[derive(Clone, Debug, Default)] pub struct AccessSet { /// Accounts/contracts that will be read. pub reads: HashSet, /// Accounts/contracts that will be written. pub writes: HashSet, } impl AccessSet { /// Creates a new empty access set. pub fn new() -> Self { Self::default() } /// Adds a read access. pub fn add_read(&mut self, contract: ContractId) { self.reads.insert(contract); } /// Adds a write access. pub fn add_write(&mut self, contract: ContractId) { // Writes implicitly include reads self.reads.insert(contract); self.writes.insert(contract); } /// Checks if two access sets conflict (can't execute in parallel). pub fn conflicts_with(&self, other: &AccessSet) -> bool { // Write-Write conflict if !self.writes.is_disjoint(&other.writes) { return true; } // Write-Read conflict if !self.writes.is_disjoint(&other.reads) { return true; } // Read-Write conflict if !self.reads.is_disjoint(&other.writes) { return true; } false } /// Merges another access set into this one. pub fn merge(&mut self, other: &AccessSet) { self.reads.extend(other.reads.iter().cloned()); self.writes.extend(other.writes.iter().cloned()); } } /// A transaction ready for parallel execution. pub struct ScheduledTransaction { /// The transaction data. pub tx: T, /// Transaction index (for ordering). pub index: usize, /// Declared access set. pub access: AccessSet, } /// Result of executing a scheduled transaction. pub struct ScheduledResult { /// Original transaction. pub tx: T, /// Transaction index. pub index: usize, /// Execution result or error. pub result: Result, } /// A batch of transactions that can execute in parallel. pub struct ExecutionBatch { /// Transactions in this batch. pub transactions: Vec>, /// Combined access set for the batch. pub combined_access: AccessSet, } impl Default for ExecutionBatch { fn default() -> Self { Self { transactions: Vec::new(), combined_access: AccessSet::default(), } } } impl ExecutionBatch { /// Creates a new empty batch. pub fn new() -> Self { Self::default() } /// Tries to add a transaction to this batch. /// Returns false if it conflicts with existing transactions. pub fn try_add(&mut self, tx: ScheduledTransaction) -> Result<(), ScheduledTransaction> { // Check for conflicts with current batch if self.combined_access.conflicts_with(&tx.access) { return Err(tx); } // Add to batch self.combined_access.merge(&tx.access); self.transactions.push(tx); Ok(()) } /// Returns the number of transactions in this batch. pub fn len(&self) -> usize { self.transactions.len() } /// Returns true if the batch is empty. pub fn is_empty(&self) -> bool { self.transactions.is_empty() } } /// Statistics for parallel execution. #[derive(Clone, Debug, Default)] pub struct SchedulerStats { /// Total transactions processed. pub total_transactions: usize, /// Number of batches created. pub batch_count: usize, /// Transactions that ran in parallel. pub parallel_transactions: usize, /// Transactions that had conflicts. pub conflicting_transactions: usize, /// Maximum batch size achieved. pub max_batch_size: usize, /// Average parallelism factor. pub avg_parallelism: f64, } impl SchedulerStats { /// Computes the parallelism improvement factor. pub fn parallelism_factor(&self) -> f64 { if self.batch_count == 0 { return 1.0; } self.total_transactions as f64 / self.batch_count as f64 } } /// Configuration for the parallel scheduler. #[derive(Clone, Debug)] pub struct SchedulerConfig { /// Maximum batch size. pub max_batch_size: usize, /// Number of worker threads. pub worker_threads: usize, /// Enable speculative execution. pub enable_speculation: bool, /// Maximum pending batches in queue. pub queue_depth: usize, } impl Default for SchedulerConfig { fn default() -> Self { Self { max_batch_size: 256, worker_threads: num_cpus::get(), enable_speculation: false, queue_depth: 16, } } } /// The parallel execution scheduler. pub struct ParallelScheduler { /// Configuration. config: SchedulerConfig, /// Current batches being built. batches: Vec>, /// Statistics. stats: Mutex, } impl ParallelScheduler { /// Creates a new scheduler with default configuration. pub fn new() -> Self { Self::with_config(SchedulerConfig::default()) } /// Creates a scheduler with custom configuration. pub fn with_config(config: SchedulerConfig) -> Self { Self { config, batches: Vec::new(), stats: Mutex::new(SchedulerStats::default()), } } /// Schedules transactions into parallel batches. /// /// Uses a greedy algorithm: /// 1. Try to add each transaction to the first batch it doesn't conflict with /// 2. If no batch works, create a new batch pub fn schedule( &mut self, transactions: Vec>, ) -> Vec> { let mut batches: Vec> = Vec::new(); let tx_count = transactions.len(); for tx in transactions { // Find first non-conflicting batch let mut current_tx = Some(tx); for batch in batches.iter_mut() { if batch.len() < self.config.max_batch_size { if let Some(tx_to_add) = current_tx.take() { match batch.try_add(tx_to_add) { Ok(()) => { // Successfully placed, current_tx is now None break; } Err(returned_tx) => { // Failed, put it back for next iteration current_tx = Some(returned_tx); } } } } } // Create new batch if transaction wasn't placed if let Some(tx_to_add) = current_tx { let mut new_batch = ExecutionBatch::new(); // This should always succeed for a new batch let _ = new_batch.try_add(tx_to_add); batches.push(new_batch); } } // Update statistics { let mut stats = self.stats.lock(); stats.total_transactions += tx_count; stats.batch_count += batches.len(); let max_size = batches.iter().map(|b| b.len()).max().unwrap_or(0); stats.max_batch_size = stats.max_batch_size.max(max_size); if !batches.is_empty() { stats.parallel_transactions += tx_count.saturating_sub(batches.len()); stats.avg_parallelism = stats.total_transactions as f64 / stats.batch_count as f64; } } batches } /// Returns current statistics. pub fn stats(&self) -> SchedulerStats { self.stats.lock().clone() } /// Resets statistics. pub fn reset_stats(&self) { *self.stats.lock() = SchedulerStats::default(); } } impl Default for ParallelScheduler { fn default() -> Self { Self::new() } } /// Executor trait for parallel execution. pub trait ParallelExecutor { /// Executes a single transaction. fn execute(&self, tx: &ScheduledTransaction) -> Result; } /// Executes batches in parallel using the provided executor. pub async fn execute_batches_parallel( batches: Vec>, executor: Arc, ) -> Vec> where T: Send + Sync + 'static + Clone, E: ParallelExecutor + Send + Sync + 'static, { let mut all_results: Vec> = Vec::new(); // Execute batches sequentially (order matters for state) // but transactions within each batch in parallel for batch in batches { let batch_results = execute_batch_parallel(batch, executor.clone()).await; all_results.extend(batch_results); } // Sort by original index to maintain order all_results.sort_by_key(|r| r.index); all_results } /// Executes a single batch in parallel. async fn execute_batch_parallel( batch: ExecutionBatch, executor: Arc, ) -> Vec> where T: Send + Sync + 'static + Clone, E: ParallelExecutor + Send + Sync + 'static, { let mut handles = Vec::new(); for tx in batch.transactions { let executor = executor.clone(); let handle = tokio::spawn(async move { let result = executor.execute(&tx); ScheduledResult { tx: tx.tx, index: tx.index, result, } }); handles.push(handle); } let mut results = Vec::new(); for handle in handles { if let Ok(result) = handle.await { results.push(result); } } results } /// Builder for creating scheduled transactions with access declarations. pub struct TransactionBuilder { tx: T, index: usize, access: AccessSet, } impl TransactionBuilder { /// Creates a new builder. pub fn new(tx: T, index: usize) -> Self { Self { tx, index, access: AccessSet::new(), } } /// Declares a read access. pub fn reads(mut self, contract: ContractId) -> Self { self.access.add_read(contract); self } /// Declares a write access. pub fn writes(mut self, contract: ContractId) -> Self { self.access.add_write(contract); self } /// Builds the scheduled transaction. pub fn build(self) -> ScheduledTransaction { ScheduledTransaction { tx: self.tx, index: self.index, access: self.access, } } } /// Lock manager for ensuring safe parallel access. pub struct LockManager { /// Currently locked contracts. locks: RwLock>, } #[derive(Clone, Debug)] enum LockState { /// Read lock with count. Read(usize), /// Write lock. Write, } impl LockManager { /// Creates a new lock manager. pub fn new() -> Self { Self { locks: RwLock::new(HashMap::new()), } } /// Tries to acquire locks for an access set. pub fn try_acquire(&self, access: &AccessSet) -> bool { let mut locks = self.locks.write(); // Check all writes first for contract in &access.writes { if locks.get(contract).is_some() { return false; // Any lock blocks write } } // Check reads (only blocked by write locks) for contract in &access.reads { if !access.writes.contains(contract) { if let Some(LockState::Write) = locks.get(contract) { return false; } } } // Acquire all locks for contract in &access.writes { locks.insert(*contract, LockState::Write); } for contract in &access.reads { if !access.writes.contains(contract) { let entry = locks.entry(*contract).or_insert(LockState::Read(0)); if let LockState::Read(count) = entry { *count += 1; } } } true } /// Releases locks for an access set. pub fn release(&self, access: &AccessSet) { let mut locks = self.locks.write(); for contract in &access.writes { locks.remove(contract); } for contract in &access.reads { if !access.writes.contains(contract) { if let Some(LockState::Read(count)) = locks.get_mut(contract) { *count -= 1; if *count == 0 { locks.remove(contract); } } } } } } impl Default for LockManager { fn default() -> Self { Self::new() } } #[cfg(test)] mod tests { use super::*; fn make_contract(id: u8) -> ContractId { ContractId::from_bytes([id; 32]) } #[test] fn test_access_set_no_conflict() { let mut a = AccessSet::new(); a.add_read(make_contract(1)); a.add_read(make_contract(2)); let mut b = AccessSet::new(); b.add_read(make_contract(1)); b.add_read(make_contract(3)); // Read-read is fine assert!(!a.conflicts_with(&b)); } #[test] fn test_access_set_write_conflict() { let mut a = AccessSet::new(); a.add_write(make_contract(1)); let mut b = AccessSet::new(); b.add_write(make_contract(1)); // Write-write conflicts assert!(a.conflicts_with(&b)); } #[test] fn test_access_set_read_write_conflict() { let mut a = AccessSet::new(); a.add_read(make_contract(1)); let mut b = AccessSet::new(); b.add_write(make_contract(1)); // Read-write conflicts assert!(a.conflicts_with(&b)); } #[test] fn test_scheduler_parallel() { // Three transactions that don't conflict let tx1 = TransactionBuilder::new("tx1", 0) .writes(make_contract(1)) .build(); let tx2 = TransactionBuilder::new("tx2", 1) .writes(make_contract(2)) .build(); let tx3 = TransactionBuilder::new("tx3", 2) .writes(make_contract(3)) .build(); let mut scheduler = ParallelScheduler::new(); let batches = scheduler.schedule(vec![tx1, tx2, tx3]); // Should all be in one batch (no conflicts) assert_eq!(batches.len(), 1); assert_eq!(batches[0].len(), 3); } #[test] fn test_scheduler_conflicts() { // Three transactions that all conflict (write to same contract) let tx1 = TransactionBuilder::new("tx1", 0) .writes(make_contract(1)) .build(); let tx2 = TransactionBuilder::new("tx2", 1) .writes(make_contract(1)) .build(); let tx3 = TransactionBuilder::new("tx3", 2) .writes(make_contract(1)) .build(); let mut scheduler = ParallelScheduler::new(); let batches = scheduler.schedule(vec![tx1, tx2, tx3]); // Each should be in its own batch (all conflict) assert_eq!(batches.len(), 3); } #[test] fn test_scheduler_mixed() { // Mix of conflicting and non-conflicting let tx1 = TransactionBuilder::new("tx1", 0) .writes(make_contract(1)) .build(); let tx2 = TransactionBuilder::new("tx2", 1) .writes(make_contract(2)) .build(); let tx3 = TransactionBuilder::new("tx3", 2) .writes(make_contract(1)) .build(); let tx4 = TransactionBuilder::new("tx4", 3) .writes(make_contract(3)) .build(); let mut scheduler = ParallelScheduler::new(); let batches = scheduler.schedule(vec![tx1, tx2, tx3, tx4]); // tx1, tx2, tx4 can be parallel; tx3 conflicts with tx1 assert_eq!(batches.len(), 2); } #[test] fn test_lock_manager() { let manager = LockManager::new(); let mut access1 = AccessSet::new(); access1.add_write(make_contract(1)); let mut access2 = AccessSet::new(); access2.add_read(make_contract(1)); // First should succeed assert!(manager.try_acquire(&access1)); // Second should fail (read blocked by write) assert!(!manager.try_acquire(&access2)); // After release, should succeed manager.release(&access1); assert!(manager.try_acquire(&access2)); } #[test] fn test_scheduler_stats() { let tx1 = TransactionBuilder::new("tx1", 0) .writes(make_contract(1)) .build(); let tx2 = TransactionBuilder::new("tx2", 1) .writes(make_contract(2)) .build(); let mut scheduler = ParallelScheduler::new(); scheduler.schedule(vec![tx1, tx2]); let stats = scheduler.stats(); assert_eq!(stats.total_transactions, 2); assert_eq!(stats.batch_count, 1); assert!(stats.parallelism_factor() > 1.0); } }