//! Content Resolver - Fetches content from storage network //! //! Resolves CIDs by contacting storage nodes, fetching chunks, //! and reassembling content. use crate::cid::ContentId; use crate::error::{Error, Result}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; /// Content resolver configuration #[derive(Debug, Clone)] pub struct ResolverConfig { /// Storage node endpoints pub nodes: Vec, /// Request timeout in seconds pub timeout_secs: u64, /// Maximum concurrent requests pub max_concurrent: usize, /// Retry count for failed requests pub retries: u32, } impl Default for ResolverConfig { fn default() -> Self { Self { nodes: vec![], timeout_secs: 30, max_concurrent: 10, retries: 3, } } } /// Resolves content from the storage network pub struct ContentResolver { /// Resolver configuration config: ResolverConfig, /// Storage node endpoints nodes: Vec, /// Node health status node_health: Arc>>, } /// Node health tracking #[derive(Debug, Clone)] #[derive(Default)] struct NodeHealth { /// Successful requests successes: u64, /// Failed requests failures: u64, /// Last response time (ms) latency_ms: u32, /// Last check timestamp last_check: u64, } impl ContentResolver { /// Create a new resolver with node list pub fn new(nodes: Vec) -> Self { let config = ResolverConfig { nodes: nodes.clone(), ..Default::default() }; Self { config, nodes, node_health: Arc::new(RwLock::new(HashMap::new())), } } /// Create a new resolver with configuration pub fn with_config(config: ResolverConfig) -> Self { let nodes = config.nodes.clone(); Self { config, nodes, node_health: Arc::new(RwLock::new(HashMap::new())), } } /// Get the resolver configuration pub fn config(&self) -> &ResolverConfig { &self.config } /// Resolve content by CID pub async fn resolve(&self, cid: &ContentId) -> Result> { if self.nodes.is_empty() { return Err(Error::Network("No storage nodes configured".to_string())); } // Find providers for this CID let providers = self.find_providers(cid).await?; if providers.is_empty() { return Err(Error::NotFound(format!( "No providers found for CID {}", cid.to_string_repr() ))); } // For small content, try direct fetch if cid.size <= 1024 * 1024 { return self.fetch_direct(cid, &providers).await; } // Fetch metadata to determine chunk count let metadata = self.fetch_metadata(cid, &providers).await?; // Validate metadata matches CID if metadata.size != cid.size { return Err(Error::ReassemblyFailed(format!( "Metadata size {} doesn't match CID size {}", metadata.size, cid.size ))); } // Fetch all chunks and reassemble let mut all_data = Vec::with_capacity(metadata.size as usize); for chunk_index in 0..metadata.chunk_count { let chunk_data = self.fetch_chunk(cid, chunk_index, &providers).await?; // Validate chunk size (except possibly the last chunk) if chunk_index < metadata.chunk_count - 1 && chunk_data.len() != metadata.chunk_size { return Err(Error::ReassemblyFailed(format!( "Chunk {} size {} doesn't match expected {}", chunk_index, chunk_data.len(), metadata.chunk_size ))); } all_data.extend_from_slice(&chunk_data); } // Trim to exact size (last chunk might have padding) all_data.truncate(metadata.size as usize); // Verify CID matches if !cid.verify(&all_data) { return Err(Error::ReassemblyFailed( "Content hash does not match CID".to_string(), )); } Ok(all_data) } /// Fetch small content directly async fn fetch_direct(&self, cid: &ContentId, providers: &[String]) -> Result> { for provider in providers { match self.fetch_content_from_provider(provider, cid).await { Ok(data) => { // Verify before returning if cid.verify(&data) { return Ok(data); } } Err(_) => continue, } } Err(Error::NotFound(format!( "Could not fetch content for CID {}", cid.to_string_repr() ))) } /// Fetch content from a single provider async fn fetch_content_from_provider( &self, provider: &str, _cid: &ContentId, ) -> Result> { let start = std::time::Instant::now(); // TODO: HTTP request to provider // GET {provider}/content/{cid} let result: Result> = Err(Error::Network("Not implemented".to_string())); // Update health stats let latency_ms = start.elapsed().as_millis() as u32; self.update_health(provider, result.is_ok(), latency_ms).await; result } /// Find providers for a CID async fn find_providers(&self, _cid: &ContentId) -> Result> { // TODO: Query DHT for providers // For now, return all configured nodes Ok(self.nodes.clone()) } /// Fetch content metadata async fn fetch_metadata(&self, cid: &ContentId, providers: &[String]) -> Result { for provider in providers { match self.query_metadata(provider, cid).await { Ok(metadata) => return Ok(metadata), Err(_) => continue, // Try next provider } } Err(Error::NotFound("Could not fetch metadata".to_string())) } /// Query metadata from a single provider async fn query_metadata(&self, _provider: &str, cid: &ContentId) -> Result { // TODO: HTTP request to provider // For now, calculate from size let chunk_size = 1024 * 1024; // 1 MB let chunk_count = (cid.size as usize).div_ceil(chunk_size); Ok(ContentMetadata { size: cid.size, chunk_count, chunk_size, }) } /// Fetch a single chunk async fn fetch_chunk( &self, cid: &ContentId, chunk_index: usize, providers: &[String], ) -> Result> { // Try to fetch from multiple providers for provider in providers { match self.fetch_chunk_from_provider(provider, cid, chunk_index).await { Ok(data) => return Ok(data), Err(_) => continue, } } Err(Error::NotFound(format!( "Could not fetch chunk {} for CID {}", chunk_index, cid.to_string_repr() ))) } /// Fetch chunk from a single provider async fn fetch_chunk_from_provider( &self, provider: &str, _cid: &ContentId, _chunk_index: usize, ) -> Result> { let start = std::time::Instant::now(); // TODO: HTTP request to provider // GET {provider}/chunks/{cid}/{chunk_index} let result: Result> = Err(Error::Network("Not implemented".to_string())); // Update health stats let latency_ms = start.elapsed().as_millis() as u32; self.update_health(provider, result.is_ok(), latency_ms).await; result } /// Update node health stats async fn update_health(&self, node: &str, success: bool, latency_ms: u32) { let mut health = self.node_health.write().await; let entry = health.entry(node.to_string()).or_default(); if success { entry.successes += 1; } else { entry.failures += 1; } entry.latency_ms = latency_ms; entry.last_check = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); } /// Cleanup stale health entries pub async fn cleanup(&self, max_age_secs: u64) { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); let mut health = self.node_health.write().await; health.retain(|_, h| now - h.last_check < max_age_secs); } /// Get nodes sorted by health pub async fn sorted_nodes(&self) -> Vec { let health = self.node_health.read().await; let mut nodes: Vec<_> = self.nodes.to_vec(); nodes.sort_by(|a, b| { let health_a = health.get(a); let health_b = health.get(b); match (health_a, health_b) { (Some(ha), Some(hb)) => { // Sort by success rate, then latency let rate_a = if ha.successes + ha.failures > 0 { ha.successes as f64 / (ha.successes + ha.failures) as f64 } else { 0.5 }; let rate_b = if hb.successes + hb.failures > 0 { hb.successes as f64 / (hb.successes + hb.failures) as f64 } else { 0.5 }; rate_b .partial_cmp(&rate_a) .unwrap_or(std::cmp::Ordering::Equal) .then_with(|| ha.latency_ms.cmp(&hb.latency_ms)) } (Some(_), None) => std::cmp::Ordering::Less, (None, Some(_)) => std::cmp::Ordering::Greater, (None, None) => std::cmp::Ordering::Equal, } }); nodes } } /// Content metadata from storage network #[derive(Debug, Clone)] struct ContentMetadata { /// Total size in bytes size: u64, /// Number of chunks chunk_count: usize, /// Size of each chunk chunk_size: usize, }