mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
feat(stats): add daily statistics aggregation job and model
- Create `DailyStat` model for immutable daily metrics including request counts, tokens, latency, and top models. - Implement `DailyStatsJob` to aggregate `log_records` from the previous day, running daily at 00:05 UTC. - Register database migrations and schedule the job in the server. - Add `last7d` and `last30d` period support to stats handler.
This commit is contained in:
@@ -264,6 +264,12 @@ func periodWindow(period string) (time.Time, time.Time) {
|
||||
case "month":
|
||||
start := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
|
||||
return start, now
|
||||
case "last7d":
|
||||
start := now.AddDate(0, 0, -7)
|
||||
return start, now
|
||||
case "last30d":
|
||||
start := now.AddDate(0, 0, -30)
|
||||
return start, now
|
||||
default:
|
||||
return time.Time{}, time.Time{}
|
||||
}
|
||||
|
||||
243
internal/cron/daily_stats_job.go
Normal file
243
internal/cron/daily_stats_job.go
Normal file
@@ -0,0 +1,243 @@
|
||||
package cron
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/ez-api/ez-api/internal/model"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// DailyStatsJob aggregates log_records into daily_stats for efficient dashboard queries.
|
||||
// Runs at 00:05 UTC daily to aggregate the previous day's data.
|
||||
// Design principles:
|
||||
// - Idempotent: skip if date already exists
|
||||
// - Immutable: once generated, rows are never modified
|
||||
// - T-1 aggregation: aggregates yesterday's data (ensures completeness)
|
||||
type DailyStatsJob struct {
|
||||
db *gorm.DB // main database (writes daily_stats)
|
||||
logDB *gorm.DB // log database (reads log_records)
|
||||
}
|
||||
|
||||
// NewDailyStatsJob creates a new DailyStatsJob.
|
||||
func NewDailyStatsJob(db, logDB *gorm.DB) *DailyStatsJob {
|
||||
return &DailyStatsJob{
|
||||
db: db,
|
||||
logDB: logDB,
|
||||
}
|
||||
}
|
||||
|
||||
// RunOnce executes a single daily stats aggregation. Called by scheduler.
|
||||
func (j *DailyStatsJob) RunOnce(ctx context.Context) {
|
||||
if j == nil || j.db == nil || j.logDB == nil {
|
||||
return
|
||||
}
|
||||
if err := j.aggregateYesterday(ctx); err != nil {
|
||||
slog.Default().Warn("daily stats aggregation failed", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (j *DailyStatsJob) aggregateYesterday(ctx context.Context) error {
|
||||
yesterday := time.Now().UTC().AddDate(0, 0, -1)
|
||||
dateStr := yesterday.Format("2006-01-02")
|
||||
|
||||
// Idempotent check: skip if already exists
|
||||
var count int64
|
||||
if err := j.db.Model(&model.DailyStat{}).Where("date = ?", dateStr).Count(&count).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if count > 0 {
|
||||
slog.Default().Debug("daily stats already exists, skipping", "date", dateStr)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Calculate time window for yesterday (UTC)
|
||||
startOfDay := time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 0, 0, 0, 0, time.UTC)
|
||||
endOfDay := startOfDay.Add(24 * time.Hour)
|
||||
|
||||
// Aggregate basic stats from log_records
|
||||
var stats struct {
|
||||
Requests int64
|
||||
Success int64
|
||||
Failed int64
|
||||
TokensIn int64
|
||||
TokensOut int64
|
||||
LatencySumMs int64
|
||||
}
|
||||
|
||||
if err := j.logDB.Model(&model.LogRecord{}).
|
||||
Select(`
|
||||
COUNT(*) as requests,
|
||||
SUM(CASE WHEN status_code >= 200 AND status_code < 400 THEN 1 ELSE 0 END) as success,
|
||||
SUM(CASE WHEN status_code >= 400 OR status_code = 0 THEN 1 ELSE 0 END) as failed,
|
||||
COALESCE(SUM(tokens_in), 0) as tokens_in,
|
||||
COALESCE(SUM(tokens_out), 0) as tokens_out,
|
||||
COALESCE(SUM(latency_ms), 0) as latency_sum_ms
|
||||
`).
|
||||
Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay).
|
||||
Scan(&stats).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Skip if no data for this day
|
||||
if stats.Requests == 0 {
|
||||
slog.Default().Debug("no log records for date, skipping daily stats", "date", dateStr)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Aggregate top models
|
||||
topModels, err := j.aggregateTopModels(startOfDay, endOfDay)
|
||||
if err != nil {
|
||||
slog.Default().Warn("failed to aggregate top models", "date", dateStr, "err", err)
|
||||
topModels = "[]"
|
||||
}
|
||||
|
||||
// Create daily stat record
|
||||
dailyStat := model.DailyStat{
|
||||
Date: dateStr,
|
||||
Requests: stats.Requests,
|
||||
Success: stats.Success,
|
||||
Failed: stats.Failed,
|
||||
TokensIn: stats.TokensIn,
|
||||
TokensOut: stats.TokensOut,
|
||||
LatencySumMs: stats.LatencySumMs,
|
||||
TopModels: topModels,
|
||||
CreatedAt: time.Now().UTC(),
|
||||
}
|
||||
|
||||
if err := j.db.Create(&dailyStat).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Default().Info("daily stats aggregated",
|
||||
"date", dateStr,
|
||||
"requests", stats.Requests,
|
||||
"tokens_in", stats.TokensIn,
|
||||
"tokens_out", stats.TokensOut,
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *DailyStatsJob) aggregateTopModels(startOfDay, endOfDay time.Time) (string, error) {
|
||||
type modelStat struct {
|
||||
Model string
|
||||
Requests int64
|
||||
Tokens int64
|
||||
}
|
||||
|
||||
var results []modelStat
|
||||
if err := j.logDB.Model(&model.LogRecord{}).
|
||||
Select(`
|
||||
model_name as model,
|
||||
COUNT(*) as requests,
|
||||
COALESCE(SUM(tokens_in), 0) + COALESCE(SUM(tokens_out), 0) as tokens
|
||||
`).
|
||||
Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay).
|
||||
Group("model_name").
|
||||
Order("requests DESC").
|
||||
Limit(10).
|
||||
Scan(&results).Error; err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Convert to TopModelStat format
|
||||
topModels := make([]model.TopModelStat, 0, len(results))
|
||||
for _, r := range results {
|
||||
topModels = append(topModels, model.TopModelStat{
|
||||
Model: r.Model,
|
||||
Requests: r.Requests,
|
||||
Tokens: r.Tokens,
|
||||
})
|
||||
}
|
||||
|
||||
data, err := json.Marshal(topModels)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(data), nil
|
||||
}
|
||||
|
||||
// BackfillRange aggregates daily stats for a date range (inclusive).
|
||||
// Useful for initial population or recovery. Skips dates that already exist.
|
||||
func (j *DailyStatsJob) BackfillRange(ctx context.Context, startDate, endDate time.Time) error {
|
||||
current := startDate
|
||||
for !current.After(endDate) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err := j.aggregateDate(ctx, current); err != nil {
|
||||
slog.Default().Warn("backfill failed for date", "date", current.Format("2006-01-02"), "err", err)
|
||||
}
|
||||
current = current.AddDate(0, 0, 1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *DailyStatsJob) aggregateDate(ctx context.Context, date time.Time) error {
|
||||
dateStr := date.Format("2006-01-02")
|
||||
|
||||
// Idempotent check
|
||||
var count int64
|
||||
if err := j.db.Model(&model.DailyStat{}).Where("date = ?", dateStr).Count(&count).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if count > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
startOfDay := time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC)
|
||||
endOfDay := startOfDay.Add(24 * time.Hour)
|
||||
|
||||
var stats struct {
|
||||
Requests int64
|
||||
Success int64
|
||||
Failed int64
|
||||
TokensIn int64
|
||||
TokensOut int64
|
||||
LatencySumMs int64
|
||||
}
|
||||
|
||||
if err := j.logDB.Model(&model.LogRecord{}).
|
||||
Select(`
|
||||
COUNT(*) as requests,
|
||||
SUM(CASE WHEN status_code >= 200 AND status_code < 400 THEN 1 ELSE 0 END) as success,
|
||||
SUM(CASE WHEN status_code >= 400 OR status_code = 0 THEN 1 ELSE 0 END) as failed,
|
||||
COALESCE(SUM(tokens_in), 0) as tokens_in,
|
||||
COALESCE(SUM(tokens_out), 0) as tokens_out,
|
||||
COALESCE(SUM(latency_ms), 0) as latency_sum_ms
|
||||
`).
|
||||
Where("created_at >= ? AND created_at < ?", startOfDay, endOfDay).
|
||||
Scan(&stats).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if stats.Requests == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
topModels, err := j.aggregateTopModels(startOfDay, endOfDay)
|
||||
if err != nil {
|
||||
topModels = "[]"
|
||||
}
|
||||
|
||||
dailyStat := model.DailyStat{
|
||||
Date: dateStr,
|
||||
Requests: stats.Requests,
|
||||
Success: stats.Success,
|
||||
Failed: stats.Failed,
|
||||
TokensIn: stats.TokensIn,
|
||||
TokensOut: stats.TokensOut,
|
||||
LatencySumMs: stats.LatencySumMs,
|
||||
TopModels: topModels,
|
||||
CreatedAt: time.Now().UTC(),
|
||||
}
|
||||
|
||||
return j.db.Create(&dailyStat).Error
|
||||
}
|
||||
42
internal/model/daily_stat.go
Normal file
42
internal/model/daily_stat.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package model
|
||||
|
||||
import "time"
|
||||
|
||||
// TopModelStat represents a single model's statistics in the top models list
|
||||
type TopModelStat struct {
|
||||
Model string `json:"model"`
|
||||
Requests int64 `json:"requests"`
|
||||
Tokens int64 `json:"tokens"`
|
||||
}
|
||||
|
||||
// DailyStat stores immutable daily aggregated statistics.
|
||||
// Once generated by the cron job, a row is never modified.
|
||||
// Design principles:
|
||||
// - Immutable: once generated, never modified
|
||||
// - Store raw aggregates, compute derived values (error_rate, avg_latency) at read time
|
||||
// - One row per day, ~365 rows/year
|
||||
type DailyStat struct {
|
||||
Date string `gorm:"primaryKey;size:10" json:"date"` // "2006-01-02" (UTC)
|
||||
|
||||
// Request counts
|
||||
Requests int64 `json:"requests"`
|
||||
Success int64 `json:"success"`
|
||||
Failed int64 `json:"failed"`
|
||||
|
||||
// Token counts
|
||||
TokensIn int64 `json:"tokens_in"`
|
||||
TokensOut int64 `json:"tokens_out"`
|
||||
|
||||
// Latency (for avg calculation: avg = LatencySumMs / Requests)
|
||||
LatencySumMs int64 `json:"latency_sum_ms"`
|
||||
|
||||
// Top models (JSON array of TopModelStat)
|
||||
TopModels string `gorm:"type:text" json:"top_models"`
|
||||
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
// TableName specifies the table name for DailyStat
|
||||
func (DailyStat) TableName() string {
|
||||
return "daily_stats"
|
||||
}
|
||||
Reference in New Issue
Block a user