diff --git a/apps/explorer/Cargo.toml b/apps/explorer/Cargo.toml index 3bd54d3..772a9b8 100644 --- a/apps/explorer/Cargo.toml +++ b/apps/explorer/Cargo.toml @@ -18,9 +18,9 @@ synor-rpc = { path = "../../crates/synor-rpc" } tokio = { version = "1.35", features = ["full"] } # Web framework -axum = { version = "0.7", features = ["json", "query"] } +axum = { version = "0.7", features = ["json", "query", "ws"] } tower = { version = "0.4", features = ["timeout", "limit"] } -tower-http = { version = "0.5", features = ["cors", "trace", "compression-gzip"] } +tower-http = { version = "0.5", features = ["cors", "trace", "compression-gzip", "fs"] } # Serialization serde = { version = "1.0", features = ["derive"] } @@ -51,6 +51,7 @@ moka = { version = "0.12", features = ["future"] } # Async utilities futures = "0.3" +tokio-tungstenite = "0.21" [dev-dependencies] tokio-test = "0.4" diff --git a/apps/explorer/src/main.rs b/apps/explorer/src/main.rs index a048f51..2933715 100644 --- a/apps/explorer/src/main.rs +++ b/apps/explorer/src/main.rs @@ -14,16 +14,21 @@ use std::time::Duration; use axum::http::{HeaderValue, Method}; use axum::{ - extract::{Path, Query, State}, + extract::{ + ws::{Message, WebSocket, WebSocketUpgrade}, + Path, Query, State, + }, http::StatusCode, response::IntoResponse, routing::get, Json, Router, }; +use futures::{SinkExt, StreamExt}; use moka::future::Cache; use serde::{Deserialize, Serialize}; use tower_http::compression::CompressionLayer; use tower_http::cors::{Any, CorsLayer}; +use tower_http::services::{ServeDir, ServeFile}; use tower_http::trace::TraceLayer; use tracing::{error, info}; @@ -36,6 +41,8 @@ pub struct ExplorerConfig { pub rpc_url: String, /// Server listen address. pub listen_addr: SocketAddr, + /// Directory containing static frontend files. + pub static_dir: Option, /// Cache TTL for blocks (seconds). pub block_cache_ttl: u64, /// Cache TTL for stats (seconds). @@ -51,6 +58,7 @@ impl Default for ExplorerConfig { ExplorerConfig { rpc_url: "http://localhost:17110".to_string(), listen_addr: "0.0.0.0:3000".parse().unwrap(), + static_dir: None, block_cache_ttl: 60, stats_cache_ttl: 10, max_page_size: 100, @@ -75,6 +83,12 @@ impl ExplorerConfig { } } + if let Ok(dir) = std::env::var("EXPLORER_STATIC_DIR") { + if std::path::Path::new(&dir).exists() { + config.static_dir = Some(dir); + } + } + if let Ok(ttl) = std::env::var("EXPLORER_BLOCK_CACHE_TTL") { if let Ok(ttl) = ttl.parse() { config.block_cache_ttl = ttl; @@ -417,14 +431,15 @@ impl ExplorerState { /// Health check endpoint. async fn health(State(state): State>) -> impl IntoResponse { - // Check RPC connection + // Check RPC connection by making a simple RPC call + #[derive(Deserialize)] + struct VersionResult { + version: String, + } let rpc_ok = state - .http_client - .get(format!("{}/health", state.config.rpc_url)) - .send() + .rpc_call::<_, VersionResult>("synor_getServerVersion", ()) .await - .map(|r| r.status().is_success()) - .unwrap_or(false); + .is_ok(); #[derive(Serialize)] #[serde(rename_all = "camelCase")] @@ -457,87 +472,82 @@ async fn get_stats( return Ok(Json(stats)); } - // Fetch from RPC + // Response types matching the actual node RPC #[derive(Deserialize)] #[serde(rename_all = "camelCase")] - struct DagInfo { - network: String, + struct NodeInfo { + version: String, + #[serde(default)] + protocol_version: u32, + peer_count: usize, block_count: u64, - header_count: u64, - tip_hashes: Vec, + blue_score: u64, + mempool_size: usize, + synced: bool, + } + + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct MiningInfo { + blocks: u64, difficulty: f64, - virtual_daa_score: u64, + networkhashps: u64, } #[derive(Deserialize)] #[serde(rename_all = "camelCase")] - struct HashrateInfo { - hashrate: f64, - block_rate: f64, + struct TipsResponse { + tips: Vec, } - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct CoinSupply { - circulating_supply: u64, - max_supply: u64, - } + // Make parallel RPC calls using correct method names + let info_fut = state.rpc_call::<_, NodeInfo>("synor_getInfo", ()); + let mining_fut = state.rpc_call::<_, MiningInfo>("synor_getMiningInfo", ()); + let tips_fut = state.rpc_call::<_, TipsResponse>("synor_getTips", ()); - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct PeerInfo { - peer_info: Vec, - } + let (info, mining, tips) = tokio::try_join!(info_fut, mining_fut, tips_fut)?; - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct MempoolInfo { - size: u64, - } + // Estimate block rate from difficulty (blocks per second) + // With 100ms target block time, ~10 blocks per second + let block_rate = 10.0; - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct NetInfo { - is_synced: bool, - } + // Calculate estimated supply based on block count + // Initial block reward: 100 SYNOR, halving every 210,000 blocks + let blocks = info.block_count; + let halvings = (blocks / 210_000).min(10) as u32; + let circulating_supply = if blocks > 0 { + // Approximate: sum of geometric series for each halving period + let mut supply = 0u64; + let mut remaining = blocks; + for h in 0..=halvings { + let period_blocks = remaining.min(210_000); + let reward = (100_u64 >> h) * 100_000_000; // in sompi + supply += period_blocks * reward; + remaining = remaining.saturating_sub(210_000); + } + supply + } else { + 0 + }; - // Make parallel RPC calls - let dag_info_fut = state.rpc_call::<_, DagInfo>("synor_getBlockDagInfo", ()); - let hashrate_fut = state.rpc_call::<_, HashrateInfo>("synor_getNetworkHashrate", ()); - let supply_fut = state.rpc_call::<_, CoinSupply>("synor_getCoinSupply", ()); - let peers_fut = state.rpc_call::<_, PeerInfo>("net_getPeerInfo", ()); - let net_info_fut = state.rpc_call::<_, NetInfo>("net_getInfo", ()); - - let (dag_info, hashrate_info, supply, peers, net_info) = tokio::try_join!( - dag_info_fut, - hashrate_fut, - supply_fut, - peers_fut, - net_info_fut, - )?; - - // Mempool info may fail on some setups, default to empty - let mempool = state - .rpc_call::<_, MempoolInfo>("synor_getMempoolInfo", ()) - .await - .unwrap_or(MempoolInfo { size: 0 }); + let max_supply = 21_000_000_u64 * 100_000_000; // 21M SYNOR in sompi let stats = NetworkStats { - network_id: dag_info.network, - is_synced: net_info.is_synced, - block_count: dag_info.block_count, - header_count: dag_info.header_count, - tip_count: dag_info.tip_hashes.len(), - virtual_daa_score: dag_info.virtual_daa_score, - difficulty: dag_info.difficulty, - hashrate: hashrate_info.hashrate, - hashrate_human: format_hashrate(hashrate_info.hashrate), - block_rate: hashrate_info.block_rate, - mempool_size: mempool.size, - peer_count: peers.peer_info.len(), - circulating_supply: supply.circulating_supply, - circulating_supply_human: format_synor(supply.circulating_supply), - max_supply: supply.max_supply, + network_id: "testnet".to_string(), + is_synced: info.synced, + block_count: info.block_count, + header_count: info.block_count, // Same as block_count in this impl + tip_count: tips.tips.len(), + virtual_daa_score: info.blue_score, + difficulty: mining.difficulty, + hashrate: mining.networkhashps as f64, + hashrate_human: format_hashrate(mining.networkhashps as f64), + block_rate, + mempool_size: info.mempool_size as u64, + peer_count: info.peer_count, + circulating_supply, + circulating_supply_human: format_synor(circulating_supply), + max_supply, }; // Cache the result @@ -599,12 +609,25 @@ async fn get_blocks( Query(params): Query, ) -> Result>, ApiError> { let limit = params.limit.min(state.config.max_page_size); - let offset = (params.page.saturating_sub(1)) * limit; - // Get tips first - let tips: Vec = state.rpc_call("synor_getTips", ()).await?; + // Get block count and current blue score + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct BlockCount { + block_count: u64, + } - if tips.is_empty() { + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct BlueScore { + blue_score: u64, + } + + let count: BlockCount = state.rpc_call("synor_getBlockCount", ()).await?; + let score: BlueScore = state.rpc_call("synor_getBlueScore", ()).await?; + + let total = count.block_count as usize; + if total == 0 { return Ok(Json(PaginatedResponse { data: vec![], page: params.page, @@ -616,61 +639,51 @@ async fn get_blocks( })); } - // Get headers from first tip - #[derive(Serialize)] - struct GetHeadersParams { - start_hash: String, - limit: u64, - is_ascending: bool, - } + // Fetch blocks by blue score (most recent first) + let start_score = score.blue_score.saturating_sub((params.page.saturating_sub(1) * limit) as u64); + let blocks_data: Vec = state + .rpc_call("synor_getBlocksByBlueScore", (start_score, true)) + .await + .unwrap_or_else(|_| vec![]); - let headers: Vec = state - .rpc_call( - "synor_getHeaders", - GetHeadersParams { - start_hash: tips[0].clone(), - limit: (offset + limit) as u64, - is_ascending: false, - }, - ) - .await?; - - // Skip offset and take limit - let page_headers: Vec<_> = headers.into_iter().skip(offset).take(limit).collect(); - - // Convert to explorer blocks (without full tx data for listing) - let blocks: Vec = page_headers + // Convert to explorer blocks + let blocks: Vec = blocks_data .into_iter() - .map(|h| ExplorerBlock { - hash: h.hash, - version: h.version, - parent_hashes: h.parent_hashes, - timestamp: h.timestamp, - timestamp_human: format_timestamp(h.timestamp), - bits: h.bits, - nonce: h.nonce, - daa_score: h.daa_score, - blue_score: h.blue_score, - blue_work: h.blue_work, - difficulty: 0.0, // Would need verbose data - transaction_count: 0, // Unknown without fetching full block - is_chain_block: true, // Assume chain block for headers - transactions: None, - children_hashes: vec![], - merge_set_blues: vec![], - merge_set_reds: vec![], + .take(limit) + .filter_map(|b| { + let hash = b.get("hash")?.as_str()?.to_string(); + let header = b.get("header")?; + let timestamp = header.get("timestamp")?.as_u64()?; + + Some(ExplorerBlock { + hash: hash.clone(), + version: header.get("version")?.as_u64()? as u32, + parent_hashes: header + .get("parents") + .and_then(|p| p.as_array()) + .map(|a| a.iter().filter_map(|v| v.as_str().map(String::from)).collect()) + .unwrap_or_default(), + timestamp, + timestamp_human: format_timestamp(timestamp), + bits: header.get("bits")?.as_u64()? as u32, + nonce: header.get("nonce")?.as_u64()?, + daa_score: header.get("blueScore").and_then(|v| v.as_u64()).unwrap_or(0), + blue_score: header.get("blueScore").and_then(|v| v.as_u64()).unwrap_or(0), + blue_work: String::new(), + difficulty: 0.0, + transaction_count: b.get("transactions") + .and_then(|t| t.as_array()) + .map(|a| a.len()) + .unwrap_or(0), + is_chain_block: true, + transactions: None, + children_hashes: vec![], + merge_set_blues: vec![], + merge_set_reds: vec![], + }) }) .collect(); - // Get total count - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct BlockCount { - block_count: u64, - } - - let count: BlockCount = state.rpc_call("synor_getBlockCount", ()).await?; - let total = count.block_count as usize; let total_pages = total.div_ceil(limit); Ok(Json(PaginatedResponse { @@ -686,8 +699,12 @@ async fn get_blocks( /// Get current DAG tips. async fn get_tips(State(state): State>) -> Result>, ApiError> { - let tips: Vec = state.rpc_call("synor_getTips", ()).await?; - Ok(Json(tips)) + #[derive(Deserialize)] + struct TipsResponse { + tips: Vec, + } + let response: TipsResponse = state.rpc_call("synor_getTips", ()).await?; + Ok(Json(response.tips)) } /// Get transaction by ID. @@ -799,59 +816,87 @@ async fn get_dag( let depth = params.depth.unwrap_or(10).min(50); // Get tips - let tips: Vec = state.rpc_call("synor_getTips", ()).await?; + #[derive(Deserialize)] + struct TipsResponse { + tips: Vec, + } + let tips_resp: TipsResponse = state.rpc_call("synor_getTips", ()).await?; - if tips.is_empty() { + if tips_resp.tips.is_empty() { return Ok(Json(DagVisualization { blocks: vec![], edges: vec![], })); } - // Get headers from tips - #[derive(Serialize)] - struct GetHeadersParams { - start_hash: String, - limit: u64, - is_ascending: bool, + // Get current blue score to fetch recent blocks + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct BlueScore { + blue_score: u64, } + let score: BlueScore = state.rpc_call("synor_getBlueScore", ()).await?; + let mut all_hashes = std::collections::HashSet::new(); let mut blocks = Vec::new(); let mut edges = Vec::new(); - for tip in tips.iter().take(5) { - let headers: Vec = state - .rpc_call( - "synor_getHeaders", - GetHeadersParams { - start_hash: tip.clone(), - limit: depth as u64, - is_ascending: false, - }, - ) - .await?; + // Fetch blocks around the current blue score + for i in 0..depth as u64 { + let target_score = score.blue_score.saturating_sub(i); + let blocks_data: Vec = state + .rpc_call("synor_getBlocksByBlueScore", (target_score, true)) + .await + .unwrap_or_else(|_| vec![]); - for header in headers { - if all_hashes.insert(header.hash.clone()) { - // Add edges to parents - for (i, parent) in header.parent_hashes.iter().enumerate() { - edges.push(DagEdge { - from: header.hash.clone(), - to: parent.clone(), - is_selected_parent: i == 0, + for b in blocks_data { + if let Some(hash) = b.get("hash").and_then(|h| h.as_str()) { + if all_hashes.insert(hash.to_string()) { + let header = b.get("header"); + + // Add edges to parents + if let Some(parents) = header + .and_then(|h| h.get("parents")) + .and_then(|p| p.as_array()) + { + for (i, parent) in parents.iter().enumerate() { + if let Some(parent_hash) = parent.as_str() { + edges.push(DagEdge { + from: hash.to_string(), + to: parent_hash.to_string(), + is_selected_parent: i == 0, + }); + } + } + } + + let timestamp = header + .and_then(|h| h.get("timestamp")) + .and_then(|t| t.as_u64()) + .unwrap_or(0); + + let blue_score_val = header + .and_then(|h| h.get("blueScore")) + .and_then(|s| s.as_u64()) + .unwrap_or(target_score); + + let tx_count = b + .get("transactions") + .and_then(|t| t.as_array()) + .map(|a| a.len()) + .unwrap_or(0); + + blocks.push(DagBlock { + hash: hash.to_string(), + short_hash: hash.chars().take(8).collect(), + blue_score: blue_score_val, + is_blue: true, + is_chain_block: tips_resp.tips.contains(&hash.to_string()), + timestamp, + tx_count, }); } - - blocks.push(DagBlock { - hash: header.hash.clone(), - short_hash: header.hash.chars().take(8).collect(), - blue_score: header.blue_score, - is_blue: true, // Would need verbose data - is_chain_block: true, // Would need verbose data - timestamp: header.timestamp, - tx_count: 0, // Unknown from header - }); } } } @@ -1130,6 +1175,156 @@ fn format_timestamp(ts: u64) -> String { .unwrap_or_else(|| "Unknown".to_string()) } +// ==================== WebSocket ==================== + +/// WebSocket upgrade handler. +async fn ws_handler( + ws: WebSocketUpgrade, + State(state): State>, +) -> impl IntoResponse { + ws.on_upgrade(move |socket| handle_websocket(socket, state)) +} + +/// Handle WebSocket connection with real-time updates. +async fn handle_websocket(socket: WebSocket, state: Arc) { + let (mut sender, mut receiver) = socket.split(); + + // Track last known values for change detection + let mut last_block_count: u64 = 0; + let mut last_blue_score: u64 = 0; + + // Spawn a task to send periodic updates + let state_clone = state.clone(); + let send_task = tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + + loop { + interval.tick().await; + + // Get current stats + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct NodeInfo { + block_count: u64, + blue_score: u64, + mempool_size: usize, + peer_count: usize, + synced: bool, + } + + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct MiningInfo { + difficulty: f64, + networkhashps: u64, + } + + // Fetch current state + let info_result = state_clone + .rpc_call::<_, NodeInfo>("synor_getInfo", ()) + .await; + let mining_result = state_clone + .rpc_call::<_, MiningInfo>("synor_getMiningInfo", ()) + .await; + + if let (Ok(info), Ok(mining)) = (info_result, mining_result) { + // Send stats update + let stats_event = serde_json::json!({ + "type": "stats_update", + "blockCount": info.block_count, + "virtualDaaScore": info.blue_score, + "difficulty": mining.difficulty, + "mempoolSize": info.mempool_size, + "hashrate": mining.networkhashps as f64, + "hashrateHuman": format_hashrate(mining.networkhashps as f64), + }); + + if sender + .send(Message::Text(stats_event.to_string().into())) + .await + .is_err() + { + break; + } + + // Check for new blocks + if info.block_count > last_block_count { + // New block detected + let block_event = serde_json::json!({ + "type": "new_block", + "hash": format!("{:064x}", info.blue_score), // Placeholder hash + "blueScore": info.blue_score, + "timestamp": chrono::Utc::now().timestamp_millis(), + "txCount": 1, + "isChainBlock": true, + }); + + if sender + .send(Message::Text(block_event.to_string().into())) + .await + .is_err() + { + break; + } + + last_block_count = info.block_count; + } + + // Check for tip updates + if info.blue_score != last_blue_score { + #[derive(Deserialize)] + struct TipsResponse { + tips: Vec, + } + + if let Ok(tips_resp) = state_clone + .rpc_call::<_, TipsResponse>("synor_getTips", ()) + .await + { + let tip_event = serde_json::json!({ + "type": "tip_update", + "tips": tips_resp.tips, + "tipCount": tips_resp.tips.len(), + }); + + if sender + .send(Message::Text(tip_event.to_string().into())) + .await + .is_err() + { + break; + } + } + + last_blue_score = info.blue_score; + } + } + } + }); + + // Handle incoming messages (for potential future subscriptions) + let recv_task = tokio::spawn(async move { + while let Some(msg) = receiver.next().await { + match msg { + Ok(Message::Close(_)) => break, + Ok(Message::Ping(data)) => { + // Pong is handled automatically by axum + let _ = data; + } + _ => {} + } + } + }); + + // Wait for either task to complete + tokio::select! { + _ = send_task => {}, + _ = recv_task => {}, + } + + info!("WebSocket connection closed"); +} + // ==================== Main ==================== #[tokio::main] @@ -1150,15 +1345,20 @@ async fn main() -> anyhow::Result<()> { info!("Starting Synor Block Explorer Backend..."); info!("RPC URL: {}", config.rpc_url); info!("Listen address: {}", config.listen_addr); + if let Some(ref dir) = config.static_dir { + info!("Static files: {}", dir); + } // Create application state let state = Arc::new(ExplorerState::new(config.clone())); - // Build router - let app = Router::new() + // Build API router + let api_router = Router::new() // Health & Info .route("/health", get(health)) .route("/api/v1/stats", get(get_stats)) + // WebSocket for real-time updates + .route("/ws", get(ws_handler)) // Blocks .route("/api/v1/blocks", get(get_blocks)) .route("/api/v1/blocks/:hash", get(get_block)) @@ -1173,10 +1373,27 @@ async fn main() -> anyhow::Result<()> { .route("/api/v1/dag", get(get_dag)) // Search .route("/api/v1/search", get(search)) - .with_state(state) - .layer(TraceLayer::new_for_http()) - .layer(CompressionLayer::new()) - .layer(config.cors_layer()); + .with_state(state); + + // Build full app with optional static file serving + let app = if let Some(ref static_dir) = config.static_dir { + // Serve static files with SPA fallback (index.html for client-side routing) + let index_path = format!("{}/index.html", static_dir); + let serve_dir = ServeDir::new(static_dir) + .not_found_service(ServeFile::new(&index_path)); + + api_router + .fallback_service(serve_dir) + .layer(TraceLayer::new_for_http()) + .layer(CompressionLayer::new()) + .layer(config.cors_layer()) + } else { + // API-only mode + api_router + .layer(TraceLayer::new_for_http()) + .layer(CompressionLayer::new()) + .layer(config.cors_layer()) + }; // Start server let listener = tokio::net::TcpListener::bind(&config.listen_addr).await?; diff --git a/docker-compose.testnet.yml b/docker-compose.testnet.yml index 47cfa35..ad41743 100644 --- a/docker-compose.testnet.yml +++ b/docker-compose.testnet.yml @@ -151,6 +151,8 @@ services: - SYNOR_RPC_URL=http://seed1:17110 - SYNOR_WS_URL=ws://seed1:17111 - DATABASE_URL=postgres://synor:synor@postgres:5432/explorer + - EXPLORER_STATIC_DIR=/var/www/explorer + - EXPLORER_CORS_ORIGINS=* - RUST_LOG=info networks: - synor-testnet