diff --git a/cmd/server/main.go b/cmd/server/main.go index 1a303d6..7b0a5b3 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -146,7 +146,7 @@ func main() { // Auto Migrate if logDB != db { - if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}); err != nil { + if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}, &model.DailyStat{}); err != nil { fatal(logger, "failed to auto migrate", "err", err) } if err := logDB.AutoMigrate(&model.LogRecord{}); err != nil { @@ -156,7 +156,7 @@ func main() { fatal(logger, "failed to ensure log indexes", "err", err) } } else { - if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.LogRecord{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}); err != nil { + if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.LogRecord{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}, &model.DailyStat{}); err != nil { fatal(logger, "failed to auto migrate", "err", err) } if err := service.EnsureLogIndexes(db); err != nil { @@ -201,6 +201,7 @@ func main() { ) alertDetectorConfig := cron.DefaultAlertDetectorConfig() alertDetector := cron.NewAlertDetector(db, logDB, rdb, service.NewStatsService(rdb), alertDetectorConfig, logger) + dailyStatsJob := cron.NewDailyStatsJob(db, logDB) // Setup scheduler (jobs are added incrementally, Start() called after all services initialized) sched := scheduler.New( @@ -212,6 +213,7 @@ func main() { sched.Every("log-cleanup", time.Hour, logCleaner.RunOnce) sched.Every("token-refresh", time.Duration(cfg.TokenRefresh.IntervalSeconds)*time.Second, tokenRefresher.RunOnce) sched.Every("alert-detection", time.Minute, alertDetector.RunOnce) + sched.Cron("daily-stats", "5 0 * * *", dailyStatsJob.RunOnce) // 00:05 UTC daily if outboxService != nil && outboxService.Enabled() { sched.Every("sync-outbox", outboxService.Interval(), outboxService.RunOnce) } @@ -489,7 +491,7 @@ func runImport(logger *slog.Logger, args []string) int { return 1 } - if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.SyncOutbox{}); err != nil { + if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.SyncOutbox{}, &model.DailyStat{}); err != nil { logger.Error("failed to auto migrate", "err", err) return 1 } diff --git a/internal/api/stats_handler.go b/internal/api/stats_handler.go index ca9ba84..59ec29a 100644 --- a/internal/api/stats_handler.go +++ b/internal/api/stats_handler.go @@ -264,6 +264,12 @@ func periodWindow(period string) (time.Time, time.Time) { case "month": start := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC) return start, now + case "last7d": + start := now.AddDate(0, 0, -7) + return start, now + case "last30d": + start := now.AddDate(0, 0, -30) + return start, now default: return time.Time{}, time.Time{} } diff --git a/internal/cron/daily_stats_job.go b/internal/cron/daily_stats_job.go new file mode 100644 index 0000000..915c03d --- /dev/null +++ b/internal/cron/daily_stats_job.go @@ -0,0 +1,243 @@ +package cron + +import ( + "context" + "encoding/json" + "log/slog" + "time" + + "github.com/ez-api/ez-api/internal/model" + "gorm.io/gorm" +) + +// DailyStatsJob aggregates log_records into daily_stats for efficient dashboard queries. +// Runs at 00:05 UTC daily to aggregate the previous day's data. +// Design principles: +// - Idempotent: skip if date already exists +// - Immutable: once generated, rows are never modified +// - T-1 aggregation: aggregates yesterday's data (ensures completeness) +type DailyStatsJob struct { + db *gorm.DB // main database (writes daily_stats) + logDB *gorm.DB // log database (reads log_records) +} + +// NewDailyStatsJob creates a new DailyStatsJob. +func NewDailyStatsJob(db, logDB *gorm.DB) *DailyStatsJob { + return &DailyStatsJob{ + db: db, + logDB: logDB, + } +} + +// RunOnce executes a single daily stats aggregation. Called by scheduler. +func (j *DailyStatsJob) RunOnce(ctx context.Context) { + if j == nil || j.db == nil || j.logDB == nil { + return + } + if err := j.aggregateYesterday(ctx); err != nil { + slog.Default().Warn("daily stats aggregation failed", "err", err) + } +} + +func (j *DailyStatsJob) aggregateYesterday(ctx context.Context) error { + yesterday := time.Now().UTC().AddDate(0, 0, -1) + dateStr := yesterday.Format("2006-01-02") + + // Idempotent check: skip if already exists + var count int64 + if err := j.db.Model(&model.DailyStat{}).Where("date = ?", dateStr).Count(&count).Error; err != nil { + return err + } + if count > 0 { + slog.Default().Debug("daily stats already exists, skipping", "date", dateStr) + return nil + } + + // Calculate time window for yesterday (UTC) + startOfDay := time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 0, 0, 0, 0, time.UTC) + endOfDay := startOfDay.Add(24 * time.Hour) + + // Aggregate basic stats from log_records + var stats struct { + Requests int64 + Success int64 + Failed int64 + TokensIn int64 + TokensOut int64 + LatencySumMs int64 + } + + if err := j.logDB.Model(&model.LogRecord{}). + Select(` + COUNT(*) as requests, + SUM(CASE WHEN status_code >= 200 AND status_code < 400 THEN 1 ELSE 0 END) as success, + SUM(CASE WHEN status_code >= 400 OR status_code = 0 THEN 1 ELSE 0 END) as failed, + COALESCE(SUM(tokens_in), 0) as tokens_in, + COALESCE(SUM(tokens_out), 0) as tokens_out, + COALESCE(SUM(latency_ms), 0) as latency_sum_ms + `). + Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay). + Scan(&stats).Error; err != nil { + return err + } + + // Skip if no data for this day + if stats.Requests == 0 { + slog.Default().Debug("no log records for date, skipping daily stats", "date", dateStr) + return nil + } + + // Aggregate top models + topModels, err := j.aggregateTopModels(startOfDay, endOfDay) + if err != nil { + slog.Default().Warn("failed to aggregate top models", "date", dateStr, "err", err) + topModels = "[]" + } + + // Create daily stat record + dailyStat := model.DailyStat{ + Date: dateStr, + Requests: stats.Requests, + Success: stats.Success, + Failed: stats.Failed, + TokensIn: stats.TokensIn, + TokensOut: stats.TokensOut, + LatencySumMs: stats.LatencySumMs, + TopModels: topModels, + CreatedAt: time.Now().UTC(), + } + + if err := j.db.Create(&dailyStat).Error; err != nil { + return err + } + + slog.Default().Info("daily stats aggregated", + "date", dateStr, + "requests", stats.Requests, + "tokens_in", stats.TokensIn, + "tokens_out", stats.TokensOut, + ) + + return nil +} + +func (j *DailyStatsJob) aggregateTopModels(startOfDay, endOfDay time.Time) (string, error) { + type modelStat struct { + Model string + Requests int64 + Tokens int64 + } + + var results []modelStat + if err := j.logDB.Model(&model.LogRecord{}). + Select(` + model_name as model, + COUNT(*) as requests, + COALESCE(SUM(tokens_in), 0) + COALESCE(SUM(tokens_out), 0) as tokens + `). + Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay). + Group("model_name"). + Order("requests DESC"). + Limit(10). + Scan(&results).Error; err != nil { + return "", err + } + + // Convert to TopModelStat format + topModels := make([]model.TopModelStat, 0, len(results)) + for _, r := range results { + topModels = append(topModels, model.TopModelStat{ + Model: r.Model, + Requests: r.Requests, + Tokens: r.Tokens, + }) + } + + data, err := json.Marshal(topModels) + if err != nil { + return "", err + } + + return string(data), nil +} + +// BackfillRange aggregates daily stats for a date range (inclusive). +// Useful for initial population or recovery. Skips dates that already exist. +func (j *DailyStatsJob) BackfillRange(ctx context.Context, startDate, endDate time.Time) error { + current := startDate + for !current.After(endDate) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if err := j.aggregateDate(ctx, current); err != nil { + slog.Default().Warn("backfill failed for date", "date", current.Format("2006-01-02"), "err", err) + } + current = current.AddDate(0, 0, 1) + } + return nil +} + +func (j *DailyStatsJob) aggregateDate(ctx context.Context, date time.Time) error { + dateStr := date.Format("2006-01-02") + + // Idempotent check + var count int64 + if err := j.db.Model(&model.DailyStat{}).Where("date = ?", dateStr).Count(&count).Error; err != nil { + return err + } + if count > 0 { + return nil + } + + startOfDay := time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC) + endOfDay := startOfDay.Add(24 * time.Hour) + + var stats struct { + Requests int64 + Success int64 + Failed int64 + TokensIn int64 + TokensOut int64 + LatencySumMs int64 + } + + if err := j.logDB.Model(&model.LogRecord{}). + Select(` + COUNT(*) as requests, + SUM(CASE WHEN status_code >= 200 AND status_code < 400 THEN 1 ELSE 0 END) as success, + SUM(CASE WHEN status_code >= 400 OR status_code = 0 THEN 1 ELSE 0 END) as failed, + COALESCE(SUM(tokens_in), 0) as tokens_in, + COALESCE(SUM(tokens_out), 0) as tokens_out, + COALESCE(SUM(latency_ms), 0) as latency_sum_ms + `). + Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay). + Scan(&stats).Error; err != nil { + return err + } + + if stats.Requests == 0 { + return nil + } + + topModels, err := j.aggregateTopModels(startOfDay, endOfDay) + if err != nil { + topModels = "[]" + } + + dailyStat := model.DailyStat{ + Date: dateStr, + Requests: stats.Requests, + Success: stats.Success, + Failed: stats.Failed, + TokensIn: stats.TokensIn, + TokensOut: stats.TokensOut, + LatencySumMs: stats.LatencySumMs, + TopModels: topModels, + CreatedAt: time.Now().UTC(), + } + + return j.db.Create(&dailyStat).Error +} diff --git a/internal/model/daily_stat.go b/internal/model/daily_stat.go new file mode 100644 index 0000000..61e5e3b --- /dev/null +++ b/internal/model/daily_stat.go @@ -0,0 +1,42 @@ +package model + +import "time" + +// TopModelStat represents a single model's statistics in the top models list +type TopModelStat struct { + Model string `json:"model"` + Requests int64 `json:"requests"` + Tokens int64 `json:"tokens"` +} + +// DailyStat stores immutable daily aggregated statistics. +// Once generated by the cron job, a row is never modified. +// Design principles: +// - Immutable: once generated, never modified +// - Store raw aggregates, compute derived values (error_rate, avg_latency) at read time +// - One row per day, ~365 rows/year +type DailyStat struct { + Date string `gorm:"primaryKey;size:10" json:"date"` // "2006-01-02" (UTC) + + // Request counts + Requests int64 `json:"requests"` + Success int64 `json:"success"` + Failed int64 `json:"failed"` + + // Token counts + TokensIn int64 `json:"tokens_in"` + TokensOut int64 `json:"tokens_out"` + + // Latency (for avg calculation: avg = LatencySumMs / Requests) + LatencySumMs int64 `json:"latency_sum_ms"` + + // Top models (JSON array of TopModelStat) + TopModels string `gorm:"type:text" json:"top_models"` + + CreatedAt time.Time `json:"created_at"` +} + +// TableName specifies the table name for DailyStat +func (DailyStat) TableName() string { + return "daily_stats" +}