From e20e5cb11f2cb915dbc6d8063cfa0d194dadffd8 Mon Sep 17 00:00:00 2001 From: Gulshan Yadav Date: Mon, 19 Jan 2026 03:01:39 +0530 Subject: [PATCH] docs(phase12): add metering service milestone Complete Phase 12 Economics & Billing planning with detailed metering service design: - UsageCollector for real-time metric capture from all compute nodes - UsageAggregator for time-windowed billing aggregation - QuotaManager for usage limits and rate limiting - UsageAnalytics for cost optimization insights - Comprehensive data flow and implementation plan --- .../01-Milestone-02-MeteringService.md | 355 ++++++++++++++++++ 1 file changed, 355 insertions(+) create mode 100644 docs/PLAN/PHASE12-EconomicsBilling/01-Milestone-02-MeteringService.md diff --git a/docs/PLAN/PHASE12-EconomicsBilling/01-Milestone-02-MeteringService.md b/docs/PLAN/PHASE12-EconomicsBilling/01-Milestone-02-MeteringService.md new file mode 100644 index 0000000..2065d4d --- /dev/null +++ b/docs/PLAN/PHASE12-EconomicsBilling/01-Milestone-02-MeteringService.md @@ -0,0 +1,355 @@ +# Milestone 2: Metering Service + +> Track and aggregate resource usage across all Synor L2 services for accurate billing + +## Overview + +The Metering Service captures real-time resource consumption across Synor Compute's heterogeneous infrastructure (CPU, GPU, TPU, NPU, LPU, FPGA, WebGPU, WASM). It provides high-fidelity usage data for billing, analytics, and cost optimization. + +## Components + +### 1. UsageCollector + +**Purpose**: Capture granular usage metrics from compute nodes and services. + +**Metrics Tracked**: +- **Compute Time**: CPU/GPU/TPU/NPU/LPU/FPGA/WebGPU/WASM seconds +- **Memory**: Peak and average RAM/VRAM usage (GB·hours) +- **Storage**: Read/write IOPS, bandwidth (GB transferred) +- **Network**: Ingress/egress bandwidth (GB) +- **Tensor Operations**: FLOPS (floating-point operations per second) +- **Model Inference**: Requests, tokens processed, latency percentiles + +**Implementation**: +```rust +pub struct UsageCollector { + metrics_buffer: RingBuffer, + flush_interval: Duration, + redis_client: RedisClient, +} + +pub struct UsageMetric { + pub timestamp: i64, + pub user_id: String, + pub resource_type: ResourceType, // CPU, GPU, TPU, etc. + pub operation: Operation, // TensorOp, MatMul, Inference, etc. + pub quantity: f64, // Compute seconds, FLOPS, tokens, etc. + pub metadata: HashMap, +} + +pub enum ResourceType { + CpuCompute, + GpuCompute(GpuType), // CUDA, ROCm, Metal, WebGPU + TpuCompute, + NpuCompute, + LpuCompute, + FpgaCompute, + WasmCompute, + Memory, + Storage, + Network, +} + +impl UsageCollector { + pub async fn record(&mut self, metric: UsageMetric) -> Result<()> { + self.metrics_buffer.push(metric); + + if self.should_flush() { + self.flush_to_redis().await?; + } + + Ok(()) + } + + async fn flush_to_redis(&mut self) -> Result<()> { + let batch = self.metrics_buffer.drain(); + + for metric in batch { + let key = format!("usage:{}:{}", metric.user_id, metric.timestamp); + self.redis_client.zadd(key, metric.timestamp, &metric).await?; + } + + Ok(()) + } +} +``` + +### 2. UsageAggregator + +**Purpose**: Aggregate raw metrics into billable line items. + +**Aggregation Windows**: +- **Real-time**: 1-minute windows for live dashboards +- **Hourly**: For usage alerts and rate limiting +- **Daily**: For invoice generation +- **Monthly**: For billing cycles + +**Implementation**: +```rust +pub struct UsageAggregator { + window_size: Duration, + pricing_oracle: Arc, +} + +pub struct AggregatedUsage { + pub user_id: String, + pub period_start: i64, + pub period_end: i64, + pub resource_usage: HashMap, + pub total_cost_usd: f64, + pub total_cost_synor: f64, +} + +pub struct ResourceUsage { + pub quantity: f64, + pub unit: String, // "compute-seconds", "GB", "TFLOPS", etc. + pub rate_usd: f64, // USD per unit + pub cost_usd: f64, // quantity * rate_usd +} + +impl UsageAggregator { + pub async fn aggregate_period( + &self, + user_id: &str, + start: i64, + end: i64, + ) -> Result { + let raw_metrics = self.fetch_metrics(user_id, start, end).await?; + let mut resource_usage = HashMap::new(); + + for metric in raw_metrics { + let entry = resource_usage + .entry(metric.resource_type) + .or_insert(ResourceUsage::default()); + + entry.quantity += metric.quantity; + } + + // Apply pricing + let mut total_cost_usd = 0.0; + for (resource_type, usage) in resource_usage.iter_mut() { + usage.rate_usd = self.pricing_oracle.get_rate(resource_type).await?; + usage.cost_usd = usage.quantity * usage.rate_usd; + total_cost_usd += usage.cost_usd; + } + + let synor_price = self.pricing_oracle.get_synor_price_usd().await?; + let total_cost_synor = total_cost_usd / synor_price; + + Ok(AggregatedUsage { + user_id: user_id.to_string(), + period_start: start, + period_end: end, + resource_usage, + total_cost_usd, + total_cost_synor, + }) + } +} +``` + +### 3. UsageQuotaManager + +**Purpose**: Enforce usage limits and prevent abuse. + +**Features**: +- Per-user compute quotas (e.g., 100 GPU-hours/month) +- Rate limiting (e.g., 1000 requests/minute) +- Burst allowances with token bucket algorithm +- Soft limits (warnings) vs hard limits (throttling) + +**Implementation**: +```rust +pub struct QuotaManager { + redis_client: RedisClient, +} + +pub struct Quota { + pub resource_type: ResourceType, + pub limit: f64, + pub period: Duration, + pub current_usage: f64, + pub reset_at: i64, +} + +impl QuotaManager { + pub async fn check_quota( + &self, + user_id: &str, + resource_type: ResourceType, + requested_amount: f64, + ) -> Result { + let quota = self.get_quota(user_id, &resource_type).await?; + + if quota.current_usage + requested_amount > quota.limit { + return Ok(QuotaStatus::Exceeded { + limit: quota.limit, + current: quota.current_usage, + requested: requested_amount, + reset_at: quota.reset_at, + }); + } + + Ok(QuotaStatus::Available) + } + + pub async fn consume_quota( + &mut self, + user_id: &str, + resource_type: ResourceType, + amount: f64, + ) -> Result<()> { + let key = format!("quota:{}:{:?}", user_id, resource_type); + self.redis_client.incr_by(key, amount).await?; + Ok(()) + } +} +``` + +### 4. UsageAnalytics + +**Purpose**: Provide insights for cost optimization and capacity planning. + +**Dashboards**: +- **User Dashboard**: Real-time usage, cost trends, quota status +- **Admin Dashboard**: Top consumers, resource utilization, anomaly detection +- **Forecast Dashboard**: Projected costs, growth trends + +**Metrics**: +```rust +pub struct UsageAnalytics { + timeseries_db: TimeseriesDB, +} + +pub struct CostTrend { + pub timestamps: Vec, + pub costs: Vec, + pub resource_breakdown: HashMap>, +} + +impl UsageAnalytics { + pub async fn get_cost_trend( + &self, + user_id: &str, + window: Duration, + ) -> Result { + // Query timeseries DB for aggregated usage + // Generate cost trends and resource breakdowns + todo!() + } + + pub async fn detect_anomalies( + &self, + user_id: &str, + ) -> Result> { + // Statistical anomaly detection (z-score, IQR) + // Notify on sudden spikes or unusual patterns + todo!() + } +} +``` + +## Data Flow + +``` +┌──────────────────┐ +│ Compute Nodes │ +│ (CPU/GPU/TPU...) │ +└────────┬─────────┘ + │ emit metrics + ↓ +┌──────────────────┐ +│ UsageCollector │ +│ (batch buffer) │ +└────────┬─────────┘ + │ flush every 1s + ↓ +┌──────────────────┐ +│ Redis Stream │ +│ (raw metrics) │ +└────────┬─────────┘ + │ consume + ↓ +┌──────────────────┐ +│ UsageAggregator │ +│ (time windows) │ +└────────┬─────────┘ + │ write + ↓ +┌──────────────────┐ +│ Timeseries DB │ +│ (InfluxDB/VictoriaMetrics) +└────────┬─────────┘ + │ query + ↓ +┌──────────────────┐ +│ BillingEngine │ +│ (invoice gen) │ +└──────────────────┘ +``` + +## Implementation Plan + +### Phase 1: Core Metrics Collection (Week 1-2) +- [ ] Implement UsageCollector in Rust +- [ ] Integrate with Synor Compute worker nodes +- [ ] Set up Redis streams for metric buffering +- [ ] Add metric collection to all compute operations + +### Phase 2: Aggregation & Storage (Week 3-4) +- [ ] Implement UsageAggregator with hourly/daily windows +- [ ] Deploy InfluxDB or VictoriaMetrics for timeseries storage +- [ ] Create aggregation jobs (cron or stream processors) +- [ ] Build query APIs for usage data + +### Phase 3: Quota Management (Week 5-6) +- [ ] Implement QuotaManager with Redis-backed quotas +- [ ] Add quota checks to orchestrator before job dispatch +- [ ] Implement rate limiting with token bucket algorithm +- [ ] Create admin UI for setting user quotas + +### Phase 4: Analytics & Dashboards (Week 7-8) +- [ ] Build UsageAnalytics module +- [ ] Create user dashboard (Next.js + Chart.js) +- [ ] Add cost trend visualization +- [ ] Implement anomaly detection alerts + +## Testing Strategy + +### Unit Tests +- Metric recording and buffering +- Aggregation window calculations +- Quota enforcement logic +- Anomaly detection algorithms + +### Integration Tests +- End-to-end metric flow (collector → Redis → aggregator → DB) +- Quota limits preventing over-consumption +- Pricing oracle integration for cost calculations + +### Load Tests +- 10,000 metrics/second ingestion +- Aggregation performance with 1M+ metrics +- Query latency for large time windows + +## Success Metrics + +- **Accuracy**: <1% discrepancy between raw metrics and billable amounts +- **Latency**: <100ms p99 for metric recording +- **Throughput**: 100,000 metrics/second ingestion +- **Retention**: 1-year historical data for analytics +- **Uptime**: 99.9% availability for metering service + +## Security Considerations + +- **Data Integrity**: HMAC signatures on metrics to prevent tampering +- **Access Control**: User can only query their own usage data +- **Audit Logs**: Track all quota changes and metric adjustments +- **Rate Limiting**: Prevent abuse of analytics APIs + +## Cost Optimization + +- **Batch Processing**: Group metrics into 1-second batches to reduce Redis ops +- **Compression**: Use columnar storage for timeseries data +- **TTL Policies**: Auto-expire raw metrics after 7 days (keep aggregated data) +- **Caching**: Cache quota values for 60 seconds to reduce Redis load