package cron import ( "context" "encoding/json" "log/slog" "time" "github.com/ez-api/ez-api/internal/model" "gorm.io/gorm" ) // DailyStatsJob aggregates log_records into daily_stats for efficient dashboard queries. // Runs at 00:05 UTC daily to aggregate the previous day's data. // Design principles: // - Idempotent: skip if date already exists // - Immutable: once generated, rows are never modified // - T-1 aggregation: aggregates yesterday's data (ensures completeness) type DailyStatsJob struct { db *gorm.DB // main database (writes daily_stats) logDB *gorm.DB // log database (reads log_records) } // NewDailyStatsJob creates a new DailyStatsJob. func NewDailyStatsJob(db, logDB *gorm.DB) *DailyStatsJob { return &DailyStatsJob{ db: db, logDB: logDB, } } // RunOnce executes a single daily stats aggregation. Called by scheduler. func (j *DailyStatsJob) RunOnce(ctx context.Context) { if j == nil || j.db == nil || j.logDB == nil { return } if err := j.aggregateYesterday(ctx); err != nil { slog.Default().Warn("daily stats aggregation failed", "err", err) } } func (j *DailyStatsJob) aggregateYesterday(ctx context.Context) error { yesterday := time.Now().UTC().AddDate(0, 0, -1) dateStr := yesterday.Format("2006-01-02") // Idempotent check: skip if already exists var count int64 if err := j.db.Model(&model.DailyStat{}).Where("date = ?", dateStr).Count(&count).Error; err != nil { return err } if count > 0 { slog.Default().Debug("daily stats already exists, skipping", "date", dateStr) return nil } // Calculate time window for yesterday (UTC) startOfDay := time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 0, 0, 0, 0, time.UTC) endOfDay := startOfDay.Add(24 * time.Hour) // Aggregate basic stats from log_records var stats struct { Requests int64 Success int64 Failed int64 TokensIn int64 TokensOut int64 LatencySumMs int64 } if err := j.logDB.Model(&model.LogRecord{}). Select(` COUNT(*) as requests, SUM(CASE WHEN status_code >= 200 AND status_code < 400 THEN 1 ELSE 0 END) as success, SUM(CASE WHEN status_code >= 400 OR status_code = 0 THEN 1 ELSE 0 END) as failed, COALESCE(SUM(tokens_in), 0) as tokens_in, COALESCE(SUM(tokens_out), 0) as tokens_out, COALESCE(SUM(latency_ms), 0) as latency_sum_ms `). Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay). Scan(&stats).Error; err != nil { return err } // Skip if no data for this day if stats.Requests == 0 { slog.Default().Debug("no log records for date, skipping daily stats", "date", dateStr) return nil } // Aggregate top models topModels, err := j.aggregateTopModels(startOfDay, endOfDay) if err != nil { slog.Default().Warn("failed to aggregate top models", "date", dateStr, "err", err) topModels = "[]" } // Create daily stat record dailyStat := model.DailyStat{ Date: dateStr, Requests: stats.Requests, Success: stats.Success, Failed: stats.Failed, TokensIn: stats.TokensIn, TokensOut: stats.TokensOut, LatencySumMs: stats.LatencySumMs, TopModels: topModels, CreatedAt: time.Now().UTC(), } if err := j.db.Create(&dailyStat).Error; err != nil { return err } slog.Default().Info("daily stats aggregated", "date", dateStr, "requests", stats.Requests, "tokens_in", stats.TokensIn, "tokens_out", stats.TokensOut, ) return nil } func (j *DailyStatsJob) aggregateTopModels(startOfDay, endOfDay time.Time) (string, error) { type modelStat struct { Model string Requests int64 Tokens int64 } var results []modelStat if err := j.logDB.Model(&model.LogRecord{}). Select(` model_name as model, COUNT(*) as requests, COALESCE(SUM(tokens_in), 0) + COALESCE(SUM(tokens_out), 0) as tokens `). Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay). Group("model_name"). Order("requests DESC"). Limit(10). Scan(&results).Error; err != nil { return "", err } // Convert to TopModelStat format topModels := make([]model.TopModelStat, 0, len(results)) for _, r := range results { topModels = append(topModels, model.TopModelStat{ Model: r.Model, Requests: r.Requests, Tokens: r.Tokens, }) } data, err := json.Marshal(topModels) if err != nil { return "", err } return string(data), nil } // BackfillRange aggregates daily stats for a date range (inclusive). // Useful for initial population or recovery. Skips dates that already exist. func (j *DailyStatsJob) BackfillRange(ctx context.Context, startDate, endDate time.Time) error { current := startDate for !current.After(endDate) { select { case <-ctx.Done(): return ctx.Err() default: } if err := j.aggregateDate(ctx, current); err != nil { slog.Default().Warn("backfill failed for date", "date", current.Format("2006-01-02"), "err", err) } current = current.AddDate(0, 0, 1) } return nil } func (j *DailyStatsJob) aggregateDate(ctx context.Context, date time.Time) error { dateStr := date.Format("2006-01-02") // Idempotent check var count int64 if err := j.db.Model(&model.DailyStat{}).Where("date = ?", dateStr).Count(&count).Error; err != nil { return err } if count > 0 { return nil } startOfDay := time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC) endOfDay := startOfDay.Add(24 * time.Hour) var stats struct { Requests int64 Success int64 Failed int64 TokensIn int64 TokensOut int64 LatencySumMs int64 } if err := j.logDB.Model(&model.LogRecord{}). Select(` COUNT(*) as requests, SUM(CASE WHEN status_code >= 200 AND status_code < 400 THEN 1 ELSE 0 END) as success, SUM(CASE WHEN status_code >= 400 OR status_code = 0 THEN 1 ELSE 0 END) as failed, COALESCE(SUM(tokens_in), 0) as tokens_in, COALESCE(SUM(tokens_out), 0) as tokens_out, COALESCE(SUM(latency_ms), 0) as latency_sum_ms `). Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay). Scan(&stats).Error; err != nil { return err } if stats.Requests == 0 { return nil } topModels, err := j.aggregateTopModels(startOfDay, endOfDay) if err != nil { topModels = "[]" } dailyStat := model.DailyStat{ Date: dateStr, Requests: stats.Requests, Success: stats.Success, Failed: stats.Failed, TokensIn: stats.TokensIn, TokensOut: stats.TokensOut, LatencySumMs: stats.LatencySumMs, TopModels: topModels, CreatedAt: time.Now().UTC(), } return j.db.Create(&dailyStat).Error }