synor/crates/synor-database/src/replication/rpc.rs
Gulshan Yadav 8da34bc73d feat(database): add SQL, Graph, and Raft Replication modules
- SQL store with SQLite-compatible subset (sqlparser 0.43)
  - CREATE TABLE, INSERT, SELECT, UPDATE, DELETE
  - WHERE clauses, ORDER BY, LIMIT
  - Aggregates (COUNT, SUM, AVG, MIN, MAX)
  - UNIQUE and NOT NULL constraints
  - BTreeMap-based indexes

- Graph store for relationship-based queries
  - Nodes with labels and properties
  - Edges with types and weights
  - BFS/DFS traversal
  - Dijkstra shortest path
  - Cypher-like query parser (MATCH, CREATE, DELETE, SET)

- Raft consensus replication for high availability
  - Leader election with randomized timeouts
  - Log replication with AppendEntries RPC
  - Snapshot management for log compaction
  - Cluster configuration and joint consensus
  - Full RPC message serialization

All 159 tests pass.
2026-01-10 19:32:14 +05:30

318 lines
8.5 KiB
Rust

//! RPC messages for Raft consensus.
use super::log::LogEntry;
use serde::{Deserialize, Serialize};
/// All RPC message types in Raft.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum RpcMessage {
/// Request vote from candidate.
RequestVote(RequestVote),
/// Response to vote request.
RequestVoteResponse(RequestVoteResponse),
/// Append entries from leader.
AppendEntries(AppendEntries),
/// Response to append entries.
AppendEntriesResponse(AppendEntriesResponse),
/// Install snapshot from leader.
InstallSnapshot(InstallSnapshot),
/// Response to install snapshot.
InstallSnapshotResponse(InstallSnapshotResponse),
}
impl RpcMessage {
/// Returns the term of this message.
pub fn term(&self) -> u64 {
match self {
RpcMessage::RequestVote(r) => r.term,
RpcMessage::RequestVoteResponse(r) => r.term,
RpcMessage::AppendEntries(r) => r.term,
RpcMessage::AppendEntriesResponse(r) => r.term,
RpcMessage::InstallSnapshot(r) => r.term,
RpcMessage::InstallSnapshotResponse(r) => r.term,
}
}
/// Serializes to bytes.
pub fn to_bytes(&self) -> Vec<u8> {
bincode::serialize(self).unwrap_or_default()
}
/// Deserializes from bytes.
pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
bincode::deserialize(bytes).ok()
}
}
/// RequestVote RPC (sent by candidates to gather votes).
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RequestVote {
/// Candidate's term.
pub term: u64,
/// Candidate requesting vote.
pub candidate_id: u64,
/// Index of candidate's last log entry.
pub last_log_index: u64,
/// Term of candidate's last log entry.
pub last_log_term: u64,
}
impl RequestVote {
/// Creates a new RequestVote message.
pub fn new(term: u64, candidate_id: u64, last_log_index: u64, last_log_term: u64) -> Self {
Self {
term,
candidate_id,
last_log_index,
last_log_term,
}
}
}
/// Response to RequestVote.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RequestVoteResponse {
/// Current term (for candidate to update).
pub term: u64,
/// True if candidate received vote.
pub vote_granted: bool,
}
impl RequestVoteResponse {
/// Creates a positive response.
pub fn grant(term: u64) -> Self {
Self {
term,
vote_granted: true,
}
}
/// Creates a negative response.
pub fn deny(term: u64) -> Self {
Self {
term,
vote_granted: false,
}
}
}
/// AppendEntries RPC (sent by leader for replication and heartbeat).
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AppendEntries {
/// Leader's term.
pub term: u64,
/// Leader ID (so follower can redirect clients).
pub leader_id: u64,
/// Index of log entry immediately preceding new ones.
pub prev_log_index: u64,
/// Term of prev_log_index entry.
pub prev_log_term: u64,
/// Log entries to store (empty for heartbeat).
pub entries: Vec<LogEntry>,
/// Leader's commit index.
pub leader_commit: u64,
}
impl AppendEntries {
/// Creates a heartbeat (empty entries).
pub fn heartbeat(
term: u64,
leader_id: u64,
prev_log_index: u64,
prev_log_term: u64,
leader_commit: u64,
) -> Self {
Self {
term,
leader_id,
prev_log_index,
prev_log_term,
entries: Vec::new(),
leader_commit,
}
}
/// Creates an append entries request with entries.
pub fn with_entries(
term: u64,
leader_id: u64,
prev_log_index: u64,
prev_log_term: u64,
entries: Vec<LogEntry>,
leader_commit: u64,
) -> Self {
Self {
term,
leader_id,
prev_log_index,
prev_log_term,
entries,
leader_commit,
}
}
/// Returns true if this is a heartbeat (no entries).
pub fn is_heartbeat(&self) -> bool {
self.entries.is_empty()
}
}
/// Response to AppendEntries.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AppendEntriesResponse {
/// Current term (for leader to update).
pub term: u64,
/// True if follower contained entry matching prev_log_index and prev_log_term.
pub success: bool,
/// Index of last entry appended (for quick catch-up).
pub match_index: u64,
/// If false, the conflicting term (for optimization).
pub conflict_term: Option<u64>,
/// First index of conflicting term.
pub conflict_index: Option<u64>,
}
impl AppendEntriesResponse {
/// Creates a success response.
pub fn success(term: u64, match_index: u64) -> Self {
Self {
term,
success: true,
match_index,
conflict_term: None,
conflict_index: None,
}
}
/// Creates a failure response.
pub fn failure(term: u64) -> Self {
Self {
term,
success: false,
match_index: 0,
conflict_term: None,
conflict_index: None,
}
}
/// Creates a failure response with conflict info.
pub fn conflict(term: u64, conflict_term: u64, conflict_index: u64) -> Self {
Self {
term,
success: false,
match_index: 0,
conflict_term: Some(conflict_term),
conflict_index: Some(conflict_index),
}
}
}
/// InstallSnapshot RPC (sent by leader when follower is too far behind).
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct InstallSnapshot {
/// Leader's term.
pub term: u64,
/// Leader ID.
pub leader_id: u64,
/// Index of last entry included in snapshot.
pub last_included_index: u64,
/// Term of last entry included in snapshot.
pub last_included_term: u64,
/// Byte offset where chunk is positioned.
pub offset: u64,
/// Raw bytes of snapshot chunk.
pub data: Vec<u8>,
/// True if this is the last chunk.
pub done: bool,
}
impl InstallSnapshot {
/// Creates a new snapshot installation request.
pub fn new(
term: u64,
leader_id: u64,
last_included_index: u64,
last_included_term: u64,
offset: u64,
data: Vec<u8>,
done: bool,
) -> Self {
Self {
term,
leader_id,
last_included_index,
last_included_term,
offset,
data,
done,
}
}
}
/// Response to InstallSnapshot.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct InstallSnapshotResponse {
/// Current term (for leader to update).
pub term: u64,
}
impl InstallSnapshotResponse {
/// Creates a new response.
pub fn new(term: u64) -> Self {
Self { term }
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::replication::state::Command;
#[test]
fn test_request_vote() {
let request = RequestVote::new(1, 1, 10, 1);
assert_eq!(request.term, 1);
assert_eq!(request.candidate_id, 1);
let grant = RequestVoteResponse::grant(1);
assert!(grant.vote_granted);
let deny = RequestVoteResponse::deny(2);
assert!(!deny.vote_granted);
assert_eq!(deny.term, 2);
}
#[test]
fn test_append_entries() {
let heartbeat = AppendEntries::heartbeat(1, 1, 0, 0, 0);
assert!(heartbeat.is_heartbeat());
let entries = vec![LogEntry::new(1, 1, Command::Noop)];
let append = AppendEntries::with_entries(1, 1, 0, 0, entries, 0);
assert!(!append.is_heartbeat());
}
#[test]
fn test_rpc_message_serialization() {
let request = RpcMessage::RequestVote(RequestVote::new(1, 1, 10, 1));
let bytes = request.to_bytes();
let decoded = RpcMessage::from_bytes(&bytes).unwrap();
assert_eq!(decoded.term(), 1);
}
#[test]
fn test_append_entries_response() {
let success = AppendEntriesResponse::success(1, 10);
assert!(success.success);
assert_eq!(success.match_index, 10);
let failure = AppendEntriesResponse::failure(2);
assert!(!failure.success);
let conflict = AppendEntriesResponse::conflict(2, 1, 5);
assert!(!conflict.success);
assert_eq!(conflict.conflict_term, Some(1));
assert_eq!(conflict.conflict_index, Some(5));
}
}