synor/crates/synor-network/src/ratelimit.rs
2026-01-08 05:22:24 +05:30

419 lines
12 KiB
Rust

//! Rate limiting for P2P requests.
//!
//! Implements token bucket rate limiting to prevent DoS attacks and
//! ensure fair resource allocation among peers.
use hashbrown::HashMap;
use libp2p::PeerId;
use parking_lot::RwLock;
use std::time::{Duration, Instant};
/// Rate limiter configuration.
#[derive(Clone, Debug)]
pub struct RateLimitConfig {
/// Maximum requests per window.
pub max_requests: u32,
/// Time window duration.
pub window: Duration,
/// Burst allowance (extra requests allowed in short bursts).
pub burst: u32,
/// Cooldown period after limit exceeded.
pub cooldown: Duration,
}
impl Default for RateLimitConfig {
fn default() -> Self {
RateLimitConfig {
max_requests: 100,
window: Duration::from_secs(60),
burst: 20,
cooldown: Duration::from_secs(30),
}
}
}
impl RateLimitConfig {
/// Creates a strict rate limit (fewer requests allowed).
pub fn strict() -> Self {
RateLimitConfig {
max_requests: 30,
window: Duration::from_secs(60),
burst: 5,
cooldown: Duration::from_secs(60),
}
}
/// Creates a relaxed rate limit (more requests allowed).
pub fn relaxed() -> Self {
RateLimitConfig {
max_requests: 500,
window: Duration::from_secs(60),
burst: 100,
cooldown: Duration::from_secs(10),
}
}
/// Configuration for block requests.
pub fn blocks() -> Self {
RateLimitConfig {
max_requests: 50,
window: Duration::from_secs(60),
burst: 10,
cooldown: Duration::from_secs(30),
}
}
/// Configuration for header requests.
pub fn headers() -> Self {
RateLimitConfig {
max_requests: 100,
window: Duration::from_secs(60),
burst: 20,
cooldown: Duration::from_secs(20),
}
}
/// Configuration for transaction requests.
pub fn transactions() -> Self {
RateLimitConfig {
max_requests: 200,
window: Duration::from_secs(60),
burst: 50,
cooldown: Duration::from_secs(15),
}
}
}
/// Tracks rate limit state for a single peer.
#[derive(Debug)]
struct PeerRateState {
/// Request timestamps in the current window.
requests: Vec<Instant>,
/// When the cooldown ends (if in cooldown).
cooldown_until: Option<Instant>,
/// Number of times limit was exceeded.
violations: u32,
}
impl PeerRateState {
fn new() -> Self {
PeerRateState {
requests: Vec::new(),
cooldown_until: None,
violations: 0,
}
}
/// Cleans up old request timestamps outside the window.
fn cleanup(&mut self, window: Duration) {
let cutoff = Instant::now() - window;
self.requests.retain(|t| *t > cutoff);
}
}
/// Result of a rate limit check.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RateLimitResult {
/// Request is allowed.
Allowed,
/// Request is denied due to rate limit.
Denied {
/// Time until the peer can make another request.
retry_after: Duration,
},
/// Peer is in cooldown due to repeated violations.
Cooldown {
/// Time remaining in cooldown.
remaining: Duration,
},
}
impl RateLimitResult {
/// Returns true if the request is allowed.
pub fn is_allowed(&self) -> bool {
matches!(self, RateLimitResult::Allowed)
}
}
/// Rate limiter for P2P requests.
pub struct RateLimiter {
/// Configuration.
config: RateLimitConfig,
/// Per-peer rate state.
peers: RwLock<HashMap<PeerId, PeerRateState>>,
}
impl RateLimiter {
/// Creates a new rate limiter with the given configuration.
pub fn new(config: RateLimitConfig) -> Self {
RateLimiter {
config,
peers: RwLock::new(HashMap::new()),
}
}
/// Checks if a request from a peer should be allowed.
pub fn check(&self, peer_id: &PeerId) -> RateLimitResult {
let mut peers = self.peers.write();
let state = peers.entry(*peer_id).or_insert_with(PeerRateState::new);
let now = Instant::now();
// Check cooldown
if let Some(cooldown_until) = state.cooldown_until {
if now < cooldown_until {
return RateLimitResult::Cooldown {
remaining: cooldown_until - now,
};
}
state.cooldown_until = None;
}
// Clean up old requests
state.cleanup(self.config.window);
// Check rate limit
let total_allowed = self.config.max_requests + self.config.burst;
if state.requests.len() as u32 >= total_allowed {
state.violations += 1;
// Enter cooldown if too many violations
if state.violations >= 3 {
state.cooldown_until = Some(now + self.config.cooldown);
return RateLimitResult::Cooldown {
remaining: self.config.cooldown,
};
}
// Calculate retry time
let oldest = state.requests.first().copied().unwrap_or(now);
let retry_after = self.config.window - (now - oldest);
return RateLimitResult::Denied { retry_after };
}
RateLimitResult::Allowed
}
/// Records a request from a peer.
pub fn record(&self, peer_id: &PeerId) {
let mut peers = self.peers.write();
let state = peers.entry(*peer_id).or_insert_with(PeerRateState::new);
state.requests.push(Instant::now());
}
/// Checks and records a request in one operation.
pub fn check_and_record(&self, peer_id: &PeerId) -> RateLimitResult {
let result = self.check(peer_id);
if result.is_allowed() {
self.record(peer_id);
}
result
}
/// Resets the rate limit state for a peer.
pub fn reset(&self, peer_id: &PeerId) {
self.peers.write().remove(peer_id);
}
/// Cleans up state for disconnected peers.
pub fn cleanup(&self, connected_peers: &[PeerId]) {
let connected: std::collections::HashSet<_> = connected_peers.iter().collect();
self.peers.write().retain(|id, _| connected.contains(id));
}
/// Returns the number of requests made by a peer in the current window.
pub fn request_count(&self, peer_id: &PeerId) -> u32 {
let peers = self.peers.read();
peers
.get(peer_id)
.map(|s| s.requests.len() as u32)
.unwrap_or(0)
}
/// Returns the number of violations for a peer.
pub fn violation_count(&self, peer_id: &PeerId) -> u32 {
let peers = self.peers.read();
peers.get(peer_id).map(|s| s.violations).unwrap_or(0)
}
}
/// Collection of rate limiters for different request types.
pub struct RateLimiters {
/// Rate limiter for block requests.
pub blocks: RateLimiter,
/// Rate limiter for header requests.
pub headers: RateLimiter,
/// Rate limiter for transaction requests.
pub transactions: RateLimiter,
/// Global rate limiter (all request types).
pub global: RateLimiter,
}
impl Default for RateLimiters {
fn default() -> Self {
RateLimiters {
blocks: RateLimiter::new(RateLimitConfig::blocks()),
headers: RateLimiter::new(RateLimitConfig::headers()),
transactions: RateLimiter::new(RateLimitConfig::transactions()),
global: RateLimiter::new(RateLimitConfig::default()),
}
}
}
impl RateLimiters {
/// Creates rate limiters with custom configurations.
pub fn new(
blocks: RateLimitConfig,
headers: RateLimitConfig,
transactions: RateLimitConfig,
global: RateLimitConfig,
) -> Self {
RateLimiters {
blocks: RateLimiter::new(blocks),
headers: RateLimiter::new(headers),
transactions: RateLimiter::new(transactions),
global: RateLimiter::new(global),
}
}
/// Checks if a block request from a peer should be allowed.
pub fn check_block_request(&self, peer_id: &PeerId) -> RateLimitResult {
// Check global first
let global_result = self.global.check(peer_id);
if !global_result.is_allowed() {
return global_result;
}
self.blocks.check(peer_id)
}
/// Records a block request from a peer.
pub fn record_block_request(&self, peer_id: &PeerId) {
self.global.record(peer_id);
self.blocks.record(peer_id);
}
/// Checks if a header request from a peer should be allowed.
pub fn check_header_request(&self, peer_id: &PeerId) -> RateLimitResult {
let global_result = self.global.check(peer_id);
if !global_result.is_allowed() {
return global_result;
}
self.headers.check(peer_id)
}
/// Records a header request from a peer.
pub fn record_header_request(&self, peer_id: &PeerId) {
self.global.record(peer_id);
self.headers.record(peer_id);
}
/// Checks if a transaction request from a peer should be allowed.
pub fn check_transaction_request(&self, peer_id: &PeerId) -> RateLimitResult {
let global_result = self.global.check(peer_id);
if !global_result.is_allowed() {
return global_result;
}
self.transactions.check(peer_id)
}
/// Records a transaction request from a peer.
pub fn record_transaction_request(&self, peer_id: &PeerId) {
self.global.record(peer_id);
self.transactions.record(peer_id);
}
/// Resets all rate limit state for a peer.
pub fn reset(&self, peer_id: &PeerId) {
self.blocks.reset(peer_id);
self.headers.reset(peer_id);
self.transactions.reset(peer_id);
self.global.reset(peer_id);
}
/// Cleans up state for disconnected peers.
pub fn cleanup(&self, connected_peers: &[PeerId]) {
self.blocks.cleanup(connected_peers);
self.headers.cleanup(connected_peers);
self.transactions.cleanup(connected_peers);
self.global.cleanup(connected_peers);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn random_peer_id() -> PeerId {
PeerId::random()
}
#[test]
fn test_rate_limit_allows_initial_requests() {
let limiter = RateLimiter::new(RateLimitConfig {
max_requests: 10,
window: Duration::from_secs(60),
burst: 5,
cooldown: Duration::from_secs(30),
});
let peer = random_peer_id();
// Should allow first 15 requests (10 + 5 burst)
for _ in 0..15 {
assert!(limiter.check_and_record(&peer).is_allowed());
}
// 16th should be denied
assert!(!limiter.check(&peer).is_allowed());
}
#[test]
fn test_rate_limit_cooldown() {
let limiter = RateLimiter::new(RateLimitConfig {
max_requests: 5,
window: Duration::from_secs(60),
burst: 0,
cooldown: Duration::from_secs(30),
});
let peer = random_peer_id();
// Exhaust limit and trigger violations
for _ in 0..5 {
limiter.record(&peer);
}
// 3 violations should trigger cooldown
for _ in 0..3 {
let result = limiter.check(&peer);
assert!(!result.is_allowed());
}
// Should now be in cooldown
match limiter.check(&peer) {
RateLimitResult::Cooldown { .. } => {}
_ => panic!("Expected cooldown"),
}
}
#[test]
fn test_rate_limiters_collection() {
let limiters = RateLimiters::default();
let peer = random_peer_id();
// Block requests should work
assert!(limiters.check_block_request(&peer).is_allowed());
limiters.record_block_request(&peer);
// Header requests should work
assert!(limiters.check_header_request(&peer).is_allowed());
limiters.record_header_request(&peer);
// Transaction requests should work
assert!(limiters.check_transaction_request(&peer).is_allowed());
limiters.record_transaction_request(&peer);
}
}