//! Network service. use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use libp2p::{Multiaddr, PeerId}; use tokio::sync::{broadcast, RwLock}; use tracing::{debug, error, info, warn}; use synor_network::{ BlockAnnouncement, ChainId, NetworkConfig, NetworkEvent, NetworkHandle, NetworkService as SynorNetworkService, SyncStatus, TransactionAnnouncement, }; use synor_types::{BlockHeader, BlockId}; use crate::config::NodeConfig; /// Peer connection info. #[derive(Clone, Debug)] pub struct PeerInfo { /// Peer ID. pub id: String, /// Remote address. pub address: Option, /// Is inbound connection. pub inbound: bool, /// Protocol version. pub version: u32, /// User agent. pub user_agent: String, /// Last seen timestamp. pub last_seen: u64, /// Ping latency in ms. pub latency_ms: u32, /// Is syncing. pub syncing: bool, } /// Network message types. #[derive(Clone, Debug)] pub enum NetworkMessage { /// Block announcement. BlockAnnounce { hash: [u8; 32] }, /// Transaction announcement. TxAnnounce { hash: [u8; 32] }, /// Block request. GetBlocks { hashes: Vec<[u8; 32]> }, /// Block response. Blocks { data: Vec> }, /// Headers request. GetHeaders { locator: Vec<[u8; 32]>, stop: [u8; 32] }, /// Headers response. Headers { headers: Vec> }, } /// Network service manages P2P connections. pub struct NetworkService { /// Network handle from synor-network (interior mutability for start()). handle: RwLock>, /// Configuration. listen_addr: String, /// Seed nodes. seeds: Vec, /// Maximum inbound connections. #[allow(dead_code)] max_inbound: usize, /// Maximum outbound connections. #[allow(dead_code)] max_outbound: usize, /// Connected peers (cached locally). peers: RwLock>, /// Is running. running: RwLock, /// Shutdown sender for the network task. #[allow(dead_code)] shutdown_tx: Option>, /// Shutdown receiver. #[allow(dead_code)] shutdown_rx: RwLock>>, /// Message broadcast channel. message_tx: broadcast::Sender<(String, NetworkMessage)>, /// Network configuration for synor-network. network_config: NetworkConfig, } impl NetworkService { /// Creates a new network service. pub async fn new( config: &NodeConfig, shutdown_rx: broadcast::Receiver<()>, ) -> anyhow::Result { let (message_tx, _) = broadcast::channel(1000); let (shutdown_tx, _) = broadcast::channel(1); // Build synor-network configuration from node config let chain_id = match config.network.as_str() { "mainnet" => ChainId::Mainnet, "testnet" => ChainId::Testnet, _ => ChainId::Devnet, }; // Parse listen address let listen_addr_parsed: Multiaddr = config .p2p .listen_addr .parse() .unwrap_or_else(|_| format!("/ip4/0.0.0.0/tcp/{}", synor_network::DEFAULT_PORT).parse().unwrap()); // Parse seed/bootstrap peers let bootstrap_peers: Vec = config .p2p .seeds .iter() .filter_map(|s| s.parse().ok()) .collect(); let network_config = NetworkConfig { chain_id, listen_addresses: vec![listen_addr_parsed], bootstrap_peers, max_inbound: config.p2p.max_inbound, max_outbound: config.p2p.max_outbound, enable_mdns: config.network == "devnet", enable_kad: config.network != "devnet", idle_timeout: Duration::from_secs(30), ping_interval: Duration::from_secs(15), gossipsub: synor_network::config::GossipsubConfig::default(), sync: synor_network::config::SyncConfig::default(), external_address: None, node_name: Some(format!("synord-{}", &config.network)), }; Ok(NetworkService { handle: RwLock::new(None), listen_addr: config.p2p.listen_addr.clone(), seeds: config.p2p.seeds.clone(), max_inbound: config.p2p.max_inbound, max_outbound: config.p2p.max_outbound, peers: RwLock::new(HashMap::new()), running: RwLock::new(false), shutdown_tx: Some(shutdown_tx), shutdown_rx: RwLock::new(Some(shutdown_rx)), message_tx, network_config, }) } /// Starts the network service. pub async fn start(&self) -> anyhow::Result<()> { info!(addr = %self.listen_addr, "Starting network service"); // Create the synor-network service let (network_service, handle) = SynorNetworkService::new(self.network_config.clone()) .await .map_err(|e| anyhow::anyhow!("Failed to create network service: {}", e))?; // Store the handle *self.handle.write().await = Some(handle.clone()); // Subscribe to network events let mut event_rx = handle.subscribe(); let message_tx = self.message_tx.clone(); let peers = Arc::new(RwLock::new(HashMap::::new())); let peers_clone = peers.clone(); // Spawn event handler tokio::spawn(async move { while let Ok(event) = event_rx.recv().await { match event { NetworkEvent::NewBlock(announcement) => { debug!("Received block announcement: {}", announcement.hash); let msg = NetworkMessage::BlockAnnounce { hash: *announcement.hash.as_bytes(), }; let _ = message_tx.send(("network".to_string(), msg)); } NetworkEvent::NewTransaction(announcement) => { debug!("Received transaction announcement: {}", announcement.txid); let msg = NetworkMessage::TxAnnounce { hash: *announcement.txid.as_bytes(), }; let _ = message_tx.send(("network".to_string(), msg)); } NetworkEvent::PeerConnected(peer_id) => { info!("Peer connected: {}", peer_id); let info = PeerInfo { id: peer_id.to_string(), address: None, inbound: false, version: 1, user_agent: String::new(), last_seen: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(), latency_ms: 0, syncing: false, }; peers_clone.write().await.insert(peer_id.to_string(), info); } NetworkEvent::PeerDisconnected(peer_id) => { info!("Peer disconnected: {}", peer_id); peers_clone.write().await.remove(&peer_id.to_string()); } NetworkEvent::SyncStatusChanged(status) => { info!("Sync status changed: {:?}", status); } NetworkEvent::BlocksReceived(blocks) => { debug!("Received {} blocks", blocks.len()); } NetworkEvent::HeadersReceived(headers) => { debug!("Received {} headers", headers.len()); } } } }); // Spawn the network service runner tokio::spawn(async move { if let Err(e) = network_service.run().await { error!("Network service error: {}", e); } }); *self.running.write().await = true; // Connect to seed nodes let handle_guard = self.handle.read().await; if let Some(ref handle) = *handle_guard { for seed in &self.seeds { if let Ok(addr) = seed.parse::() { info!(seed = %seed, "Connecting to seed node"); if let Err(e) = handle.dial(addr).await { warn!("Failed to connect to seed node {}: {}", seed, e); } } } } info!("Network service started"); Ok(()) } /// Stops the network service. pub async fn stop(&self) -> anyhow::Result<()> { info!("Stopping network service"); *self.running.write().await = false; // Shutdown the network service via handle let handle_guard = self.handle.read().await; if let Some(ref handle) = *handle_guard { if let Err(e) = handle.shutdown().await { warn!("Error during network shutdown: {}", e); } } // Clear peers self.peers.write().await.clear(); info!("Network service stopped"); Ok(()) } /// Returns the number of connected peers. pub async fn peer_count(&self) -> usize { let handle_guard = self.handle.read().await; if let Some(ref handle) = *handle_guard { handle.peer_count().await.unwrap_or(0) } else { self.peers.read().await.len() } } /// Returns all peer info. pub async fn peers(&self) -> Vec { self.peers.read().await.values().cloned().collect() } /// Gets a specific peer. pub async fn get_peer(&self, id: &str) -> Option { self.peers.read().await.get(id).cloned() } /// Connects to a peer. pub async fn connect_peer(&self, address: &str) -> anyhow::Result { info!(address = %address, "Connecting to peer"); let handle_guard = self.handle.read().await; if let Some(ref handle) = *handle_guard { let addr: Multiaddr = address .parse() .map_err(|e| anyhow::anyhow!("Invalid address: {}", e))?; handle .dial(addr) .await .map_err(|e| anyhow::anyhow!("Failed to dial: {}", e))?; Ok(format!("dialing-{}", address)) } else { Err(anyhow::anyhow!("Network service not started")) } } /// Disconnects a peer. pub async fn disconnect_peer(&self, id: &str) { info!(peer = %id, "Disconnecting peer"); let handle_guard = self.handle.read().await; if let Some(ref handle) = *handle_guard { if let Ok(peer_id) = id.parse::() { if let Err(e) = handle.disconnect(peer_id).await { warn!("Failed to disconnect peer {}: {}", id, e); } } } self.peers.write().await.remove(id); } /// Bans a peer. pub async fn ban_peer(&self, id: &str, reason: &str) { warn!(peer = %id, reason = %reason, "Banning peer"); let handle_guard = self.handle.read().await; if let Some(ref handle) = *handle_guard { if let Ok(peer_id) = id.parse::() { if let Err(e) = handle.ban(peer_id).await { warn!("Failed to ban peer {}: {}", id, e); } } } self.disconnect_peer(id).await; } /// Broadcasts a message to all peers. pub async fn broadcast(&self, message: NetworkMessage) { let handle_guard = self.handle.read().await; if let Some(ref handle) = *handle_guard { match message { NetworkMessage::BlockAnnounce { hash } => { // Create a minimal BlockAnnouncement // In practice, you'd get this from the actual block let header = BlockHeader::default(); let announcement = BlockAnnouncement::new(header, 0, 0); // Note: The hash from the message won't match the header hash // This is a placeholder - real implementation should use actual block data let _ = hash; // Suppress unused warning if let Err(e) = handle.broadcast_block(announcement).await { warn!("Failed to broadcast block: {}", e); } } NetworkMessage::TxAnnounce { hash } => { let announcement = TransactionAnnouncement::id_only( synor_types::TransactionId::from_bytes(hash) ); if let Err(e) = handle.broadcast_transaction(announcement).await { warn!("Failed to broadcast transaction: {}", e); } } _ => { debug!("Broadcast not implemented for this message type"); } } } } /// Sends a message to a specific peer. pub async fn send(&self, peer_id: &str, _message: NetworkMessage) -> anyhow::Result<()> { debug!(peer = %peer_id, "Sending message"); // For now, direct sends would be handled via request/response // This would need to be implemented based on the message type Ok(()) } /// Subscribes to network messages. pub fn subscribe(&self) -> broadcast::Receiver<(String, NetworkMessage)> { self.message_tx.subscribe() } /// Returns the network handle for advanced operations. pub async fn handle(&self) -> Option { self.handle.read().await.clone() } /// Announces a new block. pub async fn announce_block(&self, hash: [u8; 32]) { self.broadcast(NetworkMessage::BlockAnnounce { hash }).await; } /// Announces a new transaction. pub async fn announce_tx(&self, hash: [u8; 32]) { self.broadcast(NetworkMessage::TxAnnounce { hash }).await; } /// Requests blocks from a peer. pub async fn request_blocks( &self, peer_id: &str, hashes: Vec<[u8; 32]>, ) -> anyhow::Result<()> { let handle_guard = self.handle.read().await; if let Some(ref handle) = *handle_guard { let peer: PeerId = peer_id .parse() .map_err(|_| anyhow::anyhow!("Invalid peer ID"))?; let block_ids: Vec = hashes.iter().map(|h| BlockId::from_bytes(*h)).collect(); handle .request_blocks(peer, block_ids) .await .map_err(|e| anyhow::anyhow!("Request failed: {}", e))?; } Ok(()) } /// Requests headers from a peer. pub async fn request_headers( &self, peer_id: &str, locator: Vec<[u8; 32]>, _stop: [u8; 32], ) -> anyhow::Result<()> { let handle_guard = self.handle.read().await; if let Some(ref handle) = *handle_guard { let peer: PeerId = peer_id .parse() .map_err(|_| anyhow::anyhow!("Invalid peer ID"))?; let start = if locator.is_empty() { BlockId::from_bytes([0u8; 32]) } else { BlockId::from_bytes(locator[0]) }; handle .request_headers(peer, start, 500) .await .map_err(|e| anyhow::anyhow!("Request failed: {}", e))?; } Ok(()) } /// Gets the sync status. pub async fn sync_status(&self) -> Option { let handle_guard = self.handle.read().await; if let Some(ref handle) = *handle_guard { handle.sync_status().await.ok() } else { None } } /// Starts synchronization. pub async fn start_sync(&self) -> anyhow::Result<()> { let handle_guard = self.handle.read().await; if let Some(ref handle) = *handle_guard { handle .start_sync() .await .map_err(|e| anyhow::anyhow!("Failed to start sync: {}", e))?; } Ok(()) } } /// Network statistics. #[derive(Clone, Debug)] pub struct NetworkStats { pub total_peers: usize, pub inbound_peers: usize, pub outbound_peers: usize, pub bytes_sent: u64, pub bytes_received: u64, pub messages_sent: u64, pub messages_received: u64, } impl NetworkService { /// Gets network statistics. pub async fn stats(&self) -> NetworkStats { let peers = self.peers.read().await; let inbound = peers.values().filter(|p| p.inbound).count(); NetworkStats { total_peers: peers.len(), inbound_peers: inbound, outbound_peers: peers.len() - inbound, bytes_sent: 0, bytes_received: 0, messages_sent: 0, messages_received: 0, } } }