//! Dynamic resharding for load balancing and scaling. //! //! Handles shard splits and merges based on load and network conditions. use serde::{Deserialize, Serialize}; use crate::{ShardConfig, ShardId}; /// Resharding event types. #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ReshardEvent { /// Split a shard into multiple shards. Split { /// Original shard. shard_id: ShardId, /// New shard IDs after split. new_shards: Vec, }, /// Merge multiple shards into one. Merge { /// Shards to merge. shards: Vec, /// Target shard ID. into: ShardId, }, } /// Resharding status. #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum ReshardStatus { /// No resharding in progress. Idle, /// Resharding planned, waiting for finalization. Planned, /// State migration in progress. Migrating, /// Validation in progress. Validating, /// Resharding complete. Complete, /// Resharding failed. Failed, } /// Metrics for resharding decisions. #[derive(Clone, Debug)] pub struct ShardMetrics { /// Shard ID. pub shard_id: ShardId, /// Average TPS over measurement period. pub avg_tps: f64, /// Peak TPS observed. pub peak_tps: f64, /// Number of accounts. pub account_count: u64, /// State size in bytes. pub state_size: u64, /// Number of validators. pub validator_count: usize, } impl ShardMetrics { /// Creates metrics for a shard. pub fn new(shard_id: ShardId) -> Self { Self { shard_id, avg_tps: 0.0, peak_tps: 0.0, account_count: 0, state_size: 0, validator_count: 0, } } } /// Manages dynamic resharding operations. pub struct ReshardManager { /// Configuration. config: ShardConfig, /// Current status. status: ReshardStatus, /// Per-shard metrics. metrics: Vec, /// TPS threshold to trigger split. split_threshold_tps: f64, /// TPS threshold to trigger merge. merge_threshold_tps: f64, /// Minimum shards (cannot merge below this). min_shards: u16, /// Maximum shards (cannot split above this). max_shards: u16, /// Next available shard ID. next_shard_id: ShardId, } impl ReshardManager { /// Creates a new reshard manager. pub fn new(config: ShardConfig) -> Self { let mut metrics = Vec::new(); for i in 0..config.num_shards { metrics.push(ShardMetrics::new(i)); } Self { min_shards: config.num_shards, max_shards: config.num_shards * 4, next_shard_id: config.num_shards, config, status: ReshardStatus::Idle, metrics, split_threshold_tps: 2500.0, // 80% of target merge_threshold_tps: 500.0, // 16% of target } } /// Updates metrics for a shard. pub fn update_metrics(&mut self, shard_id: ShardId, metrics: ShardMetrics) { if let Some(m) = self.metrics.iter_mut().find(|m| m.shard_id == shard_id) { *m = metrics; } } /// Checks if resharding is needed based on current metrics. pub fn check_reshard_needed(&self) -> Option { if !self.config.dynamic_resharding || self.status != ReshardStatus::Idle { return None; } // Check for overloaded shards (need split) for metric in &self.metrics { if metric.avg_tps > self.split_threshold_tps && self.metrics.len() < self.max_shards as usize { return Some(self.plan_split(metric.shard_id)); } } // Check for underutilized shards (need merge) let underutilized: Vec<_> = self.metrics .iter() .filter(|m| m.avg_tps < self.merge_threshold_tps) .collect(); if underutilized.len() >= 2 && self.metrics.len() > self.min_shards as usize { let shard1 = underutilized[0].shard_id; let shard2 = underutilized[1].shard_id; return Some(self.plan_merge(vec![shard1, shard2])); } None } /// Plans a shard split. fn plan_split(&self, shard_id: ShardId) -> ReshardEvent { // Split into 2 new shards let new_shard1 = self.next_shard_id; let new_shard2 = self.next_shard_id + 1; ReshardEvent::Split { shard_id, new_shards: vec![new_shard1, new_shard2], } } /// Plans a shard merge. fn plan_merge(&self, shards: Vec) -> ReshardEvent { // Merge into the lowest shard ID let into = *shards.iter().min().unwrap_or(&0); ReshardEvent::Merge { shards, into } } /// Executes a resharding event. pub fn execute(&mut self, event: &ReshardEvent) { self.status = ReshardStatus::Migrating; match event { ReshardEvent::Split { shard_id, new_shards } => { tracing::info!("Executing split of shard {} into {:?}", shard_id, new_shards); // Remove old shard metrics self.metrics.retain(|m| m.shard_id != *shard_id); // Add new shard metrics for &new_id in new_shards { self.metrics.push(ShardMetrics::new(new_id)); } // Update next shard ID if let Some(&max_id) = new_shards.iter().max() { self.next_shard_id = max_id + 1; } } ReshardEvent::Merge { shards, into } => { tracing::info!("Executing merge of shards {:?} into {}", shards, into); // Remove merged shards (except target) self.metrics.retain(|m| m.shard_id == *into || !shards.contains(&m.shard_id)); } } self.status = ReshardStatus::Complete; } /// Gets the current resharding status. pub fn status(&self) -> ReshardStatus { self.status } /// Gets metrics for all shards. pub fn get_all_metrics(&self) -> &[ShardMetrics] { &self.metrics } /// Gets the current number of shards. pub fn num_shards(&self) -> usize { self.metrics.len() } /// Resets status to idle. pub fn reset_status(&mut self) { self.status = ReshardStatus::Idle; } /// Sets the split threshold. pub fn set_split_threshold(&mut self, tps: f64) { self.split_threshold_tps = tps; } /// Sets the merge threshold. pub fn set_merge_threshold(&mut self, tps: f64) { self.merge_threshold_tps = tps; } /// Calculates the total TPS across all shards. pub fn total_tps(&self) -> f64 { self.metrics.iter().map(|m| m.avg_tps).sum() } /// Calculates load variance across shards. pub fn load_variance(&self) -> f64 { if self.metrics.is_empty() { return 0.0; } let avg = self.total_tps() / self.metrics.len() as f64; let variance: f64 = self.metrics .iter() .map(|m| (m.avg_tps - avg).powi(2)) .sum::() / self.metrics.len() as f64; variance.sqrt() } } #[cfg(test)] mod tests { use super::*; #[test] fn test_reshard_manager_creation() { let config = ShardConfig::with_shards(4); let manager = ReshardManager::new(config); assert_eq!(manager.num_shards(), 4); assert_eq!(manager.status(), ReshardStatus::Idle); } #[test] fn test_no_reshard_when_disabled() { let mut config = ShardConfig::with_shards(4); config.dynamic_resharding = false; let manager = ReshardManager::new(config); assert!(manager.check_reshard_needed().is_none()); } #[test] fn test_split_on_high_load() { let config = ShardConfig::with_shards(4); let mut manager = ReshardManager::new(config); manager.set_split_threshold(1000.0); // Set high TPS on shard 0 let mut metrics = ShardMetrics::new(0); metrics.avg_tps = 2000.0; manager.update_metrics(0, metrics); let event = manager.check_reshard_needed(); assert!(matches!(event, Some(ReshardEvent::Split { shard_id: 0, .. }))); } #[test] fn test_merge_on_low_load() { let config = ShardConfig::with_shards(8); let mut manager = ReshardManager::new(config); manager.min_shards = 4; // Allow merging down to 4 // Set low TPS on two shards let mut metrics0 = ShardMetrics::new(0); metrics0.avg_tps = 100.0; manager.update_metrics(0, metrics0); let mut metrics1 = ShardMetrics::new(1); metrics1.avg_tps = 50.0; manager.update_metrics(1, metrics1); let event = manager.check_reshard_needed(); assert!(matches!(event, Some(ReshardEvent::Merge { .. }))); } #[test] fn test_execute_split() { let config = ShardConfig::with_shards(2); let mut manager = ReshardManager::new(config); let event = ReshardEvent::Split { shard_id: 0, new_shards: vec![2, 3], }; manager.execute(&event); // Should now have 3 shards (1, 2, 3) assert_eq!(manager.num_shards(), 3); assert_eq!(manager.status(), ReshardStatus::Complete); } #[test] fn test_execute_merge() { let config = ShardConfig::with_shards(4); let mut manager = ReshardManager::new(config); let event = ReshardEvent::Merge { shards: vec![0, 1], into: 0, }; manager.execute(&event); // Should now have 3 shards (0, 2, 3) assert_eq!(manager.num_shards(), 3); } #[test] fn test_load_variance() { let config = ShardConfig::with_shards(4); let mut manager = ReshardManager::new(config); // Set uniform load for i in 0..4 { let mut m = ShardMetrics::new(i); m.avg_tps = 1000.0; manager.update_metrics(i, m); } assert_eq!(manager.load_variance(), 0.0); // Set uneven load let mut m = ShardMetrics::new(0); m.avg_tps = 2000.0; manager.update_metrics(0, m); assert!(manager.load_variance() > 0.0); } }