A complete blockchain implementation featuring: - synord: Full node with GHOSTDAG consensus - explorer-web: Modern React blockchain explorer with 3D DAG visualization - CLI wallet and tools - Smart contract SDK and example contracts (DEX, NFT, token) - WASM crypto library for browser/mobile
888 lines
32 KiB
Rust
888 lines
32 KiB
Rust
//! Network service for Synor.
|
|
|
|
use crate::behaviour::SynorBehaviour;
|
|
use crate::config::NetworkConfig;
|
|
use crate::message::{BlockAnnouncement, TransactionAnnouncement};
|
|
use crate::peer::{Direction, PeerInfo, PeerManager, reputation};
|
|
use crate::protocol::{SynorRequest, SynorResponse};
|
|
use crate::rate_limit::{PerPeerLimiter, RateLimitConfig as TokenBucketConfig};
|
|
use crate::ratelimit::{RateLimitResult, RateLimiters};
|
|
use crate::reputation::{ReputationManager, ViolationType};
|
|
use crate::sync::{SyncManager, SyncStatus};
|
|
use crate::topics;
|
|
use futures::StreamExt;
|
|
use libp2p::{
|
|
gossipsub,
|
|
identify,
|
|
kad,
|
|
mdns,
|
|
ping,
|
|
request_response,
|
|
swarm::SwarmEvent,
|
|
Multiaddr, PeerId, Swarm, SwarmBuilder,
|
|
};
|
|
use parking_lot::RwLock;
|
|
use std::sync::Arc;
|
|
use synor_types::{Block, BlockHeader, Hash256};
|
|
use thiserror::Error;
|
|
use tokio::sync::{broadcast, mpsc, oneshot};
|
|
use tracing::{debug, info, trace, warn};
|
|
|
|
/// Network events.
|
|
#[derive(Clone, Debug)]
|
|
pub enum NetworkEvent {
|
|
/// New block received from network.
|
|
NewBlock(BlockAnnouncement),
|
|
/// New transaction received from network.
|
|
NewTransaction(TransactionAnnouncement),
|
|
/// Peer connected.
|
|
PeerConnected(PeerId),
|
|
/// Peer disconnected.
|
|
PeerDisconnected(PeerId),
|
|
/// Sync status changed.
|
|
SyncStatusChanged(SyncStatus),
|
|
/// Request blocks from peer.
|
|
BlocksReceived(Vec<Block>),
|
|
/// Headers received.
|
|
HeadersReceived(Vec<BlockHeader>),
|
|
}
|
|
|
|
/// Commands to the network service.
|
|
pub enum NetworkCommand {
|
|
/// Dial a peer.
|
|
Dial(Multiaddr),
|
|
/// Disconnect a peer.
|
|
Disconnect(PeerId),
|
|
/// Ban a peer.
|
|
Ban(PeerId),
|
|
/// Unban a peer.
|
|
Unban(PeerId),
|
|
/// Broadcast a block.
|
|
BroadcastBlock(BlockAnnouncement),
|
|
/// Broadcast a transaction.
|
|
BroadcastTransaction(TransactionAnnouncement),
|
|
/// Request blocks from a peer.
|
|
RequestBlocks {
|
|
peer: PeerId,
|
|
block_ids: Vec<synor_types::BlockId>,
|
|
response: oneshot::Sender<Result<Vec<Block>, NetworkError>>,
|
|
},
|
|
/// Request headers from a peer.
|
|
RequestHeaders {
|
|
peer: PeerId,
|
|
start: synor_types::BlockId,
|
|
max_count: u32,
|
|
response: oneshot::Sender<Result<Vec<BlockHeader>, NetworkError>>,
|
|
},
|
|
/// Get peer count.
|
|
GetPeerCount(oneshot::Sender<usize>),
|
|
/// Get sync status.
|
|
GetSyncStatus(oneshot::Sender<SyncStatus>),
|
|
/// Start sync.
|
|
StartSync,
|
|
/// Stop the service.
|
|
Shutdown,
|
|
}
|
|
|
|
/// Handle for interacting with the network service.
|
|
#[derive(Clone)]
|
|
pub struct NetworkHandle {
|
|
command_tx: mpsc::Sender<NetworkCommand>,
|
|
event_rx: broadcast::Sender<NetworkEvent>,
|
|
}
|
|
|
|
impl NetworkHandle {
|
|
/// Dials a peer at the given address.
|
|
pub async fn dial(&self, addr: Multiaddr) -> Result<(), NetworkError> {
|
|
self.command_tx
|
|
.send(NetworkCommand::Dial(addr))
|
|
.await
|
|
.map_err(|_| NetworkError::ServiceStopped)
|
|
}
|
|
|
|
/// Disconnects a peer.
|
|
pub async fn disconnect(&self, peer_id: PeerId) -> Result<(), NetworkError> {
|
|
self.command_tx
|
|
.send(NetworkCommand::Disconnect(peer_id))
|
|
.await
|
|
.map_err(|_| NetworkError::ServiceStopped)
|
|
}
|
|
|
|
/// Bans a peer.
|
|
pub async fn ban(&self, peer_id: PeerId) -> Result<(), NetworkError> {
|
|
self.command_tx
|
|
.send(NetworkCommand::Ban(peer_id))
|
|
.await
|
|
.map_err(|_| NetworkError::ServiceStopped)
|
|
}
|
|
|
|
/// Unbans a peer.
|
|
pub async fn unban(&self, peer_id: PeerId) -> Result<(), NetworkError> {
|
|
self.command_tx
|
|
.send(NetworkCommand::Unban(peer_id))
|
|
.await
|
|
.map_err(|_| NetworkError::ServiceStopped)
|
|
}
|
|
|
|
/// Broadcasts a block to the network.
|
|
pub async fn broadcast_block(&self, announcement: BlockAnnouncement) -> Result<(), NetworkError> {
|
|
self.command_tx
|
|
.send(NetworkCommand::BroadcastBlock(announcement))
|
|
.await
|
|
.map_err(|_| NetworkError::ServiceStopped)
|
|
}
|
|
|
|
/// Broadcasts a transaction to the network.
|
|
pub async fn broadcast_transaction(&self, announcement: TransactionAnnouncement) -> Result<(), NetworkError> {
|
|
self.command_tx
|
|
.send(NetworkCommand::BroadcastTransaction(announcement))
|
|
.await
|
|
.map_err(|_| NetworkError::ServiceStopped)
|
|
}
|
|
|
|
/// Requests blocks from a peer.
|
|
pub async fn request_blocks(
|
|
&self,
|
|
peer: PeerId,
|
|
block_ids: Vec<synor_types::BlockId>,
|
|
) -> Result<Vec<Block>, NetworkError> {
|
|
let (tx, rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(NetworkCommand::RequestBlocks {
|
|
peer,
|
|
block_ids,
|
|
response: tx,
|
|
})
|
|
.await
|
|
.map_err(|_| NetworkError::ServiceStopped)?;
|
|
rx.await.map_err(|_| NetworkError::ServiceStopped)?
|
|
}
|
|
|
|
/// Requests headers from a peer.
|
|
pub async fn request_headers(
|
|
&self,
|
|
peer: PeerId,
|
|
start: synor_types::BlockId,
|
|
max_count: u32,
|
|
) -> Result<Vec<BlockHeader>, NetworkError> {
|
|
let (tx, rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(NetworkCommand::RequestHeaders {
|
|
peer,
|
|
start,
|
|
max_count,
|
|
response: tx,
|
|
})
|
|
.await
|
|
.map_err(|_| NetworkError::ServiceStopped)?;
|
|
rx.await.map_err(|_| NetworkError::ServiceStopped)?
|
|
}
|
|
|
|
/// Gets the current peer count.
|
|
pub async fn peer_count(&self) -> Result<usize, NetworkError> {
|
|
let (tx, rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(NetworkCommand::GetPeerCount(tx))
|
|
.await
|
|
.map_err(|_| NetworkError::ServiceStopped)?;
|
|
rx.await.map_err(|_| NetworkError::ServiceStopped)
|
|
}
|
|
|
|
/// Gets the current sync status.
|
|
pub async fn sync_status(&self) -> Result<SyncStatus, NetworkError> {
|
|
let (tx, rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(NetworkCommand::GetSyncStatus(tx))
|
|
.await
|
|
.map_err(|_| NetworkError::ServiceStopped)?;
|
|
rx.await.map_err(|_| NetworkError::ServiceStopped)
|
|
}
|
|
|
|
/// Starts synchronization.
|
|
pub async fn start_sync(&self) -> Result<(), NetworkError> {
|
|
self.command_tx
|
|
.send(NetworkCommand::StartSync)
|
|
.await
|
|
.map_err(|_| NetworkError::ServiceStopped)
|
|
}
|
|
|
|
/// Subscribes to network events.
|
|
pub fn subscribe(&self) -> broadcast::Receiver<NetworkEvent> {
|
|
self.event_rx.subscribe()
|
|
}
|
|
|
|
/// Shuts down the network service.
|
|
pub async fn shutdown(&self) -> Result<(), NetworkError> {
|
|
self.command_tx
|
|
.send(NetworkCommand::Shutdown)
|
|
.await
|
|
.map_err(|_| NetworkError::ServiceStopped)
|
|
}
|
|
}
|
|
|
|
/// Network service errors.
|
|
#[derive(Debug, Error)]
|
|
pub enum NetworkError {
|
|
#[error("Service has stopped")]
|
|
ServiceStopped,
|
|
|
|
#[error("Transport error: {0}")]
|
|
Transport(String),
|
|
|
|
#[error("Dial error: {0}")]
|
|
DialError(String),
|
|
|
|
#[error("Request timeout")]
|
|
Timeout,
|
|
|
|
#[error("Request failed: {0}")]
|
|
RequestFailed(String),
|
|
|
|
#[error("Peer not found")]
|
|
PeerNotFound,
|
|
|
|
#[error("Invalid response")]
|
|
InvalidResponse,
|
|
|
|
#[error("Serialization error: {0}")]
|
|
Serialization(String),
|
|
|
|
#[error("Rate limited: retry after {0} seconds")]
|
|
RateLimited(u64),
|
|
|
|
#[error("Peer in cooldown: wait {0} seconds")]
|
|
PeerCooldown(u64),
|
|
|
|
#[error("Peer is banned")]
|
|
PeerBanned,
|
|
}
|
|
|
|
/// The network service.
|
|
pub struct NetworkService {
|
|
/// The libp2p swarm.
|
|
swarm: Swarm<SynorBehaviour>,
|
|
/// Peer manager.
|
|
peer_manager: Arc<PeerManager>,
|
|
/// Sync manager.
|
|
sync_manager: Arc<SyncManager>,
|
|
/// Reputation manager for peer banning.
|
|
reputation_manager: Arc<ReputationManager>,
|
|
/// Rate limiters for DoS protection (sliding window).
|
|
rate_limiters: RateLimiters,
|
|
/// Token bucket rate limiter for incoming messages.
|
|
token_bucket_limiter: PerPeerLimiter,
|
|
/// Configuration.
|
|
config: NetworkConfig,
|
|
/// Command receiver.
|
|
command_rx: mpsc::Receiver<NetworkCommand>,
|
|
/// Event sender.
|
|
event_tx: broadcast::Sender<NetworkEvent>,
|
|
/// Pending requests.
|
|
pending_requests: RwLock<std::collections::HashMap<
|
|
request_response::OutboundRequestId,
|
|
PendingRequest,
|
|
>>,
|
|
}
|
|
|
|
struct PendingRequest {
|
|
response_tx: Option<oneshot::Sender<Result<SynorResponse, NetworkError>>>,
|
|
}
|
|
|
|
impl NetworkService {
|
|
/// Creates a new network service.
|
|
pub async fn new(config: NetworkConfig) -> Result<(Self, NetworkHandle), Box<dyn std::error::Error>> {
|
|
// Generate keypair
|
|
let local_keypair = libp2p::identity::Keypair::generate_ed25519();
|
|
let local_peer_id = PeerId::from(local_keypair.public());
|
|
|
|
info!("Local peer ID: {}", local_peer_id);
|
|
|
|
// Build swarm
|
|
let swarm = SwarmBuilder::with_existing_identity(local_keypair)
|
|
.with_tokio()
|
|
.with_tcp(
|
|
libp2p::tcp::Config::default(),
|
|
libp2p::noise::Config::new,
|
|
libp2p::yamux::Config::default,
|
|
)?
|
|
.with_quic()
|
|
.with_dns()?
|
|
.with_behaviour(|key| {
|
|
SynorBehaviour::new(key, &config).expect("Behaviour creation failed")
|
|
})?
|
|
.with_swarm_config(|cfg| {
|
|
cfg.with_idle_connection_timeout(config.idle_timeout)
|
|
})
|
|
.build();
|
|
|
|
// Create peer, sync, and reputation managers
|
|
let peer_manager = Arc::new(PeerManager::new(config.max_inbound, config.max_outbound));
|
|
let sync_manager = Arc::new(SyncManager::new(
|
|
config.sync.clone(),
|
|
Arc::clone(&peer_manager),
|
|
Hash256::from_bytes([0u8; 32]), // Will be set properly when genesis is known
|
|
));
|
|
let reputation_manager = Arc::new(ReputationManager::new());
|
|
|
|
// Create channels
|
|
let (command_tx, command_rx) = mpsc::channel(256);
|
|
let (event_tx, _) = broadcast::channel(1024);
|
|
|
|
let handle = NetworkHandle {
|
|
command_tx,
|
|
event_rx: event_tx.clone(),
|
|
};
|
|
|
|
// Create token bucket limiter with default config: 100 req/s, burst of 200
|
|
let token_bucket_config = TokenBucketConfig::default();
|
|
let token_bucket_limiter = PerPeerLimiter::new(token_bucket_config);
|
|
|
|
let service = NetworkService {
|
|
swarm,
|
|
peer_manager,
|
|
sync_manager,
|
|
reputation_manager,
|
|
rate_limiters: RateLimiters::default(),
|
|
token_bucket_limiter,
|
|
config,
|
|
command_rx,
|
|
event_tx,
|
|
pending_requests: RwLock::new(std::collections::HashMap::new()),
|
|
};
|
|
|
|
Ok((service, handle))
|
|
}
|
|
|
|
/// Starts the network service.
|
|
pub async fn run(mut self) -> Result<(), Box<dyn std::error::Error>> {
|
|
// Listen on configured addresses
|
|
for addr in &self.config.listen_addresses {
|
|
match self.swarm.listen_on(addr.clone()) {
|
|
Ok(_) => info!("Listening on {}", addr),
|
|
Err(e) => warn!("Failed to listen on {}: {}", addr, e),
|
|
}
|
|
}
|
|
|
|
// Subscribe to topics
|
|
if let Err(e) = self.swarm.behaviour_mut().subscribe_topics() {
|
|
warn!("Failed to subscribe to topics: {}", e);
|
|
}
|
|
|
|
// Connect to bootstrap peers
|
|
for addr in &self.config.bootstrap_peers {
|
|
if let Err(e) = self.swarm.dial(addr.clone()) {
|
|
warn!("Failed to dial bootstrap peer {}: {}", addr, e);
|
|
}
|
|
}
|
|
|
|
// Main event loop
|
|
loop {
|
|
tokio::select! {
|
|
// Handle swarm events
|
|
event = self.swarm.select_next_some() => {
|
|
self.handle_swarm_event(event).await;
|
|
}
|
|
|
|
// Handle commands
|
|
Some(cmd) = self.command_rx.recv() => {
|
|
if !self.handle_command(cmd).await {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
info!("Network service shutting down");
|
|
Ok(())
|
|
}
|
|
|
|
/// Handles a swarm event.
|
|
async fn handle_swarm_event(&mut self, event: SwarmEvent<crate::behaviour::SynorBehaviourEvent>) {
|
|
match event {
|
|
SwarmEvent::NewListenAddr { address, .. } => {
|
|
info!("Listening on {}", address);
|
|
}
|
|
|
|
SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
|
|
// Check if peer is banned via reputation system
|
|
if self.reputation_manager.is_banned(&peer_id) {
|
|
warn!("Rejecting connection from banned peer: {}", peer_id);
|
|
let _ = self.swarm.disconnect_peer_id(peer_id);
|
|
return;
|
|
}
|
|
|
|
let direction = if endpoint.is_dialer() {
|
|
Direction::Outbound
|
|
} else {
|
|
Direction::Inbound
|
|
};
|
|
|
|
let addr = endpoint.get_remote_address().clone();
|
|
let info = if direction == Direction::Inbound {
|
|
PeerInfo::new_inbound(peer_id)
|
|
} else {
|
|
PeerInfo::new_outbound(peer_id, addr.clone())
|
|
};
|
|
|
|
if self.peer_manager.add_peer(peer_id, info) {
|
|
info!("Connected to peer: {} ({:?})", peer_id, direction);
|
|
self.swarm.behaviour_mut().add_address(&peer_id, addr);
|
|
let _ = self.event_tx.send(NetworkEvent::PeerConnected(peer_id));
|
|
}
|
|
}
|
|
|
|
SwarmEvent::ConnectionClosed { peer_id, .. } => {
|
|
self.peer_manager.remove_peer(&peer_id);
|
|
// Clean up rate limiter state for disconnected peer
|
|
self.rate_limiters.reset(&peer_id);
|
|
self.token_bucket_limiter.reset_peer(&peer_id);
|
|
info!("Disconnected from peer: {}", peer_id);
|
|
let _ = self.event_tx.send(NetworkEvent::PeerDisconnected(peer_id));
|
|
}
|
|
|
|
SwarmEvent::Behaviour(behaviour_event) => {
|
|
self.handle_behaviour_event(behaviour_event).await;
|
|
}
|
|
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
/// Handles a behaviour event.
|
|
async fn handle_behaviour_event(&mut self, event: crate::behaviour::SynorBehaviourEvent) {
|
|
use crate::behaviour::SynorBehaviourEvent;
|
|
|
|
match event {
|
|
SynorBehaviourEvent::Gossipsub(gossipsub::Event::Message {
|
|
propagation_source,
|
|
message_id: _,
|
|
message,
|
|
}) => {
|
|
self.handle_gossip_message(propagation_source, message).await;
|
|
}
|
|
|
|
SynorBehaviourEvent::RequestResponse(request_response::Event::Message {
|
|
peer,
|
|
message,
|
|
}) => {
|
|
match message {
|
|
request_response::Message::Request { request, channel, .. } => {
|
|
self.handle_request(peer, request, channel).await;
|
|
}
|
|
request_response::Message::Response { request_id, response } => {
|
|
self.handle_response(peer, request_id, response).await;
|
|
}
|
|
}
|
|
}
|
|
|
|
SynorBehaviourEvent::RequestResponse(request_response::Event::OutboundFailure {
|
|
peer,
|
|
request_id,
|
|
error,
|
|
}) => {
|
|
warn!("Request to {} failed: {:?}", peer, error);
|
|
self.sync_manager.on_request_failed(request_id);
|
|
self.peer_manager.update_peer(&peer, |p| p.record_failure());
|
|
// Record violation in reputation system (no response from peer)
|
|
let auto_banned = self.reputation_manager.record_violation(&peer, ViolationType::NoResponse);
|
|
if auto_banned {
|
|
warn!("Auto-banned peer {} for repeated failures", peer);
|
|
let _ = self.swarm.disconnect_peer_id(peer);
|
|
}
|
|
}
|
|
|
|
SynorBehaviourEvent::Identify(identify::Event::Received { peer_id, info, .. }) => {
|
|
debug!("Identified peer {}: {}", peer_id, info.agent_version);
|
|
self.peer_manager.update_peer(&peer_id, |p| {
|
|
p.user_agent = Some(info.agent_version.clone());
|
|
for addr in &info.listen_addrs {
|
|
if !p.addresses.contains(addr) {
|
|
p.addresses.push(addr.clone());
|
|
}
|
|
}
|
|
});
|
|
|
|
// Add addresses to Kademlia
|
|
for addr in info.listen_addrs {
|
|
self.swarm.behaviour_mut().add_address(&peer_id, addr);
|
|
}
|
|
}
|
|
|
|
SynorBehaviourEvent::Ping(ping::Event { peer, result, .. }) => {
|
|
match result {
|
|
Ok(rtt) => {
|
|
trace!("Ping to {} succeeded: {:?}", peer, rtt);
|
|
self.peer_manager.update_peer(&peer, |p| {
|
|
p.latency = Some(rtt);
|
|
});
|
|
}
|
|
Err(e) => {
|
|
debug!("Ping to {} failed: {}", peer, e);
|
|
}
|
|
}
|
|
}
|
|
|
|
SynorBehaviourEvent::Mdns(mdns::Event::Discovered(peers)) => {
|
|
for (peer_id, addr) in peers {
|
|
debug!("mDNS discovered peer: {} at {}", peer_id, addr);
|
|
if self.peer_manager.can_connect_outbound() {
|
|
self.swarm.behaviour_mut().add_address(&peer_id, addr.clone());
|
|
if let Err(e) = self.swarm.dial(addr) {
|
|
debug!("Failed to dial discovered peer: {}", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
SynorBehaviourEvent::Mdns(mdns::Event::Expired(peers)) => {
|
|
for (peer_id, _) in peers {
|
|
debug!("mDNS peer expired: {}", peer_id);
|
|
}
|
|
}
|
|
|
|
SynorBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, .. }) => {
|
|
match result {
|
|
kad::QueryResult::GetClosestPeers(Ok(ok)) => {
|
|
for peer in ok.peers {
|
|
debug!("Kademlia found peer: {:?}", peer);
|
|
}
|
|
}
|
|
kad::QueryResult::Bootstrap(Ok(_)) => {
|
|
info!("Kademlia bootstrap complete");
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
/// Handles a gossip message.
|
|
async fn handle_gossip_message(&mut self, source: PeerId, message: gossipsub::Message) {
|
|
let topic = message.topic.as_str();
|
|
|
|
match topic {
|
|
t if t == topics::BLOCKS => {
|
|
if let Ok(announcement) = borsh::from_slice::<BlockAnnouncement>(&message.data) {
|
|
debug!("Received block announcement from {}: {}", source, announcement.hash);
|
|
let _ = self.event_tx.send(NetworkEvent::NewBlock(announcement));
|
|
}
|
|
}
|
|
t if t == topics::TRANSACTIONS => {
|
|
if let Ok(announcement) = borsh::from_slice::<TransactionAnnouncement>(&message.data) {
|
|
debug!("Received tx announcement from {}: {}", source, announcement.txid);
|
|
let _ = self.event_tx.send(NetworkEvent::NewTransaction(announcement));
|
|
}
|
|
}
|
|
_ => {
|
|
trace!("Received message on unknown topic: {}", topic);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Handles a request from a peer.
|
|
async fn handle_request(
|
|
&mut self,
|
|
peer: PeerId,
|
|
request: SynorRequest,
|
|
channel: request_response::ResponseChannel<SynorResponse>,
|
|
) {
|
|
debug!("Received {} request from {}", request.request_type(), peer);
|
|
|
|
// Check if peer is banned via reputation system
|
|
if self.reputation_manager.is_banned(&peer) {
|
|
warn!("Ignoring request from banned peer: {}", peer);
|
|
let response = SynorResponse::Error("You are banned".to_string());
|
|
if let Err(e) = self.swarm.behaviour_mut().send_response(channel, response) {
|
|
warn!("Failed to send banned response to {}: {:?}", peer, e);
|
|
}
|
|
return;
|
|
}
|
|
|
|
// First layer: Token bucket rate limiting (100 req/s with burst of 200)
|
|
if !self.token_bucket_limiter.check_rate_limit(&peer) {
|
|
// Check if peer is in cooldown
|
|
if let Some(remaining) = self.token_bucket_limiter.cooldown_remaining(&peer) {
|
|
warn!(
|
|
"Token bucket: Peer {} in cooldown for {} request ({:?} remaining)",
|
|
peer,
|
|
request.request_type(),
|
|
remaining
|
|
);
|
|
self.peer_manager.update_peer(&peer, |p| {
|
|
p.add_reputation(reputation::INVALID_DATA);
|
|
});
|
|
// Record spam violation in reputation system (may auto-ban)
|
|
let auto_banned = self.reputation_manager.record_violation(&peer, ViolationType::Spam);
|
|
if auto_banned {
|
|
warn!("Auto-banned peer {} for repeated spam violations", peer);
|
|
let _ = self.swarm.disconnect_peer_id(peer);
|
|
}
|
|
let response = SynorResponse::Error(format!(
|
|
"Rate limited (cooldown). Wait {} seconds.",
|
|
remaining.as_secs()
|
|
));
|
|
if let Err(e) = self.swarm.behaviour_mut().send_response(channel, response) {
|
|
warn!("Failed to send cooldown response to {}: {:?}", peer, e);
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Rate limited but not in cooldown
|
|
warn!(
|
|
"Token bucket: Rate limited {} request from {} (violations: {})",
|
|
request.request_type(),
|
|
peer,
|
|
self.token_bucket_limiter.violations(&peer)
|
|
);
|
|
self.peer_manager.update_peer(&peer, |p| {
|
|
p.add_reputation(reputation::TIMEOUT);
|
|
});
|
|
// Record spam violation in reputation system
|
|
let auto_banned = self.reputation_manager.record_violation(&peer, ViolationType::Spam);
|
|
if auto_banned {
|
|
warn!("Auto-banned peer {} for spam violations", peer);
|
|
let _ = self.swarm.disconnect_peer_id(peer);
|
|
}
|
|
let response = SynorResponse::Error(
|
|
"Rate limited. Too many requests per second.".to_string()
|
|
);
|
|
if let Err(e) = self.swarm.behaviour_mut().send_response(channel, response) {
|
|
warn!("Failed to send rate limit response to {}: {:?}", peer, e);
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Record the request in token bucket limiter
|
|
self.token_bucket_limiter.record_request(&peer);
|
|
|
|
// Second layer: Apply type-specific rate limiting (sliding window)
|
|
let rate_limit_result = match &request {
|
|
SynorRequest::GetBlock(_) | SynorRequest::GetBlocks(_) => {
|
|
self.rate_limiters.check_block_request(&peer)
|
|
}
|
|
SynorRequest::GetHeader(_) | SynorRequest::GetHeaders { .. } => {
|
|
self.rate_limiters.check_header_request(&peer)
|
|
}
|
|
SynorRequest::GetTransaction(_) | SynorRequest::GetTransactions(_) => {
|
|
self.rate_limiters.check_transaction_request(&peer)
|
|
}
|
|
_ => {
|
|
// For other requests (peers, status, tips), use global limiter
|
|
self.rate_limiters.global.check(&peer)
|
|
}
|
|
};
|
|
|
|
// Handle rate limit violations
|
|
match rate_limit_result {
|
|
RateLimitResult::Denied { retry_after } => {
|
|
warn!(
|
|
"Rate limited {} request from {} (retry after {:?})",
|
|
request.request_type(),
|
|
peer,
|
|
retry_after
|
|
);
|
|
// Penalize reputation for hitting rate limit
|
|
self.peer_manager.update_peer(&peer, |p| {
|
|
p.add_reputation(reputation::TIMEOUT);
|
|
});
|
|
// Record slow response violation
|
|
self.reputation_manager.record_violation(&peer, ViolationType::SlowResponse);
|
|
let response = SynorResponse::Error(format!(
|
|
"Rate limited. Retry after {} seconds.",
|
|
retry_after.as_secs()
|
|
));
|
|
if let Err(e) = self.swarm.behaviour_mut().send_response(channel, response) {
|
|
warn!("Failed to send rate limit response to {}: {:?}", peer, e);
|
|
}
|
|
return;
|
|
}
|
|
RateLimitResult::Cooldown { remaining } => {
|
|
warn!(
|
|
"Peer {} in cooldown for {} request ({:?} remaining)",
|
|
peer,
|
|
request.request_type(),
|
|
remaining
|
|
);
|
|
// Stronger reputation penalty for repeated violations
|
|
self.peer_manager.update_peer(&peer, |p| {
|
|
p.add_reputation(reputation::INVALID_DATA);
|
|
});
|
|
// Record spam violation (may auto-ban)
|
|
let auto_banned = self.reputation_manager.record_violation(&peer, ViolationType::Spam);
|
|
if auto_banned {
|
|
warn!("Auto-banned peer {} for repeated rate limit violations", peer);
|
|
let _ = self.swarm.disconnect_peer_id(peer);
|
|
}
|
|
let response = SynorResponse::Error(format!(
|
|
"In cooldown. Wait {} seconds.",
|
|
remaining.as_secs()
|
|
));
|
|
if let Err(e) = self.swarm.behaviour_mut().send_response(channel, response) {
|
|
warn!("Failed to send cooldown response to {}: {:?}", peer, e);
|
|
}
|
|
return;
|
|
}
|
|
RateLimitResult::Allowed => {
|
|
// Record the request for rate limiting
|
|
match &request {
|
|
SynorRequest::GetBlock(_) | SynorRequest::GetBlocks(_) => {
|
|
self.rate_limiters.record_block_request(&peer);
|
|
}
|
|
SynorRequest::GetHeader(_) | SynorRequest::GetHeaders { .. } => {
|
|
self.rate_limiters.record_header_request(&peer);
|
|
}
|
|
SynorRequest::GetTransaction(_) | SynorRequest::GetTransactions(_) => {
|
|
self.rate_limiters.record_transaction_request(&peer);
|
|
}
|
|
_ => {
|
|
self.rate_limiters.global.record(&peer);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process the request - actual implementation would query storage
|
|
let response = match request {
|
|
SynorRequest::GetBlock(_) => SynorResponse::Block(None),
|
|
SynorRequest::GetBlocks(_) => SynorResponse::Blocks(vec![]),
|
|
SynorRequest::GetHeader(_) => SynorResponse::Header(None),
|
|
SynorRequest::GetHeaders { .. } => SynorResponse::Headers(vec![]),
|
|
SynorRequest::GetTransaction(_) => SynorResponse::Transaction(None),
|
|
SynorRequest::GetTransactions(_) => SynorResponse::Transactions(vec![]),
|
|
SynorRequest::GetPeers => {
|
|
let addrs: Vec<String> = self
|
|
.peer_manager
|
|
.active_peers()
|
|
.iter()
|
|
.flat_map(|p| p.addresses.iter().map(|a| a.to_string()))
|
|
.take(20)
|
|
.collect();
|
|
SynorResponse::Peers(addrs)
|
|
}
|
|
SynorRequest::GetStatus => {
|
|
// Would need actual chain state
|
|
SynorResponse::Error("Not implemented".to_string())
|
|
}
|
|
SynorRequest::GetTips => SynorResponse::Tips(vec![]),
|
|
SynorRequest::GetPruningPoint => SynorResponse::PruningPoint(None),
|
|
_ => SynorResponse::Error("Unknown request".to_string()),
|
|
};
|
|
|
|
if let Err(e) = self.swarm.behaviour_mut().send_response(channel, response) {
|
|
warn!("Failed to send response to {}: {:?}", peer, e);
|
|
}
|
|
}
|
|
|
|
/// Handles a response from a peer.
|
|
async fn handle_response(
|
|
&mut self,
|
|
peer: PeerId,
|
|
request_id: request_response::OutboundRequestId,
|
|
response: SynorResponse,
|
|
) {
|
|
debug!("Received response from {}", peer);
|
|
|
|
// Record success in both peer manager and reputation system
|
|
self.peer_manager.update_peer(&peer, |p| p.record_success());
|
|
self.reputation_manager.record_success(&peer);
|
|
|
|
match response {
|
|
SynorResponse::Headers(headers) => {
|
|
self.sync_manager.on_headers_response(headers.clone());
|
|
let _ = self.event_tx.send(NetworkEvent::HeadersReceived(headers));
|
|
}
|
|
SynorResponse::Blocks(blocks) => {
|
|
self.sync_manager.on_blocks_response(request_id, blocks.clone());
|
|
let _ = self.event_tx.send(NetworkEvent::BlocksReceived(blocks));
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
/// Handles a command.
|
|
async fn handle_command(&mut self, cmd: NetworkCommand) -> bool {
|
|
match cmd {
|
|
NetworkCommand::Dial(addr) => {
|
|
if let Err(e) = self.swarm.dial(addr.clone()) {
|
|
warn!("Failed to dial {}: {}", addr, e);
|
|
}
|
|
}
|
|
|
|
NetworkCommand::Disconnect(peer_id) => {
|
|
let _ = self.swarm.disconnect_peer_id(peer_id);
|
|
}
|
|
|
|
NetworkCommand::Ban(peer_id) => {
|
|
self.peer_manager.ban(&peer_id);
|
|
let _ = self.swarm.disconnect_peer_id(peer_id);
|
|
}
|
|
|
|
NetworkCommand::BroadcastBlock(announcement) => {
|
|
if let Ok(data) = borsh::to_vec(&announcement) {
|
|
if let Err(e) = self.swarm.behaviour_mut().publish_block(data) {
|
|
warn!("Failed to publish block: {:?}", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
NetworkCommand::BroadcastTransaction(announcement) => {
|
|
if let Ok(data) = borsh::to_vec(&announcement) {
|
|
if let Err(e) = self.swarm.behaviour_mut().publish_transaction(data) {
|
|
warn!("Failed to publish transaction: {:?}", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
NetworkCommand::RequestBlocks { peer, block_ids, response } => {
|
|
let request = SynorRequest::GetBlocks(block_ids);
|
|
let _request_id = self.swarm.behaviour_mut().send_request(&peer, request);
|
|
// Would need to track the response sender
|
|
let _ = response.send(Err(NetworkError::RequestFailed("Not implemented".to_string())));
|
|
}
|
|
|
|
NetworkCommand::RequestHeaders { peer, start, max_count, response } => {
|
|
let request = SynorRequest::GetHeaders { start, max_count };
|
|
let _request_id = self.swarm.behaviour_mut().send_request(&peer, request);
|
|
let _ = response.send(Err(NetworkError::RequestFailed("Not implemented".to_string())));
|
|
}
|
|
|
|
NetworkCommand::GetPeerCount(response) => {
|
|
let _ = response.send(self.peer_manager.connected_count());
|
|
}
|
|
|
|
NetworkCommand::GetSyncStatus(response) => {
|
|
let _ = response.send(self.sync_manager.status());
|
|
}
|
|
|
|
NetworkCommand::StartSync => {
|
|
self.sync_manager.start_sync();
|
|
}
|
|
|
|
NetworkCommand::Shutdown => {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
true
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[tokio::test]
|
|
async fn test_network_handle_clone() {
|
|
let (tx, _rx) = mpsc::channel(1);
|
|
let (event_tx, _) = broadcast::channel(1);
|
|
|
|
let handle = NetworkHandle {
|
|
command_tx: tx,
|
|
event_rx: event_tx,
|
|
};
|
|
|
|
let _handle2 = handle.clone();
|
|
}
|
|
}
|