//! Stress tests for high throughput scenarios. //! //! These tests verify: //! - High transaction throughput (TPS) //! - Network under load //! - Memory and resource management //! - Concurrent operations //! - Block production under pressure //! - Large DAG handling use std::sync::Arc; use std::time::{Duration, Instant}; use tempfile::TempDir; use tokio::sync::Semaphore; use tokio::time::sleep; use tracing::info; use synord::config::NodeConfig; use synord::node::{NodeState, SynorNode}; // ==================== Test Constants ==================== /// Timeout for stress test operations. #[allow(dead_code)] const STRESS_TIMEOUT: Duration = Duration::from_secs(60); /// Number of concurrent operations for stress tests. const CONCURRENT_OPS: usize = 100; // ==================== Test Helpers ==================== /// Creates a test node configuration optimized for stress testing. fn create_stress_config(temp_dir: &TempDir, node_index: u16) -> NodeConfig { let mut config = NodeConfig::for_network("devnet").unwrap(); config.data_dir = temp_dir.path().join(format!("stress_node_{}", node_index)); config.mining.enabled = false; // Use unique ports let port_base = 20000 + (std::process::id() % 500) as u16 * 10 + node_index * 3; config.p2p.listen_addr = format!("/ip4/127.0.0.1/tcp/{}", port_base); config.rpc.http_addr = format!("127.0.0.1:{}", port_base + 1); config.rpc.ws_addr = format!("127.0.0.1:{}", port_base + 2); // Increase limits for stress testing config.p2p.max_inbound = 200; config.p2p.max_outbound = 50; config.rpc.max_connections = 200; config.rpc.rate_limit = 0; // No rate limit for stress tests config } /// Stress test network configuration. struct StressTestNetwork { nodes: Vec>, _temp_dirs: Vec, } impl StressTestNetwork { async fn new(node_count: usize) -> anyhow::Result { let mut temp_dirs = Vec::new(); let mut nodes = Vec::new(); // First node (seed) let temp = TempDir::new()?; let seed_port = 20000 + (std::process::id() % 500) as u16 * 10; let config = create_stress_config(&temp, 0); temp_dirs.push(temp); nodes.push(Arc::new(SynorNode::new(config).await?)); // Other nodes connect to seed for i in 1..node_count { let temp = TempDir::new()?; let mut config = create_stress_config(&temp, i as u16); config.p2p.seeds = vec![format!("/ip4/127.0.0.1/tcp/{}", seed_port)]; temp_dirs.push(temp); nodes.push(Arc::new(SynorNode::new(config).await?)); } Ok(StressTestNetwork { nodes, _temp_dirs: temp_dirs, }) } async fn start_all(&self) -> anyhow::Result<()> { for node in &self.nodes { node.start().await?; } Ok(()) } async fn stop_all(&self) -> anyhow::Result<()> { for node in &self.nodes { node.stop().await?; } Ok(()) } } // ==================== Concurrent Query Tests ==================== #[tokio::test] async fn test_concurrent_consensus_queries() { let temp_dir = TempDir::new().unwrap(); let config = create_stress_config(&temp_dir, 0); let node = Arc::new(SynorNode::new(config).await.unwrap()); node.start().await.unwrap(); sleep(Duration::from_secs(1)).await; let start = Instant::now(); let mut handles = Vec::new(); // Spawn concurrent consensus queries for i in 0..CONCURRENT_OPS { let node_clone = node.clone(); let handle = tokio::spawn(async move { let consensus = node_clone.consensus(); // Mix of different query types match i % 5 { 0 => { let _ = consensus.current_blue_score().await; } 1 => { let _ = consensus.current_daa_score().await; } 2 => { let _: Vec<[u8; 32]> = consensus.tips().await; } 3 => { let _ = consensus.current_height().await; } _ => { let _ = consensus.current_difficulty().await; } } }); handles.push(handle); } // Wait for all queries for handle in handles { handle.await.unwrap(); } let elapsed = start.elapsed(); let qps = CONCURRENT_OPS as f64 / elapsed.as_secs_f64(); info!( queries = CONCURRENT_OPS, elapsed_ms = elapsed.as_millis(), qps = qps, "Concurrent consensus query performance" ); // Should handle 100 concurrent queries quickly assert!( elapsed < Duration::from_secs(5), "Concurrent queries took too long" ); node.stop().await.unwrap(); } #[tokio::test] async fn test_concurrent_network_queries() { let temp_dir = TempDir::new().unwrap(); let config = create_stress_config(&temp_dir, 0); let node = Arc::new(SynorNode::new(config).await.unwrap()); node.start().await.unwrap(); sleep(Duration::from_secs(1)).await; let start = Instant::now(); let mut handles = Vec::new(); for i in 0..CONCURRENT_OPS { let node_clone = node.clone(); let handle = tokio::spawn(async move { let network = node_clone.network(); match i % 3 { 0 => { let _ = network.peer_count().await; } 1 => { let _ = network.peers().await; } _ => { let _ = network.stats().await; } } }); handles.push(handle); } for handle in handles { handle.await.unwrap(); } let elapsed = start.elapsed(); info!( queries = CONCURRENT_OPS, elapsed_ms = elapsed.as_millis(), "Concurrent network query performance" ); assert!( elapsed < Duration::from_secs(5), "Network queries took too long" ); node.stop().await.unwrap(); } // ==================== Multi-Node Stress Tests ==================== #[tokio::test] async fn test_multi_node_concurrent_queries() { let network = StressTestNetwork::new(3).await.unwrap(); network.start_all().await.unwrap(); sleep(Duration::from_secs(2)).await; let start = Instant::now(); let mut handles = Vec::new(); // Distribute queries across all nodes for i in 0..CONCURRENT_OPS { let node = network.nodes[i % network.nodes.len()].clone(); let handle = tokio::spawn(async move { let consensus = node.consensus(); let _ = consensus.current_blue_score().await; let _: Vec<[u8; 32]> = consensus.tips().await; }); handles.push(handle); } for handle in handles { handle.await.unwrap(); } let elapsed = start.elapsed(); info!( queries = CONCURRENT_OPS, nodes = network.nodes.len(), elapsed_ms = elapsed.as_millis(), "Multi-node concurrent query performance" ); network.stop_all().await.unwrap(); } #[tokio::test] async fn test_many_node_network() { // Test with more nodes let node_count = 5; let network = StressTestNetwork::new(node_count).await.unwrap(); network.start_all().await.unwrap(); // Allow mesh to form sleep(Duration::from_secs(5)).await; // Check connectivity let mut total_peers = 0; for (i, node) in network.nodes.iter().enumerate() { let net = node.network(); let peers = net.peer_count().await; total_peers += peers; info!(node = i, peers = peers, "Node peer count"); } info!( total_peers = total_peers, avg_peers = total_peers / node_count, "Network connectivity" ); // With 5 nodes, should have reasonable connectivity assert!(total_peers > 0, "Network should have connections"); network.stop_all().await.unwrap(); } // ==================== Block Announcement Flood Tests ==================== #[tokio::test] async fn test_block_announcement_flood() { let network = StressTestNetwork::new(2).await.unwrap(); network.start_all().await.unwrap(); sleep(Duration::from_secs(2)).await; let start = Instant::now(); // Flood with block announcements let net0 = network.nodes[0].network(); for i in 0..100 { let hash = [i as u8; 32]; net0.announce_block(hash).await; } let elapsed = start.elapsed(); info!( announcements = 100, elapsed_ms = elapsed.as_millis(), rate = 100.0 / elapsed.as_secs_f64(), "Block announcement flood rate" ); // Should handle rapid announcements assert!( elapsed < Duration::from_secs(5), "Block announcements too slow" ); // Node should remain stable assert_eq!(network.nodes[0].state().await, NodeState::Running); assert_eq!(network.nodes[1].state().await, NodeState::Running); network.stop_all().await.unwrap(); } // ==================== Memory and Resource Tests ==================== #[tokio::test] async fn test_long_running_queries() { let temp_dir = TempDir::new().unwrap(); let config = create_stress_config(&temp_dir, 0); let node = Arc::new(SynorNode::new(config).await.unwrap()); node.start().await.unwrap(); sleep(Duration::from_secs(1)).await; // Run queries over an extended period let iterations = 50; let start = Instant::now(); for i in 0..iterations { let consensus = node.consensus(); let _ = consensus.current_blue_score().await; let _: Vec<[u8; 32]> = consensus.tips().await; let _: Vec<[u8; 32]> = consensus.get_selected_chain(100).await; if i % 10 == 0 { info!(iteration = i, "Long-running query progress"); } } let elapsed = start.elapsed(); info!( iterations = iterations, elapsed_ms = elapsed.as_millis(), avg_ms = elapsed.as_millis() / iterations as u128, "Long-running query performance" ); // Node should remain healthy assert_eq!(node.state().await, NodeState::Running); node.stop().await.unwrap(); } #[tokio::test] async fn test_rapid_start_stop_cycles() { let temp_dir = TempDir::new().unwrap(); for cycle in 0..5 { info!(cycle = cycle, "Start/stop cycle"); let config = create_stress_config(&temp_dir, 0); let node = SynorNode::new(config).await.unwrap(); node.start().await.unwrap(); sleep(Duration::from_millis(500)).await; assert_eq!(node.state().await, NodeState::Running); node.stop().await.unwrap(); assert_eq!(node.state().await, NodeState::Stopped); } info!("All start/stop cycles completed successfully"); } // ==================== Sync Service Stress Tests ==================== #[tokio::test] async fn test_concurrent_sync_queries() { let temp_dir = TempDir::new().unwrap(); let config = create_stress_config(&temp_dir, 0); let node = Arc::new(SynorNode::new(config).await.unwrap()); node.start().await.unwrap(); sleep(Duration::from_secs(1)).await; let start = Instant::now(); let mut handles = Vec::new(); for _ in 0..CONCURRENT_OPS { let node_clone = node.clone(); let handle = tokio::spawn(async move { let sync = node_clone.sync(); let _ = sync.state().await; let _ = sync.progress().await; let _ = sync.is_synced().await; }); handles.push(handle); } for handle in handles { handle.await.unwrap(); } let elapsed = start.elapsed(); info!( queries = CONCURRENT_OPS * 3, // 3 calls per iteration elapsed_ms = elapsed.as_millis(), "Concurrent sync service query performance" ); node.stop().await.unwrap(); } // ==================== Mempool Stress Tests ==================== #[tokio::test] async fn test_mempool_concurrent_access() { let temp_dir = TempDir::new().unwrap(); let config = create_stress_config(&temp_dir, 0); let node = Arc::new(SynorNode::new(config).await.unwrap()); node.start().await.unwrap(); sleep(Duration::from_secs(1)).await; let start = Instant::now(); let mut handles = Vec::new(); for _ in 0..CONCURRENT_OPS { let node_clone = node.clone(); let handle = tokio::spawn(async move { let mempool = node_clone.mempool(); let _ = mempool.size().await; }); handles.push(handle); } for handle in handles { handle.await.unwrap(); } let elapsed = start.elapsed(); info!( queries = CONCURRENT_OPS, elapsed_ms = elapsed.as_millis(), "Concurrent mempool access performance" ); node.stop().await.unwrap(); } // ==================== Connection Stress Tests ==================== #[tokio::test] async fn test_rapid_peer_operations() { let network = StressTestNetwork::new(2).await.unwrap(); network.start_all().await.unwrap(); sleep(Duration::from_secs(2)).await; let net0 = network.nodes[0].network(); // Rapid peer list queries let start = Instant::now(); for _ in 0..50 { let _ = net0.peers().await; let _ = net0.peer_count().await; } let elapsed = start.elapsed(); info!( operations = 100, elapsed_ms = elapsed.as_millis(), "Rapid peer operations" ); network.stop_all().await.unwrap(); } #[tokio::test] async fn test_subscription_under_load() { let network = StressTestNetwork::new(2).await.unwrap(); network.start_all().await.unwrap(); sleep(Duration::from_secs(2)).await; // Create multiple subscriptions let net0 = network.nodes[0].network(); let net1 = network.nodes[1].network(); let mut subscriptions = Vec::new(); for _ in 0..10 { subscriptions.push(net0.subscribe()); subscriptions.push(net1.subscribe()); } // Send announcements while subscriptions exist for i in 0..50 { net0.announce_block([i as u8; 32]).await; } sleep(Duration::from_millis(500)).await; // Subscriptions should not cause issues assert_eq!(network.nodes[0].state().await, NodeState::Running); network.stop_all().await.unwrap(); } // ==================== Throughput Measurement Tests ==================== #[tokio::test] async fn test_consensus_throughput() { let temp_dir = TempDir::new().unwrap(); let config = create_stress_config(&temp_dir, 0); let node = Arc::new(SynorNode::new(config).await.unwrap()); node.start().await.unwrap(); sleep(Duration::from_secs(1)).await; let consensus = node.consensus(); // Measure throughput of different operations let operations = 1000; // Blue score queries let start = Instant::now(); for _ in 0..operations { let _ = consensus.current_blue_score().await; } let blue_score_elapsed = start.elapsed(); // Tips queries let start = Instant::now(); for _ in 0..operations { let _: Vec<[u8; 32]> = consensus.tips().await; } let tips_elapsed = start.elapsed(); // Height queries let start = Instant::now(); for _ in 0..operations { let _ = consensus.current_height().await; } let height_elapsed = start.elapsed(); info!( blue_score_qps = operations as f64 / blue_score_elapsed.as_secs_f64(), tips_qps = operations as f64 / tips_elapsed.as_secs_f64(), height_qps = operations as f64 / height_elapsed.as_secs_f64(), "Consensus operation throughput" ); node.stop().await.unwrap(); } // ==================== Semaphore-Limited Stress Tests ==================== #[tokio::test] async fn test_bounded_concurrent_operations() { let temp_dir = TempDir::new().unwrap(); let config = create_stress_config(&temp_dir, 0); let node = Arc::new(SynorNode::new(config).await.unwrap()); node.start().await.unwrap(); sleep(Duration::from_secs(1)).await; // Limit concurrency with semaphore let semaphore = Arc::new(Semaphore::new(50)); let mut handles = Vec::new(); let start = Instant::now(); for _ in 0..500 { let node_clone = node.clone(); let sem_clone = semaphore.clone(); let handle = tokio::spawn(async move { let _permit = sem_clone.acquire().await.unwrap(); let consensus = node_clone.consensus(); let _ = consensus.current_blue_score().await; }); handles.push(handle); } for handle in handles { handle.await.unwrap(); } let elapsed = start.elapsed(); info!( total_ops = 500, max_concurrent = 50, elapsed_ms = elapsed.as_millis(), ops_per_sec = 500.0 / elapsed.as_secs_f64(), "Bounded concurrent operations" ); node.stop().await.unwrap(); } // ==================== Node Info Stress Tests ==================== #[tokio::test] async fn test_info_endpoint_stress() { let temp_dir = TempDir::new().unwrap(); let config = create_stress_config(&temp_dir, 0); let node = Arc::new(SynorNode::new(config).await.unwrap()); node.start().await.unwrap(); sleep(Duration::from_secs(1)).await; let start = Instant::now(); for _ in 0..100 { let info = node.info().await; // Verify info is valid assert!(!info.network.is_empty()); } let elapsed = start.elapsed(); info!( queries = 100, elapsed_ms = elapsed.as_millis(), qps = 100.0 / elapsed.as_secs_f64(), "Node info endpoint throughput" ); node.stop().await.unwrap(); } // ==================== Stability Under Load ==================== #[tokio::test] async fn test_stability_under_mixed_load() { let network = StressTestNetwork::new(3).await.unwrap(); network.start_all().await.unwrap(); sleep(Duration::from_secs(2)).await; let start = Instant::now(); let mut handles = Vec::new(); // Mix of different operations for i in 0..150 { let node = network.nodes[i % network.nodes.len()].clone(); let handle = tokio::spawn(async move { match i % 6 { 0 => { let consensus = node.consensus(); let _: Vec<[u8; 32]> = consensus.tips().await; } 1 => { let network_svc = node.network(); let _ = network_svc.peers().await; } 2 => { let sync = node.sync(); let _ = sync.progress().await; } 3 => { let mempool = node.mempool(); let _ = mempool.size().await; } 4 => { let _ = node.info().await; } _ => { let consensus = node.consensus(); let _ = consensus.current_blue_score().await; } } }); handles.push(handle); } for handle in handles { handle.await.unwrap(); } let elapsed = start.elapsed(); info!( operations = 150, elapsed_ms = elapsed.as_millis(), "Mixed load test completed" ); // All nodes should remain running for (i, node) in network.nodes.iter().enumerate() { assert_eq!( node.state().await, NodeState::Running, "Node {} should be running", i ); } network.stop_all().await.unwrap(); }