//! Network service for Synor. use crate::behaviour::SynorBehaviour; use crate::config::NetworkConfig; use crate::message::{BlockAnnouncement, TransactionAnnouncement}; use crate::peer::{Direction, PeerInfo, PeerManager, reputation}; use crate::protocol::{SynorRequest, SynorResponse}; use crate::rate_limit::{PerPeerLimiter, RateLimitConfig as TokenBucketConfig}; use crate::ratelimit::{RateLimitResult, RateLimiters}; use crate::reputation::{ReputationManager, ViolationType}; use crate::sync::{SyncManager, SyncStatus}; use crate::topics; use futures::StreamExt; use libp2p::{ gossipsub, identify, kad, mdns, ping, request_response, swarm::SwarmEvent, Multiaddr, PeerId, Swarm, SwarmBuilder, }; use parking_lot::RwLock; use std::sync::Arc; use synor_types::{Block, BlockHeader, Hash256}; use thiserror::Error; use tokio::sync::{broadcast, mpsc, oneshot}; use tracing::{debug, info, trace, warn}; /// Network events. #[derive(Clone, Debug)] pub enum NetworkEvent { /// New block received from network. NewBlock(BlockAnnouncement), /// New transaction received from network. NewTransaction(TransactionAnnouncement), /// Peer connected. PeerConnected(PeerId), /// Peer disconnected. PeerDisconnected(PeerId), /// Sync status changed. SyncStatusChanged(SyncStatus), /// Request blocks from peer. BlocksReceived(Vec), /// Headers received. HeadersReceived(Vec), } /// Commands to the network service. pub enum NetworkCommand { /// Dial a peer. Dial(Multiaddr), /// Disconnect a peer. Disconnect(PeerId), /// Ban a peer. Ban(PeerId), /// Unban a peer. Unban(PeerId), /// Broadcast a block. BroadcastBlock(BlockAnnouncement), /// Broadcast a transaction. BroadcastTransaction(TransactionAnnouncement), /// Request blocks from a peer. RequestBlocks { peer: PeerId, block_ids: Vec, response: oneshot::Sender, NetworkError>>, }, /// Request headers from a peer. RequestHeaders { peer: PeerId, start: synor_types::BlockId, max_count: u32, response: oneshot::Sender, NetworkError>>, }, /// Get peer count. GetPeerCount(oneshot::Sender), /// Get sync status. GetSyncStatus(oneshot::Sender), /// Start sync. StartSync, /// Stop the service. Shutdown, } /// Handle for interacting with the network service. #[derive(Clone)] pub struct NetworkHandle { command_tx: mpsc::Sender, event_rx: broadcast::Sender, } impl NetworkHandle { /// Dials a peer at the given address. pub async fn dial(&self, addr: Multiaddr) -> Result<(), NetworkError> { self.command_tx .send(NetworkCommand::Dial(addr)) .await .map_err(|_| NetworkError::ServiceStopped) } /// Disconnects a peer. pub async fn disconnect(&self, peer_id: PeerId) -> Result<(), NetworkError> { self.command_tx .send(NetworkCommand::Disconnect(peer_id)) .await .map_err(|_| NetworkError::ServiceStopped) } /// Bans a peer. pub async fn ban(&self, peer_id: PeerId) -> Result<(), NetworkError> { self.command_tx .send(NetworkCommand::Ban(peer_id)) .await .map_err(|_| NetworkError::ServiceStopped) } /// Unbans a peer. pub async fn unban(&self, peer_id: PeerId) -> Result<(), NetworkError> { self.command_tx .send(NetworkCommand::Unban(peer_id)) .await .map_err(|_| NetworkError::ServiceStopped) } /// Broadcasts a block to the network. pub async fn broadcast_block(&self, announcement: BlockAnnouncement) -> Result<(), NetworkError> { self.command_tx .send(NetworkCommand::BroadcastBlock(announcement)) .await .map_err(|_| NetworkError::ServiceStopped) } /// Broadcasts a transaction to the network. pub async fn broadcast_transaction(&self, announcement: TransactionAnnouncement) -> Result<(), NetworkError> { self.command_tx .send(NetworkCommand::BroadcastTransaction(announcement)) .await .map_err(|_| NetworkError::ServiceStopped) } /// Requests blocks from a peer. pub async fn request_blocks( &self, peer: PeerId, block_ids: Vec, ) -> Result, NetworkError> { let (tx, rx) = oneshot::channel(); self.command_tx .send(NetworkCommand::RequestBlocks { peer, block_ids, response: tx, }) .await .map_err(|_| NetworkError::ServiceStopped)?; rx.await.map_err(|_| NetworkError::ServiceStopped)? } /// Requests headers from a peer. pub async fn request_headers( &self, peer: PeerId, start: synor_types::BlockId, max_count: u32, ) -> Result, NetworkError> { let (tx, rx) = oneshot::channel(); self.command_tx .send(NetworkCommand::RequestHeaders { peer, start, max_count, response: tx, }) .await .map_err(|_| NetworkError::ServiceStopped)?; rx.await.map_err(|_| NetworkError::ServiceStopped)? } /// Gets the current peer count. pub async fn peer_count(&self) -> Result { let (tx, rx) = oneshot::channel(); self.command_tx .send(NetworkCommand::GetPeerCount(tx)) .await .map_err(|_| NetworkError::ServiceStopped)?; rx.await.map_err(|_| NetworkError::ServiceStopped) } /// Gets the current sync status. pub async fn sync_status(&self) -> Result { let (tx, rx) = oneshot::channel(); self.command_tx .send(NetworkCommand::GetSyncStatus(tx)) .await .map_err(|_| NetworkError::ServiceStopped)?; rx.await.map_err(|_| NetworkError::ServiceStopped) } /// Starts synchronization. pub async fn start_sync(&self) -> Result<(), NetworkError> { self.command_tx .send(NetworkCommand::StartSync) .await .map_err(|_| NetworkError::ServiceStopped) } /// Subscribes to network events. pub fn subscribe(&self) -> broadcast::Receiver { self.event_rx.subscribe() } /// Shuts down the network service. pub async fn shutdown(&self) -> Result<(), NetworkError> { self.command_tx .send(NetworkCommand::Shutdown) .await .map_err(|_| NetworkError::ServiceStopped) } } /// Network service errors. #[derive(Debug, Error)] pub enum NetworkError { #[error("Service has stopped")] ServiceStopped, #[error("Transport error: {0}")] Transport(String), #[error("Dial error: {0}")] DialError(String), #[error("Request timeout")] Timeout, #[error("Request failed: {0}")] RequestFailed(String), #[error("Peer not found")] PeerNotFound, #[error("Invalid response")] InvalidResponse, #[error("Serialization error: {0}")] Serialization(String), #[error("Rate limited: retry after {0} seconds")] RateLimited(u64), #[error("Peer in cooldown: wait {0} seconds")] PeerCooldown(u64), #[error("Peer is banned")] PeerBanned, } /// The network service. pub struct NetworkService { /// The libp2p swarm. swarm: Swarm, /// Peer manager. peer_manager: Arc, /// Sync manager. sync_manager: Arc, /// Reputation manager for peer banning. reputation_manager: Arc, /// Rate limiters for DoS protection (sliding window). rate_limiters: RateLimiters, /// Token bucket rate limiter for incoming messages. token_bucket_limiter: PerPeerLimiter, /// Configuration. config: NetworkConfig, /// Command receiver. command_rx: mpsc::Receiver, /// Event sender. event_tx: broadcast::Sender, /// Pending requests. pending_requests: RwLock>, } struct PendingRequest { response_tx: Option>>, } impl NetworkService { /// Creates a new network service. pub async fn new(config: NetworkConfig) -> Result<(Self, NetworkHandle), Box> { // Generate keypair let local_keypair = libp2p::identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_keypair.public()); info!("Local peer ID: {}", local_peer_id); // Build swarm let swarm = SwarmBuilder::with_existing_identity(local_keypair) .with_tokio() .with_tcp( libp2p::tcp::Config::default(), libp2p::noise::Config::new, libp2p::yamux::Config::default, )? .with_quic() .with_dns()? .with_behaviour(|key| { SynorBehaviour::new(key, &config).expect("Behaviour creation failed") })? .with_swarm_config(|cfg| { cfg.with_idle_connection_timeout(config.idle_timeout) }) .build(); // Create peer, sync, and reputation managers let peer_manager = Arc::new(PeerManager::new(config.max_inbound, config.max_outbound)); let sync_manager = Arc::new(SyncManager::new( config.sync.clone(), Arc::clone(&peer_manager), Hash256::from_bytes([0u8; 32]), // Will be set properly when genesis is known )); let reputation_manager = Arc::new(ReputationManager::new()); // Create channels let (command_tx, command_rx) = mpsc::channel(256); let (event_tx, _) = broadcast::channel(1024); let handle = NetworkHandle { command_tx, event_rx: event_tx.clone(), }; // Create token bucket limiter with default config: 100 req/s, burst of 200 let token_bucket_config = TokenBucketConfig::default(); let token_bucket_limiter = PerPeerLimiter::new(token_bucket_config); let service = NetworkService { swarm, peer_manager, sync_manager, reputation_manager, rate_limiters: RateLimiters::default(), token_bucket_limiter, config, command_rx, event_tx, pending_requests: RwLock::new(std::collections::HashMap::new()), }; Ok((service, handle)) } /// Starts the network service. pub async fn run(mut self) -> Result<(), Box> { // Listen on configured addresses for addr in &self.config.listen_addresses { match self.swarm.listen_on(addr.clone()) { Ok(_) => info!("Listening on {}", addr), Err(e) => warn!("Failed to listen on {}: {}", addr, e), } } // Subscribe to topics if let Err(e) = self.swarm.behaviour_mut().subscribe_topics() { warn!("Failed to subscribe to topics: {}", e); } // Connect to bootstrap peers for addr in &self.config.bootstrap_peers { if let Err(e) = self.swarm.dial(addr.clone()) { warn!("Failed to dial bootstrap peer {}: {}", addr, e); } } // Main event loop loop { tokio::select! { // Handle swarm events event = self.swarm.select_next_some() => { self.handle_swarm_event(event).await; } // Handle commands Some(cmd) = self.command_rx.recv() => { if !self.handle_command(cmd).await { break; } } } } info!("Network service shutting down"); Ok(()) } /// Handles a swarm event. async fn handle_swarm_event(&mut self, event: SwarmEvent) { match event { SwarmEvent::NewListenAddr { address, .. } => { info!("Listening on {}", address); } SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => { // Check if peer is banned via reputation system if self.reputation_manager.is_banned(&peer_id) { warn!("Rejecting connection from banned peer: {}", peer_id); let _ = self.swarm.disconnect_peer_id(peer_id); return; } let direction = if endpoint.is_dialer() { Direction::Outbound } else { Direction::Inbound }; let addr = endpoint.get_remote_address().clone(); let info = if direction == Direction::Inbound { PeerInfo::new_inbound(peer_id) } else { PeerInfo::new_outbound(peer_id, addr.clone()) }; if self.peer_manager.add_peer(peer_id, info) { info!("Connected to peer: {} ({:?})", peer_id, direction); self.swarm.behaviour_mut().add_address(&peer_id, addr); let _ = self.event_tx.send(NetworkEvent::PeerConnected(peer_id)); } } SwarmEvent::ConnectionClosed { peer_id, .. } => { self.peer_manager.remove_peer(&peer_id); // Clean up rate limiter state for disconnected peer self.rate_limiters.reset(&peer_id); self.token_bucket_limiter.reset_peer(&peer_id); info!("Disconnected from peer: {}", peer_id); let _ = self.event_tx.send(NetworkEvent::PeerDisconnected(peer_id)); } SwarmEvent::Behaviour(behaviour_event) => { self.handle_behaviour_event(behaviour_event).await; } _ => {} } } /// Handles a behaviour event. async fn handle_behaviour_event(&mut self, event: crate::behaviour::SynorBehaviourEvent) { use crate::behaviour::SynorBehaviourEvent; match event { SynorBehaviourEvent::Gossipsub(gossipsub::Event::Message { propagation_source, message_id: _, message, }) => { self.handle_gossip_message(propagation_source, message).await; } SynorBehaviourEvent::RequestResponse(request_response::Event::Message { peer, message, }) => { match message { request_response::Message::Request { request, channel, .. } => { self.handle_request(peer, request, channel).await; } request_response::Message::Response { request_id, response } => { self.handle_response(peer, request_id, response).await; } } } SynorBehaviourEvent::RequestResponse(request_response::Event::OutboundFailure { peer, request_id, error, }) => { warn!("Request to {} failed: {:?}", peer, error); self.sync_manager.on_request_failed(request_id); self.peer_manager.update_peer(&peer, |p| p.record_failure()); // Record violation in reputation system (no response from peer) let auto_banned = self.reputation_manager.record_violation(&peer, ViolationType::NoResponse); if auto_banned { warn!("Auto-banned peer {} for repeated failures", peer); let _ = self.swarm.disconnect_peer_id(peer); } } SynorBehaviourEvent::Identify(identify::Event::Received { peer_id, info, .. }) => { debug!("Identified peer {}: {}", peer_id, info.agent_version); self.peer_manager.update_peer(&peer_id, |p| { p.user_agent = Some(info.agent_version.clone()); for addr in &info.listen_addrs { if !p.addresses.contains(addr) { p.addresses.push(addr.clone()); } } }); // Add addresses to Kademlia for addr in info.listen_addrs { self.swarm.behaviour_mut().add_address(&peer_id, addr); } } SynorBehaviourEvent::Ping(ping::Event { peer, result, .. }) => { match result { Ok(rtt) => { trace!("Ping to {} succeeded: {:?}", peer, rtt); self.peer_manager.update_peer(&peer, |p| { p.latency = Some(rtt); }); } Err(e) => { debug!("Ping to {} failed: {}", peer, e); } } } SynorBehaviourEvent::Mdns(mdns::Event::Discovered(peers)) => { for (peer_id, addr) in peers { debug!("mDNS discovered peer: {} at {}", peer_id, addr); if self.peer_manager.can_connect_outbound() { self.swarm.behaviour_mut().add_address(&peer_id, addr.clone()); if let Err(e) = self.swarm.dial(addr) { debug!("Failed to dial discovered peer: {}", e); } } } } SynorBehaviourEvent::Mdns(mdns::Event::Expired(peers)) => { for (peer_id, _) in peers { debug!("mDNS peer expired: {}", peer_id); } } SynorBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, .. }) => { match result { kad::QueryResult::GetClosestPeers(Ok(ok)) => { for peer in ok.peers { debug!("Kademlia found peer: {:?}", peer); } } kad::QueryResult::Bootstrap(Ok(_)) => { info!("Kademlia bootstrap complete"); } _ => {} } } _ => {} } } /// Handles a gossip message. async fn handle_gossip_message(&mut self, source: PeerId, message: gossipsub::Message) { let topic = message.topic.as_str(); match topic { t if t == topics::BLOCKS => { if let Ok(announcement) = borsh::from_slice::(&message.data) { debug!("Received block announcement from {}: {}", source, announcement.hash); let _ = self.event_tx.send(NetworkEvent::NewBlock(announcement)); } } t if t == topics::TRANSACTIONS => { if let Ok(announcement) = borsh::from_slice::(&message.data) { debug!("Received tx announcement from {}: {}", source, announcement.txid); let _ = self.event_tx.send(NetworkEvent::NewTransaction(announcement)); } } _ => { trace!("Received message on unknown topic: {}", topic); } } } /// Handles a request from a peer. async fn handle_request( &mut self, peer: PeerId, request: SynorRequest, channel: request_response::ResponseChannel, ) { debug!("Received {} request from {}", request.request_type(), peer); // Check if peer is banned via reputation system if self.reputation_manager.is_banned(&peer) { warn!("Ignoring request from banned peer: {}", peer); let response = SynorResponse::Error("You are banned".to_string()); if let Err(e) = self.swarm.behaviour_mut().send_response(channel, response) { warn!("Failed to send banned response to {}: {:?}", peer, e); } return; } // First layer: Token bucket rate limiting (100 req/s with burst of 200) if !self.token_bucket_limiter.check_rate_limit(&peer) { // Check if peer is in cooldown if let Some(remaining) = self.token_bucket_limiter.cooldown_remaining(&peer) { warn!( "Token bucket: Peer {} in cooldown for {} request ({:?} remaining)", peer, request.request_type(), remaining ); self.peer_manager.update_peer(&peer, |p| { p.add_reputation(reputation::INVALID_DATA); }); // Record spam violation in reputation system (may auto-ban) let auto_banned = self.reputation_manager.record_violation(&peer, ViolationType::Spam); if auto_banned { warn!("Auto-banned peer {} for repeated spam violations", peer); let _ = self.swarm.disconnect_peer_id(peer); } let response = SynorResponse::Error(format!( "Rate limited (cooldown). Wait {} seconds.", remaining.as_secs() )); if let Err(e) = self.swarm.behaviour_mut().send_response(channel, response) { warn!("Failed to send cooldown response to {}: {:?}", peer, e); } return; } // Rate limited but not in cooldown warn!( "Token bucket: Rate limited {} request from {} (violations: {})", request.request_type(), peer, self.token_bucket_limiter.violations(&peer) ); self.peer_manager.update_peer(&peer, |p| { p.add_reputation(reputation::TIMEOUT); }); // Record spam violation in reputation system let auto_banned = self.reputation_manager.record_violation(&peer, ViolationType::Spam); if auto_banned { warn!("Auto-banned peer {} for spam violations", peer); let _ = self.swarm.disconnect_peer_id(peer); } let response = SynorResponse::Error( "Rate limited. Too many requests per second.".to_string() ); if let Err(e) = self.swarm.behaviour_mut().send_response(channel, response) { warn!("Failed to send rate limit response to {}: {:?}", peer, e); } return; } // Record the request in token bucket limiter self.token_bucket_limiter.record_request(&peer); // Second layer: Apply type-specific rate limiting (sliding window) let rate_limit_result = match &request { SynorRequest::GetBlock(_) | SynorRequest::GetBlocks(_) => { self.rate_limiters.check_block_request(&peer) } SynorRequest::GetHeader(_) | SynorRequest::GetHeaders { .. } => { self.rate_limiters.check_header_request(&peer) } SynorRequest::GetTransaction(_) | SynorRequest::GetTransactions(_) => { self.rate_limiters.check_transaction_request(&peer) } _ => { // For other requests (peers, status, tips), use global limiter self.rate_limiters.global.check(&peer) } }; // Handle rate limit violations match rate_limit_result { RateLimitResult::Denied { retry_after } => { warn!( "Rate limited {} request from {} (retry after {:?})", request.request_type(), peer, retry_after ); // Penalize reputation for hitting rate limit self.peer_manager.update_peer(&peer, |p| { p.add_reputation(reputation::TIMEOUT); }); // Record slow response violation self.reputation_manager.record_violation(&peer, ViolationType::SlowResponse); let response = SynorResponse::Error(format!( "Rate limited. Retry after {} seconds.", retry_after.as_secs() )); if let Err(e) = self.swarm.behaviour_mut().send_response(channel, response) { warn!("Failed to send rate limit response to {}: {:?}", peer, e); } return; } RateLimitResult::Cooldown { remaining } => { warn!( "Peer {} in cooldown for {} request ({:?} remaining)", peer, request.request_type(), remaining ); // Stronger reputation penalty for repeated violations self.peer_manager.update_peer(&peer, |p| { p.add_reputation(reputation::INVALID_DATA); }); // Record spam violation (may auto-ban) let auto_banned = self.reputation_manager.record_violation(&peer, ViolationType::Spam); if auto_banned { warn!("Auto-banned peer {} for repeated rate limit violations", peer); let _ = self.swarm.disconnect_peer_id(peer); } let response = SynorResponse::Error(format!( "In cooldown. Wait {} seconds.", remaining.as_secs() )); if let Err(e) = self.swarm.behaviour_mut().send_response(channel, response) { warn!("Failed to send cooldown response to {}: {:?}", peer, e); } return; } RateLimitResult::Allowed => { // Record the request for rate limiting match &request { SynorRequest::GetBlock(_) | SynorRequest::GetBlocks(_) => { self.rate_limiters.record_block_request(&peer); } SynorRequest::GetHeader(_) | SynorRequest::GetHeaders { .. } => { self.rate_limiters.record_header_request(&peer); } SynorRequest::GetTransaction(_) | SynorRequest::GetTransactions(_) => { self.rate_limiters.record_transaction_request(&peer); } _ => { self.rate_limiters.global.record(&peer); } } } } // Process the request - actual implementation would query storage let response = match request { SynorRequest::GetBlock(_) => SynorResponse::Block(None), SynorRequest::GetBlocks(_) => SynorResponse::Blocks(vec![]), SynorRequest::GetHeader(_) => SynorResponse::Header(None), SynorRequest::GetHeaders { .. } => SynorResponse::Headers(vec![]), SynorRequest::GetTransaction(_) => SynorResponse::Transaction(None), SynorRequest::GetTransactions(_) => SynorResponse::Transactions(vec![]), SynorRequest::GetPeers => { let addrs: Vec = self .peer_manager .active_peers() .iter() .flat_map(|p| p.addresses.iter().map(|a| a.to_string())) .take(20) .collect(); SynorResponse::Peers(addrs) } SynorRequest::GetStatus => { // Would need actual chain state SynorResponse::Error("Not implemented".to_string()) } SynorRequest::GetTips => SynorResponse::Tips(vec![]), SynorRequest::GetPruningPoint => SynorResponse::PruningPoint(None), _ => SynorResponse::Error("Unknown request".to_string()), }; if let Err(e) = self.swarm.behaviour_mut().send_response(channel, response) { warn!("Failed to send response to {}: {:?}", peer, e); } } /// Handles a response from a peer. async fn handle_response( &mut self, peer: PeerId, request_id: request_response::OutboundRequestId, response: SynorResponse, ) { debug!("Received response from {}", peer); // Record success in both peer manager and reputation system self.peer_manager.update_peer(&peer, |p| p.record_success()); self.reputation_manager.record_success(&peer); match response { SynorResponse::Headers(headers) => { self.sync_manager.on_headers_response(headers.clone()); let _ = self.event_tx.send(NetworkEvent::HeadersReceived(headers)); } SynorResponse::Blocks(blocks) => { self.sync_manager.on_blocks_response(request_id, blocks.clone()); let _ = self.event_tx.send(NetworkEvent::BlocksReceived(blocks)); } _ => {} } } /// Handles a command. async fn handle_command(&mut self, cmd: NetworkCommand) -> bool { match cmd { NetworkCommand::Dial(addr) => { if let Err(e) = self.swarm.dial(addr.clone()) { warn!("Failed to dial {}: {}", addr, e); } } NetworkCommand::Disconnect(peer_id) => { let _ = self.swarm.disconnect_peer_id(peer_id); } NetworkCommand::Ban(peer_id) => { self.peer_manager.ban(&peer_id); let _ = self.swarm.disconnect_peer_id(peer_id); } NetworkCommand::BroadcastBlock(announcement) => { if let Ok(data) = borsh::to_vec(&announcement) { if let Err(e) = self.swarm.behaviour_mut().publish_block(data) { warn!("Failed to publish block: {:?}", e); } } } NetworkCommand::BroadcastTransaction(announcement) => { if let Ok(data) = borsh::to_vec(&announcement) { if let Err(e) = self.swarm.behaviour_mut().publish_transaction(data) { warn!("Failed to publish transaction: {:?}", e); } } } NetworkCommand::RequestBlocks { peer, block_ids, response } => { let request = SynorRequest::GetBlocks(block_ids); let _request_id = self.swarm.behaviour_mut().send_request(&peer, request); // Would need to track the response sender let _ = response.send(Err(NetworkError::RequestFailed("Not implemented".to_string()))); } NetworkCommand::RequestHeaders { peer, start, max_count, response } => { let request = SynorRequest::GetHeaders { start, max_count }; let _request_id = self.swarm.behaviour_mut().send_request(&peer, request); let _ = response.send(Err(NetworkError::RequestFailed("Not implemented".to_string()))); } NetworkCommand::GetPeerCount(response) => { let _ = response.send(self.peer_manager.connected_count()); } NetworkCommand::GetSyncStatus(response) => { let _ = response.send(self.sync_manager.status()); } NetworkCommand::StartSync => { self.sync_manager.start_sync(); } NetworkCommand::Shutdown => { return false; } } true } } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_network_handle_clone() { let (tx, _rx) = mpsc::channel(1); let (event_tx, _) = broadcast::channel(1); let handle = NetworkHandle { command_tx: tx, event_rx: event_tx, }; let _handle2 = handle.clone(); } }