synor/apps/synord/tests/stress_tests.rs
Gulshan Yadav 48949ebb3f Initial commit: Synor blockchain monorepo
A complete blockchain implementation featuring:
- synord: Full node with GHOSTDAG consensus
- explorer-web: Modern React blockchain explorer with 3D DAG visualization
- CLI wallet and tools
- Smart contract SDK and example contracts (DEX, NFT, token)
- WASM crypto library for browser/mobile
2026-01-08 05:22:17 +05:30

718 lines
19 KiB
Rust

//! Stress tests for high throughput scenarios.
//!
//! These tests verify:
//! - High transaction throughput (TPS)
//! - Network under load
//! - Memory and resource management
//! - Concurrent operations
//! - Block production under pressure
//! - Large DAG handling
use std::sync::Arc;
use std::time::{Duration, Instant};
use tempfile::TempDir;
use tokio::sync::Semaphore;
use tokio::time::sleep;
use tracing::info;
use synord::config::NodeConfig;
use synord::node::{NodeState, SynorNode};
// ==================== Test Constants ====================
/// Timeout for stress test operations.
const STRESS_TIMEOUT: Duration = Duration::from_secs(60);
/// Number of concurrent operations for stress tests.
const CONCURRENT_OPS: usize = 100;
// ==================== Test Helpers ====================
/// Creates a test node configuration optimized for stress testing.
fn create_stress_config(temp_dir: &TempDir, node_index: u16) -> NodeConfig {
let mut config = NodeConfig::for_network("devnet").unwrap();
config.data_dir = temp_dir.path().join(format!("stress_node_{}", node_index));
config.mining.enabled = false;
// Use unique ports
let port_base = 20000 + (std::process::id() % 500) as u16 * 10 + node_index * 3;
config.p2p.listen_addr = format!("/ip4/127.0.0.1/tcp/{}", port_base);
config.rpc.http_addr = format!("127.0.0.1:{}", port_base + 1);
config.rpc.ws_addr = format!("127.0.0.1:{}", port_base + 2);
// Increase limits for stress testing
config.p2p.max_inbound = 200;
config.p2p.max_outbound = 50;
config.rpc.max_connections = 200;
config.rpc.rate_limit = 0; // No rate limit for stress tests
config
}
/// Stress test network configuration.
struct StressTestNetwork {
nodes: Vec<Arc<SynorNode>>,
_temp_dirs: Vec<TempDir>,
}
impl StressTestNetwork {
async fn new(node_count: usize) -> anyhow::Result<Self> {
let mut temp_dirs = Vec::new();
let mut nodes = Vec::new();
// First node (seed)
let temp = TempDir::new()?;
let seed_port = 20000 + (std::process::id() % 500) as u16 * 10;
let mut config = create_stress_config(&temp, 0);
temp_dirs.push(temp);
nodes.push(Arc::new(SynorNode::new(config).await?));
// Other nodes connect to seed
for i in 1..node_count {
let temp = TempDir::new()?;
let mut config = create_stress_config(&temp, i as u16);
config.p2p.seeds = vec![format!("/ip4/127.0.0.1/tcp/{}", seed_port)];
temp_dirs.push(temp);
nodes.push(Arc::new(SynorNode::new(config).await?));
}
Ok(StressTestNetwork {
nodes,
_temp_dirs: temp_dirs,
})
}
async fn start_all(&self) -> anyhow::Result<()> {
for node in &self.nodes {
node.start().await?;
}
Ok(())
}
async fn stop_all(&self) -> anyhow::Result<()> {
for node in &self.nodes {
node.stop().await?;
}
Ok(())
}
}
// ==================== Concurrent Query Tests ====================
#[tokio::test]
async fn test_concurrent_consensus_queries() {
let temp_dir = TempDir::new().unwrap();
let config = create_stress_config(&temp_dir, 0);
let node = Arc::new(SynorNode::new(config).await.unwrap());
node.start().await.unwrap();
sleep(Duration::from_secs(1)).await;
let start = Instant::now();
let mut handles = Vec::new();
// Spawn concurrent consensus queries
for i in 0..CONCURRENT_OPS {
let node_clone = node.clone();
let handle = tokio::spawn(async move {
let consensus = node_clone.consensus();
// Mix of different query types
match i % 5 {
0 => {
let _ = consensus.current_blue_score().await;
}
1 => {
let _ = consensus.current_daa_score().await;
}
2 => {
let _: Vec<[u8; 32]> = consensus.tips().await;
}
3 => {
let _ = consensus.current_height().await;
}
_ => {
let _ = consensus.current_difficulty().await;
}
}
});
handles.push(handle);
}
// Wait for all queries
for handle in handles {
handle.await.unwrap();
}
let elapsed = start.elapsed();
let qps = CONCURRENT_OPS as f64 / elapsed.as_secs_f64();
info!(
queries = CONCURRENT_OPS,
elapsed_ms = elapsed.as_millis(),
qps = qps,
"Concurrent consensus query performance"
);
// Should handle 100 concurrent queries quickly
assert!(
elapsed < Duration::from_secs(5),
"Concurrent queries took too long"
);
node.stop().await.unwrap();
}
#[tokio::test]
async fn test_concurrent_network_queries() {
let temp_dir = TempDir::new().unwrap();
let config = create_stress_config(&temp_dir, 0);
let node = Arc::new(SynorNode::new(config).await.unwrap());
node.start().await.unwrap();
sleep(Duration::from_secs(1)).await;
let start = Instant::now();
let mut handles = Vec::new();
for i in 0..CONCURRENT_OPS {
let node_clone = node.clone();
let handle = tokio::spawn(async move {
let network = node_clone.network();
match i % 3 {
0 => {
let _ = network.peer_count().await;
}
1 => {
let _ = network.peers().await;
}
_ => {
let _ = network.stats().await;
}
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let elapsed = start.elapsed();
info!(
queries = CONCURRENT_OPS,
elapsed_ms = elapsed.as_millis(),
"Concurrent network query performance"
);
assert!(
elapsed < Duration::from_secs(5),
"Network queries took too long"
);
node.stop().await.unwrap();
}
// ==================== Multi-Node Stress Tests ====================
#[tokio::test]
async fn test_multi_node_concurrent_queries() {
let network = StressTestNetwork::new(3).await.unwrap();
network.start_all().await.unwrap();
sleep(Duration::from_secs(2)).await;
let start = Instant::now();
let mut handles = Vec::new();
// Distribute queries across all nodes
for i in 0..CONCURRENT_OPS {
let node = network.nodes[i % network.nodes.len()].clone();
let handle = tokio::spawn(async move {
let consensus = node.consensus();
let _ = consensus.current_blue_score().await;
let _: Vec<[u8; 32]> = consensus.tips().await;
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let elapsed = start.elapsed();
info!(
queries = CONCURRENT_OPS,
nodes = network.nodes.len(),
elapsed_ms = elapsed.as_millis(),
"Multi-node concurrent query performance"
);
network.stop_all().await.unwrap();
}
#[tokio::test]
async fn test_many_node_network() {
// Test with more nodes
let node_count = 5;
let network = StressTestNetwork::new(node_count).await.unwrap();
network.start_all().await.unwrap();
// Allow mesh to form
sleep(Duration::from_secs(5)).await;
// Check connectivity
let mut total_peers = 0;
for (i, node) in network.nodes.iter().enumerate() {
let net = node.network();
let peers = net.peer_count().await;
total_peers += peers;
info!(node = i, peers = peers, "Node peer count");
}
info!(
total_peers = total_peers,
avg_peers = total_peers / node_count,
"Network connectivity"
);
// With 5 nodes, should have reasonable connectivity
assert!(total_peers > 0, "Network should have connections");
network.stop_all().await.unwrap();
}
// ==================== Block Announcement Flood Tests ====================
#[tokio::test]
async fn test_block_announcement_flood() {
let network = StressTestNetwork::new(2).await.unwrap();
network.start_all().await.unwrap();
sleep(Duration::from_secs(2)).await;
let start = Instant::now();
// Flood with block announcements
let net0 = network.nodes[0].network();
for i in 0..100 {
let hash = [i as u8; 32];
net0.announce_block(hash).await;
}
let elapsed = start.elapsed();
info!(
announcements = 100,
elapsed_ms = elapsed.as_millis(),
rate = 100.0 / elapsed.as_secs_f64(),
"Block announcement flood rate"
);
// Should handle rapid announcements
assert!(
elapsed < Duration::from_secs(5),
"Block announcements too slow"
);
// Node should remain stable
assert_eq!(network.nodes[0].state().await, NodeState::Running);
assert_eq!(network.nodes[1].state().await, NodeState::Running);
network.stop_all().await.unwrap();
}
// ==================== Memory and Resource Tests ====================
#[tokio::test]
async fn test_long_running_queries() {
let temp_dir = TempDir::new().unwrap();
let config = create_stress_config(&temp_dir, 0);
let node = Arc::new(SynorNode::new(config).await.unwrap());
node.start().await.unwrap();
sleep(Duration::from_secs(1)).await;
// Run queries over an extended period
let iterations = 50;
let start = Instant::now();
for i in 0..iterations {
let consensus = node.consensus();
let _ = consensus.current_blue_score().await;
let _: Vec<[u8; 32]> = consensus.tips().await;
let _: Vec<[u8; 32]> = consensus.get_selected_chain(100).await;
if i % 10 == 0 {
info!(iteration = i, "Long-running query progress");
}
}
let elapsed = start.elapsed();
info!(
iterations = iterations,
elapsed_ms = elapsed.as_millis(),
avg_ms = elapsed.as_millis() / iterations as u128,
"Long-running query performance"
);
// Node should remain healthy
assert_eq!(node.state().await, NodeState::Running);
node.stop().await.unwrap();
}
#[tokio::test]
async fn test_rapid_start_stop_cycles() {
let temp_dir = TempDir::new().unwrap();
for cycle in 0..5 {
info!(cycle = cycle, "Start/stop cycle");
let config = create_stress_config(&temp_dir, 0);
let node = SynorNode::new(config).await.unwrap();
node.start().await.unwrap();
sleep(Duration::from_millis(500)).await;
assert_eq!(node.state().await, NodeState::Running);
node.stop().await.unwrap();
assert_eq!(node.state().await, NodeState::Stopped);
}
info!("All start/stop cycles completed successfully");
}
// ==================== Sync Service Stress Tests ====================
#[tokio::test]
async fn test_concurrent_sync_queries() {
let temp_dir = TempDir::new().unwrap();
let config = create_stress_config(&temp_dir, 0);
let node = Arc::new(SynorNode::new(config).await.unwrap());
node.start().await.unwrap();
sleep(Duration::from_secs(1)).await;
let start = Instant::now();
let mut handles = Vec::new();
for _ in 0..CONCURRENT_OPS {
let node_clone = node.clone();
let handle = tokio::spawn(async move {
let sync = node_clone.sync();
let _ = sync.state().await;
let _ = sync.progress().await;
let _ = sync.is_synced().await;
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let elapsed = start.elapsed();
info!(
queries = CONCURRENT_OPS * 3, // 3 calls per iteration
elapsed_ms = elapsed.as_millis(),
"Concurrent sync service query performance"
);
node.stop().await.unwrap();
}
// ==================== Mempool Stress Tests ====================
#[tokio::test]
async fn test_mempool_concurrent_access() {
let temp_dir = TempDir::new().unwrap();
let config = create_stress_config(&temp_dir, 0);
let node = Arc::new(SynorNode::new(config).await.unwrap());
node.start().await.unwrap();
sleep(Duration::from_secs(1)).await;
let start = Instant::now();
let mut handles = Vec::new();
for _ in 0..CONCURRENT_OPS {
let node_clone = node.clone();
let handle = tokio::spawn(async move {
let mempool = node_clone.mempool();
let _ = mempool.size().await;
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let elapsed = start.elapsed();
info!(
queries = CONCURRENT_OPS,
elapsed_ms = elapsed.as_millis(),
"Concurrent mempool access performance"
);
node.stop().await.unwrap();
}
// ==================== Connection Stress Tests ====================
#[tokio::test]
async fn test_rapid_peer_operations() {
let network = StressTestNetwork::new(2).await.unwrap();
network.start_all().await.unwrap();
sleep(Duration::from_secs(2)).await;
let net0 = network.nodes[0].network();
// Rapid peer list queries
let start = Instant::now();
for _ in 0..50 {
let _ = net0.peers().await;
let _ = net0.peer_count().await;
}
let elapsed = start.elapsed();
info!(
operations = 100,
elapsed_ms = elapsed.as_millis(),
"Rapid peer operations"
);
network.stop_all().await.unwrap();
}
#[tokio::test]
async fn test_subscription_under_load() {
let network = StressTestNetwork::new(2).await.unwrap();
network.start_all().await.unwrap();
sleep(Duration::from_secs(2)).await;
// Create multiple subscriptions
let net0 = network.nodes[0].network();
let net1 = network.nodes[1].network();
let mut subscriptions = Vec::new();
for _ in 0..10 {
subscriptions.push(net0.subscribe());
subscriptions.push(net1.subscribe());
}
// Send announcements while subscriptions exist
for i in 0..50 {
net0.announce_block([i as u8; 32]).await;
}
sleep(Duration::from_millis(500)).await;
// Subscriptions should not cause issues
assert_eq!(network.nodes[0].state().await, NodeState::Running);
network.stop_all().await.unwrap();
}
// ==================== Throughput Measurement Tests ====================
#[tokio::test]
async fn test_consensus_throughput() {
let temp_dir = TempDir::new().unwrap();
let config = create_stress_config(&temp_dir, 0);
let node = Arc::new(SynorNode::new(config).await.unwrap());
node.start().await.unwrap();
sleep(Duration::from_secs(1)).await;
let consensus = node.consensus();
// Measure throughput of different operations
let operations = 1000;
// Blue score queries
let start = Instant::now();
for _ in 0..operations {
let _ = consensus.current_blue_score().await;
}
let blue_score_elapsed = start.elapsed();
// Tips queries
let start = Instant::now();
for _ in 0..operations {
let _: Vec<[u8; 32]> = consensus.tips().await;
}
let tips_elapsed = start.elapsed();
// Height queries
let start = Instant::now();
for _ in 0..operations {
let _ = consensus.current_height().await;
}
let height_elapsed = start.elapsed();
info!(
blue_score_qps = operations as f64 / blue_score_elapsed.as_secs_f64(),
tips_qps = operations as f64 / tips_elapsed.as_secs_f64(),
height_qps = operations as f64 / height_elapsed.as_secs_f64(),
"Consensus operation throughput"
);
node.stop().await.unwrap();
}
// ==================== Semaphore-Limited Stress Tests ====================
#[tokio::test]
async fn test_bounded_concurrent_operations() {
let temp_dir = TempDir::new().unwrap();
let config = create_stress_config(&temp_dir, 0);
let node = Arc::new(SynorNode::new(config).await.unwrap());
node.start().await.unwrap();
sleep(Duration::from_secs(1)).await;
// Limit concurrency with semaphore
let semaphore = Arc::new(Semaphore::new(50));
let mut handles = Vec::new();
let start = Instant::now();
for _ in 0..500 {
let node_clone = node.clone();
let sem_clone = semaphore.clone();
let handle = tokio::spawn(async move {
let _permit = sem_clone.acquire().await.unwrap();
let consensus = node_clone.consensus();
let _ = consensus.current_blue_score().await;
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let elapsed = start.elapsed();
info!(
total_ops = 500,
max_concurrent = 50,
elapsed_ms = elapsed.as_millis(),
ops_per_sec = 500.0 / elapsed.as_secs_f64(),
"Bounded concurrent operations"
);
node.stop().await.unwrap();
}
// ==================== Node Info Stress Tests ====================
#[tokio::test]
async fn test_info_endpoint_stress() {
let temp_dir = TempDir::new().unwrap();
let config = create_stress_config(&temp_dir, 0);
let node = Arc::new(SynorNode::new(config).await.unwrap());
node.start().await.unwrap();
sleep(Duration::from_secs(1)).await;
let start = Instant::now();
for _ in 0..100 {
let info = node.info().await;
// Verify info is valid
assert!(!info.network.is_empty());
}
let elapsed = start.elapsed();
info!(
queries = 100,
elapsed_ms = elapsed.as_millis(),
qps = 100.0 / elapsed.as_secs_f64(),
"Node info endpoint throughput"
);
node.stop().await.unwrap();
}
// ==================== Stability Under Load ====================
#[tokio::test]
async fn test_stability_under_mixed_load() {
let network = StressTestNetwork::new(3).await.unwrap();
network.start_all().await.unwrap();
sleep(Duration::from_secs(2)).await;
let start = Instant::now();
let mut handles = Vec::new();
// Mix of different operations
for i in 0..150 {
let node = network.nodes[i % network.nodes.len()].clone();
let handle = tokio::spawn(async move {
match i % 6 {
0 => {
let consensus = node.consensus();
let _: Vec<[u8; 32]> = consensus.tips().await;
}
1 => {
let network_svc = node.network();
let _ = network_svc.peers().await;
}
2 => {
let sync = node.sync();
let _ = sync.progress().await;
}
3 => {
let mempool = node.mempool();
let _ = mempool.size().await;
}
4 => {
let _ = node.info().await;
}
_ => {
let consensus = node.consensus();
let _ = consensus.current_blue_score().await;
}
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let elapsed = start.elapsed();
info!(
operations = 150,
elapsed_ms = elapsed.as_millis(),
"Mixed load test completed"
);
// All nodes should remain running
for (i, node) in network.nodes.iter().enumerate() {
assert_eq!(
node.state().await,
NodeState::Running,
"Node {} should be running",
i
);
}
network.stop_all().await.unwrap();
}