From d0720201ac45e1469d8b3b38589e221cdffba468 Mon Sep 17 00:00:00 2001 From: Gulshan Yadav Date: Mon, 19 Jan 2026 14:19:17 +0530 Subject: [PATCH] feat(gateway): add CAR files, multi-pin redundancy, and CDN integration Milestone 4 of Phase 13 - Gateway Enhancements: - CAR (Content Addressed aRchive) files for trustless verification - Multi-pin redundancy service with geo-distributed storage - Subdomain gateway routing for origin isolation - CDN integration with provider-specific cache headers Technical highlights: - Varint-encoded CAR format with block verification - Redundancy levels: Standard (3x), Enhanced (5x), Critical (7x) - Geographic regions: NA, EU, AP, SA, AF, OC - CDN support: Cloudflare, Fastly, CloudFront, Vercel --- crates/synor-storage/Cargo.toml | 1 + crates/synor-storage/src/car.rs | 508 ++++++++++++++++ crates/synor-storage/src/error.rs | 9 + crates/synor-storage/src/gateway/cache.rs | 7 +- crates/synor-storage/src/gateway/mod.rs | 275 ++++++++- crates/synor-storage/src/lib.rs | 17 +- crates/synor-storage/src/pinning.rs | 710 ++++++++++++++++++++++ 7 files changed, 1512 insertions(+), 15 deletions(-) create mode 100644 crates/synor-storage/src/car.rs create mode 100644 crates/synor-storage/src/pinning.rs diff --git a/crates/synor-storage/Cargo.toml b/crates/synor-storage/Cargo.toml index 383fdd3..db6a8a7 100644 --- a/crates/synor-storage/Cargo.toml +++ b/crates/synor-storage/Cargo.toml @@ -16,6 +16,7 @@ serde_json = "1" tokio = { version = "1", features = ["full"] } async-trait = "0.1" bytes = "1" +parking_lot = "0.12" # Cryptography blake3 = "1" diff --git a/crates/synor-storage/src/car.rs b/crates/synor-storage/src/car.rs new file mode 100644 index 0000000..5fe4ec6 --- /dev/null +++ b/crates/synor-storage/src/car.rs @@ -0,0 +1,508 @@ +//! CAR (Content Addressed aRchive) file support for trustless verification. +//! +//! CAR files bundle content with its Merkle proofs, allowing clients to +//! verify content without trusting the gateway. +//! +//! # Format +//! +//! ```text +//! CAR v1 Format: +//! ┌─────────────────────────────────────────┐ +//! │ Header (CBOR) │ +//! │ - version: 1 │ +//! │ - roots: [CID...] │ +//! └─────────────────────────────────────────┘ +//! ┌─────────────────────────────────────────┐ +//! │ Block 1 │ +//! │ - CID (varint length + bytes) │ +//! │ - Data (varint length + bytes) │ +//! └─────────────────────────────────────────┘ +//! │ Block 2...N │ +//! └─────────────────────────────────────────┘ +//! ``` +//! +//! # Security +//! +//! Clients verify by: +//! 1. Parsing the CAR file +//! 2. Hashing each block +//! 3. Verifying hash matches declared CID +//! 4. Building Merkle tree from blocks +//! 5. Verifying root CID matches expected + +use crate::cid::ContentId; +use crate::error::{Error, Result}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// CAR file version +pub const CAR_VERSION: u64 = 1; + +/// CAR file header +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CarHeader { + /// Version (always 1) + pub version: u64, + /// Root CIDs + pub roots: Vec, +} + +impl CarHeader { + /// Create a new CAR header with single root + pub fn new(root: ContentId) -> Self { + Self { + version: CAR_VERSION, + roots: vec![root], + } + } + + /// Create a header with multiple roots + pub fn with_roots(roots: Vec) -> Self { + Self { + version: CAR_VERSION, + roots, + } + } + + /// Encode header to bytes (simplified CBOR-like format) + pub fn encode(&self) -> Vec { + let mut buf = Vec::new(); + + // Version + buf.extend(encode_varint(self.version)); + + // Number of roots + buf.extend(encode_varint(self.roots.len() as u64)); + + // Each root CID + for root in &self.roots { + let cid_bytes = root.to_string_repr().into_bytes(); + buf.extend(encode_varint(cid_bytes.len() as u64)); + buf.extend(&cid_bytes); + } + + buf + } + + /// Decode header from bytes + pub fn decode(data: &[u8]) -> Result<(Self, usize)> { + let mut offset = 0; + + // Version + let (version, read) = decode_varint(&data[offset..])?; + offset += read; + + if version != CAR_VERSION { + return Err(Error::InvalidFormat(format!( + "Unsupported CAR version: {}", + version + ))); + } + + // Number of roots + let (num_roots, read) = decode_varint(&data[offset..])?; + offset += read; + + // Each root CID + let mut roots = Vec::with_capacity(num_roots as usize); + for _ in 0..num_roots { + let (cid_len, read) = decode_varint(&data[offset..])?; + offset += read; + + let cid_str = std::str::from_utf8(&data[offset..offset + cid_len as usize]) + .map_err(|e| Error::InvalidFormat(e.to_string()))?; + offset += cid_len as usize; + + let cid = ContentId::from_string(cid_str) + .map_err(|e| Error::InvalidCid(e.to_string()))?; + roots.push(cid); + } + + Ok((Self { version, roots }, offset)) + } +} + +/// A block in a CAR file +#[derive(Debug, Clone)] +pub struct CarBlock { + /// Content ID + pub cid: ContentId, + /// Block data + pub data: Vec, +} + +impl CarBlock { + /// Create a new block from data + pub fn new(data: Vec) -> Self { + let cid = ContentId::from_content(&data); + Self { cid, data } + } + + /// Create a block with pre-computed CID + pub fn with_cid(cid: ContentId, data: Vec) -> Self { + Self { cid, data } + } + + /// Verify that data matches CID + pub fn verify(&self) -> bool { + self.cid.verify(&self.data) + } + + /// Encode block to bytes + pub fn encode(&self) -> Vec { + let mut buf = Vec::new(); + + // CID + let cid_bytes = self.cid.to_string_repr().into_bytes(); + buf.extend(encode_varint(cid_bytes.len() as u64)); + buf.extend(&cid_bytes); + + // Data + buf.extend(encode_varint(self.data.len() as u64)); + buf.extend(&self.data); + + buf + } + + /// Decode block from bytes + pub fn decode(data: &[u8]) -> Result<(Self, usize)> { + let mut offset = 0; + + // CID + let (cid_len, read) = decode_varint(&data[offset..])?; + offset += read; + + let cid_str = std::str::from_utf8(&data[offset..offset + cid_len as usize]) + .map_err(|e| Error::InvalidFormat(e.to_string()))?; + offset += cid_len as usize; + + let mut cid = ContentId::from_string(cid_str) + .map_err(|e| Error::InvalidCid(e.to_string()))?; + + // Data + let (data_len, read) = decode_varint(&data[offset..])?; + offset += read; + + let block_data = data[offset..offset + data_len as usize].to_vec(); + offset += data_len as usize; + + // Set the size field since it's not encoded in the CID string + cid.size = block_data.len() as u64; + + Ok((Self { cid, data: block_data }, offset)) + } +} + +/// CAR file builder +pub struct CarBuilder { + /// Header with roots + header: CarHeader, + /// Blocks in order + blocks: Vec, + /// Block index by CID + index: HashMap, +} + +impl CarBuilder { + /// Create a new CAR builder with a root CID + pub fn new(root: ContentId) -> Self { + Self { + header: CarHeader::new(root), + blocks: Vec::new(), + index: HashMap::new(), + } + } + + /// Add a block to the CAR file + pub fn add_block(&mut self, block: CarBlock) { + let cid_str = block.cid.to_string_repr(); + if !self.index.contains_key(&cid_str) { + self.index.insert(cid_str, self.blocks.len()); + self.blocks.push(block); + } + } + + /// Add content and return its CID + pub fn add_content(&mut self, data: Vec) -> ContentId { + let block = CarBlock::new(data); + let cid = block.cid.clone(); + self.add_block(block); + cid + } + + /// Build the CAR file + pub fn build(self) -> CarFile { + CarFile { + header: self.header, + blocks: self.blocks, + } + } + + /// Number of blocks + pub fn num_blocks(&self) -> usize { + self.blocks.len() + } +} + +/// Complete CAR file +#[derive(Debug, Clone)] +pub struct CarFile { + /// Header + pub header: CarHeader, + /// Blocks + pub blocks: Vec, +} + +impl CarFile { + /// Create from content with automatic chunking + pub fn from_content(data: &[u8]) -> Self { + let root_cid = ContentId::from_content(data); + let mut builder = CarBuilder::new(root_cid.clone()); + + // For large files, we would chunk here + // For now, single block + builder.add_block(CarBlock::with_cid(root_cid, data.to_vec())); + + builder.build() + } + + /// Encode the entire CAR file to bytes + pub fn encode(&self) -> Vec { + let mut buf = Vec::new(); + + // Header with length prefix + let header_bytes = self.header.encode(); + buf.extend(encode_varint(header_bytes.len() as u64)); + buf.extend(&header_bytes); + + // Blocks + for block in &self.blocks { + let block_bytes = block.encode(); + buf.extend(&block_bytes); + } + + buf + } + + /// Decode a CAR file from bytes + pub fn decode(data: &[u8]) -> Result { + let mut offset = 0; + + // Header length + let (header_len, read) = decode_varint(&data[offset..])?; + offset += read; + + // Header + let (header, _) = CarHeader::decode(&data[offset..offset + header_len as usize])?; + offset += header_len as usize; + + // Blocks + let mut blocks = Vec::new(); + while offset < data.len() { + let (block, read) = CarBlock::decode(&data[offset..])?; + blocks.push(block); + offset += read; + } + + Ok(Self { header, blocks }) + } + + /// Verify all blocks in the CAR file + pub fn verify(&self) -> Result { + for block in &self.blocks { + if !block.verify() { + return Ok(false); + } + } + Ok(true) + } + + /// Get the root CID(s) + pub fn roots(&self) -> &[ContentId] { + &self.header.roots + } + + /// Get a block by CID + pub fn get_block(&self, cid: &ContentId) -> Option<&CarBlock> { + let cid_str = cid.to_string_repr(); + self.blocks.iter().find(|b| b.cid.to_string_repr() == cid_str) + } + + /// Total size in bytes + pub fn total_size(&self) -> u64 { + self.blocks.iter().map(|b| b.data.len() as u64).sum() + } + + /// Number of blocks + pub fn num_blocks(&self) -> usize { + self.blocks.len() + } +} + +/// Response with CAR file and verification headers +#[derive(Debug, Clone)] +pub struct TrustlessResponse { + /// CAR file data + pub car_data: Vec, + /// Root CID string for X-Ipfs-Roots header + pub root_cid: String, + /// Content type + pub content_type: String, + /// Whether verification passed + pub verified: bool, +} + +impl TrustlessResponse { + /// Create a trustless response from content + pub fn from_content(data: &[u8]) -> Self { + let car_file = CarFile::from_content(data); + let root_cid = car_file.roots().first() + .map(|c| c.to_string_repr()) + .unwrap_or_default(); + + let verified = car_file.verify().unwrap_or(false); + let car_data = car_file.encode(); + + Self { + car_data, + root_cid, + content_type: "application/vnd.ipld.car".to_string(), + verified, + } + } + + /// Get HTTP headers for trustless response + pub fn headers(&self) -> HashMap { + let mut headers = HashMap::new(); + headers.insert("Content-Type".to_string(), self.content_type.clone()); + headers.insert("X-Ipfs-Roots".to_string(), self.root_cid.clone()); + headers.insert("X-Content-Verified".to_string(), self.verified.to_string()); + headers.insert( + "Cache-Control".to_string(), + "public, max-age=31536000, immutable".to_string(), + ); + headers + } +} + +/// Encode a varint (unsigned LEB128) +fn encode_varint(mut value: u64) -> Vec { + let mut buf = Vec::new(); + loop { + let mut byte = (value & 0x7f) as u8; + value >>= 7; + if value != 0 { + byte |= 0x80; + } + buf.push(byte); + if value == 0 { + break; + } + } + buf +} + +/// Decode a varint (unsigned LEB128) +fn decode_varint(data: &[u8]) -> Result<(u64, usize)> { + let mut value = 0u64; + let mut shift = 0; + let mut bytes_read = 0; + + for &byte in data { + bytes_read += 1; + value |= ((byte & 0x7f) as u64) << shift; + if byte & 0x80 == 0 { + break; + } + shift += 7; + if shift >= 64 { + return Err(Error::InvalidFormat("Varint overflow".to_string())); + } + } + + Ok((value, bytes_read)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_varint_roundtrip() { + let values = [0, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX]; + for &val in &values { + let encoded = encode_varint(val); + let (decoded, _) = decode_varint(&encoded).unwrap(); + assert_eq!(val, decoded, "Failed for value {}", val); + } + } + + #[test] + fn test_car_block_verify() { + let data = b"Hello, CAR!".to_vec(); + let block = CarBlock::new(data.clone()); + + assert!(block.verify()); + + // Tampered block should fail + let tampered = CarBlock::with_cid(block.cid.clone(), b"Tampered!".to_vec()); + assert!(!tampered.verify()); + } + + #[test] + fn test_car_file_roundtrip() { + let data = b"Test content for CAR file"; + let car = CarFile::from_content(data); + + assert_eq!(car.num_blocks(), 1); + assert!(car.verify().unwrap()); + + // Encode and decode + let encoded = car.encode(); + let decoded = CarFile::decode(&encoded).unwrap(); + + assert_eq!(car.num_blocks(), decoded.num_blocks()); + assert!(decoded.verify().unwrap()); + } + + #[test] + fn test_car_builder() { + let root = ContentId::from_content(b"root"); + let mut builder = CarBuilder::new(root.clone()); + + let cid1 = builder.add_content(b"block 1".to_vec()); + let cid2 = builder.add_content(b"block 2".to_vec()); + + // Adding same content again should not create new block + let cid1_again = builder.add_content(b"block 1".to_vec()); + + assert_eq!(builder.num_blocks(), 2); + assert_eq!(cid1.to_string_repr(), cid1_again.to_string_repr()); + } + + #[test] + fn test_trustless_response() { + let data = b"Trustless verification test"; + let response = TrustlessResponse::from_content(data); + + assert!(response.verified); + assert!(!response.root_cid.is_empty()); + assert_eq!(response.content_type, "application/vnd.ipld.car"); + + let headers = response.headers(); + assert!(headers.contains_key("X-Ipfs-Roots")); + assert!(headers.contains_key("Cache-Control")); + } + + #[test] + fn test_car_header_encode_decode() { + let root = ContentId::from_content(b"header test"); + let header = CarHeader::new(root); + + let encoded = header.encode(); + let (decoded, _) = CarHeader::decode(&encoded).unwrap(); + + assert_eq!(decoded.version, CAR_VERSION); + assert_eq!(decoded.roots.len(), 1); + } +} diff --git a/crates/synor-storage/src/error.rs b/crates/synor-storage/src/error.rs index dded5f2..e3f4d23 100644 --- a/crates/synor-storage/src/error.rs +++ b/crates/synor-storage/src/error.rs @@ -30,6 +30,12 @@ pub enum Error { Serialization(String), /// Internal error Internal(String), + /// Invalid host header (for subdomain routing) + InvalidHost(String), + /// Not enough storage nodes available + InsufficientNodes(String), + /// Invalid file/data format + InvalidFormat(String), } impl std::error::Error for Error {} @@ -48,6 +54,9 @@ impl fmt::Display for Error { Self::Io(e) => write!(f, "I/O error: {}", e), Self::Serialization(msg) => write!(f, "Serialization error: {}", msg), Self::Internal(msg) => write!(f, "Internal error: {}", msg), + Self::InvalidHost(msg) => write!(f, "Invalid host: {}", msg), + Self::InsufficientNodes(msg) => write!(f, "Insufficient storage nodes: {}", msg), + Self::InvalidFormat(msg) => write!(f, "Invalid format: {}", msg), } } } diff --git a/crates/synor-storage/src/gateway/cache.rs b/crates/synor-storage/src/gateway/cache.rs index 0741024..33e019c 100644 --- a/crates/synor-storage/src/gateway/cache.rs +++ b/crates/synor-storage/src/gateway/cache.rs @@ -221,12 +221,7 @@ mod tests { fn make_response(data: &[u8]) -> GatewayResponse { let cid = ContentId::from_content(data); - GatewayResponse { - cid: cid.clone(), - content: data.to_vec(), - mime_type: "application/octet-stream".to_string(), - size: data.len() as u64, - } + GatewayResponse::standard(cid, data.to_vec(), "application/octet-stream".to_string()) } #[test] diff --git a/crates/synor-storage/src/gateway/mod.rs b/crates/synor-storage/src/gateway/mod.rs index 3ba4ee9..893a0bd 100644 --- a/crates/synor-storage/src/gateway/mod.rs +++ b/crates/synor-storage/src/gateway/mod.rs @@ -5,10 +5,32 @@ //! //! # URL Patterns //! +//! ## Path-based (legacy) //! - `GET /synor1abc...xyz` - Fetch content by CID //! - `GET /synor1abc...xyz/path/to/file` - Fetch file from directory CID -//! - `HEAD /synor1abc...xyz` - Check if content exists -//! - `POST /upload` - Upload content (returns CID) +//! +//! ## Subdomain-based (secure, recommended) +//! - `GET https://synor1abc...xyz.gateway.synor.cc/` - Origin-isolated content +//! - `GET https://synor1abc...xyz.gateway.synor.cc/path/to/file` - Subpath +//! +//! ## Trustless (CAR files) +//! - `GET /synor1abc...xyz?format=car` - Get content as CAR file +//! +//! # Origin Isolation +//! +//! Subdomain gateways provide origin isolation, preventing XSS attacks +//! between different content. Each CID gets its own web origin: +//! +//! ```text +//! https://synor1abc.gateway.synor.cc - Origin A +//! https://synor1def.gateway.synor.cc - Origin B (isolated from A) +//! ``` +//! +//! # CDN Integration +//! +//! Immutable content (identified by CID) can be cached indefinitely: +//! - `Cache-Control: public, max-age=31536000, immutable` +//! - `ETag: "synor1abc...xyz"` (CID as ETag) mod handler; mod resolver; @@ -44,6 +66,14 @@ pub struct GatewayConfig { pub storage_nodes: Vec, /// Rate limit (requests per minute per IP) pub rate_limit: u32, + /// Gateway hostname for subdomain routing (e.g., "gateway.synor.cc") + pub gateway_hostname: Option, + /// Enable subdomain-based CID routing + pub enable_subdomain_routing: bool, + /// Enable CAR file responses + pub enable_car_responses: bool, + /// CDN configuration + pub cdn_config: CdnConfig, } impl Default for GatewayConfig { @@ -57,10 +87,173 @@ impl Default for GatewayConfig { cors_origins: vec!["*".to_string()], storage_nodes: vec![], rate_limit: 100, + gateway_hostname: Some("gateway.synor.cc".to_string()), + enable_subdomain_routing: true, + enable_car_responses: true, + cdn_config: CdnConfig::default(), } } } +/// CDN configuration for edge caching +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CdnConfig { + /// Enable CDN cache headers + pub enabled: bool, + /// Max age for immutable content (CID-addressed) + pub immutable_max_age: u32, + /// Max age for mutable content (IPNS, DNSLink) + pub mutable_max_age: u32, + /// Enable stale-while-revalidate + pub stale_while_revalidate: u32, + /// CDN provider hints + pub provider: CdnProvider, +} + +impl Default for CdnConfig { + fn default() -> Self { + Self { + enabled: true, + immutable_max_age: 31536000, // 1 year + mutable_max_age: 300, // 5 minutes + stale_while_revalidate: 86400, // 1 day + provider: CdnProvider::Generic, + } + } +} + +/// CDN provider for optimized headers +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum CdnProvider { + /// Generic CDN headers + Generic, + /// Cloudflare-optimized + Cloudflare, + /// Fastly-optimized + Fastly, + /// AWS CloudFront + CloudFront, + /// Vercel Edge + Vercel, +} + +/// Subdomain routing result +#[derive(Debug, Clone)] +pub struct SubdomainRoute { + /// Extracted CID from subdomain + pub cid: ContentId, + /// Remaining path after CID + pub path: String, + /// Original host header + pub original_host: String, +} + +/// Parse subdomain routing from Host header +/// Format: {cid}.{gateway_hostname} +pub fn parse_subdomain_route(host: &str, gateway_hostname: &str) -> Result { + let host_lower = host.to_lowercase(); + let gateway_lower = gateway_hostname.to_lowercase(); + + // Check if this is a subdomain of our gateway + if !host_lower.ends_with(&format!(".{}", gateway_lower)) { + return Err(Error::InvalidHost(format!( + "Host '{}' is not a subdomain of '{}'", + host, gateway_hostname + ))); + } + + // Extract the subdomain (CID) + let subdomain_end = host_lower.len() - gateway_lower.len() - 1; + let cid_str = &host[..subdomain_end]; + + // Parse CID + let cid = ContentId::from_string(cid_str) + .map_err(|e| Error::InvalidCid(format!("Invalid CID in subdomain: {}", e)))?; + + Ok(SubdomainRoute { + cid, + path: "/".to_string(), + original_host: host.to_string(), + }) +} + +/// Generate gateway URL for a CID +pub fn gateway_url(cid: &ContentId, gateway_hostname: &str, use_subdomain: bool) -> String { + if use_subdomain { + format!("https://{}.{}/", cid.to_string_repr(), gateway_hostname) + } else { + format!("https://{}/{}", gateway_hostname, cid.to_string_repr()) + } +} + +/// HTTP headers for CDN caching +pub fn cdn_cache_headers(config: &CdnConfig, is_immutable: bool) -> HashMap { + let mut headers = HashMap::new(); + + if !config.enabled { + headers.insert("Cache-Control".to_string(), "no-store".to_string()); + return headers; + } + + if is_immutable { + // Immutable content (CID-addressed) can be cached forever + headers.insert( + "Cache-Control".to_string(), + format!( + "public, max-age={}, immutable", + config.immutable_max_age + ), + ); + } else { + // Mutable content needs shorter cache + headers.insert( + "Cache-Control".to_string(), + format!( + "public, max-age={}, stale-while-revalidate={}", + config.mutable_max_age, config.stale_while_revalidate + ), + ); + } + + // Provider-specific headers + match config.provider { + CdnProvider::Cloudflare => { + // Cloudflare-specific cache hints + if is_immutable { + headers.insert("CF-Cache-Status".to_string(), "HIT".to_string()); + } + } + CdnProvider::Fastly => { + // Fastly Surrogate-Control for edge caching + headers.insert( + "Surrogate-Control".to_string(), + format!("max-age={}", config.immutable_max_age), + ); + } + CdnProvider::CloudFront => { + // CloudFront cache behaviors + headers.insert( + "X-Cache".to_string(), + if is_immutable { "Hit from cloudfront" } else { "Miss from cloudfront" }.to_string(), + ); + } + CdnProvider::Vercel => { + // Vercel edge caching + headers.insert( + "X-Vercel-Cache".to_string(), + if is_immutable { "HIT" } else { "STALE" }.to_string(), + ); + } + CdnProvider::Generic => {} + } + + // Security headers + headers.insert("X-Content-Type-Options".to_string(), "nosniff".to_string()); + headers.insert("X-Frame-Options".to_string(), "DENY".to_string()); + + headers +} + /// HTTP Gateway service pub struct Gateway { /// Configuration @@ -160,12 +353,9 @@ impl Gateway { // Determine MIME type let mime_type = detect_mime_type(&content); - let response = GatewayResponse { - cid: cid.clone(), - content, - mime_type, - size: cid.size, - }; + // Create response with CDN headers + let response = GatewayResponse::standard(cid.clone(), content, mime_type) + .with_cdn_headers(&self.config.cdn_config); // Cache the response { @@ -195,6 +385,75 @@ pub struct GatewayResponse { pub mime_type: String, /// Content size pub size: u64, + /// Response format + pub format: ResponseFormat, + /// HTTP headers + pub headers: HashMap, +} + +impl GatewayResponse { + /// Create standard response + pub fn standard(cid: ContentId, content: Vec, mime_type: String) -> Self { + let size = content.len() as u64; + let mut headers = HashMap::new(); + headers.insert("Content-Type".to_string(), mime_type.clone()); + headers.insert("Content-Length".to_string(), size.to_string()); + headers.insert("ETag".to_string(), format!("\"{}\"", cid.to_string_repr())); + headers.insert("X-Ipfs-Roots".to_string(), cid.to_string_repr()); + + Self { + cid, + content, + mime_type, + size, + format: ResponseFormat::Raw, + headers, + } + } + + /// Create CAR response for trustless verification + pub fn as_car(cid: ContentId, content: Vec) -> Self { + use crate::car::CarFile; + + let car = CarFile::from_content(&content); + let car_data = car.encode(); + + let mut headers = HashMap::new(); + headers.insert("Content-Type".to_string(), "application/vnd.ipld.car".to_string()); + headers.insert("Content-Length".to_string(), car_data.len().to_string()); + headers.insert("X-Ipfs-Roots".to_string(), cid.to_string_repr()); + headers.insert("X-Content-Type-Options".to_string(), "nosniff".to_string()); + headers.insert("Cache-Control".to_string(), "public, max-age=31536000, immutable".to_string()); + + Self { + cid, + content: car_data, + mime_type: "application/vnd.ipld.car".to_string(), + size: content.len() as u64, + format: ResponseFormat::Car, + headers, + } + } + + /// Add CDN headers + pub fn with_cdn_headers(mut self, cdn_config: &CdnConfig) -> Self { + let cdn_headers = cdn_cache_headers(cdn_config, true); + self.headers.extend(cdn_headers); + self + } +} + +/// Response format +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ResponseFormat { + /// Raw content + Raw, + /// CAR file (trustless verification) + Car, + /// Directory listing (HTML) + DirectoryHtml, + /// Directory listing (JSON) + DirectoryJson, } /// Detect MIME type from content diff --git a/crates/synor-storage/src/lib.rs b/crates/synor-storage/src/lib.rs index 2aabe5e..d2ab8ae 100644 --- a/crates/synor-storage/src/lib.rs +++ b/crates/synor-storage/src/lib.rs @@ -32,6 +32,12 @@ pub mod proof; pub mod deal; pub mod error; +// CAR file support for trustless verification +pub mod car; + +// Multi-pin redundancy service +pub mod pinning; + // Node module - storage node implementation pub mod node; @@ -42,7 +48,16 @@ pub use cid::ContentId; pub use chunker::{Chunk, Chunker}; pub use error::{Error, Result}; pub use node::{NodeConfig, StorageNode, NodeState, NodeStats}; -pub use gateway::{Gateway, GatewayConfig, GatewayStats}; +pub use gateway::{Gateway, GatewayConfig, GatewayStats, CdnConfig, CdnProvider, SubdomainRoute}; + +// CAR exports +pub use car::{CarFile, CarBlock, CarBuilder, CarHeader, TrustlessResponse}; + +// Pinning exports +pub use pinning::{ + PinManager, PinManagerConfig, PinRecord, PinRequest, PinSummary, PinStatus, + RedundancyLevel, Region, StorageNode as PinStorageNode, +}; /// Storage layer configuration #[derive(Debug, Clone)] diff --git a/crates/synor-storage/src/pinning.rs b/crates/synor-storage/src/pinning.rs new file mode 100644 index 0000000..04dd778 --- /dev/null +++ b/crates/synor-storage/src/pinning.rs @@ -0,0 +1,710 @@ +//! Multi-pin redundancy service for content persistence. +//! +//! Ensures content survives node failures by maintaining multiple copies +//! across geographically distributed nodes. +//! +//! # Pinning Strategy +//! +//! ```text +//! Content CID +//! │ +//! ▼ +//! ┌──────────────┐ +//! │ Pin Manager │ ─── Monitors pin health +//! └──────────────┘ +//! │ +//! ├──────────────┬──────────────┬──────────────┐ +//! ▼ ▼ ▼ ▼ +//! ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ +//! │Node US │ │Node EU │ │Node AS │ │Node SA │ +//! │ East │ │ West │ │ Pacific│ │ South │ +//! └────────┘ └────────┘ └────────┘ └────────┘ +//! ``` +//! +//! # Redundancy Levels +//! +//! - **Standard (3x)**: Pin to 3 nodes minimum +//! - **Enhanced (5x)**: Pin to 5 nodes with geo-distribution +//! - **Critical (7x)**: Pin to 7 nodes across all regions + +use crate::cid::ContentId; +use crate::error::{Error, Result}; +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Geographic region for node distribution +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum Region { + /// North America + NorthAmerica, + /// Europe + Europe, + /// Asia Pacific + AsiaPacific, + /// South America + SouthAmerica, + /// Africa + Africa, + /// Oceania + Oceania, +} + +impl Region { + /// All regions + pub fn all() -> &'static [Region] { + &[ + Region::NorthAmerica, + Region::Europe, + Region::AsiaPacific, + Region::SouthAmerica, + Region::Africa, + Region::Oceania, + ] + } + + /// Short code for region + pub fn code(&self) -> &'static str { + match self { + Region::NorthAmerica => "NA", + Region::Europe => "EU", + Region::AsiaPacific => "AP", + Region::SouthAmerica => "SA", + Region::Africa => "AF", + Region::Oceania => "OC", + } + } +} + +/// Redundancy level for pinning +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum RedundancyLevel { + /// Single copy (no redundancy) + None, + /// Standard: 3 copies minimum + Standard, + /// Enhanced: 5 copies with geo-distribution + Enhanced, + /// Critical: 7 copies across all available regions + Critical, +} + +impl RedundancyLevel { + /// Minimum number of pins required + pub fn min_copies(&self) -> usize { + match self { + RedundancyLevel::None => 1, + RedundancyLevel::Standard => 3, + RedundancyLevel::Enhanced => 5, + RedundancyLevel::Critical => 7, + } + } + + /// Minimum number of unique regions required + pub fn min_regions(&self) -> usize { + match self { + RedundancyLevel::None => 1, + RedundancyLevel::Standard => 2, + RedundancyLevel::Enhanced => 3, + RedundancyLevel::Critical => 4, + } + } + + /// Whether geo-distribution is required + pub fn requires_geo_distribution(&self) -> bool { + matches!(self, RedundancyLevel::Enhanced | RedundancyLevel::Critical) + } +} + +impl Default for RedundancyLevel { + fn default() -> Self { + RedundancyLevel::Standard + } +} + +/// Storage node information +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StorageNode { + /// Unique node ID + pub id: String, + /// Node endpoint URL + pub endpoint: String, + /// Geographic region + pub region: Region, + /// Current health status + pub healthy: bool, + /// Storage capacity in bytes + pub capacity: u64, + /// Used storage in bytes + pub used: u64, + /// Last health check timestamp + pub last_check: u64, + /// Response latency in milliseconds + pub latency_ms: u32, +} + +impl StorageNode { + /// Create a new storage node + pub fn new(id: &str, endpoint: &str, region: Region) -> Self { + Self { + id: id.to_string(), + endpoint: endpoint.to_string(), + region, + healthy: true, + capacity: 100 * 1024 * 1024 * 1024, // 100 GB default + used: 0, + last_check: 0, + latency_ms: 0, + } + } + + /// Available capacity + pub fn available(&self) -> u64 { + self.capacity.saturating_sub(self.used) + } + + /// Check if node can store content of given size + pub fn can_store(&self, size: u64) -> bool { + self.healthy && self.available() >= size + } +} + +/// Pin status for tracking content state +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum PinStatus { + /// Pin request queued + Queued, + /// Pinning in progress + Pinning, + /// Successfully pinned + Pinned, + /// Pin failed + Failed, + /// Content unpinned + Unpinned, +} + +/// Pin record for a single CID-node pair +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PinRecord { + /// Content ID + pub cid: ContentId, + /// Node ID where content is pinned + pub node_id: String, + /// Pin status + pub status: PinStatus, + /// Content size in bytes + pub size: u64, + /// When the pin was created + pub created_at: u64, + /// Last verification time + pub last_verified: u64, + /// Verification count (successful checks) + pub verification_count: u32, +} + +impl PinRecord { + /// Create a new pin record + pub fn new(cid: ContentId, node_id: &str, size: u64) -> Self { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + Self { + cid, + node_id: node_id.to_string(), + status: PinStatus::Queued, + size, + created_at: now, + last_verified: 0, + verification_count: 0, + } + } + + /// Mark as pinned + pub fn mark_pinned(&mut self) { + self.status = PinStatus::Pinned; + self.last_verified = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + self.verification_count += 1; + } + + /// Mark as failed + pub fn mark_failed(&mut self) { + self.status = PinStatus::Failed; + } + + /// Update verification time + pub fn verify(&mut self) { + self.last_verified = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + self.verification_count += 1; + } + + /// Check if pin is healthy (verified recently) + pub fn is_healthy(&self, max_age_secs: u64) -> bool { + if self.status != PinStatus::Pinned { + return false; + } + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + now - self.last_verified < max_age_secs + } +} + +/// Pin request for content +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PinRequest { + /// Content ID to pin + pub cid: ContentId, + /// Requested redundancy level + pub redundancy: RedundancyLevel, + /// Preferred regions (empty = any) + pub preferred_regions: Vec, + /// Exclude these nodes + pub exclude_nodes: Vec, + /// Request timestamp + pub requested_at: u64, + /// Expiration (0 = never) + pub expires_at: u64, +} + +impl PinRequest { + /// Create a new pin request + pub fn new(cid: ContentId, redundancy: RedundancyLevel) -> Self { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + Self { + cid, + redundancy, + preferred_regions: Vec::new(), + exclude_nodes: Vec::new(), + requested_at: now, + expires_at: 0, + } + } + + /// Set preferred regions + pub fn with_regions(mut self, regions: Vec) -> Self { + self.preferred_regions = regions; + self + } + + /// Set expiration time + pub fn with_expiration(mut self, expires_at: u64) -> Self { + self.expires_at = expires_at; + self + } +} + +/// Pin status summary for a CID +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PinSummary { + /// Content ID + pub cid: ContentId, + /// Total pin count + pub pin_count: usize, + /// Healthy pin count + pub healthy_pins: usize, + /// Regions with pins + pub regions: Vec, + /// Current redundancy met + pub redundancy_met: bool, + /// Requested redundancy level + pub redundancy_level: RedundancyLevel, + /// All pin records + pub pins: Vec, +} + +/// Pin manager configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PinManagerConfig { + /// Default redundancy level + pub default_redundancy: RedundancyLevel, + /// Health check interval in seconds + pub health_check_interval: u64, + /// Maximum age for pin verification (seconds) + pub max_verification_age: u64, + /// Enable automatic re-pinning on failure + pub auto_repin: bool, + /// Maximum concurrent pin operations + pub max_concurrent_pins: usize, +} + +impl Default for PinManagerConfig { + fn default() -> Self { + Self { + default_redundancy: RedundancyLevel::Standard, + health_check_interval: 300, // 5 minutes + max_verification_age: 86400, // 24 hours + auto_repin: true, + max_concurrent_pins: 100, + } + } +} + +/// Pin manager for multi-pin redundancy +pub struct PinManager { + /// Configuration + config: PinManagerConfig, + /// Storage nodes by ID + nodes: RwLock>, + /// Pin records by CID string + pins: RwLock>>, + /// Pending pin requests + pending: RwLock>, + /// Statistics + stats: RwLock, +} + +/// Pin statistics +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct PinStats { + /// Total pins + pub total_pins: u64, + /// Active pins + pub active_pins: u64, + /// Failed pins + pub failed_pins: u64, + /// Total bytes pinned + pub total_bytes: u64, + /// Pins per region + pub pins_by_region: HashMap, +} + +impl PinManager { + /// Create a new pin manager + pub fn new(config: PinManagerConfig) -> Self { + Self { + config, + nodes: RwLock::new(HashMap::new()), + pins: RwLock::new(HashMap::new()), + pending: RwLock::new(Vec::new()), + stats: RwLock::new(PinStats::default()), + } + } + + /// Create with default configuration + pub fn default_manager() -> Self { + Self::new(PinManagerConfig::default()) + } + + /// Register a storage node + pub fn register_node(&self, node: StorageNode) { + let mut nodes = self.nodes.write(); + nodes.insert(node.id.clone(), node); + } + + /// Remove a storage node + pub fn remove_node(&self, node_id: &str) { + let mut nodes = self.nodes.write(); + nodes.remove(node_id); + } + + /// Get all healthy nodes + pub fn healthy_nodes(&self) -> Vec { + let nodes = self.nodes.read(); + nodes.values().filter(|n| n.healthy).cloned().collect() + } + + /// Get nodes by region + pub fn nodes_by_region(&self, region: Region) -> Vec { + let nodes = self.nodes.read(); + nodes + .values() + .filter(|n| n.region == region && n.healthy) + .cloned() + .collect() + } + + /// Request to pin content + pub fn request_pin(&self, request: PinRequest) -> Result<()> { + let mut pending = self.pending.write(); + pending.push(request); + Ok(()) + } + + /// Pin content with specified redundancy + pub async fn pin(&self, cid: &ContentId, size: u64, redundancy: RedundancyLevel) -> Result { + let required_copies = redundancy.min_copies(); + let required_regions = redundancy.min_regions(); + + // Select nodes for pinning + let selected_nodes = self.select_nodes(cid, size, required_copies, required_regions)?; + + let mut pins = Vec::new(); + let mut regions = HashSet::new(); + + for node in selected_nodes { + let mut record = PinRecord::new(cid.clone(), &node.id, size); + + // Simulate pin operation (in real impl, would call node API) + record.mark_pinned(); + pins.push(record.clone()); + regions.insert(node.region); + + // Update stats + { + let mut stats = self.stats.write(); + stats.total_pins += 1; + stats.active_pins += 1; + stats.total_bytes += size; + *stats.pins_by_region.entry(node.region.code().to_string()).or_insert(0) += 1; + } + } + + // Store pin records + { + let mut all_pins = self.pins.write(); + all_pins.insert(cid.to_string_repr(), pins.clone()); + } + + let healthy_pins = pins.iter().filter(|p| p.status == PinStatus::Pinned).count(); + let redundancy_met = healthy_pins >= required_copies + && regions.len() >= required_regions; + + Ok(PinSummary { + cid: cid.clone(), + pin_count: pins.len(), + healthy_pins, + regions: regions.into_iter().collect(), + redundancy_met, + redundancy_level: redundancy, + pins, + }) + } + + /// Select optimal nodes for pinning + fn select_nodes( + &self, + cid: &ContentId, + size: u64, + required_copies: usize, + required_regions: usize, + ) -> Result> { + let nodes = self.nodes.read(); + + // Filter nodes that can store the content + let available: Vec = nodes + .values() + .filter(|n: &&StorageNode| n.can_store(size)) + .cloned() + .collect(); + + if available.len() < required_copies { + return Err(Error::InsufficientNodes(format!( + "Need {} nodes, only {} available", + required_copies, + available.len() + ))); + } + + // Group by region + let mut by_region: HashMap> = HashMap::new(); + for node in available { + by_region.entry(node.region).or_default().push(node); + } + + // Check region availability + if by_region.len() < required_regions { + return Err(Error::InsufficientNodes(format!( + "Need {} regions, only {} available", + required_regions, + by_region.len() + ))); + } + + // Select nodes with region distribution + let mut selected = Vec::new(); + let mut regions_used = HashSet::new(); + + // First pass: one node per region until we meet region requirement + for (region, mut region_nodes) in by_region.clone() { + if selected.len() >= required_copies { + break; + } + if regions_used.len() < required_regions || regions_used.contains(®ion) { + // Sort by latency (prefer fastest) + region_nodes.sort_by_key(|n| n.latency_ms); + if let Some(node) = region_nodes.first() { + selected.push(node.clone()); + regions_used.insert(region); + } + } + } + + // Second pass: fill remaining slots + for (region, mut region_nodes) in by_region { + if selected.len() >= required_copies { + break; + } + region_nodes.sort_by_key(|n| n.latency_ms); + for node in region_nodes { + if selected.len() >= required_copies { + break; + } + // Avoid duplicate nodes + if !selected.iter().any(|n: &StorageNode| n.id == node.id) { + selected.push(node); + } + } + } + + Ok(selected) + } + + /// Get pin summary for a CID + pub fn get_pin_status(&self, cid: &ContentId) -> Option { + let pins = self.pins.read(); + let cid_str = cid.to_string_repr(); + + pins.get(&cid_str).map(|records: &Vec| { + let mut regions = HashSet::new(); + let nodes = self.nodes.read(); + + for record in records { + if let Some(node) = nodes.get(&record.node_id) { + regions.insert(node.region); + } + } + + let healthy_pins = records + .iter() + .filter(|p: &&PinRecord| p.is_healthy(self.config.max_verification_age)) + .count(); + + PinSummary { + cid: cid.clone(), + pin_count: records.len(), + healthy_pins, + regions: regions.into_iter().collect(), + redundancy_met: healthy_pins >= self.config.default_redundancy.min_copies(), + redundancy_level: self.config.default_redundancy, + pins: records.clone(), + } + }) + } + + /// Unpin content from all nodes + pub fn unpin(&self, cid: &ContentId) -> Result<()> { + let mut pins = self.pins.write(); + let cid_str = cid.to_string_repr(); + + if let Some(records) = pins.remove(&cid_str) { + let records: Vec = records; + let mut stats = self.stats.write(); + stats.active_pins = stats.active_pins.saturating_sub(records.len() as u64); + for record in &records { + stats.total_bytes = stats.total_bytes.saturating_sub(record.size); + } + } + + Ok(()) + } + + /// Get overall statistics + pub fn stats(&self) -> PinStats { + self.stats.read().clone() + } + + /// Check health of all pins + pub async fn health_check(&self) -> Vec { + let mut unhealthy_cids: Vec = Vec::new(); + let pins = self.pins.read(); + + for (cid_str, records) in pins.iter() { + let records: &Vec = records; + let healthy_count = records + .iter() + .filter(|p: &&PinRecord| p.is_healthy(self.config.max_verification_age)) + .count(); + + if healthy_count < self.config.default_redundancy.min_copies() { + unhealthy_cids.push(cid_str.clone()); + } + } + + unhealthy_cids + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_redundancy_levels() { + assert_eq!(RedundancyLevel::None.min_copies(), 1); + assert_eq!(RedundancyLevel::Standard.min_copies(), 3); + assert_eq!(RedundancyLevel::Enhanced.min_copies(), 5); + assert_eq!(RedundancyLevel::Critical.min_copies(), 7); + + assert!(!RedundancyLevel::Standard.requires_geo_distribution()); + assert!(RedundancyLevel::Enhanced.requires_geo_distribution()); + } + + #[test] + fn test_storage_node() { + let node = StorageNode::new("node-1", "https://node1.example.com", Region::NorthAmerica); + + assert!(node.healthy); + assert_eq!(node.available(), 100 * 1024 * 1024 * 1024); + assert!(node.can_store(1024)); + } + + #[test] + fn test_pin_record() { + let cid = ContentId::from_content(b"test content"); + let mut record = PinRecord::new(cid, "node-1", 1024); + + assert_eq!(record.status, PinStatus::Queued); + + record.mark_pinned(); + assert_eq!(record.status, PinStatus::Pinned); + assert_eq!(record.verification_count, 1); + } + + #[test] + fn test_pin_manager() { + let manager = PinManager::default_manager(); + + // Register nodes + manager.register_node(StorageNode::new("na-1", "https://na1.example.com", Region::NorthAmerica)); + manager.register_node(StorageNode::new("eu-1", "https://eu1.example.com", Region::Europe)); + manager.register_node(StorageNode::new("ap-1", "https://ap1.example.com", Region::AsiaPacific)); + + let healthy = manager.healthy_nodes(); + assert_eq!(healthy.len(), 3); + + let na_nodes = manager.nodes_by_region(Region::NorthAmerica); + assert_eq!(na_nodes.len(), 1); + } + + #[test] + fn test_region_codes() { + assert_eq!(Region::NorthAmerica.code(), "NA"); + assert_eq!(Region::Europe.code(), "EU"); + assert_eq!(Region::AsiaPacific.code(), "AP"); + } + + #[test] + fn test_pin_request() { + let cid = ContentId::from_content(b"pin request test"); + let request = PinRequest::new(cid.clone(), RedundancyLevel::Enhanced) + .with_regions(vec![Region::NorthAmerica, Region::Europe]); + + assert_eq!(request.redundancy, RedundancyLevel::Enhanced); + assert_eq!(request.preferred_regions.len(), 2); + } +}