mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
refactor(stats): remove daily stats aggregation
Remove the DailyStatsJob, DailyStat model, and associated database migrations. This eliminates the pre-aggregation layer and updates the dashboard handler to remove dependencies on the daily_stats table.
This commit is contained in:
@@ -146,7 +146,7 @@ func main() {
|
|||||||
|
|
||||||
// Auto Migrate
|
// Auto Migrate
|
||||||
if logDB != db {
|
if logDB != db {
|
||||||
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}, &model.DailyStat{}); err != nil {
|
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}); err != nil {
|
||||||
fatal(logger, "failed to auto migrate", "err", err)
|
fatal(logger, "failed to auto migrate", "err", err)
|
||||||
}
|
}
|
||||||
if err := logDB.AutoMigrate(&model.LogRecord{}); err != nil {
|
if err := logDB.AutoMigrate(&model.LogRecord{}); err != nil {
|
||||||
@@ -156,7 +156,7 @@ func main() {
|
|||||||
fatal(logger, "failed to ensure log indexes", "err", err)
|
fatal(logger, "failed to ensure log indexes", "err", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.LogRecord{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}, &model.DailyStat{}); err != nil {
|
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.LogRecord{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}); err != nil {
|
||||||
fatal(logger, "failed to auto migrate", "err", err)
|
fatal(logger, "failed to auto migrate", "err", err)
|
||||||
}
|
}
|
||||||
if err := service.EnsureLogIndexes(db); err != nil {
|
if err := service.EnsureLogIndexes(db); err != nil {
|
||||||
@@ -201,7 +201,6 @@ func main() {
|
|||||||
)
|
)
|
||||||
alertDetectorConfig := cron.DefaultAlertDetectorConfig()
|
alertDetectorConfig := cron.DefaultAlertDetectorConfig()
|
||||||
alertDetector := cron.NewAlertDetector(db, logDB, rdb, service.NewStatsService(rdb), alertDetectorConfig, logger)
|
alertDetector := cron.NewAlertDetector(db, logDB, rdb, service.NewStatsService(rdb), alertDetectorConfig, logger)
|
||||||
dailyStatsJob := cron.NewDailyStatsJob(db, logDB)
|
|
||||||
|
|
||||||
// Setup scheduler (jobs are added incrementally, Start() called after all services initialized)
|
// Setup scheduler (jobs are added incrementally, Start() called after all services initialized)
|
||||||
sched := scheduler.New(
|
sched := scheduler.New(
|
||||||
@@ -213,7 +212,6 @@ func main() {
|
|||||||
sched.Every("log-cleanup", time.Hour, logCleaner.RunOnce)
|
sched.Every("log-cleanup", time.Hour, logCleaner.RunOnce)
|
||||||
sched.Every("token-refresh", time.Duration(cfg.TokenRefresh.IntervalSeconds)*time.Second, tokenRefresher.RunOnce)
|
sched.Every("token-refresh", time.Duration(cfg.TokenRefresh.IntervalSeconds)*time.Second, tokenRefresher.RunOnce)
|
||||||
sched.Every("alert-detection", time.Minute, alertDetector.RunOnce)
|
sched.Every("alert-detection", time.Minute, alertDetector.RunOnce)
|
||||||
sched.Cron("daily-stats", "5 0 * * *", dailyStatsJob.RunOnce) // 00:05 UTC daily
|
|
||||||
if outboxService != nil && outboxService.Enabled() {
|
if outboxService != nil && outboxService.Enabled() {
|
||||||
sched.Every("sync-outbox", outboxService.Interval(), outboxService.RunOnce)
|
sched.Every("sync-outbox", outboxService.Interval(), outboxService.RunOnce)
|
||||||
}
|
}
|
||||||
@@ -491,7 +489,7 @@ func runImport(logger *slog.Logger, args []string) int {
|
|||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.SyncOutbox{}, &model.DailyStat{}); err != nil {
|
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.SyncOutbox{}); err != nil {
|
||||||
logger.Error("failed to auto migrate", "err", err)
|
logger.Error("failed to auto migrate", "err", err)
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
@@ -403,23 +402,6 @@ func previousPeriodWindow(period string) (start, end time.Time) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// aggregateFromDailyStats sums statistics from daily_stats table for the given date range
|
|
||||||
func (h *DashboardHandler) aggregateFromDailyStats(startDate, endDate string) (aggregatedStats, error) {
|
|
||||||
var stats aggregatedStats
|
|
||||||
err := h.db.Model(&model.DailyStat{}).
|
|
||||||
Select(`
|
|
||||||
COALESCE(SUM(requests), 0) as requests,
|
|
||||||
COALESCE(SUM(success), 0) as success,
|
|
||||||
COALESCE(SUM(failed), 0) as failed,
|
|
||||||
COALESCE(SUM(tokens_in), 0) as tokens_in,
|
|
||||||
COALESCE(SUM(tokens_out), 0) as tokens_out,
|
|
||||||
COALESCE(SUM(latency_sum_ms), 0) as latency_sum_ms
|
|
||||||
`).
|
|
||||||
Where("date >= ? AND date <= ?", startDate, endDate).
|
|
||||||
Scan(&stats).Error
|
|
||||||
return stats, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// aggregateFromLogRecords queries log_records directly for the given time range
|
// aggregateFromLogRecords queries log_records directly for the given time range
|
||||||
func (h *DashboardHandler) aggregateFromLogRecords(start, end time.Time) (aggregatedStats, error) {
|
func (h *DashboardHandler) aggregateFromLogRecords(start, end time.Time) (aggregatedStats, error) {
|
||||||
var stats struct {
|
var stats struct {
|
||||||
@@ -452,48 +434,3 @@ func (h *DashboardHandler) aggregateFromLogRecords(start, end time.Time) (aggreg
|
|||||||
LatencySumMs: stats.LatencySumMs,
|
LatencySumMs: stats.LatencySumMs,
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// getTopModelsFromDailyStats aggregates top models from daily_stats JSON field
|
|
||||||
func (h *DashboardHandler) getTopModelsFromDailyStats(startDate, endDate string) ([]TopModelStat, error) {
|
|
||||||
var dailyStats []model.DailyStat
|
|
||||||
if err := h.db.Where("date >= ? AND date <= ?", startDate, endDate).Find(&dailyStats).Error; err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Aggregate top models from all days
|
|
||||||
modelMap := make(map[string]TopModelStat)
|
|
||||||
for _, ds := range dailyStats {
|
|
||||||
var topModels []model.TopModelStat
|
|
||||||
if err := json.Unmarshal([]byte(ds.TopModels), &topModels); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, tm := range topModels {
|
|
||||||
existing := modelMap[tm.Model]
|
|
||||||
existing.Model = tm.Model
|
|
||||||
existing.Requests += tm.Requests
|
|
||||||
existing.Tokens += tm.Tokens
|
|
||||||
modelMap[tm.Model] = existing
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert to slice and sort by requests
|
|
||||||
result := make([]TopModelStat, 0, len(modelMap))
|
|
||||||
for _, tm := range modelMap {
|
|
||||||
result = append(result, tm)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Simple bubble sort for top 10 (small dataset)
|
|
||||||
for i := 0; i < len(result)-1; i++ {
|
|
||||||
for j := i + 1; j < len(result); j++ {
|
|
||||||
if result[j].Requests > result[i].Requests {
|
|
||||||
result[i], result[j] = result[j], result[i]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(result) > 10 {
|
|
||||||
result = result[:10]
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,243 +0,0 @@
|
|||||||
package cron
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"log/slog"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ez-api/ez-api/internal/model"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
)
|
|
||||||
|
|
||||||
// DailyStatsJob aggregates log_records into daily_stats for efficient dashboard queries.
|
|
||||||
// Runs at 00:05 UTC daily to aggregate the previous day's data.
|
|
||||||
// Design principles:
|
|
||||||
// - Idempotent: skip if date already exists
|
|
||||||
// - Immutable: once generated, rows are never modified
|
|
||||||
// - T-1 aggregation: aggregates yesterday's data (ensures completeness)
|
|
||||||
type DailyStatsJob struct {
|
|
||||||
db *gorm.DB // main database (writes daily_stats)
|
|
||||||
logDB *gorm.DB // log database (reads log_records)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDailyStatsJob creates a new DailyStatsJob.
|
|
||||||
func NewDailyStatsJob(db, logDB *gorm.DB) *DailyStatsJob {
|
|
||||||
return &DailyStatsJob{
|
|
||||||
db: db,
|
|
||||||
logDB: logDB,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// RunOnce executes a single daily stats aggregation. Called by scheduler.
|
|
||||||
func (j *DailyStatsJob) RunOnce(ctx context.Context) {
|
|
||||||
if j == nil || j.db == nil || j.logDB == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := j.aggregateYesterday(ctx); err != nil {
|
|
||||||
slog.Default().Warn("daily stats aggregation failed", "err", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (j *DailyStatsJob) aggregateYesterday(ctx context.Context) error {
|
|
||||||
yesterday := time.Now().UTC().AddDate(0, 0, -1)
|
|
||||||
dateStr := yesterday.Format("2006-01-02")
|
|
||||||
|
|
||||||
// Idempotent check: skip if already exists
|
|
||||||
var count int64
|
|
||||||
if err := j.db.Model(&model.DailyStat{}).Where("date = ?", dateStr).Count(&count).Error; err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if count > 0 {
|
|
||||||
slog.Default().Debug("daily stats already exists, skipping", "date", dateStr)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate time window for yesterday (UTC)
|
|
||||||
startOfDay := time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 0, 0, 0, 0, time.UTC)
|
|
||||||
endOfDay := startOfDay.Add(24 * time.Hour)
|
|
||||||
|
|
||||||
// Aggregate basic stats from log_records
|
|
||||||
var stats struct {
|
|
||||||
Requests int64
|
|
||||||
Success int64
|
|
||||||
Failed int64
|
|
||||||
TokensIn int64
|
|
||||||
TokensOut int64
|
|
||||||
LatencySumMs int64
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := j.logDB.Model(&model.LogRecord{}).
|
|
||||||
Select(`
|
|
||||||
COUNT(*) as requests,
|
|
||||||
SUM(CASE WHEN status_code >= 200 AND status_code < 400 THEN 1 ELSE 0 END) as success,
|
|
||||||
SUM(CASE WHEN status_code >= 400 OR status_code = 0 THEN 1 ELSE 0 END) as failed,
|
|
||||||
COALESCE(SUM(tokens_in), 0) as tokens_in,
|
|
||||||
COALESCE(SUM(tokens_out), 0) as tokens_out,
|
|
||||||
COALESCE(SUM(latency_ms), 0) as latency_sum_ms
|
|
||||||
`).
|
|
||||||
Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay).
|
|
||||||
Scan(&stats).Error; err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip if no data for this day
|
|
||||||
if stats.Requests == 0 {
|
|
||||||
slog.Default().Debug("no log records for date, skipping daily stats", "date", dateStr)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Aggregate top models
|
|
||||||
topModels, err := j.aggregateTopModels(startOfDay, endOfDay)
|
|
||||||
if err != nil {
|
|
||||||
slog.Default().Warn("failed to aggregate top models", "date", dateStr, "err", err)
|
|
||||||
topModels = "[]"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create daily stat record
|
|
||||||
dailyStat := model.DailyStat{
|
|
||||||
Date: dateStr,
|
|
||||||
Requests: stats.Requests,
|
|
||||||
Success: stats.Success,
|
|
||||||
Failed: stats.Failed,
|
|
||||||
TokensIn: stats.TokensIn,
|
|
||||||
TokensOut: stats.TokensOut,
|
|
||||||
LatencySumMs: stats.LatencySumMs,
|
|
||||||
TopModels: topModels,
|
|
||||||
CreatedAt: time.Now().UTC(),
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := j.db.Create(&dailyStat).Error; err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
slog.Default().Info("daily stats aggregated",
|
|
||||||
"date", dateStr,
|
|
||||||
"requests", stats.Requests,
|
|
||||||
"tokens_in", stats.TokensIn,
|
|
||||||
"tokens_out", stats.TokensOut,
|
|
||||||
)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (j *DailyStatsJob) aggregateTopModels(startOfDay, endOfDay time.Time) (string, error) {
|
|
||||||
type modelStat struct {
|
|
||||||
Model string
|
|
||||||
Requests int64
|
|
||||||
Tokens int64
|
|
||||||
}
|
|
||||||
|
|
||||||
var results []modelStat
|
|
||||||
if err := j.logDB.Model(&model.LogRecord{}).
|
|
||||||
Select(`
|
|
||||||
model_name as model,
|
|
||||||
COUNT(*) as requests,
|
|
||||||
COALESCE(SUM(tokens_in), 0) + COALESCE(SUM(tokens_out), 0) as tokens
|
|
||||||
`).
|
|
||||||
Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay).
|
|
||||||
Group("model_name").
|
|
||||||
Order("requests DESC").
|
|
||||||
Limit(10).
|
|
||||||
Scan(&results).Error; err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert to TopModelStat format
|
|
||||||
topModels := make([]model.TopModelStat, 0, len(results))
|
|
||||||
for _, r := range results {
|
|
||||||
topModels = append(topModels, model.TopModelStat{
|
|
||||||
Model: r.Model,
|
|
||||||
Requests: r.Requests,
|
|
||||||
Tokens: r.Tokens,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := json.Marshal(topModels)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
return string(data), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// BackfillRange aggregates daily stats for a date range (inclusive).
|
|
||||||
// Useful for initial population or recovery. Skips dates that already exist.
|
|
||||||
func (j *DailyStatsJob) BackfillRange(ctx context.Context, startDate, endDate time.Time) error {
|
|
||||||
current := startDate
|
|
||||||
for !current.After(endDate) {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := j.aggregateDate(ctx, current); err != nil {
|
|
||||||
slog.Default().Warn("backfill failed for date", "date", current.Format("2006-01-02"), "err", err)
|
|
||||||
}
|
|
||||||
current = current.AddDate(0, 0, 1)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (j *DailyStatsJob) aggregateDate(ctx context.Context, date time.Time) error {
|
|
||||||
dateStr := date.Format("2006-01-02")
|
|
||||||
|
|
||||||
// Idempotent check
|
|
||||||
var count int64
|
|
||||||
if err := j.db.Model(&model.DailyStat{}).Where("date = ?", dateStr).Count(&count).Error; err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if count > 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
startOfDay := time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC)
|
|
||||||
endOfDay := startOfDay.Add(24 * time.Hour)
|
|
||||||
|
|
||||||
var stats struct {
|
|
||||||
Requests int64
|
|
||||||
Success int64
|
|
||||||
Failed int64
|
|
||||||
TokensIn int64
|
|
||||||
TokensOut int64
|
|
||||||
LatencySumMs int64
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := j.logDB.Model(&model.LogRecord{}).
|
|
||||||
Select(`
|
|
||||||
COUNT(*) as requests,
|
|
||||||
SUM(CASE WHEN status_code >= 200 AND status_code < 400 THEN 1 ELSE 0 END) as success,
|
|
||||||
SUM(CASE WHEN status_code >= 400 OR status_code = 0 THEN 1 ELSE 0 END) as failed,
|
|
||||||
COALESCE(SUM(tokens_in), 0) as tokens_in,
|
|
||||||
COALESCE(SUM(tokens_out), 0) as tokens_out,
|
|
||||||
COALESCE(SUM(latency_ms), 0) as latency_sum_ms
|
|
||||||
`).
|
|
||||||
Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay).
|
|
||||||
Scan(&stats).Error; err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if stats.Requests == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
topModels, err := j.aggregateTopModels(startOfDay, endOfDay)
|
|
||||||
if err != nil {
|
|
||||||
topModels = "[]"
|
|
||||||
}
|
|
||||||
|
|
||||||
dailyStat := model.DailyStat{
|
|
||||||
Date: dateStr,
|
|
||||||
Requests: stats.Requests,
|
|
||||||
Success: stats.Success,
|
|
||||||
Failed: stats.Failed,
|
|
||||||
TokensIn: stats.TokensIn,
|
|
||||||
TokensOut: stats.TokensOut,
|
|
||||||
LatencySumMs: stats.LatencySumMs,
|
|
||||||
TopModels: topModels,
|
|
||||||
CreatedAt: time.Now().UTC(),
|
|
||||||
}
|
|
||||||
|
|
||||||
return j.db.Create(&dailyStat).Error
|
|
||||||
}
|
|
||||||
@@ -1,42 +0,0 @@
|
|||||||
package model
|
|
||||||
|
|
||||||
import "time"
|
|
||||||
|
|
||||||
// TopModelStat represents a single model's statistics in the top models list
|
|
||||||
type TopModelStat struct {
|
|
||||||
Model string `json:"model"`
|
|
||||||
Requests int64 `json:"requests"`
|
|
||||||
Tokens int64 `json:"tokens"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// DailyStat stores immutable daily aggregated statistics.
|
|
||||||
// Once generated by the cron job, a row is never modified.
|
|
||||||
// Design principles:
|
|
||||||
// - Immutable: once generated, never modified
|
|
||||||
// - Store raw aggregates, compute derived values (error_rate, avg_latency) at read time
|
|
||||||
// - One row per day, ~365 rows/year
|
|
||||||
type DailyStat struct {
|
|
||||||
Date string `gorm:"primaryKey;size:10" json:"date"` // "2006-01-02" (UTC)
|
|
||||||
|
|
||||||
// Request counts
|
|
||||||
Requests int64 `json:"requests"`
|
|
||||||
Success int64 `json:"success"`
|
|
||||||
Failed int64 `json:"failed"`
|
|
||||||
|
|
||||||
// Token counts
|
|
||||||
TokensIn int64 `json:"tokens_in"`
|
|
||||||
TokensOut int64 `json:"tokens_out"`
|
|
||||||
|
|
||||||
// Latency (for avg calculation: avg = LatencySumMs / Requests)
|
|
||||||
LatencySumMs int64 `json:"latency_sum_ms"`
|
|
||||||
|
|
||||||
// Top models (JSON array of TopModelStat)
|
|
||||||
TopModels string `gorm:"type:text" json:"top_models"`
|
|
||||||
|
|
||||||
CreatedAt time.Time `json:"created_at"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TableName specifies the table name for DailyStat
|
|
||||||
func (DailyStat) TableName() string {
|
|
||||||
return "daily_stats"
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user