diff --git a/cmd/server/main.go b/cmd/server/main.go index 7b0a5b3..1a303d6 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -146,7 +146,7 @@ func main() { // Auto Migrate if logDB != db { - if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}, &model.DailyStat{}); err != nil { + if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}); err != nil { fatal(logger, "failed to auto migrate", "err", err) } if err := logDB.AutoMigrate(&model.LogRecord{}); err != nil { @@ -156,7 +156,7 @@ func main() { fatal(logger, "failed to ensure log indexes", "err", err) } } else { - if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.LogRecord{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}, &model.DailyStat{}); err != nil { + if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.LogRecord{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}); err != nil { fatal(logger, "failed to auto migrate", "err", err) } if err := service.EnsureLogIndexes(db); err != nil { @@ -201,7 +201,6 @@ func main() { ) alertDetectorConfig := cron.DefaultAlertDetectorConfig() alertDetector := cron.NewAlertDetector(db, logDB, rdb, service.NewStatsService(rdb), alertDetectorConfig, logger) - dailyStatsJob := cron.NewDailyStatsJob(db, logDB) // Setup scheduler (jobs are added incrementally, Start() called after all services initialized) sched := scheduler.New( @@ -213,7 +212,6 @@ func main() { sched.Every("log-cleanup", time.Hour, logCleaner.RunOnce) sched.Every("token-refresh", time.Duration(cfg.TokenRefresh.IntervalSeconds)*time.Second, tokenRefresher.RunOnce) sched.Every("alert-detection", time.Minute, alertDetector.RunOnce) - sched.Cron("daily-stats", "5 0 * * *", dailyStatsJob.RunOnce) // 00:05 UTC daily if outboxService != nil && outboxService.Enabled() { sched.Every("sync-outbox", outboxService.Interval(), outboxService.RunOnce) } @@ -491,7 +489,7 @@ func runImport(logger *slog.Logger, args []string) int { return 1 } - if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.SyncOutbox{}, &model.DailyStat{}); err != nil { + if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.SyncOutbox{}); err != nil { logger.Error("failed to auto migrate", "err", err) return 1 } diff --git a/internal/api/dashboard_handler.go b/internal/api/dashboard_handler.go index 5239ab9..3284afc 100644 --- a/internal/api/dashboard_handler.go +++ b/internal/api/dashboard_handler.go @@ -1,7 +1,6 @@ package api import ( - "encoding/json" "math" "net/http" "time" @@ -403,23 +402,6 @@ func previousPeriodWindow(period string) (start, end time.Time) { } } -// aggregateFromDailyStats sums statistics from daily_stats table for the given date range -func (h *DashboardHandler) aggregateFromDailyStats(startDate, endDate string) (aggregatedStats, error) { - var stats aggregatedStats - err := h.db.Model(&model.DailyStat{}). - Select(` - COALESCE(SUM(requests), 0) as requests, - COALESCE(SUM(success), 0) as success, - COALESCE(SUM(failed), 0) as failed, - COALESCE(SUM(tokens_in), 0) as tokens_in, - COALESCE(SUM(tokens_out), 0) as tokens_out, - COALESCE(SUM(latency_sum_ms), 0) as latency_sum_ms - `). - Where("date >= ? AND date <= ?", startDate, endDate). - Scan(&stats).Error - return stats, err -} - // aggregateFromLogRecords queries log_records directly for the given time range func (h *DashboardHandler) aggregateFromLogRecords(start, end time.Time) (aggregatedStats, error) { var stats struct { @@ -452,48 +434,3 @@ func (h *DashboardHandler) aggregateFromLogRecords(start, end time.Time) (aggreg LatencySumMs: stats.LatencySumMs, }, err } - -// getTopModelsFromDailyStats aggregates top models from daily_stats JSON field -func (h *DashboardHandler) getTopModelsFromDailyStats(startDate, endDate string) ([]TopModelStat, error) { - var dailyStats []model.DailyStat - if err := h.db.Where("date >= ? AND date <= ?", startDate, endDate).Find(&dailyStats).Error; err != nil { - return nil, err - } - - // Aggregate top models from all days - modelMap := make(map[string]TopModelStat) - for _, ds := range dailyStats { - var topModels []model.TopModelStat - if err := json.Unmarshal([]byte(ds.TopModels), &topModels); err != nil { - continue - } - for _, tm := range topModels { - existing := modelMap[tm.Model] - existing.Model = tm.Model - existing.Requests += tm.Requests - existing.Tokens += tm.Tokens - modelMap[tm.Model] = existing - } - } - - // Convert to slice and sort by requests - result := make([]TopModelStat, 0, len(modelMap)) - for _, tm := range modelMap { - result = append(result, tm) - } - - // Simple bubble sort for top 10 (small dataset) - for i := 0; i < len(result)-1; i++ { - for j := i + 1; j < len(result); j++ { - if result[j].Requests > result[i].Requests { - result[i], result[j] = result[j], result[i] - } - } - } - - if len(result) > 10 { - result = result[:10] - } - - return result, nil -} diff --git a/internal/cron/daily_stats_job.go b/internal/cron/daily_stats_job.go deleted file mode 100644 index 915c03d..0000000 --- a/internal/cron/daily_stats_job.go +++ /dev/null @@ -1,243 +0,0 @@ -package cron - -import ( - "context" - "encoding/json" - "log/slog" - "time" - - "github.com/ez-api/ez-api/internal/model" - "gorm.io/gorm" -) - -// DailyStatsJob aggregates log_records into daily_stats for efficient dashboard queries. -// Runs at 00:05 UTC daily to aggregate the previous day's data. -// Design principles: -// - Idempotent: skip if date already exists -// - Immutable: once generated, rows are never modified -// - T-1 aggregation: aggregates yesterday's data (ensures completeness) -type DailyStatsJob struct { - db *gorm.DB // main database (writes daily_stats) - logDB *gorm.DB // log database (reads log_records) -} - -// NewDailyStatsJob creates a new DailyStatsJob. -func NewDailyStatsJob(db, logDB *gorm.DB) *DailyStatsJob { - return &DailyStatsJob{ - db: db, - logDB: logDB, - } -} - -// RunOnce executes a single daily stats aggregation. Called by scheduler. -func (j *DailyStatsJob) RunOnce(ctx context.Context) { - if j == nil || j.db == nil || j.logDB == nil { - return - } - if err := j.aggregateYesterday(ctx); err != nil { - slog.Default().Warn("daily stats aggregation failed", "err", err) - } -} - -func (j *DailyStatsJob) aggregateYesterday(ctx context.Context) error { - yesterday := time.Now().UTC().AddDate(0, 0, -1) - dateStr := yesterday.Format("2006-01-02") - - // Idempotent check: skip if already exists - var count int64 - if err := j.db.Model(&model.DailyStat{}).Where("date = ?", dateStr).Count(&count).Error; err != nil { - return err - } - if count > 0 { - slog.Default().Debug("daily stats already exists, skipping", "date", dateStr) - return nil - } - - // Calculate time window for yesterday (UTC) - startOfDay := time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 0, 0, 0, 0, time.UTC) - endOfDay := startOfDay.Add(24 * time.Hour) - - // Aggregate basic stats from log_records - var stats struct { - Requests int64 - Success int64 - Failed int64 - TokensIn int64 - TokensOut int64 - LatencySumMs int64 - } - - if err := j.logDB.Model(&model.LogRecord{}). - Select(` - COUNT(*) as requests, - SUM(CASE WHEN status_code >= 200 AND status_code < 400 THEN 1 ELSE 0 END) as success, - SUM(CASE WHEN status_code >= 400 OR status_code = 0 THEN 1 ELSE 0 END) as failed, - COALESCE(SUM(tokens_in), 0) as tokens_in, - COALESCE(SUM(tokens_out), 0) as tokens_out, - COALESCE(SUM(latency_ms), 0) as latency_sum_ms - `). - Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay). - Scan(&stats).Error; err != nil { - return err - } - - // Skip if no data for this day - if stats.Requests == 0 { - slog.Default().Debug("no log records for date, skipping daily stats", "date", dateStr) - return nil - } - - // Aggregate top models - topModels, err := j.aggregateTopModels(startOfDay, endOfDay) - if err != nil { - slog.Default().Warn("failed to aggregate top models", "date", dateStr, "err", err) - topModels = "[]" - } - - // Create daily stat record - dailyStat := model.DailyStat{ - Date: dateStr, - Requests: stats.Requests, - Success: stats.Success, - Failed: stats.Failed, - TokensIn: stats.TokensIn, - TokensOut: stats.TokensOut, - LatencySumMs: stats.LatencySumMs, - TopModels: topModels, - CreatedAt: time.Now().UTC(), - } - - if err := j.db.Create(&dailyStat).Error; err != nil { - return err - } - - slog.Default().Info("daily stats aggregated", - "date", dateStr, - "requests", stats.Requests, - "tokens_in", stats.TokensIn, - "tokens_out", stats.TokensOut, - ) - - return nil -} - -func (j *DailyStatsJob) aggregateTopModels(startOfDay, endOfDay time.Time) (string, error) { - type modelStat struct { - Model string - Requests int64 - Tokens int64 - } - - var results []modelStat - if err := j.logDB.Model(&model.LogRecord{}). - Select(` - model_name as model, - COUNT(*) as requests, - COALESCE(SUM(tokens_in), 0) + COALESCE(SUM(tokens_out), 0) as tokens - `). - Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay). - Group("model_name"). - Order("requests DESC"). - Limit(10). - Scan(&results).Error; err != nil { - return "", err - } - - // Convert to TopModelStat format - topModels := make([]model.TopModelStat, 0, len(results)) - for _, r := range results { - topModels = append(topModels, model.TopModelStat{ - Model: r.Model, - Requests: r.Requests, - Tokens: r.Tokens, - }) - } - - data, err := json.Marshal(topModels) - if err != nil { - return "", err - } - - return string(data), nil -} - -// BackfillRange aggregates daily stats for a date range (inclusive). -// Useful for initial population or recovery. Skips dates that already exist. -func (j *DailyStatsJob) BackfillRange(ctx context.Context, startDate, endDate time.Time) error { - current := startDate - for !current.After(endDate) { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - if err := j.aggregateDate(ctx, current); err != nil { - slog.Default().Warn("backfill failed for date", "date", current.Format("2006-01-02"), "err", err) - } - current = current.AddDate(0, 0, 1) - } - return nil -} - -func (j *DailyStatsJob) aggregateDate(ctx context.Context, date time.Time) error { - dateStr := date.Format("2006-01-02") - - // Idempotent check - var count int64 - if err := j.db.Model(&model.DailyStat{}).Where("date = ?", dateStr).Count(&count).Error; err != nil { - return err - } - if count > 0 { - return nil - } - - startOfDay := time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC) - endOfDay := startOfDay.Add(24 * time.Hour) - - var stats struct { - Requests int64 - Success int64 - Failed int64 - TokensIn int64 - TokensOut int64 - LatencySumMs int64 - } - - if err := j.logDB.Model(&model.LogRecord{}). - Select(` - COUNT(*) as requests, - SUM(CASE WHEN status_code >= 200 AND status_code < 400 THEN 1 ELSE 0 END) as success, - SUM(CASE WHEN status_code >= 400 OR status_code = 0 THEN 1 ELSE 0 END) as failed, - COALESCE(SUM(tokens_in), 0) as tokens_in, - COALESCE(SUM(tokens_out), 0) as tokens_out, - COALESCE(SUM(latency_ms), 0) as latency_sum_ms - `). - Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay). - Scan(&stats).Error; err != nil { - return err - } - - if stats.Requests == 0 { - return nil - } - - topModels, err := j.aggregateTopModels(startOfDay, endOfDay) - if err != nil { - topModels = "[]" - } - - dailyStat := model.DailyStat{ - Date: dateStr, - Requests: stats.Requests, - Success: stats.Success, - Failed: stats.Failed, - TokensIn: stats.TokensIn, - TokensOut: stats.TokensOut, - LatencySumMs: stats.LatencySumMs, - TopModels: topModels, - CreatedAt: time.Now().UTC(), - } - - return j.db.Create(&dailyStat).Error -} diff --git a/internal/model/daily_stat.go b/internal/model/daily_stat.go deleted file mode 100644 index 61e5e3b..0000000 --- a/internal/model/daily_stat.go +++ /dev/null @@ -1,42 +0,0 @@ -package model - -import "time" - -// TopModelStat represents a single model's statistics in the top models list -type TopModelStat struct { - Model string `json:"model"` - Requests int64 `json:"requests"` - Tokens int64 `json:"tokens"` -} - -// DailyStat stores immutable daily aggregated statistics. -// Once generated by the cron job, a row is never modified. -// Design principles: -// - Immutable: once generated, never modified -// - Store raw aggregates, compute derived values (error_rate, avg_latency) at read time -// - One row per day, ~365 rows/year -type DailyStat struct { - Date string `gorm:"primaryKey;size:10" json:"date"` // "2006-01-02" (UTC) - - // Request counts - Requests int64 `json:"requests"` - Success int64 `json:"success"` - Failed int64 `json:"failed"` - - // Token counts - TokensIn int64 `json:"tokens_in"` - TokensOut int64 `json:"tokens_out"` - - // Latency (for avg calculation: avg = LatencySumMs / Requests) - LatencySumMs int64 `json:"latency_sum_ms"` - - // Top models (JSON array of TopModelStat) - TopModels string `gorm:"type:text" json:"top_models"` - - CreatedAt time.Time `json:"created_at"` -} - -// TableName specifies the table name for DailyStat -func (DailyStat) TableName() string { - return "daily_stats" -}