synor/docs/PLAN/PHASE12-EconomicsBilling/01-Milestone-02-MeteringService.md
Gulshan Yadav e20e5cb11f docs(phase12): add metering service milestone
Complete Phase 12 Economics & Billing planning with detailed metering service design:
- UsageCollector for real-time metric capture from all compute nodes
- UsageAggregator for time-windowed billing aggregation
- QuotaManager for usage limits and rate limiting
- UsageAnalytics for cost optimization insights
- Comprehensive data flow and implementation plan
2026-01-19 03:01:39 +05:30

10 KiB

Milestone 2: Metering Service

Track and aggregate resource usage across all Synor L2 services for accurate billing

Overview

The Metering Service captures real-time resource consumption across Synor Compute's heterogeneous infrastructure (CPU, GPU, TPU, NPU, LPU, FPGA, WebGPU, WASM). It provides high-fidelity usage data for billing, analytics, and cost optimization.

Components

1. UsageCollector

Purpose: Capture granular usage metrics from compute nodes and services.

Metrics Tracked:

  • Compute Time: CPU/GPU/TPU/NPU/LPU/FPGA/WebGPU/WASM seconds
  • Memory: Peak and average RAM/VRAM usage (GB·hours)
  • Storage: Read/write IOPS, bandwidth (GB transferred)
  • Network: Ingress/egress bandwidth (GB)
  • Tensor Operations: FLOPS (floating-point operations per second)
  • Model Inference: Requests, tokens processed, latency percentiles

Implementation:

pub struct UsageCollector {
    metrics_buffer: RingBuffer<UsageMetric>,
    flush_interval: Duration,
    redis_client: RedisClient,
}

pub struct UsageMetric {
    pub timestamp: i64,
    pub user_id: String,
    pub resource_type: ResourceType, // CPU, GPU, TPU, etc.
    pub operation: Operation,        // TensorOp, MatMul, Inference, etc.
    pub quantity: f64,               // Compute seconds, FLOPS, tokens, etc.
    pub metadata: HashMap<String, String>,
}

pub enum ResourceType {
    CpuCompute,
    GpuCompute(GpuType),  // CUDA, ROCm, Metal, WebGPU
    TpuCompute,
    NpuCompute,
    LpuCompute,
    FpgaCompute,
    WasmCompute,
    Memory,
    Storage,
    Network,
}

impl UsageCollector {
    pub async fn record(&mut self, metric: UsageMetric) -> Result<()> {
        self.metrics_buffer.push(metric);

        if self.should_flush() {
            self.flush_to_redis().await?;
        }

        Ok(())
    }

    async fn flush_to_redis(&mut self) -> Result<()> {
        let batch = self.metrics_buffer.drain();

        for metric in batch {
            let key = format!("usage:{}:{}", metric.user_id, metric.timestamp);
            self.redis_client.zadd(key, metric.timestamp, &metric).await?;
        }

        Ok(())
    }
}

2. UsageAggregator

Purpose: Aggregate raw metrics into billable line items.

Aggregation Windows:

  • Real-time: 1-minute windows for live dashboards
  • Hourly: For usage alerts and rate limiting
  • Daily: For invoice generation
  • Monthly: For billing cycles

Implementation:

pub struct UsageAggregator {
    window_size: Duration,
    pricing_oracle: Arc<PricingOracle>,
}

pub struct AggregatedUsage {
    pub user_id: String,
    pub period_start: i64,
    pub period_end: i64,
    pub resource_usage: HashMap<ResourceType, ResourceUsage>,
    pub total_cost_usd: f64,
    pub total_cost_synor: f64,
}

pub struct ResourceUsage {
    pub quantity: f64,
    pub unit: String,          // "compute-seconds", "GB", "TFLOPS", etc.
    pub rate_usd: f64,         // USD per unit
    pub cost_usd: f64,         // quantity * rate_usd
}

impl UsageAggregator {
    pub async fn aggregate_period(
        &self,
        user_id: &str,
        start: i64,
        end: i64,
    ) -> Result<AggregatedUsage> {
        let raw_metrics = self.fetch_metrics(user_id, start, end).await?;
        let mut resource_usage = HashMap::new();

        for metric in raw_metrics {
            let entry = resource_usage
                .entry(metric.resource_type)
                .or_insert(ResourceUsage::default());

            entry.quantity += metric.quantity;
        }

        // Apply pricing
        let mut total_cost_usd = 0.0;
        for (resource_type, usage) in resource_usage.iter_mut() {
            usage.rate_usd = self.pricing_oracle.get_rate(resource_type).await?;
            usage.cost_usd = usage.quantity * usage.rate_usd;
            total_cost_usd += usage.cost_usd;
        }

        let synor_price = self.pricing_oracle.get_synor_price_usd().await?;
        let total_cost_synor = total_cost_usd / synor_price;

        Ok(AggregatedUsage {
            user_id: user_id.to_string(),
            period_start: start,
            period_end: end,
            resource_usage,
            total_cost_usd,
            total_cost_synor,
        })
    }
}

3. UsageQuotaManager

Purpose: Enforce usage limits and prevent abuse.

Features:

  • Per-user compute quotas (e.g., 100 GPU-hours/month)
  • Rate limiting (e.g., 1000 requests/minute)
  • Burst allowances with token bucket algorithm
  • Soft limits (warnings) vs hard limits (throttling)

Implementation:

pub struct QuotaManager {
    redis_client: RedisClient,
}

pub struct Quota {
    pub resource_type: ResourceType,
    pub limit: f64,
    pub period: Duration,
    pub current_usage: f64,
    pub reset_at: i64,
}

impl QuotaManager {
    pub async fn check_quota(
        &self,
        user_id: &str,
        resource_type: ResourceType,
        requested_amount: f64,
    ) -> Result<QuotaStatus> {
        let quota = self.get_quota(user_id, &resource_type).await?;

        if quota.current_usage + requested_amount > quota.limit {
            return Ok(QuotaStatus::Exceeded {
                limit: quota.limit,
                current: quota.current_usage,
                requested: requested_amount,
                reset_at: quota.reset_at,
            });
        }

        Ok(QuotaStatus::Available)
    }

    pub async fn consume_quota(
        &mut self,
        user_id: &str,
        resource_type: ResourceType,
        amount: f64,
    ) -> Result<()> {
        let key = format!("quota:{}:{:?}", user_id, resource_type);
        self.redis_client.incr_by(key, amount).await?;
        Ok(())
    }
}

4. UsageAnalytics

Purpose: Provide insights for cost optimization and capacity planning.

Dashboards:

  • User Dashboard: Real-time usage, cost trends, quota status
  • Admin Dashboard: Top consumers, resource utilization, anomaly detection
  • Forecast Dashboard: Projected costs, growth trends

Metrics:

pub struct UsageAnalytics {
    timeseries_db: TimeseriesDB,
}

pub struct CostTrend {
    pub timestamps: Vec<i64>,
    pub costs: Vec<f64>,
    pub resource_breakdown: HashMap<ResourceType, Vec<f64>>,
}

impl UsageAnalytics {
    pub async fn get_cost_trend(
        &self,
        user_id: &str,
        window: Duration,
    ) -> Result<CostTrend> {
        // Query timeseries DB for aggregated usage
        // Generate cost trends and resource breakdowns
        todo!()
    }

    pub async fn detect_anomalies(
        &self,
        user_id: &str,
    ) -> Result<Vec<UsageAnomaly>> {
        // Statistical anomaly detection (z-score, IQR)
        // Notify on sudden spikes or unusual patterns
        todo!()
    }
}

Data Flow

┌──────────────────┐
│  Compute Nodes   │
│ (CPU/GPU/TPU...) │
└────────┬─────────┘
         │ emit metrics
         ↓
┌──────────────────┐
│ UsageCollector   │
│  (batch buffer)  │
└────────┬─────────┘
         │ flush every 1s
         ↓
┌──────────────────┐
│  Redis Stream    │
│ (raw metrics)    │
└────────┬─────────┘
         │ consume
         ↓
┌──────────────────┐
│ UsageAggregator  │
│ (time windows)   │
└────────┬─────────┘
         │ write
         ↓
┌──────────────────┐
│  Timeseries DB   │
│ (InfluxDB/VictoriaMetrics)
└────────┬─────────┘
         │ query
         ↓
┌──────────────────┐
│ BillingEngine    │
│ (invoice gen)    │
└──────────────────┘

Implementation Plan

Phase 1: Core Metrics Collection (Week 1-2)

  • Implement UsageCollector in Rust
  • Integrate with Synor Compute worker nodes
  • Set up Redis streams for metric buffering
  • Add metric collection to all compute operations

Phase 2: Aggregation & Storage (Week 3-4)

  • Implement UsageAggregator with hourly/daily windows
  • Deploy InfluxDB or VictoriaMetrics for timeseries storage
  • Create aggregation jobs (cron or stream processors)
  • Build query APIs for usage data

Phase 3: Quota Management (Week 5-6)

  • Implement QuotaManager with Redis-backed quotas
  • Add quota checks to orchestrator before job dispatch
  • Implement rate limiting with token bucket algorithm
  • Create admin UI for setting user quotas

Phase 4: Analytics & Dashboards (Week 7-8)

  • Build UsageAnalytics module
  • Create user dashboard (Next.js + Chart.js)
  • Add cost trend visualization
  • Implement anomaly detection alerts

Testing Strategy

Unit Tests

  • Metric recording and buffering
  • Aggregation window calculations
  • Quota enforcement logic
  • Anomaly detection algorithms

Integration Tests

  • End-to-end metric flow (collector → Redis → aggregator → DB)
  • Quota limits preventing over-consumption
  • Pricing oracle integration for cost calculations

Load Tests

  • 10,000 metrics/second ingestion
  • Aggregation performance with 1M+ metrics
  • Query latency for large time windows

Success Metrics

  • Accuracy: <1% discrepancy between raw metrics and billable amounts
  • Latency: <100ms p99 for metric recording
  • Throughput: 100,000 metrics/second ingestion
  • Retention: 1-year historical data for analytics
  • Uptime: 99.9% availability for metering service

Security Considerations

  • Data Integrity: HMAC signatures on metrics to prevent tampering
  • Access Control: User can only query their own usage data
  • Audit Logs: Track all quota changes and metric adjustments
  • Rate Limiting: Prevent abuse of analytics APIs

Cost Optimization

  • Batch Processing: Group metrics into 1-second batches to reduce Redis ops
  • Compression: Use columnar storage for timeseries data
  • TTL Policies: Auto-expire raw metrics after 7 days (keep aggregated data)
  • Caching: Cache quota values for 60 seconds to reduce Redis load