//! Speculative Execution and Hot State Caching. //! //! This module provides optimistic transaction execution with rollback support //! and intelligent state caching for frequently accessed contracts. //! //! # Architecture //! //! ```text //! ┌─────────────────────────────────────────────────────────────────┐ //! │ SPECULATIVE EXECUTION ENGINE │ //! ├─────────────────────────────────────────────────────────────────┤ //! │ │ //! │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │ //! │ │ Pending │ │ Speculative │ │ Hot State │ │ //! │ │ Pool │──│ Executor │──│ Cache │ │ //! │ └──────────────┘ └──────────────┘ └──────────────────────┘ │ //! │ │ │ │ │ //! │ │ ┌──────▼───────┐ │ │ //! │ │ │ Version │ │ │ //! │ └─────────▶│ Manager │◀──────────┘ │ //! │ └──────────────┘ │ //! │ │ │ //! │ ┌──────▼───────┐ │ //! │ │ Rollback │ │ //! │ │ Handler │ │ //! │ └──────────────┘ │ //! │ │ //! └─────────────────────────────────────────────────────────────────┘ //! ``` //! //! # Benefits //! //! - Pre-execution reduces apparent latency by 30-70% //! - Hot state cache eliminates 90%+ of storage reads //! - Snapshot/rollback enables safe speculation //! - Version tracking prevents dirty reads use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use parking_lot::{Mutex, RwLock}; use crate::{ContractId, ExecutionResult, StorageKey, StorageValue, VmError}; /// Configuration for speculative execution. #[derive(Clone, Debug)] pub struct SpeculativeConfig { /// Maximum pending speculative executions. pub max_pending: usize, /// Maximum speculative depth (blocks ahead). pub max_depth: u32, /// Enable hot state caching. pub enable_cache: bool, /// Hot cache size per contract. pub cache_size_per_contract: usize, /// Global cache size limit. pub global_cache_limit: usize, /// Snapshot retention count. pub snapshot_retention: usize, /// Auto-promote threshold (access count). pub hot_threshold: u32, } impl Default for SpeculativeConfig { fn default() -> Self { Self { max_pending: 1000, max_depth: 3, enable_cache: true, cache_size_per_contract: 1024, global_cache_limit: 100_000, snapshot_retention: 10, hot_threshold: 5, } } } /// Speculative execution state version. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct StateVersion(pub u64); impl StateVersion { /// Creates a new state version. pub fn new(version: u64) -> Self { Self(version) } /// Returns the next version. pub fn next(&self) -> Self { Self(self.0 + 1) } } /// A cached state entry with version tracking. #[derive(Clone, Debug)] pub struct CachedState { /// The cached value. pub value: Option, /// Version when cached. pub version: StateVersion, /// Access count for hot tracking. pub access_count: u32, /// Last access timestamp (block height). pub last_access: u64, } impl CachedState { /// Creates a new cached state. pub fn new(value: Option, version: StateVersion) -> Self { Self { value, version, access_count: 1, last_access: 0, } } /// Records an access. pub fn record_access(&mut self, block_height: u64) { self.access_count = self.access_count.saturating_add(1); self.last_access = block_height; } /// Checks if this entry is hot. pub fn is_hot(&self, threshold: u32) -> bool { self.access_count >= threshold } } /// Hot state cache for a single contract. #[derive(Debug)] pub struct ContractCache { /// Contract identifier. pub contract_id: ContractId, /// Cached key-value pairs. entries: HashMap, /// Maximum cache entries. max_size: usize, /// Cache statistics. stats: CacheStats, } impl ContractCache { /// Creates a new contract cache. pub fn new(contract_id: ContractId, max_size: usize) -> Self { Self { contract_id, entries: HashMap::new(), max_size, stats: CacheStats::default(), } } /// Gets a value from cache. pub fn get(&mut self, key: &StorageKey, block_height: u64) -> Option<&CachedState> { if let Some(entry) = self.entries.get_mut(key) { entry.record_access(block_height); self.stats.hits += 1; Some(entry) } else { self.stats.misses += 1; None } } /// Inserts a value into cache. pub fn insert(&mut self, key: StorageKey, value: Option, version: StateVersion) { // Evict if at capacity. if self.entries.len() >= self.max_size { self.evict_coldest(); } self.entries.insert(key, CachedState::new(value, version)); self.stats.inserts += 1; } /// Updates a cached value. pub fn update(&mut self, key: &StorageKey, value: Option, version: StateVersion) { if let Some(entry) = self.entries.get_mut(key) { entry.value = value; entry.version = version; self.stats.updates += 1; } else { self.insert(*key, value, version); } } /// Invalidates entries older than a version. pub fn invalidate_older_than(&mut self, version: StateVersion) { self.entries.retain(|_, v| v.version >= version); self.stats.invalidations += 1; } /// Evicts the coldest (least accessed) entry. fn evict_coldest(&mut self) { if let Some(coldest_key) = self .entries .iter() .min_by_key(|(_, v)| v.access_count) .map(|(k, _)| *k) { self.entries.remove(&coldest_key); self.stats.evictions += 1; } } /// Returns cache statistics. pub fn stats(&self) -> &CacheStats { &self.stats } /// Returns the hit rate. pub fn hit_rate(&self) -> f64 { let total = self.stats.hits + self.stats.misses; if total == 0 { 0.0 } else { self.stats.hits as f64 / total as f64 } } } /// Cache statistics. #[derive(Clone, Debug, Default)] pub struct CacheStats { /// Cache hits. pub hits: u64, /// Cache misses. pub misses: u64, /// Cache inserts. pub inserts: u64, /// Cache updates. pub updates: u64, /// Cache evictions. pub evictions: u64, /// Cache invalidations. pub invalidations: u64, } /// Global hot state cache manager. pub struct HotStateCache { /// Configuration. config: SpeculativeConfig, /// Per-contract caches. caches: RwLock>, /// Current version. current_version: AtomicU64, /// Global statistics. stats: RwLock, } impl HotStateCache { /// Creates a new hot state cache. pub fn new(config: SpeculativeConfig) -> Self { Self { config, caches: RwLock::new(HashMap::new()), current_version: AtomicU64::new(0), stats: RwLock::new(GlobalCacheStats::default()), } } /// Gets the current version. pub fn version(&self) -> StateVersion { StateVersion(self.current_version.load(Ordering::Acquire)) } /// Advances to the next version. pub fn advance_version(&self) -> StateVersion { let new_version = self.current_version.fetch_add(1, Ordering::AcqRel) + 1; StateVersion(new_version) } /// Gets a value from cache. pub fn get( &self, contract_id: ContractId, key: &StorageKey, block_height: u64, ) -> Option> { let mut caches = self.caches.write(); if let Some(cache) = caches.get_mut(&contract_id) { cache.get(key, block_height).map(|s| s.value.clone()) } else { None } } /// Inserts a value into cache. pub fn insert( &self, contract_id: ContractId, key: StorageKey, value: Option, ) { let version = self.advance_version(); let mut caches = self.caches.write(); // Check global limit. let total_entries: usize = caches.values().map(|c| c.entries.len()).sum(); if total_entries >= self.config.global_cache_limit { // Evict from coldest contract. self.evict_coldest_contract(&mut caches); } let cache = caches .entry(contract_id) .or_insert_with(|| ContractCache::new(contract_id, self.config.cache_size_per_contract)); cache.insert(key, value, version); } /// Batch update from execution result. pub fn apply_execution_result( &self, contract_id: ContractId, changes: &[(StorageKey, Option)], ) { let version = self.advance_version(); let mut caches = self.caches.write(); let cache = caches .entry(contract_id) .or_insert_with(|| ContractCache::new(contract_id, self.config.cache_size_per_contract)); for (key, value) in changes { cache.update(key, value.clone(), version); } } /// Invalidates all caches older than a version. pub fn invalidate_older_than(&self, version: StateVersion) { let mut caches = self.caches.write(); for cache in caches.values_mut() { cache.invalidate_older_than(version); } } /// Evicts entries from the coldest contract. fn evict_coldest_contract(&self, caches: &mut HashMap) { // Find contract with lowest average hit rate. if let Some(coldest_id) = caches .iter() .min_by(|(_, a), (_, b)| { a.hit_rate() .partial_cmp(&b.hit_rate()) .unwrap_or(std::cmp::Ordering::Equal) }) .map(|(id, _)| *id) { if let Some(cache) = caches.get_mut(&coldest_id) { cache.evict_coldest(); } } } /// Returns global cache statistics. pub fn stats(&self) -> GlobalCacheStats { let caches = self.caches.read(); let mut stats = GlobalCacheStats::default(); for cache in caches.values() { let cs = cache.stats(); stats.total_hits += cs.hits; stats.total_misses += cs.misses; stats.total_entries += cache.entries.len() as u64; } stats.contract_count = caches.len() as u64; stats } } /// Global cache statistics. #[derive(Clone, Debug, Default)] pub struct GlobalCacheStats { /// Total cache hits. pub total_hits: u64, /// Total cache misses. pub total_misses: u64, /// Total cached entries. pub total_entries: u64, /// Number of cached contracts. pub contract_count: u64, } /// A state snapshot for rollback support. #[derive(Clone, Debug)] pub struct StateSnapshot { /// Snapshot version. pub version: StateVersion, /// Block height when taken. pub block_height: u64, /// State changes to rollback. pub changes: HashMap<(ContractId, StorageKey), Option>, } impl StateSnapshot { /// Creates a new snapshot. pub fn new(version: StateVersion, block_height: u64) -> Self { Self { version, block_height, changes: HashMap::new(), } } /// Records the original value before a change. pub fn record_original( &mut self, contract_id: ContractId, key: StorageKey, original_value: Option, ) { // Only record the first original value (don't overwrite). self.changes .entry((contract_id, key)) .or_insert(original_value); } } /// Speculative execution result. #[derive(Clone, Debug)] pub struct SpeculativeResult { /// Transaction identifier. pub tx_id: [u8; 32], /// Execution result. pub result: Result, /// State version after execution. pub version: StateVersion, /// Contracts accessed (read). pub reads: HashSet, /// Contracts modified (write). pub writes: HashSet, /// State changes made. pub changes: Vec<(ContractId, StorageKey, Option)>, /// Whether this execution can be committed. pub committable: bool, } /// Pending speculative execution. #[derive(Debug)] pub struct PendingExecution { /// Transaction identifier. pub tx_id: [u8; 32], /// Predicted block height. pub predicted_block: u64, /// Execution result. pub result: SpeculativeResult, /// Snapshot for rollback. pub snapshot: StateSnapshot, } /// Speculative executor with rollback support. pub struct SpeculativeExecutor { /// Configuration. config: SpeculativeConfig, /// Hot state cache. cache: Arc, /// Pending executions. pending: Mutex>, /// State snapshots. snapshots: Mutex>, /// Current block height. current_block: AtomicU64, /// Execution statistics. stats: RwLock, } impl SpeculativeExecutor { /// Creates a new speculative executor. pub fn new(config: SpeculativeConfig) -> Self { let cache = Arc::new(HotStateCache::new(config.clone())); Self { config, cache, pending: Mutex::new(VecDeque::new()), snapshots: Mutex::new(VecDeque::new()), current_block: AtomicU64::new(0), stats: RwLock::new(SpeculativeStats::default()), } } /// Returns the hot state cache. pub fn cache(&self) -> Arc { self.cache.clone() } /// Sets the current block height. pub fn set_block_height(&self, height: u64) { self.current_block.store(height, Ordering::Release); } /// Gets the current block height. pub fn block_height(&self) -> u64 { self.current_block.load(Ordering::Acquire) } /// Creates a snapshot for speculative execution. pub fn create_snapshot(&self) -> StateSnapshot { let version = self.cache.advance_version(); let block_height = self.block_height(); let snapshot = StateSnapshot::new(version, block_height); let mut snapshots = self.snapshots.lock(); if snapshots.len() >= self.config.snapshot_retention { snapshots.pop_front(); } snapshots.push_back(snapshot.clone()); snapshot } /// Submits a transaction for speculative execution. pub fn submit_speculative( &self, tx_id: [u8; 32], predicted_block: u64, execute_fn: F, ) -> Result where F: FnOnce() -> Result<(ExecutionResult, SpeculativeContext), VmError>, { // Check depth limit. let current = self.block_height(); if predicted_block.saturating_sub(current) > self.config.max_depth as u64 { return Err(VmError::ExecutionError( "Speculative depth exceeded".to_string(), )); } // Check pending limit. { let pending = self.pending.lock(); if pending.len() >= self.config.max_pending { return Err(VmError::ExecutionError( "Too many pending speculative executions".to_string(), )); } } // Create snapshot. let snapshot = self.create_snapshot(); // Execute speculatively. let (result, context) = execute_fn()?; let spec_result = SpeculativeResult { tx_id, result: Ok(result), version: snapshot.version, reads: context.reads, writes: context.writes, changes: context.changes, committable: true, }; // Store pending execution. let pending_exec = PendingExecution { tx_id, predicted_block, result: spec_result.clone(), snapshot, }; self.pending.lock().push_back(pending_exec); // Update stats. { let mut stats = self.stats.write(); stats.speculative_executions += 1; } Ok(spec_result) } /// Commits a speculative execution. pub fn commit(&self, tx_id: &[u8; 32]) -> Result<(), VmError> { let mut pending = self.pending.lock(); let idx = pending .iter() .position(|p| &p.tx_id == tx_id) .ok_or_else(|| VmError::ExecutionError("Speculative execution not found".to_string()))?; let exec = pending.remove(idx).unwrap(); // Apply changes to cache. for (contract_id, key, value) in &exec.result.changes { self.cache.insert(*contract_id, *key, value.clone()); } // Update stats. { let mut stats = self.stats.write(); stats.committed += 1; } Ok(()) } /// Rolls back a speculative execution. pub fn rollback(&self, tx_id: &[u8; 32]) -> Result<(), VmError> { let mut pending = self.pending.lock(); let idx = pending .iter() .position(|p| &p.tx_id == tx_id) .ok_or_else(|| VmError::ExecutionError("Speculative execution not found".to_string()))?; let exec = pending.remove(idx).unwrap(); // Rollback changes using snapshot. for ((contract_id, key), original_value) in exec.snapshot.changes { self.cache.insert(contract_id, key, original_value); } // Invalidate newer cache entries. self.cache.invalidate_older_than(exec.snapshot.version.next()); // Update stats. { let mut stats = self.stats.write(); stats.rolled_back += 1; } Ok(()) } /// Checks for conflicts with confirmed state. pub fn check_conflicts(&self, tx_id: &[u8; 32]) -> bool { let pending = self.pending.lock(); if let Some(exec) = pending.iter().find(|p| &p.tx_id == tx_id) { // Check if any read contracts were modified by earlier commits. for contract_id in &exec.result.reads { // In a real implementation, check against confirmed state. // For now, assume no conflicts. let _ = contract_id; } false } else { true // Not found means already committed or rolled back. } } /// Prunes old pending executions. pub fn prune_old(&self, min_block: u64) { let mut pending = self.pending.lock(); let old_count = pending.len(); pending.retain(|p| p.predicted_block >= min_block); let pruned = old_count - pending.len(); if pruned > 0 { let mut stats = self.stats.write(); stats.pruned += pruned as u64; } } /// Returns speculation statistics. pub fn stats(&self) -> SpeculativeStats { self.stats.read().clone() } /// Returns pending execution count. pub fn pending_count(&self) -> usize { self.pending.lock().len() } } /// Context collected during speculative execution. #[derive(Clone, Debug, Default)] pub struct SpeculativeContext { /// Contracts read. pub reads: HashSet, /// Contracts written. pub writes: HashSet, /// State changes. pub changes: Vec<(ContractId, StorageKey, Option)>, } impl SpeculativeContext { /// Creates a new speculative context. pub fn new() -> Self { Self::default() } /// Records a read. pub fn record_read(&mut self, contract_id: ContractId) { self.reads.insert(contract_id); } /// Records a write. pub fn record_write( &mut self, contract_id: ContractId, key: StorageKey, value: Option, ) { self.writes.insert(contract_id); self.changes.push((contract_id, key, value)); } } /// Speculation statistics. #[derive(Clone, Debug, Default)] pub struct SpeculativeStats { /// Total speculative executions. pub speculative_executions: u64, /// Successfully committed. pub committed: u64, /// Rolled back due to conflicts. pub rolled_back: u64, /// Pruned due to age. pub pruned: u64, } impl SpeculativeStats { /// Returns the commit rate. pub fn commit_rate(&self) -> f64 { let total = self.committed + self.rolled_back; if total == 0 { 1.0 } else { self.committed as f64 / total as f64 } } } #[cfg(test)] mod tests { use super::*; use crate::Hash256; fn make_contract_id(n: u8) -> ContractId { ContractId::new(Hash256::from_bytes([n; 32])) } fn make_key(n: u8) -> StorageKey { StorageKey([n; 32]) } fn make_value(n: u8) -> StorageValue { StorageValue(vec![n]) } #[test] fn test_contract_cache() { let id = make_contract_id(1); let mut cache = ContractCache::new(id, 10); // Insert and get. let key = make_key(1); cache.insert(key.clone(), Some(make_value(1)), StateVersion(1)); let result = cache.get(&key, 100); assert!(result.is_some()); assert_eq!(result.unwrap().value, Some(make_value(1))); // Check stats. assert_eq!(cache.stats().hits, 1); assert_eq!(cache.stats().inserts, 1); } #[test] fn test_cache_eviction() { let id = make_contract_id(1); let mut cache = ContractCache::new(id, 3); // Fill cache. for i in 0..3 { cache.insert(make_key(i), Some(make_value(i)), StateVersion(1)); } // Access one entry to make it hot. for _ in 0..10 { cache.get(&make_key(1), 100); } // Insert new entry, should evict coldest. cache.insert(make_key(10), Some(make_value(10)), StateVersion(2)); // Hot entry should still be there. assert!(cache.get(&make_key(1), 100).is_some()); // One cold entry should be evicted. assert_eq!(cache.entries.len(), 3); } #[test] fn test_hot_state_cache() { let config = SpeculativeConfig::default(); let cache = HotStateCache::new(config); let contract_id = make_contract_id(1); let key = make_key(1); let value = make_value(42); // Insert. cache.insert(contract_id, key.clone(), Some(value.clone())); // Get. let result = cache.get(contract_id, &key, 100); assert_eq!(result, Some(Some(value))); // Version advanced. assert!(cache.version().0 > 0); } #[test] fn test_speculative_executor() { let config = SpeculativeConfig::default(); let executor = SpeculativeExecutor::new(config); executor.set_block_height(100); assert_eq!(executor.block_height(), 100); // Create snapshot. let snapshot = executor.create_snapshot(); assert!(snapshot.version.0 > 0); } #[test] fn test_speculative_context() { let mut ctx = SpeculativeContext::new(); let contract = make_contract_id(1); ctx.record_read(contract); ctx.record_write(contract, make_key(1), Some(make_value(1))); assert!(ctx.reads.contains(&contract)); assert!(ctx.writes.contains(&contract)); assert_eq!(ctx.changes.len(), 1); } #[test] fn test_state_version_ordering() { let v1 = StateVersion(1); let v2 = StateVersion(2); assert!(v1 < v2); assert_eq!(v1.next(), v2); } #[test] fn test_speculative_stats() { let stats = SpeculativeStats { speculative_executions: 100, committed: 90, rolled_back: 10, pruned: 5, }; assert!((stats.commit_rate() - 0.9).abs() < 0.001); } #[test] fn test_cache_invalidation() { let id = make_contract_id(1); let mut cache = ContractCache::new(id, 10); // Insert entries at different versions. cache.insert(make_key(1), Some(make_value(1)), StateVersion(1)); cache.insert(make_key(2), Some(make_value(2)), StateVersion(2)); cache.insert(make_key(3), Some(make_value(3)), StateVersion(3)); // Invalidate older than version 2. cache.invalidate_older_than(StateVersion(2)); // Version 1 should be gone. assert!(cache.get(&make_key(1), 100).is_none()); // Versions 2 and 3 should remain. assert!(cache.get(&make_key(2), 100).is_some()); assert!(cache.get(&make_key(3), 100).is_some()); } }