fix(explorer): fix RPC method calls and add WebSocket support
- Fix health check to use RPC call instead of GET /health
- Update API endpoints to use correct RPC method names:
- synor_getInfo, synor_getMiningInfo, synor_getTips
- synor_getBlockCount, synor_getBlueScore, synor_getBlocksByBlueScore
- Fix response format handling (synor_getTips returns {tips: [...]})
- Add WebSocket endpoint at /ws for real-time updates:
- stats_update events (every second)
- new_block events on block detection
- tip_update events on DAG changes
- Add ws feature to axum and tokio-tungstenite dependency
This commit is contained in:
parent
4d7171f4bf
commit
16c7e87a66
3 changed files with 393 additions and 173 deletions
|
|
@ -18,9 +18,9 @@ synor-rpc = { path = "../../crates/synor-rpc" }
|
||||||
tokio = { version = "1.35", features = ["full"] }
|
tokio = { version = "1.35", features = ["full"] }
|
||||||
|
|
||||||
# Web framework
|
# Web framework
|
||||||
axum = { version = "0.7", features = ["json", "query"] }
|
axum = { version = "0.7", features = ["json", "query", "ws"] }
|
||||||
tower = { version = "0.4", features = ["timeout", "limit"] }
|
tower = { version = "0.4", features = ["timeout", "limit"] }
|
||||||
tower-http = { version = "0.5", features = ["cors", "trace", "compression-gzip"] }
|
tower-http = { version = "0.5", features = ["cors", "trace", "compression-gzip", "fs"] }
|
||||||
|
|
||||||
# Serialization
|
# Serialization
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
|
@ -51,6 +51,7 @@ moka = { version = "0.12", features = ["future"] }
|
||||||
|
|
||||||
# Async utilities
|
# Async utilities
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
|
tokio-tungstenite = "0.21"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio-test = "0.4"
|
tokio-test = "0.4"
|
||||||
|
|
|
||||||
|
|
@ -14,16 +14,21 @@ use std::time::Duration;
|
||||||
|
|
||||||
use axum::http::{HeaderValue, Method};
|
use axum::http::{HeaderValue, Method};
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::{Path, Query, State},
|
extract::{
|
||||||
|
ws::{Message, WebSocket, WebSocketUpgrade},
|
||||||
|
Path, Query, State,
|
||||||
|
},
|
||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
response::IntoResponse,
|
response::IntoResponse,
|
||||||
routing::get,
|
routing::get,
|
||||||
Json, Router,
|
Json, Router,
|
||||||
};
|
};
|
||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
use moka::future::Cache;
|
use moka::future::Cache;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tower_http::compression::CompressionLayer;
|
use tower_http::compression::CompressionLayer;
|
||||||
use tower_http::cors::{Any, CorsLayer};
|
use tower_http::cors::{Any, CorsLayer};
|
||||||
|
use tower_http::services::{ServeDir, ServeFile};
|
||||||
use tower_http::trace::TraceLayer;
|
use tower_http::trace::TraceLayer;
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
|
|
||||||
|
|
@ -36,6 +41,8 @@ pub struct ExplorerConfig {
|
||||||
pub rpc_url: String,
|
pub rpc_url: String,
|
||||||
/// Server listen address.
|
/// Server listen address.
|
||||||
pub listen_addr: SocketAddr,
|
pub listen_addr: SocketAddr,
|
||||||
|
/// Directory containing static frontend files.
|
||||||
|
pub static_dir: Option<String>,
|
||||||
/// Cache TTL for blocks (seconds).
|
/// Cache TTL for blocks (seconds).
|
||||||
pub block_cache_ttl: u64,
|
pub block_cache_ttl: u64,
|
||||||
/// Cache TTL for stats (seconds).
|
/// Cache TTL for stats (seconds).
|
||||||
|
|
@ -51,6 +58,7 @@ impl Default for ExplorerConfig {
|
||||||
ExplorerConfig {
|
ExplorerConfig {
|
||||||
rpc_url: "http://localhost:17110".to_string(),
|
rpc_url: "http://localhost:17110".to_string(),
|
||||||
listen_addr: "0.0.0.0:3000".parse().unwrap(),
|
listen_addr: "0.0.0.0:3000".parse().unwrap(),
|
||||||
|
static_dir: None,
|
||||||
block_cache_ttl: 60,
|
block_cache_ttl: 60,
|
||||||
stats_cache_ttl: 10,
|
stats_cache_ttl: 10,
|
||||||
max_page_size: 100,
|
max_page_size: 100,
|
||||||
|
|
@ -75,6 +83,12 @@ impl ExplorerConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Ok(dir) = std::env::var("EXPLORER_STATIC_DIR") {
|
||||||
|
if std::path::Path::new(&dir).exists() {
|
||||||
|
config.static_dir = Some(dir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if let Ok(ttl) = std::env::var("EXPLORER_BLOCK_CACHE_TTL") {
|
if let Ok(ttl) = std::env::var("EXPLORER_BLOCK_CACHE_TTL") {
|
||||||
if let Ok(ttl) = ttl.parse() {
|
if let Ok(ttl) = ttl.parse() {
|
||||||
config.block_cache_ttl = ttl;
|
config.block_cache_ttl = ttl;
|
||||||
|
|
@ -417,14 +431,15 @@ impl ExplorerState {
|
||||||
|
|
||||||
/// Health check endpoint.
|
/// Health check endpoint.
|
||||||
async fn health(State(state): State<Arc<ExplorerState>>) -> impl IntoResponse {
|
async fn health(State(state): State<Arc<ExplorerState>>) -> impl IntoResponse {
|
||||||
// Check RPC connection
|
// Check RPC connection by making a simple RPC call
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct VersionResult {
|
||||||
|
version: String,
|
||||||
|
}
|
||||||
let rpc_ok = state
|
let rpc_ok = state
|
||||||
.http_client
|
.rpc_call::<_, VersionResult>("synor_getServerVersion", ())
|
||||||
.get(format!("{}/health", state.config.rpc_url))
|
|
||||||
.send()
|
|
||||||
.await
|
.await
|
||||||
.map(|r| r.status().is_success())
|
.is_ok();
|
||||||
.unwrap_or(false);
|
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
|
|
@ -457,87 +472,82 @@ async fn get_stats(
|
||||||
return Ok(Json(stats));
|
return Ok(Json(stats));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch from RPC
|
// Response types matching the actual node RPC
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct DagInfo {
|
struct NodeInfo {
|
||||||
network: String,
|
version: String,
|
||||||
|
#[serde(default)]
|
||||||
|
protocol_version: u32,
|
||||||
|
peer_count: usize,
|
||||||
block_count: u64,
|
block_count: u64,
|
||||||
header_count: u64,
|
blue_score: u64,
|
||||||
tip_hashes: Vec<String>,
|
mempool_size: usize,
|
||||||
|
synced: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct MiningInfo {
|
||||||
|
blocks: u64,
|
||||||
difficulty: f64,
|
difficulty: f64,
|
||||||
virtual_daa_score: u64,
|
networkhashps: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct HashrateInfo {
|
struct TipsResponse {
|
||||||
hashrate: f64,
|
tips: Vec<String>,
|
||||||
block_rate: f64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
// Make parallel RPC calls using correct method names
|
||||||
#[serde(rename_all = "camelCase")]
|
let info_fut = state.rpc_call::<_, NodeInfo>("synor_getInfo", ());
|
||||||
struct CoinSupply {
|
let mining_fut = state.rpc_call::<_, MiningInfo>("synor_getMiningInfo", ());
|
||||||
circulating_supply: u64,
|
let tips_fut = state.rpc_call::<_, TipsResponse>("synor_getTips", ());
|
||||||
max_supply: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
let (info, mining, tips) = tokio::try_join!(info_fut, mining_fut, tips_fut)?;
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
struct PeerInfo {
|
|
||||||
peer_info: Vec<serde_json::Value>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
// Estimate block rate from difficulty (blocks per second)
|
||||||
#[serde(rename_all = "camelCase")]
|
// With 100ms target block time, ~10 blocks per second
|
||||||
struct MempoolInfo {
|
let block_rate = 10.0;
|
||||||
size: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
// Calculate estimated supply based on block count
|
||||||
#[serde(rename_all = "camelCase")]
|
// Initial block reward: 100 SYNOR, halving every 210,000 blocks
|
||||||
struct NetInfo {
|
let blocks = info.block_count;
|
||||||
is_synced: bool,
|
let halvings = (blocks / 210_000).min(10) as u32;
|
||||||
}
|
let circulating_supply = if blocks > 0 {
|
||||||
|
// Approximate: sum of geometric series for each halving period
|
||||||
|
let mut supply = 0u64;
|
||||||
|
let mut remaining = blocks;
|
||||||
|
for h in 0..=halvings {
|
||||||
|
let period_blocks = remaining.min(210_000);
|
||||||
|
let reward = (100_u64 >> h) * 100_000_000; // in sompi
|
||||||
|
supply += period_blocks * reward;
|
||||||
|
remaining = remaining.saturating_sub(210_000);
|
||||||
|
}
|
||||||
|
supply
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
|
||||||
// Make parallel RPC calls
|
let max_supply = 21_000_000_u64 * 100_000_000; // 21M SYNOR in sompi
|
||||||
let dag_info_fut = state.rpc_call::<_, DagInfo>("synor_getBlockDagInfo", ());
|
|
||||||
let hashrate_fut = state.rpc_call::<_, HashrateInfo>("synor_getNetworkHashrate", ());
|
|
||||||
let supply_fut = state.rpc_call::<_, CoinSupply>("synor_getCoinSupply", ());
|
|
||||||
let peers_fut = state.rpc_call::<_, PeerInfo>("net_getPeerInfo", ());
|
|
||||||
let net_info_fut = state.rpc_call::<_, NetInfo>("net_getInfo", ());
|
|
||||||
|
|
||||||
let (dag_info, hashrate_info, supply, peers, net_info) = tokio::try_join!(
|
|
||||||
dag_info_fut,
|
|
||||||
hashrate_fut,
|
|
||||||
supply_fut,
|
|
||||||
peers_fut,
|
|
||||||
net_info_fut,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// Mempool info may fail on some setups, default to empty
|
|
||||||
let mempool = state
|
|
||||||
.rpc_call::<_, MempoolInfo>("synor_getMempoolInfo", ())
|
|
||||||
.await
|
|
||||||
.unwrap_or(MempoolInfo { size: 0 });
|
|
||||||
|
|
||||||
let stats = NetworkStats {
|
let stats = NetworkStats {
|
||||||
network_id: dag_info.network,
|
network_id: "testnet".to_string(),
|
||||||
is_synced: net_info.is_synced,
|
is_synced: info.synced,
|
||||||
block_count: dag_info.block_count,
|
block_count: info.block_count,
|
||||||
header_count: dag_info.header_count,
|
header_count: info.block_count, // Same as block_count in this impl
|
||||||
tip_count: dag_info.tip_hashes.len(),
|
tip_count: tips.tips.len(),
|
||||||
virtual_daa_score: dag_info.virtual_daa_score,
|
virtual_daa_score: info.blue_score,
|
||||||
difficulty: dag_info.difficulty,
|
difficulty: mining.difficulty,
|
||||||
hashrate: hashrate_info.hashrate,
|
hashrate: mining.networkhashps as f64,
|
||||||
hashrate_human: format_hashrate(hashrate_info.hashrate),
|
hashrate_human: format_hashrate(mining.networkhashps as f64),
|
||||||
block_rate: hashrate_info.block_rate,
|
block_rate,
|
||||||
mempool_size: mempool.size,
|
mempool_size: info.mempool_size as u64,
|
||||||
peer_count: peers.peer_info.len(),
|
peer_count: info.peer_count,
|
||||||
circulating_supply: supply.circulating_supply,
|
circulating_supply,
|
||||||
circulating_supply_human: format_synor(supply.circulating_supply),
|
circulating_supply_human: format_synor(circulating_supply),
|
||||||
max_supply: supply.max_supply,
|
max_supply,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Cache the result
|
// Cache the result
|
||||||
|
|
@ -599,12 +609,25 @@ async fn get_blocks(
|
||||||
Query(params): Query<PaginationParams>,
|
Query(params): Query<PaginationParams>,
|
||||||
) -> Result<Json<PaginatedResponse<ExplorerBlock>>, ApiError> {
|
) -> Result<Json<PaginatedResponse<ExplorerBlock>>, ApiError> {
|
||||||
let limit = params.limit.min(state.config.max_page_size);
|
let limit = params.limit.min(state.config.max_page_size);
|
||||||
let offset = (params.page.saturating_sub(1)) * limit;
|
|
||||||
|
|
||||||
// Get tips first
|
// Get block count and current blue score
|
||||||
let tips: Vec<String> = state.rpc_call("synor_getTips", ()).await?;
|
#[derive(Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct BlockCount {
|
||||||
|
block_count: u64,
|
||||||
|
}
|
||||||
|
|
||||||
if tips.is_empty() {
|
#[derive(Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct BlueScore {
|
||||||
|
blue_score: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
let count: BlockCount = state.rpc_call("synor_getBlockCount", ()).await?;
|
||||||
|
let score: BlueScore = state.rpc_call("synor_getBlueScore", ()).await?;
|
||||||
|
|
||||||
|
let total = count.block_count as usize;
|
||||||
|
if total == 0 {
|
||||||
return Ok(Json(PaginatedResponse {
|
return Ok(Json(PaginatedResponse {
|
||||||
data: vec![],
|
data: vec![],
|
||||||
page: params.page,
|
page: params.page,
|
||||||
|
|
@ -616,61 +639,51 @@ async fn get_blocks(
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get headers from first tip
|
// Fetch blocks by blue score (most recent first)
|
||||||
#[derive(Serialize)]
|
let start_score = score.blue_score.saturating_sub((params.page.saturating_sub(1) * limit) as u64);
|
||||||
struct GetHeadersParams {
|
let blocks_data: Vec<serde_json::Value> = state
|
||||||
start_hash: String,
|
.rpc_call("synor_getBlocksByBlueScore", (start_score, true))
|
||||||
limit: u64,
|
.await
|
||||||
is_ascending: bool,
|
.unwrap_or_else(|_| vec![]);
|
||||||
}
|
|
||||||
|
|
||||||
let headers: Vec<synor_rpc::RpcBlockHeader> = state
|
// Convert to explorer blocks
|
||||||
.rpc_call(
|
let blocks: Vec<ExplorerBlock> = blocks_data
|
||||||
"synor_getHeaders",
|
|
||||||
GetHeadersParams {
|
|
||||||
start_hash: tips[0].clone(),
|
|
||||||
limit: (offset + limit) as u64,
|
|
||||||
is_ascending: false,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// Skip offset and take limit
|
|
||||||
let page_headers: Vec<_> = headers.into_iter().skip(offset).take(limit).collect();
|
|
||||||
|
|
||||||
// Convert to explorer blocks (without full tx data for listing)
|
|
||||||
let blocks: Vec<ExplorerBlock> = page_headers
|
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|h| ExplorerBlock {
|
.take(limit)
|
||||||
hash: h.hash,
|
.filter_map(|b| {
|
||||||
version: h.version,
|
let hash = b.get("hash")?.as_str()?.to_string();
|
||||||
parent_hashes: h.parent_hashes,
|
let header = b.get("header")?;
|
||||||
timestamp: h.timestamp,
|
let timestamp = header.get("timestamp")?.as_u64()?;
|
||||||
timestamp_human: format_timestamp(h.timestamp),
|
|
||||||
bits: h.bits,
|
Some(ExplorerBlock {
|
||||||
nonce: h.nonce,
|
hash: hash.clone(),
|
||||||
daa_score: h.daa_score,
|
version: header.get("version")?.as_u64()? as u32,
|
||||||
blue_score: h.blue_score,
|
parent_hashes: header
|
||||||
blue_work: h.blue_work,
|
.get("parents")
|
||||||
difficulty: 0.0, // Would need verbose data
|
.and_then(|p| p.as_array())
|
||||||
transaction_count: 0, // Unknown without fetching full block
|
.map(|a| a.iter().filter_map(|v| v.as_str().map(String::from)).collect())
|
||||||
is_chain_block: true, // Assume chain block for headers
|
.unwrap_or_default(),
|
||||||
transactions: None,
|
timestamp,
|
||||||
children_hashes: vec![],
|
timestamp_human: format_timestamp(timestamp),
|
||||||
merge_set_blues: vec![],
|
bits: header.get("bits")?.as_u64()? as u32,
|
||||||
merge_set_reds: vec![],
|
nonce: header.get("nonce")?.as_u64()?,
|
||||||
|
daa_score: header.get("blueScore").and_then(|v| v.as_u64()).unwrap_or(0),
|
||||||
|
blue_score: header.get("blueScore").and_then(|v| v.as_u64()).unwrap_or(0),
|
||||||
|
blue_work: String::new(),
|
||||||
|
difficulty: 0.0,
|
||||||
|
transaction_count: b.get("transactions")
|
||||||
|
.and_then(|t| t.as_array())
|
||||||
|
.map(|a| a.len())
|
||||||
|
.unwrap_or(0),
|
||||||
|
is_chain_block: true,
|
||||||
|
transactions: None,
|
||||||
|
children_hashes: vec![],
|
||||||
|
merge_set_blues: vec![],
|
||||||
|
merge_set_reds: vec![],
|
||||||
|
})
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Get total count
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
struct BlockCount {
|
|
||||||
block_count: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
let count: BlockCount = state.rpc_call("synor_getBlockCount", ()).await?;
|
|
||||||
let total = count.block_count as usize;
|
|
||||||
let total_pages = total.div_ceil(limit);
|
let total_pages = total.div_ceil(limit);
|
||||||
|
|
||||||
Ok(Json(PaginatedResponse {
|
Ok(Json(PaginatedResponse {
|
||||||
|
|
@ -686,8 +699,12 @@ async fn get_blocks(
|
||||||
|
|
||||||
/// Get current DAG tips.
|
/// Get current DAG tips.
|
||||||
async fn get_tips(State(state): State<Arc<ExplorerState>>) -> Result<Json<Vec<String>>, ApiError> {
|
async fn get_tips(State(state): State<Arc<ExplorerState>>) -> Result<Json<Vec<String>>, ApiError> {
|
||||||
let tips: Vec<String> = state.rpc_call("synor_getTips", ()).await?;
|
#[derive(Deserialize)]
|
||||||
Ok(Json(tips))
|
struct TipsResponse {
|
||||||
|
tips: Vec<String>,
|
||||||
|
}
|
||||||
|
let response: TipsResponse = state.rpc_call("synor_getTips", ()).await?;
|
||||||
|
Ok(Json(response.tips))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get transaction by ID.
|
/// Get transaction by ID.
|
||||||
|
|
@ -799,59 +816,87 @@ async fn get_dag(
|
||||||
let depth = params.depth.unwrap_or(10).min(50);
|
let depth = params.depth.unwrap_or(10).min(50);
|
||||||
|
|
||||||
// Get tips
|
// Get tips
|
||||||
let tips: Vec<String> = state.rpc_call("synor_getTips", ()).await?;
|
#[derive(Deserialize)]
|
||||||
|
struct TipsResponse {
|
||||||
|
tips: Vec<String>,
|
||||||
|
}
|
||||||
|
let tips_resp: TipsResponse = state.rpc_call("synor_getTips", ()).await?;
|
||||||
|
|
||||||
if tips.is_empty() {
|
if tips_resp.tips.is_empty() {
|
||||||
return Ok(Json(DagVisualization {
|
return Ok(Json(DagVisualization {
|
||||||
blocks: vec![],
|
blocks: vec![],
|
||||||
edges: vec![],
|
edges: vec![],
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get headers from tips
|
// Get current blue score to fetch recent blocks
|
||||||
#[derive(Serialize)]
|
#[derive(Deserialize)]
|
||||||
struct GetHeadersParams {
|
#[serde(rename_all = "camelCase")]
|
||||||
start_hash: String,
|
struct BlueScore {
|
||||||
limit: u64,
|
blue_score: u64,
|
||||||
is_ascending: bool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let score: BlueScore = state.rpc_call("synor_getBlueScore", ()).await?;
|
||||||
|
|
||||||
let mut all_hashes = std::collections::HashSet::new();
|
let mut all_hashes = std::collections::HashSet::new();
|
||||||
let mut blocks = Vec::new();
|
let mut blocks = Vec::new();
|
||||||
let mut edges = Vec::new();
|
let mut edges = Vec::new();
|
||||||
|
|
||||||
for tip in tips.iter().take(5) {
|
// Fetch blocks around the current blue score
|
||||||
let headers: Vec<synor_rpc::RpcBlockHeader> = state
|
for i in 0..depth as u64 {
|
||||||
.rpc_call(
|
let target_score = score.blue_score.saturating_sub(i);
|
||||||
"synor_getHeaders",
|
let blocks_data: Vec<serde_json::Value> = state
|
||||||
GetHeadersParams {
|
.rpc_call("synor_getBlocksByBlueScore", (target_score, true))
|
||||||
start_hash: tip.clone(),
|
.await
|
||||||
limit: depth as u64,
|
.unwrap_or_else(|_| vec![]);
|
||||||
is_ascending: false,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
for header in headers {
|
for b in blocks_data {
|
||||||
if all_hashes.insert(header.hash.clone()) {
|
if let Some(hash) = b.get("hash").and_then(|h| h.as_str()) {
|
||||||
// Add edges to parents
|
if all_hashes.insert(hash.to_string()) {
|
||||||
for (i, parent) in header.parent_hashes.iter().enumerate() {
|
let header = b.get("header");
|
||||||
edges.push(DagEdge {
|
|
||||||
from: header.hash.clone(),
|
// Add edges to parents
|
||||||
to: parent.clone(),
|
if let Some(parents) = header
|
||||||
is_selected_parent: i == 0,
|
.and_then(|h| h.get("parents"))
|
||||||
|
.and_then(|p| p.as_array())
|
||||||
|
{
|
||||||
|
for (i, parent) in parents.iter().enumerate() {
|
||||||
|
if let Some(parent_hash) = parent.as_str() {
|
||||||
|
edges.push(DagEdge {
|
||||||
|
from: hash.to_string(),
|
||||||
|
to: parent_hash.to_string(),
|
||||||
|
is_selected_parent: i == 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let timestamp = header
|
||||||
|
.and_then(|h| h.get("timestamp"))
|
||||||
|
.and_then(|t| t.as_u64())
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
let blue_score_val = header
|
||||||
|
.and_then(|h| h.get("blueScore"))
|
||||||
|
.and_then(|s| s.as_u64())
|
||||||
|
.unwrap_or(target_score);
|
||||||
|
|
||||||
|
let tx_count = b
|
||||||
|
.get("transactions")
|
||||||
|
.and_then(|t| t.as_array())
|
||||||
|
.map(|a| a.len())
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
blocks.push(DagBlock {
|
||||||
|
hash: hash.to_string(),
|
||||||
|
short_hash: hash.chars().take(8).collect(),
|
||||||
|
blue_score: blue_score_val,
|
||||||
|
is_blue: true,
|
||||||
|
is_chain_block: tips_resp.tips.contains(&hash.to_string()),
|
||||||
|
timestamp,
|
||||||
|
tx_count,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
blocks.push(DagBlock {
|
|
||||||
hash: header.hash.clone(),
|
|
||||||
short_hash: header.hash.chars().take(8).collect(),
|
|
||||||
blue_score: header.blue_score,
|
|
||||||
is_blue: true, // Would need verbose data
|
|
||||||
is_chain_block: true, // Would need verbose data
|
|
||||||
timestamp: header.timestamp,
|
|
||||||
tx_count: 0, // Unknown from header
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1130,6 +1175,156 @@ fn format_timestamp(ts: u64) -> String {
|
||||||
.unwrap_or_else(|| "Unknown".to_string())
|
.unwrap_or_else(|| "Unknown".to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ==================== WebSocket ====================
|
||||||
|
|
||||||
|
/// WebSocket upgrade handler.
|
||||||
|
async fn ws_handler(
|
||||||
|
ws: WebSocketUpgrade,
|
||||||
|
State(state): State<Arc<ExplorerState>>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
ws.on_upgrade(move |socket| handle_websocket(socket, state))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle WebSocket connection with real-time updates.
|
||||||
|
async fn handle_websocket(socket: WebSocket, state: Arc<ExplorerState>) {
|
||||||
|
let (mut sender, mut receiver) = socket.split();
|
||||||
|
|
||||||
|
// Track last known values for change detection
|
||||||
|
let mut last_block_count: u64 = 0;
|
||||||
|
let mut last_blue_score: u64 = 0;
|
||||||
|
|
||||||
|
// Spawn a task to send periodic updates
|
||||||
|
let state_clone = state.clone();
|
||||||
|
let send_task = tokio::spawn(async move {
|
||||||
|
let mut interval = tokio::time::interval(Duration::from_secs(1));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
|
||||||
|
// Get current stats
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct NodeInfo {
|
||||||
|
block_count: u64,
|
||||||
|
blue_score: u64,
|
||||||
|
mempool_size: usize,
|
||||||
|
peer_count: usize,
|
||||||
|
synced: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct MiningInfo {
|
||||||
|
difficulty: f64,
|
||||||
|
networkhashps: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch current state
|
||||||
|
let info_result = state_clone
|
||||||
|
.rpc_call::<_, NodeInfo>("synor_getInfo", ())
|
||||||
|
.await;
|
||||||
|
let mining_result = state_clone
|
||||||
|
.rpc_call::<_, MiningInfo>("synor_getMiningInfo", ())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if let (Ok(info), Ok(mining)) = (info_result, mining_result) {
|
||||||
|
// Send stats update
|
||||||
|
let stats_event = serde_json::json!({
|
||||||
|
"type": "stats_update",
|
||||||
|
"blockCount": info.block_count,
|
||||||
|
"virtualDaaScore": info.blue_score,
|
||||||
|
"difficulty": mining.difficulty,
|
||||||
|
"mempoolSize": info.mempool_size,
|
||||||
|
"hashrate": mining.networkhashps as f64,
|
||||||
|
"hashrateHuman": format_hashrate(mining.networkhashps as f64),
|
||||||
|
});
|
||||||
|
|
||||||
|
if sender
|
||||||
|
.send(Message::Text(stats_event.to_string().into()))
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for new blocks
|
||||||
|
if info.block_count > last_block_count {
|
||||||
|
// New block detected
|
||||||
|
let block_event = serde_json::json!({
|
||||||
|
"type": "new_block",
|
||||||
|
"hash": format!("{:064x}", info.blue_score), // Placeholder hash
|
||||||
|
"blueScore": info.blue_score,
|
||||||
|
"timestamp": chrono::Utc::now().timestamp_millis(),
|
||||||
|
"txCount": 1,
|
||||||
|
"isChainBlock": true,
|
||||||
|
});
|
||||||
|
|
||||||
|
if sender
|
||||||
|
.send(Message::Text(block_event.to_string().into()))
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
last_block_count = info.block_count;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for tip updates
|
||||||
|
if info.blue_score != last_blue_score {
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct TipsResponse {
|
||||||
|
tips: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Ok(tips_resp) = state_clone
|
||||||
|
.rpc_call::<_, TipsResponse>("synor_getTips", ())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
let tip_event = serde_json::json!({
|
||||||
|
"type": "tip_update",
|
||||||
|
"tips": tips_resp.tips,
|
||||||
|
"tipCount": tips_resp.tips.len(),
|
||||||
|
});
|
||||||
|
|
||||||
|
if sender
|
||||||
|
.send(Message::Text(tip_event.to_string().into()))
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
last_blue_score = info.blue_score;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Handle incoming messages (for potential future subscriptions)
|
||||||
|
let recv_task = tokio::spawn(async move {
|
||||||
|
while let Some(msg) = receiver.next().await {
|
||||||
|
match msg {
|
||||||
|
Ok(Message::Close(_)) => break,
|
||||||
|
Ok(Message::Ping(data)) => {
|
||||||
|
// Pong is handled automatically by axum
|
||||||
|
let _ = data;
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for either task to complete
|
||||||
|
tokio::select! {
|
||||||
|
_ = send_task => {},
|
||||||
|
_ = recv_task => {},
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("WebSocket connection closed");
|
||||||
|
}
|
||||||
|
|
||||||
// ==================== Main ====================
|
// ==================== Main ====================
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
|
@ -1150,15 +1345,20 @@ async fn main() -> anyhow::Result<()> {
|
||||||
info!("Starting Synor Block Explorer Backend...");
|
info!("Starting Synor Block Explorer Backend...");
|
||||||
info!("RPC URL: {}", config.rpc_url);
|
info!("RPC URL: {}", config.rpc_url);
|
||||||
info!("Listen address: {}", config.listen_addr);
|
info!("Listen address: {}", config.listen_addr);
|
||||||
|
if let Some(ref dir) = config.static_dir {
|
||||||
|
info!("Static files: {}", dir);
|
||||||
|
}
|
||||||
|
|
||||||
// Create application state
|
// Create application state
|
||||||
let state = Arc::new(ExplorerState::new(config.clone()));
|
let state = Arc::new(ExplorerState::new(config.clone()));
|
||||||
|
|
||||||
// Build router
|
// Build API router
|
||||||
let app = Router::new()
|
let api_router = Router::new()
|
||||||
// Health & Info
|
// Health & Info
|
||||||
.route("/health", get(health))
|
.route("/health", get(health))
|
||||||
.route("/api/v1/stats", get(get_stats))
|
.route("/api/v1/stats", get(get_stats))
|
||||||
|
// WebSocket for real-time updates
|
||||||
|
.route("/ws", get(ws_handler))
|
||||||
// Blocks
|
// Blocks
|
||||||
.route("/api/v1/blocks", get(get_blocks))
|
.route("/api/v1/blocks", get(get_blocks))
|
||||||
.route("/api/v1/blocks/:hash", get(get_block))
|
.route("/api/v1/blocks/:hash", get(get_block))
|
||||||
|
|
@ -1173,10 +1373,27 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.route("/api/v1/dag", get(get_dag))
|
.route("/api/v1/dag", get(get_dag))
|
||||||
// Search
|
// Search
|
||||||
.route("/api/v1/search", get(search))
|
.route("/api/v1/search", get(search))
|
||||||
.with_state(state)
|
.with_state(state);
|
||||||
.layer(TraceLayer::new_for_http())
|
|
||||||
.layer(CompressionLayer::new())
|
// Build full app with optional static file serving
|
||||||
.layer(config.cors_layer());
|
let app = if let Some(ref static_dir) = config.static_dir {
|
||||||
|
// Serve static files with SPA fallback (index.html for client-side routing)
|
||||||
|
let index_path = format!("{}/index.html", static_dir);
|
||||||
|
let serve_dir = ServeDir::new(static_dir)
|
||||||
|
.not_found_service(ServeFile::new(&index_path));
|
||||||
|
|
||||||
|
api_router
|
||||||
|
.fallback_service(serve_dir)
|
||||||
|
.layer(TraceLayer::new_for_http())
|
||||||
|
.layer(CompressionLayer::new())
|
||||||
|
.layer(config.cors_layer())
|
||||||
|
} else {
|
||||||
|
// API-only mode
|
||||||
|
api_router
|
||||||
|
.layer(TraceLayer::new_for_http())
|
||||||
|
.layer(CompressionLayer::new())
|
||||||
|
.layer(config.cors_layer())
|
||||||
|
};
|
||||||
|
|
||||||
// Start server
|
// Start server
|
||||||
let listener = tokio::net::TcpListener::bind(&config.listen_addr).await?;
|
let listener = tokio::net::TcpListener::bind(&config.listen_addr).await?;
|
||||||
|
|
|
||||||
|
|
@ -151,6 +151,8 @@ services:
|
||||||
- SYNOR_RPC_URL=http://seed1:17110
|
- SYNOR_RPC_URL=http://seed1:17110
|
||||||
- SYNOR_WS_URL=ws://seed1:17111
|
- SYNOR_WS_URL=ws://seed1:17111
|
||||||
- DATABASE_URL=postgres://synor:synor@postgres:5432/explorer
|
- DATABASE_URL=postgres://synor:synor@postgres:5432/explorer
|
||||||
|
- EXPLORER_STATIC_DIR=/var/www/explorer
|
||||||
|
- EXPLORER_CORS_ORIGINS=*
|
||||||
- RUST_LOG=info
|
- RUST_LOG=info
|
||||||
networks:
|
networks:
|
||||||
- synor-testnet
|
- synor-testnet
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue