//! Block synchronization for Synor. use crate::config::SyncConfig; use crate::peer::PeerManager; use crate::protocol::SynorRequest; use hashbrown::{HashMap, HashSet}; use libp2p::PeerId; use parking_lot::RwLock; use std::collections::VecDeque; use std::sync::Arc; use std::time::{Duration, Instant}; use synor_types::{BlockHeader, BlockId, Hash256}; /// Synchronization state. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum SyncState { /// Not synchronizing, fully synced. Idle, /// Finding peers to sync from. FindingPeers, /// Downloading headers. DownloadingHeaders, /// Downloading blocks. DownloadingBlocks, /// Processing downloaded data. Processing, /// Synced and following the network. Synced, } impl SyncState { /// Returns true if we're actively syncing. pub fn is_syncing(&self) -> bool { matches!( self, SyncState::DownloadingHeaders | SyncState::DownloadingBlocks | SyncState::Processing ) } } /// Sync status information. #[derive(Clone, Debug)] pub struct SyncStatus { /// Current state. pub state: SyncState, /// Our best blue score. pub local_blue_score: u64, /// Network's best blue score. pub network_blue_score: u64, /// Progress percentage (0-100). pub progress: f32, /// Current sync peer. pub sync_peer: Option, /// Headers downloaded. pub headers_downloaded: u64, /// Blocks downloaded. pub blocks_downloaded: u64, /// Download rate (blocks/sec). pub download_rate: f64, /// Estimated time remaining. pub eta: Option, /// Sync start time. pub started_at: Option, } impl Default for SyncStatus { fn default() -> Self { SyncStatus { state: SyncState::Idle, local_blue_score: 0, network_blue_score: 0, progress: 0.0, sync_peer: None, headers_downloaded: 0, blocks_downloaded: 0, download_rate: 0.0, eta: None, started_at: None, } } } /// Pending block request. #[derive(Clone, Debug)] struct PendingRequest { /// Block IDs requested. block_ids: Vec, /// Peer we requested from. peer: PeerId, /// When the request was sent. sent_at: Instant, /// Number of retries. retries: u32, } /// Header chain for sync. #[derive(Clone, Debug)] pub struct HeaderChain { /// Headers in order. headers: Vec, /// Hash to index mapping. hash_to_index: HashMap, /// Blue scores. blue_scores: HashMap, } impl HeaderChain { /// Creates a new empty header chain. pub fn new() -> Self { HeaderChain { headers: Vec::new(), hash_to_index: HashMap::new(), blue_scores: HashMap::new(), } } /// Adds headers to the chain. pub fn add_headers(&mut self, headers: Vec, start_blue_score: u64) { let start_index = self.headers.len(); for (i, header) in headers.into_iter().enumerate() { let hash = header.block_id(); self.hash_to_index.insert(hash, start_index + i); self.blue_scores.insert(hash, start_blue_score + i as u64); self.headers.push(header); } } /// Returns the number of headers. pub fn len(&self) -> usize { self.headers.len() } /// Returns true if empty. pub fn is_empty(&self) -> bool { self.headers.is_empty() } /// Gets a header by hash. pub fn get(&self, hash: &Hash256) -> Option<&BlockHeader> { self.hash_to_index.get(hash).map(|&i| &self.headers[i]) } /// Gets the blue score for a hash. pub fn blue_score(&self, hash: &Hash256) -> Option { self.blue_scores.get(hash).copied() } /// Returns the last header. pub fn last(&self) -> Option<&BlockHeader> { self.headers.last() } /// Returns headers in a range. pub fn range(&self, start: usize, end: usize) -> &[BlockHeader] { &self.headers[start.min(self.headers.len())..end.min(self.headers.len())] } } impl Default for HeaderChain { fn default() -> Self { Self::new() } } /// Block synchronization manager. pub struct SyncManager { /// Configuration. config: SyncConfig, /// Current status. status: RwLock, /// Peer manager reference. peer_manager: Arc, /// Pending requests. pending_requests: RwLock>, /// Downloaded headers. header_chain: RwLock, /// Blocks to download. blocks_to_download: RwLock>, /// Downloaded blocks waiting to be processed. downloaded_blocks: RwLock>>, /// Blocks we already have. known_blocks: RwLock>, /// Our genesis hash. genesis_hash: Hash256, /// Our tips. tips: RwLock>, } impl SyncManager { /// Creates a new sync manager. pub fn new(config: SyncConfig, peer_manager: Arc, genesis_hash: Hash256) -> Self { SyncManager { config, status: RwLock::new(SyncStatus::default()), peer_manager, pending_requests: RwLock::new(HashMap::new()), header_chain: RwLock::new(HeaderChain::new()), blocks_to_download: RwLock::new(VecDeque::new()), downloaded_blocks: RwLock::new(HashMap::new()), known_blocks: RwLock::new(HashSet::new()), genesis_hash, tips: RwLock::new(Vec::new()), } } /// Returns the current sync status. pub fn status(&self) -> SyncStatus { self.status.read().clone() } /// Returns the current sync state. pub fn state(&self) -> SyncState { self.status.read().state } /// Sets the local blue score. pub fn set_local_blue_score(&self, score: u64) { self.status.write().local_blue_score = score; } /// Sets the current tips. pub fn set_tips(&self, tips: Vec) { *self.tips.write() = tips; } /// Marks a block as known (already in our database). pub fn mark_known(&self, block_id: BlockId) { self.known_blocks.write().insert(block_id); } /// Checks if we need to sync. pub fn needs_sync(&self) -> bool { let status = self.status.read(); status.network_blue_score > status.local_blue_score + 10 } /// Starts the synchronization process. pub fn start_sync(&self) { let mut status = self.status.write(); if status.state != SyncState::Idle && status.state != SyncState::Synced { return; } status.state = SyncState::FindingPeers; status.started_at = Some(Instant::now()); status.headers_downloaded = 0; status.blocks_downloaded = 0; } /// Called when we discover a peer's status. pub fn on_peer_status(&self, peer_id: PeerId, blue_score: u64, _daa_score: u64) { let mut status = self.status.write(); if blue_score > status.network_blue_score { status.network_blue_score = blue_score; } // If we're finding peers and this peer is ahead, start syncing from them if status.state == SyncState::FindingPeers && blue_score > status.local_blue_score { status.sync_peer = Some(peer_id); status.state = SyncState::DownloadingHeaders; } } /// Creates a request for headers. pub fn create_headers_request(&self) -> Option { let status = self.status.read(); if status.state != SyncState::DownloadingHeaders { return None; } let header_chain = self.header_chain.read(); let start = if let Some(last) = header_chain.last() { last.block_id() } else { // Start from our tips let tips = self.tips.read(); if let Some(tip) = tips.first() { *tip } else { self.genesis_hash } }; Some(SynorRequest::GetHeaders { start, max_count: self.config.max_blocks_per_request as u32, }) } /// Creates a request for blocks. pub fn create_blocks_request(&self) -> Option<(Vec, SynorRequest)> { let status = self.status.read(); if status.state != SyncState::DownloadingBlocks { return None; } // Check we're not at max concurrent downloads let pending = self.pending_requests.read(); if pending.len() >= self.config.max_concurrent_downloads { return None; } drop(pending); let mut to_download = self.blocks_to_download.write(); let known = self.known_blocks.read(); let mut block_ids = Vec::new(); while block_ids.len() < self.config.max_blocks_per_request && !to_download.is_empty() { if let Some(id) = to_download.pop_front() { if !known.contains(&id) { block_ids.push(id); } } } if block_ids.is_empty() { return None; } let request = SynorRequest::GetBlocks(block_ids.clone()); Some((block_ids, request)) } /// Records a pending request. pub fn record_request( &self, request_id: libp2p::request_response::OutboundRequestId, block_ids: Vec, peer: PeerId, ) { self.pending_requests.write().insert( request_id, PendingRequest { block_ids, peer, sent_at: Instant::now(), retries: 0, }, ); } /// Handles a headers response. pub fn on_headers_response(&self, headers: Vec) { if headers.is_empty() { // No more headers, move to block download let mut status = self.status.write(); status.state = SyncState::DownloadingBlocks; return; } let mut header_chain = self.header_chain.write(); let start_score = header_chain.len() as u64; let count = headers.len() as u64; header_chain.add_headers(headers, start_score); let mut status = self.status.write(); status.headers_downloaded += count; // Add blocks to download queue let new_count = header_chain.len(); for i in (new_count - count as usize)..new_count { if let Some(header) = header_chain.headers.get(i) { let block_id = header.block_id(); self.blocks_to_download.write().push_back(block_id); } } } /// Handles a blocks response. pub fn on_blocks_response( &self, request_id: libp2p::request_response::OutboundRequestId, blocks: Vec, ) { // Remove from pending self.pending_requests.write().remove(&request_id); let mut status = self.status.write(); status.blocks_downloaded += blocks.len() as u64; // Store downloaded blocks let mut downloaded = self.downloaded_blocks.write(); for block in blocks { let id = block.header.block_id(); if let Ok(bytes) = borsh::to_vec(&block) { downloaded.insert(id, bytes); } } // Update progress let header_count = self.header_chain.read().len() as f32; if header_count > 0.0 { status.progress = (status.blocks_downloaded as f32 / header_count) * 100.0; } // Calculate download rate and ETA if let Some(started) = status.started_at { let elapsed = started.elapsed().as_secs_f64(); if elapsed > 0.0 { status.download_rate = status.blocks_downloaded as f64 / elapsed; let remaining = header_count as u64 - status.blocks_downloaded; if status.download_rate > 0.0 { let secs = remaining as f64 / status.download_rate; status.eta = Some(Duration::from_secs_f64(secs)); } } } // Check if done if self.blocks_to_download.read().is_empty() && self.pending_requests.read().is_empty() { status.state = SyncState::Processing; } } /// Handles a request failure. pub fn on_request_failed(&self, request_id: libp2p::request_response::OutboundRequestId) { if let Some(pending) = self.pending_requests.write().remove(&request_id) { // Re-add blocks to download queue for retry let mut queue = self.blocks_to_download.write(); for id in pending.block_ids { queue.push_back(id); } // Decrease peer reputation self.peer_manager.update_peer(&pending.peer, |p| { p.record_failure(); }); } } /// Returns downloaded blocks that are ready for processing. pub fn take_downloaded_blocks(&self) -> Vec<(BlockId, Vec)> { let mut downloaded = self.downloaded_blocks.write(); let result: Vec<_> = downloaded.drain().collect(); result } /// Marks sync as complete. pub fn complete_sync(&self) { let mut status = self.status.write(); status.state = SyncState::Synced; status.progress = 100.0; status.eta = None; // Clear temporary data self.header_chain.write().headers.clear(); self.header_chain.write().hash_to_index.clear(); self.blocks_to_download.write().clear(); } /// Checks for timed out requests. pub fn check_timeouts(&self) -> Vec<(libp2p::request_response::OutboundRequestId, PeerId)> { let mut timed_out = Vec::new(); let now = Instant::now(); for (id, pending) in self.pending_requests.read().iter() { if now.duration_since(pending.sent_at) > self.config.request_timeout { timed_out.push((*id, pending.peer)); } } timed_out } /// Selects the best peer to sync from. pub fn select_sync_peer(&self) -> Option { let peers = self.peer_manager.peers_by_blue_score(); let local_score = self.status.read().local_blue_score; peers .into_iter() .find(|p| p.best_blue_score().unwrap_or(0) > local_score) .map(|p| p.peer_id) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_sync_status_default() { let status = SyncStatus::default(); assert_eq!(status.state, SyncState::Idle); assert_eq!(status.progress, 0.0); } #[test] fn test_header_chain() { let chain = HeaderChain::new(); assert!(chain.is_empty()); // Would need real headers to test properly assert_eq!(chain.len(), 0); } #[test] fn test_sync_state() { assert!(!SyncState::Idle.is_syncing()); assert!(SyncState::DownloadingHeaders.is_syncing()); assert!(SyncState::DownloadingBlocks.is_syncing()); } }