synor/crates/synor-mining/src/miner.rs
Gulshan Yadav a7a4a7effc test: add comprehensive test suite (1,357 tests total)
Phase 1 - Test Writing (8 parallel agents):
- synord: Added 133 unit tests for node, config, services
- synor-rpc: Expanded to 207 tests for API endpoints
- synor-bridge: Added 144 tests for transfer lifecycle
- synor-zk: Added 279 tests for circuits, proofs, state
- synor-mining: Added 147 tests for kHeavyHash, miner
- synor-types: Added 146 tests for hash, amount, address
- cross-crate: Created 75 integration tests
- byzantine: Created 40+ fault scenario tests

Key additions:
- tests/cross_crate_integration.rs (new)
- apps/synord/tests/byzantine_fault_tests.rs (new)
- crates/synor-storage/src/cf.rs (new)
- src/lib.rs for workspace integration tests

Fixes during testing:
- synor-compute: Added Default impl for GpuVariant
- synor-bridge: Fixed replay protection in process_lock_event
- synor-storage: Added cf module and database exports

All 1,357 tests pass with 0 failures.
2026-01-20 06:35:28 +05:30

837 lines
25 KiB
Rust

//! Mining manager and coordination.
//!
//! The mining manager coordinates mining operations, handles template updates,
//! and manages worker threads.
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::{Mutex, RwLock};
use tokio::sync::{broadcast, mpsc};
use synor_types::{Address, Hash256, Network};
use crate::kheavyhash::KHeavyHash;
use crate::template::BlockTemplate;
use crate::{MiningStats, Target};
/// Mining configuration.
#[derive(Clone, Debug)]
pub struct MinerConfig {
/// Number of mining threads (0 = auto).
pub threads: usize,
/// Miner address for rewards.
pub miner_address: Address,
/// Enable CPU mining.
pub cpu_mining: bool,
/// Target hashrate limit (0 = unlimited).
pub hashrate_limit: u64,
/// Stats update interval in milliseconds.
pub stats_interval_ms: u64,
/// Extra nonce for pool mining.
pub extra_nonce: u64,
}
impl Default for MinerConfig {
fn default() -> Self {
MinerConfig {
threads: 0, // Auto-detect
miner_address: Address::from_ed25519_pubkey(Network::Mainnet, &[0; 32]),
cpu_mining: true,
hashrate_limit: 0,
stats_interval_ms: 1000,
extra_nonce: 0,
}
}
}
impl MinerConfig {
/// Creates config for solo mining.
pub fn solo(miner_address: Address, threads: usize) -> Self {
MinerConfig {
threads,
miner_address,
cpu_mining: true,
hashrate_limit: 0,
stats_interval_ms: 1000,
extra_nonce: 0,
}
}
/// Creates config for pool mining.
pub fn pool(miner_address: Address, threads: usize, extra_nonce: u64) -> Self {
MinerConfig {
threads,
miner_address,
cpu_mining: true,
hashrate_limit: 0,
stats_interval_ms: 1000,
extra_nonce,
}
}
}
/// Result of mining a block.
#[derive(Clone, Debug)]
pub struct MiningResult {
/// Template ID that was mined.
pub template_id: u64,
/// The winning nonce.
pub nonce: u64,
/// The PoW hash.
pub pow_hash: Hash256,
/// Time taken to find (ms).
pub solve_time_ms: u64,
/// Hashes computed.
pub hashes: u64,
}
/// Commands for the mining manager.
#[derive(Clone, Debug)]
pub enum MinerCommand {
/// Start mining with a new template.
NewTemplate(Arc<BlockTemplate>),
/// Stop mining.
Stop,
/// Pause mining.
Pause,
/// Resume mining.
Resume,
/// Update configuration.
UpdateConfig(MinerConfig),
}
/// Events from the mining manager.
#[derive(Clone, Debug)]
pub enum MinerEvent {
/// A block was found.
BlockFound(MiningResult),
/// Stats update.
StatsUpdate(MiningStats),
/// Mining started.
Started,
/// Mining stopped.
Stopped,
/// Mining paused.
Paused,
/// Mining resumed.
Resumed,
/// Error occurred.
Error(String),
}
/// The block miner manages mining operations.
pub struct BlockMiner {
/// Configuration.
config: RwLock<MinerConfig>,
/// Current template.
current_template: RwLock<Option<Arc<BlockTemplate>>>,
/// Mining statistics.
stats: Mutex<MiningStats>,
/// Whether mining is active.
is_mining: AtomicBool,
/// Whether mining is paused.
is_paused: AtomicBool,
/// Total hashes counter.
hash_counter: AtomicU64,
/// Command sender.
cmd_tx: mpsc::Sender<MinerCommand>,
/// Command receiver.
cmd_rx: Mutex<Option<mpsc::Receiver<MinerCommand>>>,
/// Event broadcaster.
event_tx: broadcast::Sender<MinerEvent>,
/// Number of worker threads.
num_threads: usize,
}
impl BlockMiner {
/// Creates a new block miner.
pub fn new(config: MinerConfig) -> Self {
let (cmd_tx, cmd_rx) = mpsc::channel(100);
let (event_tx, _) = broadcast::channel(100);
let num_threads = if config.threads == 0 {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
} else {
config.threads
};
BlockMiner {
config: RwLock::new(config),
current_template: RwLock::new(None),
stats: Mutex::new(MiningStats::default()),
is_mining: AtomicBool::new(false),
is_paused: AtomicBool::new(false),
hash_counter: AtomicU64::new(0),
cmd_tx,
cmd_rx: Mutex::new(Some(cmd_rx)),
event_tx,
num_threads,
}
}
/// Gets the command sender.
pub fn command_sender(&self) -> mpsc::Sender<MinerCommand> {
self.cmd_tx.clone()
}
/// Subscribes to miner events.
pub fn subscribe(&self) -> broadcast::Receiver<MinerEvent> {
self.event_tx.subscribe()
}
/// Checks if mining is active.
pub fn is_mining(&self) -> bool {
self.is_mining.load(Ordering::Relaxed)
}
/// Checks if mining is paused.
pub fn is_paused(&self) -> bool {
self.is_paused.load(Ordering::Relaxed)
}
/// Gets current mining stats.
pub fn stats(&self) -> MiningStats {
self.stats.lock().clone()
}
/// Gets the current template.
pub fn current_template(&self) -> Option<Arc<BlockTemplate>> {
self.current_template.read().clone()
}
/// Updates the mining template.
pub fn set_template(&self, template: BlockTemplate) {
*self.current_template.write() = Some(Arc::new(template));
}
/// Starts the mining loop (sync version for testing).
pub fn mine_sync(&self) -> Option<MiningResult> {
let template = self.current_template.read().clone()?;
let config = self.config.read().clone();
self.is_mining.store(true, Ordering::Relaxed);
let _ = self.event_tx.send(MinerEvent::Started);
let result = self.mine_template(&template, &config);
self.is_mining.store(false, Ordering::Relaxed);
let _ = self.event_tx.send(MinerEvent::Stopped);
result
}
/// Mines a specific template.
fn mine_template(
&self,
template: &BlockTemplate,
config: &MinerConfig,
) -> Option<MiningResult> {
let hasher = KHeavyHash::new();
let header = template.header_for_mining();
let target = Target::from_bytes(template.target);
let start = Instant::now();
let mut nonce = config.extra_nonce;
let mut hashes = 0u64;
let pre_hash = hasher.pre_hash(&header);
loop {
// Check if we should stop
if !self.is_mining.load(Ordering::Relaxed) {
return None;
}
// Check if paused
while self.is_paused.load(Ordering::Relaxed) {
std::thread::sleep(Duration::from_millis(100));
if !self.is_mining.load(Ordering::Relaxed) {
return None;
}
}
let pow = hasher.finalize(&pre_hash, nonce);
hashes += 1;
if target.is_met_by(&pow.hash) {
let elapsed = start.elapsed().as_millis() as u64;
// Update stats
{
let mut stats = self.stats.lock();
stats.total_hashes += hashes;
stats.record_block(elapsed);
}
let result = MiningResult {
template_id: template.id,
nonce,
pow_hash: pow.hash,
solve_time_ms: elapsed,
hashes,
};
let _ = self.event_tx.send(MinerEvent::BlockFound(result.clone()));
return Some(result);
}
nonce = nonce.wrapping_add(1);
// Update stats periodically
if hashes % 10000 == 0 {
self.hash_counter.fetch_add(10000, Ordering::Relaxed);
}
}
}
/// Runs the mining manager (async version).
pub async fn run(self: Arc<Self>) {
let mut cmd_rx = self.cmd_rx.lock().take().expect("run called twice");
while let Some(cmd) = cmd_rx.recv().await {
match cmd {
MinerCommand::NewTemplate(template) => {
*self.current_template.write() = Some(template.clone());
// Start mining in background
if self.is_mining.load(Ordering::Relaxed) {
// Already mining, template will be picked up
} else {
let miner = Arc::clone(&self);
tokio::task::spawn_blocking(move || {
miner.is_mining.store(true, Ordering::Relaxed);
let _ = miner.event_tx.send(MinerEvent::Started);
if let Some(template) = miner.current_template.read().clone() {
let config = miner.config.read().clone();
let _ = miner.mine_template(&template, &config);
}
miner.is_mining.store(false, Ordering::Relaxed);
let _ = miner.event_tx.send(MinerEvent::Stopped);
});
}
}
MinerCommand::Stop => {
self.is_mining.store(false, Ordering::Relaxed);
}
MinerCommand::Pause => {
self.is_paused.store(true, Ordering::Relaxed);
let _ = self.event_tx.send(MinerEvent::Paused);
}
MinerCommand::Resume => {
self.is_paused.store(false, Ordering::Relaxed);
let _ = self.event_tx.send(MinerEvent::Resumed);
}
MinerCommand::UpdateConfig(new_config) => {
*self.config.write() = new_config;
}
}
}
}
/// Gets current hashrate.
pub fn hashrate(&self) -> f64 {
self.stats.lock().hashrate
}
/// Gets hash count.
pub fn hash_count(&self) -> u64 {
self.hash_counter.load(Ordering::Relaxed)
}
}
/// Multi-threaded miner using work stealing.
pub struct ParallelBlockMiner {
/// Number of threads.
num_threads: usize,
/// Configuration.
config: MinerConfig,
/// Stop flag.
stop_flag: Arc<AtomicBool>,
/// Current template.
template: Arc<RwLock<Option<BlockTemplate>>>,
/// Result channel.
result_tx: mpsc::UnboundedSender<MiningResult>,
/// Result receiver.
result_rx: Mutex<Option<mpsc::UnboundedReceiver<MiningResult>>>,
}
impl ParallelBlockMiner {
/// Creates a new parallel miner.
pub fn new(config: MinerConfig) -> Self {
let num_threads = if config.threads == 0 {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
} else {
config.threads
};
let (result_tx, result_rx) = mpsc::unbounded_channel();
ParallelBlockMiner {
num_threads,
config,
stop_flag: Arc::new(AtomicBool::new(false)),
template: Arc::new(RwLock::new(None)),
result_tx,
result_rx: Mutex::new(Some(result_rx)),
}
}
/// Sets a new template.
pub fn set_template(&self, template: BlockTemplate) {
*self.template.write() = Some(template);
}
/// Starts mining.
pub fn start(&self) {
self.stop_flag.store(false, Ordering::Relaxed);
let nonce_range = u64::MAX / self.num_threads as u64;
for thread_id in 0..self.num_threads {
let stop_flag = Arc::clone(&self.stop_flag);
let template = Arc::clone(&self.template);
let result_tx = self.result_tx.clone();
let start_nonce = self.config.extra_nonce + (thread_id as u64 * nonce_range);
std::thread::spawn(move || {
let hasher = KHeavyHash::new();
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let tmpl = template.read().clone();
if let Some(tmpl) = tmpl {
let header = tmpl.header_for_mining();
let target = Target::from_bytes(tmpl.target);
let pre_hash = hasher.pre_hash(&header);
let start = Instant::now();
let mut nonce = start_nonce;
let mut hashes = 0u64;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let pow = hasher.finalize(&pre_hash, nonce);
hashes += 1;
if target.is_met_by(&pow.hash) {
let elapsed = start.elapsed().as_millis() as u64;
let _ = result_tx.send(MiningResult {
template_id: tmpl.id,
nonce,
pow_hash: pow.hash,
solve_time_ms: elapsed,
hashes,
});
break;
}
nonce = nonce.wrapping_add(1);
// Don't overflow into another thread's range
if nonce >= start_nonce + nonce_range {
break;
}
}
} else {
std::thread::sleep(Duration::from_millis(100));
}
}
});
}
}
/// Stops mining.
pub fn stop(&self) {
self.stop_flag.store(true, Ordering::Relaxed);
}
/// Takes the result receiver.
pub fn take_results(&self) -> Option<mpsc::UnboundedReceiver<MiningResult>> {
self.result_rx.lock().take()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::template::{BlockTemplateBuilder, CoinbaseBuilder};
use synor_types::Network;
fn test_address() -> Address {
Address::from_ed25519_pubkey(Network::Mainnet, &[0x42; 32])
}
fn test_hash(n: u8) -> Hash256 {
let mut bytes = [0u8; 32];
bytes[0] = n;
Hash256::from_bytes(bytes)
}
fn easy_template() -> BlockTemplate {
let coinbase = CoinbaseBuilder::new(test_address(), 1)
.reward(500_00000000)
.build();
BlockTemplateBuilder::new()
.version(1)
.add_parent(test_hash(1))
.timestamp(1234567890)
.bits(0x207fffff)
.blue_score(100)
.coinbase(coinbase)
.reward(500_00000000)
.build(1)
.unwrap()
}
#[test]
fn test_miner_config_default() {
let config = MinerConfig::default();
assert_eq!(config.threads, 0);
assert!(config.cpu_mining);
assert_eq!(config.hashrate_limit, 0);
assert_eq!(config.stats_interval_ms, 1000);
assert_eq!(config.extra_nonce, 0);
}
#[test]
fn test_miner_config_solo() {
let config = MinerConfig::solo(test_address(), 4);
assert_eq!(config.threads, 4);
assert!(config.cpu_mining);
assert_eq!(config.hashrate_limit, 0);
assert_eq!(config.extra_nonce, 0);
}
#[test]
fn test_miner_config_pool() {
let config = MinerConfig::pool(test_address(), 8, 12345);
assert_eq!(config.threads, 8);
assert!(config.cpu_mining);
assert_eq!(config.extra_nonce, 12345);
}
#[test]
fn test_miner_config_clone() {
let config1 = MinerConfig::solo(test_address(), 4);
let config2 = config1.clone();
assert_eq!(config1.threads, config2.threads);
assert_eq!(config1.cpu_mining, config2.cpu_mining);
}
#[test]
fn test_miner_config_debug() {
let config = MinerConfig::default();
let debug_str = format!("{:?}", config);
assert!(debug_str.contains("MinerConfig"));
}
#[test]
fn test_mining_result_structure() {
let result = MiningResult {
template_id: 42,
nonce: 12345,
pow_hash: Hash256::from_bytes([1u8; 32]),
solve_time_ms: 1000,
hashes: 50000,
};
assert_eq!(result.template_id, 42);
assert_eq!(result.nonce, 12345);
assert_eq!(result.solve_time_ms, 1000);
assert_eq!(result.hashes, 50000);
}
#[test]
fn test_mining_result_clone() {
let result = MiningResult {
template_id: 42,
nonce: 12345,
pow_hash: Hash256::from_bytes([1u8; 32]),
solve_time_ms: 1000,
hashes: 50000,
};
let cloned = result.clone();
assert_eq!(result.template_id, cloned.template_id);
assert_eq!(result.nonce, cloned.nonce);
assert_eq!(result.pow_hash, cloned.pow_hash);
}
#[test]
fn test_mining_result_debug() {
let result = MiningResult {
template_id: 42,
nonce: 12345,
pow_hash: Hash256::from_bytes([1u8; 32]),
solve_time_ms: 1000,
hashes: 50000,
};
let debug_str = format!("{:?}", result);
assert!(debug_str.contains("MiningResult"));
}
#[test]
fn test_miner_command_stop() {
let cmd = MinerCommand::Stop;
let debug_str = format!("{:?}", cmd);
assert!(debug_str.contains("Stop"));
}
#[test]
fn test_miner_command_pause() {
let cmd = MinerCommand::Pause;
let debug_str = format!("{:?}", cmd);
assert!(debug_str.contains("Pause"));
}
#[test]
fn test_miner_command_resume() {
let cmd = MinerCommand::Resume;
let debug_str = format!("{:?}", cmd);
assert!(debug_str.contains("Resume"));
}
#[test]
fn test_miner_command_update_config() {
let config = MinerConfig::default();
let cmd = MinerCommand::UpdateConfig(config);
let debug_str = format!("{:?}", cmd);
assert!(debug_str.contains("UpdateConfig"));
}
#[test]
fn test_miner_command_clone() {
let cmd1 = MinerCommand::Stop;
let cmd2 = cmd1.clone();
assert!(matches!(cmd2, MinerCommand::Stop));
}
#[test]
fn test_miner_event_started() {
let event = MinerEvent::Started;
let debug_str = format!("{:?}", event);
assert!(debug_str.contains("Started"));
}
#[test]
fn test_miner_event_stopped() {
let event = MinerEvent::Stopped;
let debug_str = format!("{:?}", event);
assert!(debug_str.contains("Stopped"));
}
#[test]
fn test_miner_event_paused() {
let event = MinerEvent::Paused;
let debug_str = format!("{:?}", event);
assert!(debug_str.contains("Paused"));
}
#[test]
fn test_miner_event_resumed() {
let event = MinerEvent::Resumed;
let debug_str = format!("{:?}", event);
assert!(debug_str.contains("Resumed"));
}
#[test]
fn test_miner_event_error() {
let event = MinerEvent::Error("test error".to_string());
let debug_str = format!("{:?}", event);
assert!(debug_str.contains("Error"));
}
#[test]
fn test_miner_event_clone() {
let event1 = MinerEvent::Started;
let event2 = event1.clone();
assert!(matches!(event2, MinerEvent::Started));
}
#[test]
fn test_block_miner_creation() {
let config = MinerConfig::solo(test_address(), 2);
let miner = BlockMiner::new(config);
assert!(!miner.is_mining());
assert!(!miner.is_paused());
}
#[test]
fn test_block_miner_initial_state() {
let config = MinerConfig::default();
let miner = BlockMiner::new(config);
assert!(!miner.is_mining());
assert!(!miner.is_paused());
assert!(miner.current_template().is_none());
assert_eq!(miner.hash_count(), 0);
}
#[test]
fn test_block_miner_set_template() {
let config = MinerConfig::default();
let miner = BlockMiner::new(config);
let template = easy_template();
miner.set_template(template);
assert!(miner.current_template().is_some());
}
#[test]
fn test_block_miner_stats() {
let config = MinerConfig::default();
let miner = BlockMiner::new(config);
let stats = miner.stats();
assert_eq!(stats.blocks_found, 0);
assert_eq!(stats.total_hashes, 0);
}
#[test]
fn test_block_miner_hashrate() {
let config = MinerConfig::default();
let miner = BlockMiner::new(config);
assert_eq!(miner.hashrate(), 0.0);
}
#[test]
fn test_block_miner_command_sender() {
let config = MinerConfig::default();
let miner = BlockMiner::new(config);
let sender = miner.command_sender();
assert!(!sender.is_closed());
}
#[test]
fn test_block_miner_subscribe() {
let config = MinerConfig::default();
let miner = BlockMiner::new(config);
let _receiver = miner.subscribe();
let _receiver2 = miner.subscribe();
}
#[test]
fn test_mine_easy_block() {
let config = MinerConfig::solo(test_address(), 1);
let miner = BlockMiner::new(config);
let template = easy_template();
miner.set_template(template);
let result = miner.mine_sync();
assert!(result.is_some());
let result = result.unwrap();
assert!(result.hashes > 0);
}
#[test]
fn test_mine_sync_returns_valid_result() {
let config = MinerConfig::solo(test_address(), 1);
let miner = BlockMiner::new(config);
let template = easy_template();
miner.set_template(template.clone());
let result = miner.mine_sync().unwrap();
assert_eq!(result.template_id, template.id);
assert!(result.solve_time_ms > 0 || result.hashes > 0);
}
#[test]
fn test_mine_sync_no_template() {
let config = MinerConfig::default();
let miner = BlockMiner::new(config);
let result = miner.mine_sync();
assert!(result.is_none());
}
#[test]
fn test_block_miner_auto_thread_detection() {
let config = MinerConfig::solo(test_address(), 0);
let _miner = BlockMiner::new(config);
}
#[test]
fn test_parallel_block_miner_creation() {
let config = MinerConfig::solo(test_address(), 4);
let _miner = ParallelBlockMiner::new(config);
}
#[test]
fn test_parallel_block_miner_auto_threads() {
let config = MinerConfig::solo(test_address(), 0);
let _miner = ParallelBlockMiner::new(config);
}
#[test]
fn test_parallel_block_miner_set_template() {
let config = MinerConfig::solo(test_address(), 2);
let miner = ParallelBlockMiner::new(config);
let template = easy_template();
miner.set_template(template);
}
#[test]
fn test_parallel_block_miner_stop() {
let config = MinerConfig::solo(test_address(), 2);
let miner = ParallelBlockMiner::new(config);
miner.stop();
}
#[test]
fn test_parallel_block_miner_take_results() {
let config = MinerConfig::solo(test_address(), 2);
let miner = ParallelBlockMiner::new(config);
let receiver = miner.take_results();
assert!(receiver.is_some());
let receiver2 = miner.take_results();
assert!(receiver2.is_none());
}
#[test]
fn test_parallel_block_miner_start_stop() {
let config = MinerConfig::solo(test_address(), 2);
let miner = ParallelBlockMiner::new(config);
let template = easy_template();
miner.set_template(template);
miner.start();
std::thread::sleep(std::time::Duration::from_millis(10));
miner.stop();
}
#[test]
fn test_full_mining_workflow() {
let config = MinerConfig::solo(test_address(), 1);
let miner = BlockMiner::new(config);
let template = easy_template();
miner.set_template(template.clone());
assert!(miner.current_template().is_some());
let result = miner.mine_sync().unwrap();
assert_eq!(result.template_id, template.id);
assert!(result.hashes > 0);
let stats = miner.stats();
assert!(stats.blocks_found >= 1 || stats.total_hashes > 0);
}
#[test]
fn test_miner_config_with_different_addresses() {
let addr1 = Address::from_ed25519_pubkey(Network::Mainnet, &[0x01; 32]);
let addr2 = Address::from_ed25519_pubkey(Network::Mainnet, &[0x02; 32]);
let config1 = MinerConfig::solo(addr1, 2);
let config2 = MinerConfig::solo(addr2, 2);
let _miner1 = BlockMiner::new(config1);
let _miner2 = BlockMiner::new(config2);
}
#[test]
fn test_miner_config_with_testnet() {
let addr = Address::from_ed25519_pubkey(Network::Testnet, &[0x42; 32]);
let config = MinerConfig::solo(addr, 1);
let _miner = BlockMiner::new(config);
}
}