# Milestone 2: Metering Service > Track and aggregate resource usage across all Synor L2 services for accurate billing ## Overview The Metering Service captures real-time resource consumption across Synor Compute's heterogeneous infrastructure (CPU, GPU, TPU, NPU, LPU, FPGA, WebGPU, WASM). It provides high-fidelity usage data for billing, analytics, and cost optimization. ## Components ### 1. UsageCollector **Purpose**: Capture granular usage metrics from compute nodes and services. **Metrics Tracked**: - **Compute Time**: CPU/GPU/TPU/NPU/LPU/FPGA/WebGPU/WASM seconds - **Memory**: Peak and average RAM/VRAM usage (GB·hours) - **Storage**: Read/write IOPS, bandwidth (GB transferred) - **Network**: Ingress/egress bandwidth (GB) - **Tensor Operations**: FLOPS (floating-point operations per second) - **Model Inference**: Requests, tokens processed, latency percentiles **Implementation**: ```rust pub struct UsageCollector { metrics_buffer: RingBuffer, flush_interval: Duration, redis_client: RedisClient, } pub struct UsageMetric { pub timestamp: i64, pub user_id: String, pub resource_type: ResourceType, // CPU, GPU, TPU, etc. pub operation: Operation, // TensorOp, MatMul, Inference, etc. pub quantity: f64, // Compute seconds, FLOPS, tokens, etc. pub metadata: HashMap, } pub enum ResourceType { CpuCompute, GpuCompute(GpuType), // CUDA, ROCm, Metal, WebGPU TpuCompute, NpuCompute, LpuCompute, FpgaCompute, WasmCompute, Memory, Storage, Network, } impl UsageCollector { pub async fn record(&mut self, metric: UsageMetric) -> Result<()> { self.metrics_buffer.push(metric); if self.should_flush() { self.flush_to_redis().await?; } Ok(()) } async fn flush_to_redis(&mut self) -> Result<()> { let batch = self.metrics_buffer.drain(); for metric in batch { let key = format!("usage:{}:{}", metric.user_id, metric.timestamp); self.redis_client.zadd(key, metric.timestamp, &metric).await?; } Ok(()) } } ``` ### 2. UsageAggregator **Purpose**: Aggregate raw metrics into billable line items. **Aggregation Windows**: - **Real-time**: 1-minute windows for live dashboards - **Hourly**: For usage alerts and rate limiting - **Daily**: For invoice generation - **Monthly**: For billing cycles **Implementation**: ```rust pub struct UsageAggregator { window_size: Duration, pricing_oracle: Arc, } pub struct AggregatedUsage { pub user_id: String, pub period_start: i64, pub period_end: i64, pub resource_usage: HashMap, pub total_cost_usd: f64, pub total_cost_synor: f64, } pub struct ResourceUsage { pub quantity: f64, pub unit: String, // "compute-seconds", "GB", "TFLOPS", etc. pub rate_usd: f64, // USD per unit pub cost_usd: f64, // quantity * rate_usd } impl UsageAggregator { pub async fn aggregate_period( &self, user_id: &str, start: i64, end: i64, ) -> Result { let raw_metrics = self.fetch_metrics(user_id, start, end).await?; let mut resource_usage = HashMap::new(); for metric in raw_metrics { let entry = resource_usage .entry(metric.resource_type) .or_insert(ResourceUsage::default()); entry.quantity += metric.quantity; } // Apply pricing let mut total_cost_usd = 0.0; for (resource_type, usage) in resource_usage.iter_mut() { usage.rate_usd = self.pricing_oracle.get_rate(resource_type).await?; usage.cost_usd = usage.quantity * usage.rate_usd; total_cost_usd += usage.cost_usd; } let synor_price = self.pricing_oracle.get_synor_price_usd().await?; let total_cost_synor = total_cost_usd / synor_price; Ok(AggregatedUsage { user_id: user_id.to_string(), period_start: start, period_end: end, resource_usage, total_cost_usd, total_cost_synor, }) } } ``` ### 3. UsageQuotaManager **Purpose**: Enforce usage limits and prevent abuse. **Features**: - Per-user compute quotas (e.g., 100 GPU-hours/month) - Rate limiting (e.g., 1000 requests/minute) - Burst allowances with token bucket algorithm - Soft limits (warnings) vs hard limits (throttling) **Implementation**: ```rust pub struct QuotaManager { redis_client: RedisClient, } pub struct Quota { pub resource_type: ResourceType, pub limit: f64, pub period: Duration, pub current_usage: f64, pub reset_at: i64, } impl QuotaManager { pub async fn check_quota( &self, user_id: &str, resource_type: ResourceType, requested_amount: f64, ) -> Result { let quota = self.get_quota(user_id, &resource_type).await?; if quota.current_usage + requested_amount > quota.limit { return Ok(QuotaStatus::Exceeded { limit: quota.limit, current: quota.current_usage, requested: requested_amount, reset_at: quota.reset_at, }); } Ok(QuotaStatus::Available) } pub async fn consume_quota( &mut self, user_id: &str, resource_type: ResourceType, amount: f64, ) -> Result<()> { let key = format!("quota:{}:{:?}", user_id, resource_type); self.redis_client.incr_by(key, amount).await?; Ok(()) } } ``` ### 4. UsageAnalytics **Purpose**: Provide insights for cost optimization and capacity planning. **Dashboards**: - **User Dashboard**: Real-time usage, cost trends, quota status - **Admin Dashboard**: Top consumers, resource utilization, anomaly detection - **Forecast Dashboard**: Projected costs, growth trends **Metrics**: ```rust pub struct UsageAnalytics { timeseries_db: TimeseriesDB, } pub struct CostTrend { pub timestamps: Vec, pub costs: Vec, pub resource_breakdown: HashMap>, } impl UsageAnalytics { pub async fn get_cost_trend( &self, user_id: &str, window: Duration, ) -> Result { // Query timeseries DB for aggregated usage // Generate cost trends and resource breakdowns todo!() } pub async fn detect_anomalies( &self, user_id: &str, ) -> Result> { // Statistical anomaly detection (z-score, IQR) // Notify on sudden spikes or unusual patterns todo!() } } ``` ## Data Flow ``` ┌──────────────────┐ │ Compute Nodes │ │ (CPU/GPU/TPU...) │ └────────┬─────────┘ │ emit metrics ↓ ┌──────────────────┐ │ UsageCollector │ │ (batch buffer) │ └────────┬─────────┘ │ flush every 1s ↓ ┌──────────────────┐ │ Redis Stream │ │ (raw metrics) │ └────────┬─────────┘ │ consume ↓ ┌──────────────────┐ │ UsageAggregator │ │ (time windows) │ └────────┬─────────┘ │ write ↓ ┌──────────────────┐ │ Timeseries DB │ │ (InfluxDB/VictoriaMetrics) └────────┬─────────┘ │ query ↓ ┌──────────────────┐ │ BillingEngine │ │ (invoice gen) │ └──────────────────┘ ``` ## Implementation Plan ### Phase 1: Core Metrics Collection (Week 1-2) - [ ] Implement UsageCollector in Rust - [ ] Integrate with Synor Compute worker nodes - [ ] Set up Redis streams for metric buffering - [ ] Add metric collection to all compute operations ### Phase 2: Aggregation & Storage (Week 3-4) - [ ] Implement UsageAggregator with hourly/daily windows - [ ] Deploy InfluxDB or VictoriaMetrics for timeseries storage - [ ] Create aggregation jobs (cron or stream processors) - [ ] Build query APIs for usage data ### Phase 3: Quota Management (Week 5-6) - [ ] Implement QuotaManager with Redis-backed quotas - [ ] Add quota checks to orchestrator before job dispatch - [ ] Implement rate limiting with token bucket algorithm - [ ] Create admin UI for setting user quotas ### Phase 4: Analytics & Dashboards (Week 7-8) - [ ] Build UsageAnalytics module - [ ] Create user dashboard (Next.js + Chart.js) - [ ] Add cost trend visualization - [ ] Implement anomaly detection alerts ## Testing Strategy ### Unit Tests - Metric recording and buffering - Aggregation window calculations - Quota enforcement logic - Anomaly detection algorithms ### Integration Tests - End-to-end metric flow (collector → Redis → aggregator → DB) - Quota limits preventing over-consumption - Pricing oracle integration for cost calculations ### Load Tests - 10,000 metrics/second ingestion - Aggregation performance with 1M+ metrics - Query latency for large time windows ## Success Metrics - **Accuracy**: <1% discrepancy between raw metrics and billable amounts - **Latency**: <100ms p99 for metric recording - **Throughput**: 100,000 metrics/second ingestion - **Retention**: 1-year historical data for analytics - **Uptime**: 99.9% availability for metering service ## Security Considerations - **Data Integrity**: HMAC signatures on metrics to prevent tampering - **Access Control**: User can only query their own usage data - **Audit Logs**: Track all quota changes and metric adjustments - **Rate Limiting**: Prevent abuse of analytics APIs ## Cost Optimization - **Batch Processing**: Group metrics into 1-second batches to reduce Redis ops - **Compression**: Use columnar storage for timeseries data - **TTL Policies**: Auto-expire raw metrics after 7 days (keep aggregated data) - **Caching**: Cache quota values for 60 seconds to reduce Redis load