synor/apps/synord/src/node.rs
2026-01-08 05:22:24 +05:30

367 lines
9.6 KiB
Rust

//! Synor node implementation.
//!
//! The node orchestrates all components: storage, networking, consensus, RPC, etc.
#![allow(dead_code)]
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tracing::{info, warn};
use crate::config::NodeConfig;
use crate::services::{
ConsensusService, ContractService, MempoolService, MinerService, NetworkService, RpcService,
StorageService, SyncService,
};
/// Node state.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum NodeState {
/// Node is starting up.
Starting,
/// Node is syncing with the network.
Syncing,
/// Node is fully synced and running.
Running,
/// Node is shutting down.
Stopping,
/// Node has stopped.
Stopped,
}
/// Synor blockchain node.
pub struct SynorNode {
/// Configuration.
config: NodeConfig,
/// Current state.
state: RwLock<NodeState>,
/// Storage service.
storage: Arc<StorageService>,
/// Network service.
network: Arc<NetworkService>,
/// Sync service.
sync: Arc<SyncService>,
/// Consensus service.
consensus: Arc<ConsensusService>,
/// Mempool service.
mempool: Arc<MempoolService>,
/// RPC service.
rpc: Arc<RpcService>,
/// Contract service.
contract: Arc<ContractService>,
/// Miner service.
miner: Option<Arc<MinerService>>,
/// Shutdown signal sender.
shutdown_tx: broadcast::Sender<()>,
}
impl SynorNode {
/// Creates a new node.
pub async fn new(config: NodeConfig) -> anyhow::Result<Self> {
info!("Initializing Synor node...");
// Create data directories
std::fs::create_dir_all(&config.data_dir)?;
std::fs::create_dir_all(config.blocks_path())?;
std::fs::create_dir_all(config.chainstate_path())?;
std::fs::create_dir_all(config.contracts_path())?;
// Create shutdown channel
let (shutdown_tx, _) = broadcast::channel(1);
// Initialize storage
info!("Initializing storage...");
let storage = Arc::new(StorageService::new(&config).await?);
// Initialize network
info!("Initializing P2P network...");
let network = Arc::new(NetworkService::new(&config, shutdown_tx.subscribe()).await?);
// Initialize consensus (before sync, as sync depends on consensus)
info!("Initializing consensus...");
let consensus = Arc::new(ConsensusService::new(
storage.clone(),
&config,
shutdown_tx.subscribe(),
)?);
// Initialize sync (needs storage, network, and consensus)
info!("Initializing sync service...");
let sync = Arc::new(SyncService::new(
storage.clone(),
network.clone(),
consensus.clone(),
&config,
shutdown_tx.subscribe(),
)?);
// Initialize mempool
info!("Initializing mempool...");
let mempool = Arc::new(MempoolService::new(
consensus.clone(),
&config,
shutdown_tx.subscribe(),
)?);
// Initialize contract service
info!("Initializing contract service...");
let contract = Arc::new(ContractService::new(config.chain_id));
// Initialize RPC
info!("Initializing RPC server...");
let rpc = Arc::new(RpcService::new(
storage.clone(),
network.clone(),
consensus.clone(),
mempool.clone(),
contract.clone(),
&config,
)?);
// Initialize miner if enabled
let miner = if config.mining.enabled {
info!("Initializing miner...");
Some(Arc::new(
MinerService::new(
consensus.clone(),
mempool.clone(),
&config,
shutdown_tx.subscribe(),
)
.await?,
))
} else {
None
};
Ok(SynorNode {
config,
state: RwLock::new(NodeState::Starting),
storage,
network,
sync,
consensus,
mempool,
rpc,
contract,
miner,
shutdown_tx,
})
}
/// Starts all node services.
pub async fn start(&self) -> anyhow::Result<()> {
info!("Starting Synor node services...");
// Update state
*self.state.write().await = NodeState::Starting;
// Start storage
self.storage.start().await?;
info!("Storage service started");
// Start network
self.network.start().await?;
info!(
addr = %self.config.p2p.listen_addr,
"P2P network started"
);
// Start sync
self.sync.start().await?;
info!("Sync service started");
// Start consensus
self.consensus.start().await?;
info!("Consensus service started");
// Start mempool
self.mempool.start().await?;
self.mempool.spawn_cleanup_task();
info!("Mempool service started");
// Start contract service (needs database from storage)
if let Some(db) = self.storage.database().await {
self.contract.start(db).await?;
info!("Contract service started");
}
// Start RPC
self.rpc.start().await?;
info!(
http = %self.config.rpc.http_addr,
ws = %self.config.rpc.ws_addr,
"RPC server started"
);
// Start miner if enabled
if let Some(ref miner) = self.miner {
miner.start().await?;
info!(threads = self.config.mining.threads, "Miner started");
}
// Update state
*self.state.write().await = NodeState::Running;
info!("All services started successfully");
Ok(())
}
/// Stops all node services.
pub async fn stop(&self) -> anyhow::Result<()> {
info!("Stopping Synor node services...");
// Update state
*self.state.write().await = NodeState::Stopping;
// Send shutdown signal
let _ = self.shutdown_tx.send(());
// Stop miner first
if let Some(ref miner) = self.miner {
if let Err(e) = miner.stop().await {
warn!("Error stopping miner: {}", e);
}
}
// Stop RPC
if let Err(e) = self.rpc.stop().await {
warn!("Error stopping RPC: {}", e);
}
// Stop contract service
if let Err(e) = self.contract.stop().await {
warn!("Error stopping contract service: {}", e);
}
// Stop mempool
if let Err(e) = self.mempool.stop().await {
warn!("Error stopping mempool: {}", e);
}
// Stop consensus
if let Err(e) = self.consensus.stop().await {
warn!("Error stopping consensus: {}", e);
}
// Stop sync
if let Err(e) = self.sync.stop().await {
warn!("Error stopping sync: {}", e);
}
// Stop network
if let Err(e) = self.network.stop().await {
warn!("Error stopping network: {}", e);
}
// Stop storage last
if let Err(e) = self.storage.stop().await {
warn!("Error stopping storage: {}", e);
}
// Update state
*self.state.write().await = NodeState::Stopped;
info!("All services stopped");
Ok(())
}
/// Returns current node state.
pub async fn state(&self) -> NodeState {
*self.state.read().await
}
/// Returns node configuration.
pub fn config(&self) -> &NodeConfig {
&self.config
}
/// Returns storage service.
pub fn storage(&self) -> &Arc<StorageService> {
&self.storage
}
/// Returns network service.
pub fn network(&self) -> &Arc<NetworkService> {
&self.network
}
/// Returns consensus service.
pub fn consensus(&self) -> &Arc<ConsensusService> {
&self.consensus
}
/// Returns mempool service.
pub fn mempool(&self) -> &Arc<MempoolService> {
&self.mempool
}
/// Returns RPC service.
pub fn rpc(&self) -> &Arc<RpcService> {
&self.rpc
}
/// Returns sync service.
pub fn sync(&self) -> &Arc<SyncService> {
&self.sync
}
/// Returns miner service if enabled.
pub fn miner(&self) -> Option<&Arc<MinerService>> {
self.miner.as_ref()
}
/// Returns contract service.
pub fn contract(&self) -> &Arc<ContractService> {
&self.contract
}
}
/// Node info for RPC.
#[derive(Clone, Debug)]
pub struct NodeInfo {
/// Node version.
pub version: String,
/// Network name.
pub network: String,
/// Chain ID.
pub chain_id: u64,
/// Current block height.
pub block_height: u64,
/// Current blue score.
pub blue_score: u64,
/// Number of connected peers.
pub peer_count: usize,
/// Is syncing.
pub is_syncing: bool,
/// Is mining.
pub is_mining: bool,
}
impl SynorNode {
/// Gets current node info.
pub async fn info(&self) -> NodeInfo {
let state = self.state().await;
NodeInfo {
version: env!("CARGO_PKG_VERSION").to_string(),
network: self.config.network.clone(),
chain_id: self.config.chain_id,
block_height: self.consensus.current_height().await,
blue_score: self.consensus.current_blue_score().await,
peer_count: self.network.peer_count().await,
is_syncing: state == NodeState::Syncing,
is_mining: self.miner.is_some() && self.config.mining.enabled,
}
}
}