synor/docs/PLAN/PHASE12-EconomicsBilling/01-Milestone-02-MeteringService.md
Gulshan Yadav e20e5cb11f docs(phase12): add metering service milestone
Complete Phase 12 Economics & Billing planning with detailed metering service design:
- UsageCollector for real-time metric capture from all compute nodes
- UsageAggregator for time-windowed billing aggregation
- QuotaManager for usage limits and rate limiting
- UsageAnalytics for cost optimization insights
- Comprehensive data flow and implementation plan
2026-01-19 03:01:39 +05:30

355 lines
10 KiB
Markdown

# Milestone 2: Metering Service
> Track and aggregate resource usage across all Synor L2 services for accurate billing
## Overview
The Metering Service captures real-time resource consumption across Synor Compute's heterogeneous infrastructure (CPU, GPU, TPU, NPU, LPU, FPGA, WebGPU, WASM). It provides high-fidelity usage data for billing, analytics, and cost optimization.
## Components
### 1. UsageCollector
**Purpose**: Capture granular usage metrics from compute nodes and services.
**Metrics Tracked**:
- **Compute Time**: CPU/GPU/TPU/NPU/LPU/FPGA/WebGPU/WASM seconds
- **Memory**: Peak and average RAM/VRAM usage (GB·hours)
- **Storage**: Read/write IOPS, bandwidth (GB transferred)
- **Network**: Ingress/egress bandwidth (GB)
- **Tensor Operations**: FLOPS (floating-point operations per second)
- **Model Inference**: Requests, tokens processed, latency percentiles
**Implementation**:
```rust
pub struct UsageCollector {
metrics_buffer: RingBuffer<UsageMetric>,
flush_interval: Duration,
redis_client: RedisClient,
}
pub struct UsageMetric {
pub timestamp: i64,
pub user_id: String,
pub resource_type: ResourceType, // CPU, GPU, TPU, etc.
pub operation: Operation, // TensorOp, MatMul, Inference, etc.
pub quantity: f64, // Compute seconds, FLOPS, tokens, etc.
pub metadata: HashMap<String, String>,
}
pub enum ResourceType {
CpuCompute,
GpuCompute(GpuType), // CUDA, ROCm, Metal, WebGPU
TpuCompute,
NpuCompute,
LpuCompute,
FpgaCompute,
WasmCompute,
Memory,
Storage,
Network,
}
impl UsageCollector {
pub async fn record(&mut self, metric: UsageMetric) -> Result<()> {
self.metrics_buffer.push(metric);
if self.should_flush() {
self.flush_to_redis().await?;
}
Ok(())
}
async fn flush_to_redis(&mut self) -> Result<()> {
let batch = self.metrics_buffer.drain();
for metric in batch {
let key = format!("usage:{}:{}", metric.user_id, metric.timestamp);
self.redis_client.zadd(key, metric.timestamp, &metric).await?;
}
Ok(())
}
}
```
### 2. UsageAggregator
**Purpose**: Aggregate raw metrics into billable line items.
**Aggregation Windows**:
- **Real-time**: 1-minute windows for live dashboards
- **Hourly**: For usage alerts and rate limiting
- **Daily**: For invoice generation
- **Monthly**: For billing cycles
**Implementation**:
```rust
pub struct UsageAggregator {
window_size: Duration,
pricing_oracle: Arc<PricingOracle>,
}
pub struct AggregatedUsage {
pub user_id: String,
pub period_start: i64,
pub period_end: i64,
pub resource_usage: HashMap<ResourceType, ResourceUsage>,
pub total_cost_usd: f64,
pub total_cost_synor: f64,
}
pub struct ResourceUsage {
pub quantity: f64,
pub unit: String, // "compute-seconds", "GB", "TFLOPS", etc.
pub rate_usd: f64, // USD per unit
pub cost_usd: f64, // quantity * rate_usd
}
impl UsageAggregator {
pub async fn aggregate_period(
&self,
user_id: &str,
start: i64,
end: i64,
) -> Result<AggregatedUsage> {
let raw_metrics = self.fetch_metrics(user_id, start, end).await?;
let mut resource_usage = HashMap::new();
for metric in raw_metrics {
let entry = resource_usage
.entry(metric.resource_type)
.or_insert(ResourceUsage::default());
entry.quantity += metric.quantity;
}
// Apply pricing
let mut total_cost_usd = 0.0;
for (resource_type, usage) in resource_usage.iter_mut() {
usage.rate_usd = self.pricing_oracle.get_rate(resource_type).await?;
usage.cost_usd = usage.quantity * usage.rate_usd;
total_cost_usd += usage.cost_usd;
}
let synor_price = self.pricing_oracle.get_synor_price_usd().await?;
let total_cost_synor = total_cost_usd / synor_price;
Ok(AggregatedUsage {
user_id: user_id.to_string(),
period_start: start,
period_end: end,
resource_usage,
total_cost_usd,
total_cost_synor,
})
}
}
```
### 3. UsageQuotaManager
**Purpose**: Enforce usage limits and prevent abuse.
**Features**:
- Per-user compute quotas (e.g., 100 GPU-hours/month)
- Rate limiting (e.g., 1000 requests/minute)
- Burst allowances with token bucket algorithm
- Soft limits (warnings) vs hard limits (throttling)
**Implementation**:
```rust
pub struct QuotaManager {
redis_client: RedisClient,
}
pub struct Quota {
pub resource_type: ResourceType,
pub limit: f64,
pub period: Duration,
pub current_usage: f64,
pub reset_at: i64,
}
impl QuotaManager {
pub async fn check_quota(
&self,
user_id: &str,
resource_type: ResourceType,
requested_amount: f64,
) -> Result<QuotaStatus> {
let quota = self.get_quota(user_id, &resource_type).await?;
if quota.current_usage + requested_amount > quota.limit {
return Ok(QuotaStatus::Exceeded {
limit: quota.limit,
current: quota.current_usage,
requested: requested_amount,
reset_at: quota.reset_at,
});
}
Ok(QuotaStatus::Available)
}
pub async fn consume_quota(
&mut self,
user_id: &str,
resource_type: ResourceType,
amount: f64,
) -> Result<()> {
let key = format!("quota:{}:{:?}", user_id, resource_type);
self.redis_client.incr_by(key, amount).await?;
Ok(())
}
}
```
### 4. UsageAnalytics
**Purpose**: Provide insights for cost optimization and capacity planning.
**Dashboards**:
- **User Dashboard**: Real-time usage, cost trends, quota status
- **Admin Dashboard**: Top consumers, resource utilization, anomaly detection
- **Forecast Dashboard**: Projected costs, growth trends
**Metrics**:
```rust
pub struct UsageAnalytics {
timeseries_db: TimeseriesDB,
}
pub struct CostTrend {
pub timestamps: Vec<i64>,
pub costs: Vec<f64>,
pub resource_breakdown: HashMap<ResourceType, Vec<f64>>,
}
impl UsageAnalytics {
pub async fn get_cost_trend(
&self,
user_id: &str,
window: Duration,
) -> Result<CostTrend> {
// Query timeseries DB for aggregated usage
// Generate cost trends and resource breakdowns
todo!()
}
pub async fn detect_anomalies(
&self,
user_id: &str,
) -> Result<Vec<UsageAnomaly>> {
// Statistical anomaly detection (z-score, IQR)
// Notify on sudden spikes or unusual patterns
todo!()
}
}
```
## Data Flow
```
┌──────────────────┐
│ Compute Nodes │
│ (CPU/GPU/TPU...) │
└────────┬─────────┘
│ emit metrics
┌──────────────────┐
│ UsageCollector │
│ (batch buffer) │
└────────┬─────────┘
│ flush every 1s
┌──────────────────┐
│ Redis Stream │
│ (raw metrics) │
└────────┬─────────┘
│ consume
┌──────────────────┐
│ UsageAggregator │
│ (time windows) │
└────────┬─────────┘
│ write
┌──────────────────┐
│ Timeseries DB │
│ (InfluxDB/VictoriaMetrics)
└────────┬─────────┘
│ query
┌──────────────────┐
│ BillingEngine │
│ (invoice gen) │
└──────────────────┘
```
## Implementation Plan
### Phase 1: Core Metrics Collection (Week 1-2)
- [ ] Implement UsageCollector in Rust
- [ ] Integrate with Synor Compute worker nodes
- [ ] Set up Redis streams for metric buffering
- [ ] Add metric collection to all compute operations
### Phase 2: Aggregation & Storage (Week 3-4)
- [ ] Implement UsageAggregator with hourly/daily windows
- [ ] Deploy InfluxDB or VictoriaMetrics for timeseries storage
- [ ] Create aggregation jobs (cron or stream processors)
- [ ] Build query APIs for usage data
### Phase 3: Quota Management (Week 5-6)
- [ ] Implement QuotaManager with Redis-backed quotas
- [ ] Add quota checks to orchestrator before job dispatch
- [ ] Implement rate limiting with token bucket algorithm
- [ ] Create admin UI for setting user quotas
### Phase 4: Analytics & Dashboards (Week 7-8)
- [ ] Build UsageAnalytics module
- [ ] Create user dashboard (Next.js + Chart.js)
- [ ] Add cost trend visualization
- [ ] Implement anomaly detection alerts
## Testing Strategy
### Unit Tests
- Metric recording and buffering
- Aggregation window calculations
- Quota enforcement logic
- Anomaly detection algorithms
### Integration Tests
- End-to-end metric flow (collector → Redis → aggregator → DB)
- Quota limits preventing over-consumption
- Pricing oracle integration for cost calculations
### Load Tests
- 10,000 metrics/second ingestion
- Aggregation performance with 1M+ metrics
- Query latency for large time windows
## Success Metrics
- **Accuracy**: <1% discrepancy between raw metrics and billable amounts
- **Latency**: <100ms p99 for metric recording
- **Throughput**: 100,000 metrics/second ingestion
- **Retention**: 1-year historical data for analytics
- **Uptime**: 99.9% availability for metering service
## Security Considerations
- **Data Integrity**: HMAC signatures on metrics to prevent tampering
- **Access Control**: User can only query their own usage data
- **Audit Logs**: Track all quota changes and metric adjustments
- **Rate Limiting**: Prevent abuse of analytics APIs
## Cost Optimization
- **Batch Processing**: Group metrics into 1-second batches to reduce Redis ops
- **Compression**: Use columnar storage for timeseries data
- **TTL Policies**: Auto-expire raw metrics after 7 days (keep aggregated data)
- **Caching**: Cache quota values for 60 seconds to reduce Redis load