mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
Add response envelope middleware to standardize JSON responses as
`{code,data,message}` with consistent business codes across endpoints.
Update Swagger annotations and tests to reflect the new response shape.
BREAKING CHANGE: API responses are now wrapped in a response envelope; clients must read payloads from `data` and handle `code`/`message` fields.
995 lines
31 KiB
Go
995 lines
31 KiB
Go
package api
|
||
|
||
import (
|
||
"net/http"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/ez-api/ez-api/internal/model"
|
||
"github.com/gin-gonic/gin"
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
type LogView struct {
|
||
ID uint `json:"id"`
|
||
CreatedAt int64 `json:"created_at"`
|
||
Group string `json:"group"`
|
||
KeyID uint `json:"key_id"`
|
||
ModelName string `json:"model"`
|
||
ProviderID uint `json:"provider_id"`
|
||
ProviderType string `json:"provider_type"`
|
||
ProviderName string `json:"provider_name"`
|
||
StatusCode int `json:"status_code"`
|
||
LatencyMs int64 `json:"latency_ms"`
|
||
TokensIn int64 `json:"tokens_in"`
|
||
TokensOut int64 `json:"tokens_out"`
|
||
ErrorMessage string `json:"error_message"`
|
||
ClientIP string `json:"client_ip"`
|
||
RequestSize int64 `json:"request_size"`
|
||
ResponseSize int64 `json:"response_size"`
|
||
AuditReason string `json:"audit_reason"`
|
||
RequestBody string `json:"request_body,omitempty"`
|
||
}
|
||
|
||
func toLogView(r model.LogRecord) LogView {
|
||
return LogView{
|
||
ID: r.ID,
|
||
CreatedAt: r.CreatedAt.UTC().Unix(),
|
||
Group: r.Group,
|
||
KeyID: r.KeyID,
|
||
ModelName: r.ModelName,
|
||
ProviderID: r.ProviderID,
|
||
ProviderType: r.ProviderType,
|
||
ProviderName: r.ProviderName,
|
||
StatusCode: r.StatusCode,
|
||
LatencyMs: r.LatencyMs,
|
||
TokensIn: r.TokensIn,
|
||
TokensOut: r.TokensOut,
|
||
ErrorMessage: r.ErrorMessage,
|
||
ClientIP: r.ClientIP,
|
||
RequestSize: r.RequestSize,
|
||
ResponseSize: r.ResponseSize,
|
||
AuditReason: r.AuditReason,
|
||
RequestBody: r.RequestBody,
|
||
}
|
||
}
|
||
|
||
type MasterLogView struct {
|
||
ID uint `json:"id"`
|
||
CreatedAt int64 `json:"created_at"`
|
||
Group string `json:"group"`
|
||
KeyID uint `json:"key_id"`
|
||
ModelName string `json:"model"`
|
||
StatusCode int `json:"status_code"`
|
||
LatencyMs int64 `json:"latency_ms"`
|
||
TokensIn int64 `json:"tokens_in"`
|
||
TokensOut int64 `json:"tokens_out"`
|
||
ErrorMessage string `json:"error_message"`
|
||
RequestSize int64 `json:"request_size"`
|
||
ResponseSize int64 `json:"response_size"`
|
||
}
|
||
|
||
func toMasterLogView(r model.LogRecord) MasterLogView {
|
||
return MasterLogView{
|
||
ID: r.ID,
|
||
CreatedAt: r.CreatedAt.UTC().Unix(),
|
||
Group: r.Group,
|
||
KeyID: r.KeyID,
|
||
ModelName: r.ModelName,
|
||
StatusCode: r.StatusCode,
|
||
LatencyMs: r.LatencyMs,
|
||
TokensIn: r.TokensIn,
|
||
TokensOut: r.TokensOut,
|
||
ErrorMessage: r.ErrorMessage,
|
||
RequestSize: r.RequestSize,
|
||
ResponseSize: r.ResponseSize,
|
||
}
|
||
}
|
||
|
||
type ListLogsResponse struct {
|
||
Total int64 `json:"total"`
|
||
Limit int `json:"limit"`
|
||
Offset int `json:"offset"`
|
||
Items []LogView `json:"items"`
|
||
}
|
||
|
||
type ListMasterLogsResponse struct {
|
||
Total int64 `json:"total"`
|
||
Limit int `json:"limit"`
|
||
Offset int `json:"offset"`
|
||
Items []MasterLogView `json:"items"`
|
||
}
|
||
|
||
func parseLimitOffset(c *gin.Context) (limit, offset int) {
|
||
limit = 50
|
||
offset = 0
|
||
if raw := strings.TrimSpace(c.Query("limit")); raw != "" {
|
||
if v, err := strconv.Atoi(raw); err == nil && v > 0 {
|
||
limit = v
|
||
}
|
||
}
|
||
if limit > 200 {
|
||
limit = 200
|
||
}
|
||
if raw := strings.TrimSpace(c.Query("offset")); raw != "" {
|
||
if v, err := strconv.Atoi(raw); err == nil && v >= 0 {
|
||
offset = v
|
||
}
|
||
}
|
||
return limit, offset
|
||
}
|
||
|
||
func parseUnixSeconds(raw string) (time.Time, bool) {
|
||
raw = strings.TrimSpace(raw)
|
||
if raw == "" {
|
||
return time.Time{}, false
|
||
}
|
||
sec, err := strconv.ParseInt(raw, 10, 64)
|
||
if err != nil || sec <= 0 {
|
||
return time.Time{}, false
|
||
}
|
||
return time.Unix(sec, 0).UTC(), true
|
||
}
|
||
|
||
func (h *MasterHandler) masterLogBase(masterID uint) (*gorm.DB, error) {
|
||
logDB := h.logDBConn()
|
||
base := h.logBaseQuery()
|
||
if logDB == h.db {
|
||
return base.
|
||
Joins("JOIN keys ON keys.id = log_records.key_id").
|
||
Where("keys.master_id = ?", masterID), nil
|
||
}
|
||
var keyIDs []uint
|
||
if err := h.db.Model(&model.Key{}).
|
||
Where("master_id = ?", masterID).
|
||
Pluck("id", &keyIDs).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
if len(keyIDs) == 0 {
|
||
return base.Where("1 = 0"), nil
|
||
}
|
||
return base.
|
||
Where("log_records.key_id IN ?", keyIDs), nil
|
||
}
|
||
|
||
// ListLogs godoc
|
||
// @Summary List logs (admin)
|
||
// @Description List request logs with basic filtering/pagination. Returns full log records including request_body.
|
||
// @Tags admin
|
||
// @Produce json
|
||
// @Security AdminAuth
|
||
// @Param limit query int false "limit (default 50, max 200)"
|
||
// @Param offset query int false "offset"
|
||
// @Param since query int false "unix seconds"
|
||
// @Param until query int false "unix seconds"
|
||
// @Param key_id query int false "key id"
|
||
// @Param group query string false "route group"
|
||
// @Param model query string false "model"
|
||
// @Param status_code query int false "status code"
|
||
// @Success 200 {object} ResponseEnvelope{data=ListLogsResponse}
|
||
// @Failure 500 {object} ResponseEnvelope{data=gin.H}
|
||
// @Router /admin/logs [get]
|
||
func (h *Handler) ListLogs(c *gin.Context) {
|
||
limit, offset := parseLimitOffset(c)
|
||
|
||
q := h.logBaseQuery()
|
||
|
||
if t, ok := parseUnixSeconds(c.Query("since")); ok {
|
||
q = q.Where("created_at >= ?", t)
|
||
}
|
||
if t, ok := parseUnixSeconds(c.Query("until")); ok {
|
||
q = q.Where("created_at <= ?", t)
|
||
}
|
||
if raw := strings.TrimSpace(c.Query("key_id")); raw != "" {
|
||
if v, err := strconv.ParseUint(raw, 10, 64); err == nil && v > 0 {
|
||
q = q.Where("key_id = ?", uint(v))
|
||
}
|
||
}
|
||
if raw := strings.TrimSpace(c.Query("group")); raw != "" {
|
||
q = q.Where(`"group" = ?`, raw)
|
||
}
|
||
if raw := strings.TrimSpace(c.Query("model")); raw != "" {
|
||
q = q.Where("model_name = ?", raw)
|
||
}
|
||
if raw := strings.TrimSpace(c.Query("status_code")); raw != "" {
|
||
if v, err := strconv.Atoi(raw); err == nil && v > 0 {
|
||
q = q.Where("status_code = ?", v)
|
||
}
|
||
}
|
||
|
||
var total int64
|
||
if err := q.Count(&total).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to count logs", "details": err.Error()})
|
||
return
|
||
}
|
||
|
||
var rows []model.LogRecord
|
||
if err := q.Order("id desc").Limit(limit).Offset(offset).Find(&rows).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list logs", "details": err.Error()})
|
||
return
|
||
}
|
||
// Admin sees full LogView including request_body
|
||
out := make([]LogView, 0, len(rows))
|
||
for _, r := range rows {
|
||
out = append(out, toLogView(r))
|
||
}
|
||
c.JSON(http.StatusOK, ListLogsResponse{Total: total, Limit: limit, Offset: offset, Items: out})
|
||
}
|
||
|
||
type LogStatsResponse struct {
|
||
Total int64 `json:"total"`
|
||
TokensIn int64 `json:"tokens_in"`
|
||
TokensOut int64 `json:"tokens_out"`
|
||
AvgLatency float64 `json:"avg_latency_ms"`
|
||
ByStatus map[string]int64 `json:"by_status"`
|
||
}
|
||
|
||
// GroupedStatsItem represents a single group in grouped statistics
|
||
type GroupedStatsItem struct {
|
||
// For group_by=model
|
||
Model string `json:"model,omitempty"`
|
||
// For group_by=day
|
||
Date string `json:"date,omitempty"`
|
||
// For group_by=month
|
||
Month string `json:"month,omitempty"`
|
||
// For group_by=hour
|
||
Hour string `json:"hour,omitempty"`
|
||
// For group_by=minute
|
||
Minute string `json:"minute,omitempty"`
|
||
|
||
Count int64 `json:"count"`
|
||
TokensIn int64 `json:"tokens_in"`
|
||
TokensOut int64 `json:"tokens_out"`
|
||
AvgLatencyMs float64 `json:"avg_latency_ms,omitempty"`
|
||
}
|
||
|
||
// GroupedStatsResponse is returned when group_by is specified
|
||
type GroupedStatsResponse struct {
|
||
Items []GroupedStatsItem `json:"items"`
|
||
}
|
||
|
||
type DeleteLogsRequest struct {
|
||
Before string `json:"before"`
|
||
KeyID uint `json:"key_id"`
|
||
Model string `json:"model"`
|
||
}
|
||
|
||
type DeleteLogsResponse struct {
|
||
DeletedCount int64 `json:"deleted_count"`
|
||
}
|
||
|
||
// DeleteLogs godoc
|
||
// @Summary Delete logs (admin)
|
||
// @Description Delete logs before a given timestamp with optional filters
|
||
// @Tags admin
|
||
// @Accept json
|
||
// @Produce json
|
||
// @Security AdminAuth
|
||
// @Param request body DeleteLogsRequest true "Delete filters"
|
||
// @Success 200 {object} ResponseEnvelope{data=DeleteLogsResponse}
|
||
// @Failure 400 {object} ResponseEnvelope{data=gin.H}
|
||
// @Failure 500 {object} ResponseEnvelope{data=gin.H}
|
||
// @Router /admin/logs [delete]
|
||
func (h *Handler) DeleteLogs(c *gin.Context) {
|
||
var req DeleteLogsRequest
|
||
if err := c.ShouldBindJSON(&req); err != nil {
|
||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
||
return
|
||
}
|
||
|
||
before := strings.TrimSpace(req.Before)
|
||
if before == "" {
|
||
c.JSON(http.StatusBadRequest, gin.H{"error": "before is required"})
|
||
return
|
||
}
|
||
ts, err := time.Parse(time.RFC3339, before)
|
||
if err != nil {
|
||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid before time"})
|
||
return
|
||
}
|
||
|
||
deleted, err := h.deleteLogsBefore(ts.UTC(), req.KeyID, req.Model)
|
||
if err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete logs", "details": err.Error()})
|
||
return
|
||
}
|
||
c.JSON(http.StatusOK, DeleteLogsResponse{DeletedCount: deleted})
|
||
}
|
||
|
||
func (h *Handler) deleteLogsBefore(cutoff time.Time, keyID uint, modelName string) (int64, error) {
|
||
modelName = strings.TrimSpace(modelName)
|
||
if h == nil || h.logPartitioner == nil || !h.logPartitioner.Enabled() {
|
||
q := h.logBaseQuery().Unscoped().Where("created_at < ?", cutoff)
|
||
if keyID > 0 {
|
||
q = q.Where("key_id = ?", keyID)
|
||
}
|
||
if modelName != "" {
|
||
q = q.Where("model_name = ?", modelName)
|
||
}
|
||
res := q.Delete(&model.LogRecord{})
|
||
return res.RowsAffected, res.Error
|
||
}
|
||
|
||
partitions, err := h.logPartitioner.ListPartitions()
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
if len(partitions) == 0 {
|
||
q := h.logDBConn().Table("log_records").Unscoped().Where("created_at < ?", cutoff)
|
||
if keyID > 0 {
|
||
q = q.Where("key_id = ?", keyID)
|
||
}
|
||
if modelName != "" {
|
||
q = q.Where("model_name = ?", modelName)
|
||
}
|
||
res := q.Delete(&model.LogRecord{})
|
||
return res.RowsAffected, res.Error
|
||
}
|
||
|
||
var deleted int64
|
||
for _, part := range partitions {
|
||
if !part.Start.Before(cutoff) {
|
||
continue
|
||
}
|
||
q := h.logDBConn().Table(part.Table).Unscoped().Where("created_at < ?", cutoff)
|
||
if keyID > 0 {
|
||
q = q.Where("key_id = ?", keyID)
|
||
}
|
||
if modelName != "" {
|
||
q = q.Where("model_name = ?", modelName)
|
||
}
|
||
res := q.Delete(&model.LogRecord{})
|
||
if res.Error != nil {
|
||
return deleted, res.Error
|
||
}
|
||
deleted += res.RowsAffected
|
||
}
|
||
return deleted, nil
|
||
}
|
||
|
||
// LogStats godoc
|
||
// @Summary Log stats (admin)
|
||
// @Description Aggregate log stats with basic filtering. Use group_by param for grouped statistics (model/day/month/hour/minute). Without group_by returns LogStatsResponse; with group_by returns GroupedStatsResponse. Note: minute-level aggregation is limited to 6-hour time ranges.
|
||
// @Tags admin
|
||
// @Produce json
|
||
// @Security AdminAuth
|
||
// @Param since query int false "unix seconds"
|
||
// @Param until query int false "unix seconds"
|
||
// @Param group_by query string false "group by dimension: model, day, month, hour, minute. Returns GroupedStatsResponse when specified." Enums(model, day, month, hour, minute)
|
||
// @Success 200 {object} ResponseEnvelope{data=LogStatsResponse} "Default aggregated stats (when group_by is not specified)"
|
||
// @Success 200 {object} ResponseEnvelope{data=GroupedStatsResponse} "Grouped stats (when group_by is specified)"
|
||
// @Failure 400 {object} ResponseEnvelope{data=gin.H}
|
||
// @Failure 500 {object} ResponseEnvelope{data=gin.H}
|
||
// @Router /admin/logs/stats [get]
|
||
func (h *Handler) LogStats(c *gin.Context) {
|
||
q := h.logBaseQuery()
|
||
|
||
var sinceTime, untilTime *time.Time
|
||
if t, ok := parseUnixSeconds(c.Query("since")); ok {
|
||
q = q.Where("created_at >= ?", t)
|
||
sinceTime = &t
|
||
}
|
||
if t, ok := parseUnixSeconds(c.Query("until")); ok {
|
||
q = q.Where("created_at <= ?", t)
|
||
untilTime = &t
|
||
}
|
||
|
||
groupBy := strings.TrimSpace(c.Query("group_by"))
|
||
switch groupBy {
|
||
case "model":
|
||
h.logStatsByModel(c, q)
|
||
return
|
||
case "day":
|
||
h.logStatsByDay(c, q)
|
||
return
|
||
case "month":
|
||
h.logStatsByMonth(c, q)
|
||
return
|
||
case "hour":
|
||
h.logStatsByHour(c, q)
|
||
return
|
||
case "minute":
|
||
h.logStatsByMinute(c, q, sinceTime, untilTime)
|
||
return
|
||
}
|
||
|
||
// Default: aggregated stats (backward compatible)
|
||
var total int64
|
||
if err := q.Count(&total).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to count logs", "details": err.Error()})
|
||
return
|
||
}
|
||
|
||
type sums struct {
|
||
TokensIn int64
|
||
TokensOut int64
|
||
AvgLatency float64
|
||
}
|
||
var s sums
|
||
if err := q.Select("COALESCE(SUM(tokens_in),0) as tokens_in, COALESCE(SUM(tokens_out),0) as tokens_out, COALESCE(AVG(latency_ms),0) as avg_latency").Scan(&s).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to aggregate logs", "details": err.Error()})
|
||
return
|
||
}
|
||
|
||
type bucket struct {
|
||
StatusCode int
|
||
Cnt int64
|
||
}
|
||
var buckets []bucket
|
||
if err := q.Select("status_code, COUNT(*) as cnt").Group("status_code").Scan(&buckets).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to bucket logs", "details": err.Error()})
|
||
return
|
||
}
|
||
byStatus := make(map[string]int64, len(buckets))
|
||
for _, b := range buckets {
|
||
byStatus[strconv.Itoa(b.StatusCode)] = b.Cnt
|
||
}
|
||
|
||
c.JSON(http.StatusOK, LogStatsResponse{
|
||
Total: total,
|
||
TokensIn: s.TokensIn,
|
||
TokensOut: s.TokensOut,
|
||
AvgLatency: s.AvgLatency,
|
||
ByStatus: byStatus,
|
||
})
|
||
}
|
||
|
||
// logStatsByModel handles group_by=model
|
||
func (h *Handler) logStatsByModel(c *gin.Context, q *gorm.DB) {
|
||
type modelStats struct {
|
||
ModelName string
|
||
Cnt int64
|
||
TokensIn int64
|
||
TokensOut int64
|
||
AvgLatencyMs float64
|
||
}
|
||
var rows []modelStats
|
||
if err := q.Select(`model_name, COUNT(*) as cnt, COALESCE(SUM(tokens_in),0) as tokens_in, COALESCE(SUM(tokens_out),0) as tokens_out, COALESCE(AVG(latency_ms),0) as avg_latency_ms`).
|
||
Group("model_name").
|
||
Order("cnt DESC").
|
||
Scan(&rows).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to aggregate by model", "details": err.Error()})
|
||
return
|
||
}
|
||
items := make([]GroupedStatsItem, 0, len(rows))
|
||
for _, r := range rows {
|
||
items = append(items, GroupedStatsItem{
|
||
Model: r.ModelName,
|
||
Count: r.Cnt,
|
||
TokensIn: r.TokensIn,
|
||
TokensOut: r.TokensOut,
|
||
AvgLatencyMs: r.AvgLatencyMs,
|
||
})
|
||
}
|
||
c.JSON(http.StatusOK, GroupedStatsResponse{Items: items})
|
||
}
|
||
|
||
// logStatsByDay handles group_by=day
|
||
func (h *Handler) logStatsByDay(c *gin.Context, q *gorm.DB) {
|
||
type dayStats struct {
|
||
Day string
|
||
Cnt int64
|
||
TokensIn int64
|
||
TokensOut int64
|
||
}
|
||
var rows []dayStats
|
||
// PostgreSQL DATE function; SQLite uses date()
|
||
if err := q.Select(`TO_CHAR(created_at, 'YYYY-MM-DD') as day, COUNT(*) as cnt, COALESCE(SUM(tokens_in),0) as tokens_in, COALESCE(SUM(tokens_out),0) as tokens_out`).
|
||
Group("day").
|
||
Order("day ASC").
|
||
Scan(&rows).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to aggregate by day", "details": err.Error()})
|
||
return
|
||
}
|
||
items := make([]GroupedStatsItem, 0, len(rows))
|
||
for _, r := range rows {
|
||
items = append(items, GroupedStatsItem{
|
||
Date: r.Day,
|
||
Count: r.Cnt,
|
||
TokensIn: r.TokensIn,
|
||
TokensOut: r.TokensOut,
|
||
})
|
||
}
|
||
c.JSON(http.StatusOK, GroupedStatsResponse{Items: items})
|
||
}
|
||
|
||
// logStatsByMonth handles group_by=month
|
||
func (h *Handler) logStatsByMonth(c *gin.Context, q *gorm.DB) {
|
||
type monthStats struct {
|
||
Month string
|
||
Cnt int64
|
||
TokensIn int64
|
||
TokensOut int64
|
||
}
|
||
var rows []monthStats
|
||
// PostgreSQL format
|
||
if err := q.Select(`TO_CHAR(created_at, 'YYYY-MM') as month, COUNT(*) as cnt, COALESCE(SUM(tokens_in),0) as tokens_in, COALESCE(SUM(tokens_out),0) as tokens_out`).
|
||
Group("month").
|
||
Order("month ASC").
|
||
Scan(&rows).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to aggregate by month", "details": err.Error()})
|
||
return
|
||
}
|
||
items := make([]GroupedStatsItem, 0, len(rows))
|
||
for _, r := range rows {
|
||
items = append(items, GroupedStatsItem{
|
||
Month: r.Month,
|
||
Count: r.Cnt,
|
||
TokensIn: r.TokensIn,
|
||
TokensOut: r.TokensOut,
|
||
})
|
||
}
|
||
c.JSON(http.StatusOK, GroupedStatsResponse{Items: items})
|
||
}
|
||
|
||
// logStatsByHour handles group_by=hour
|
||
func (h *Handler) logStatsByHour(c *gin.Context, q *gorm.DB) {
|
||
type hourStats struct {
|
||
Hour string
|
||
Cnt int64
|
||
TokensIn int64
|
||
TokensOut int64
|
||
AvgLatencyMs float64
|
||
}
|
||
var rows []hourStats
|
||
// PostgreSQL DATE_TRUNC for hour-level aggregation
|
||
if err := q.Select(`DATE_TRUNC('hour', created_at) as hour, COUNT(*) as cnt, COALESCE(SUM(tokens_in),0) as tokens_in, COALESCE(SUM(tokens_out),0) as tokens_out, COALESCE(AVG(latency_ms),0) as avg_latency_ms`).
|
||
Group("hour").
|
||
Order("hour ASC").
|
||
Scan(&rows).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to aggregate by hour", "details": err.Error()})
|
||
return
|
||
}
|
||
items := make([]GroupedStatsItem, 0, len(rows))
|
||
for _, r := range rows {
|
||
items = append(items, GroupedStatsItem{
|
||
Hour: r.Hour,
|
||
Count: r.Cnt,
|
||
TokensIn: r.TokensIn,
|
||
TokensOut: r.TokensOut,
|
||
AvgLatencyMs: r.AvgLatencyMs,
|
||
})
|
||
}
|
||
c.JSON(http.StatusOK, GroupedStatsResponse{Items: items})
|
||
}
|
||
|
||
// maxMinuteRangeDuration is the maximum time range allowed for minute-level aggregation (6 hours)
|
||
const maxMinuteRangeDuration = 6 * time.Hour
|
||
|
||
// logStatsByMinute handles group_by=minute with time range validation
|
||
func (h *Handler) logStatsByMinute(c *gin.Context, q *gorm.DB, sinceTime, untilTime *time.Time) {
|
||
// Validate time range - minute-level aggregation requires since/until and max 6 hours
|
||
if sinceTime == nil || untilTime == nil {
|
||
c.JSON(http.StatusBadRequest, gin.H{"error": "minute-level aggregation requires both 'since' and 'until' parameters"})
|
||
return
|
||
}
|
||
|
||
duration := untilTime.Sub(*sinceTime)
|
||
if duration > maxMinuteRangeDuration {
|
||
c.JSON(http.StatusBadRequest, gin.H{
|
||
"error": "time range too large for minute-level aggregation",
|
||
"max_hours": 6,
|
||
"actual_hours": duration.Hours(),
|
||
})
|
||
return
|
||
}
|
||
|
||
if duration < 0 {
|
||
c.JSON(http.StatusBadRequest, gin.H{"error": "'until' must be after 'since'"})
|
||
return
|
||
}
|
||
|
||
type minuteStats struct {
|
||
Minute string
|
||
Cnt int64
|
||
TokensIn int64
|
||
TokensOut int64
|
||
AvgLatencyMs float64
|
||
}
|
||
var rows []minuteStats
|
||
// PostgreSQL DATE_TRUNC for minute-level aggregation
|
||
if err := q.Select(`DATE_TRUNC('minute', created_at) as minute, COUNT(*) as cnt, COALESCE(SUM(tokens_in),0) as tokens_in, COALESCE(SUM(tokens_out),0) as tokens_out, COALESCE(AVG(latency_ms),0) as avg_latency_ms`).
|
||
Group("minute").
|
||
Order("minute ASC").
|
||
Scan(&rows).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to aggregate by minute", "details": err.Error()})
|
||
return
|
||
}
|
||
items := make([]GroupedStatsItem, 0, len(rows))
|
||
for _, r := range rows {
|
||
items = append(items, GroupedStatsItem{
|
||
Minute: r.Minute,
|
||
Count: r.Cnt,
|
||
TokensIn: r.TokensIn,
|
||
TokensOut: r.TokensOut,
|
||
AvgLatencyMs: r.AvgLatencyMs,
|
||
})
|
||
}
|
||
c.JSON(http.StatusOK, GroupedStatsResponse{Items: items})
|
||
}
|
||
|
||
// TrafficSeries contains the metrics for a model aligned to the shared time axis.
|
||
type TrafficSeries struct {
|
||
Name string `json:"name"`
|
||
Data []int64 `json:"data"`
|
||
TokensIn []int64 `json:"tokens_in"`
|
||
TokensOut []int64 `json:"tokens_out"`
|
||
}
|
||
|
||
// TrafficTotals contains aggregated totals aligned to the shared time axis.
|
||
type TrafficTotals struct {
|
||
Data []int64 `json:"data"`
|
||
TokensIn []int64 `json:"tokens_in"`
|
||
TokensOut []int64 `json:"tokens_out"`
|
||
}
|
||
|
||
// TrafficChartAxis defines the shared time axis for chart data.
|
||
type TrafficChartAxis struct {
|
||
Labels []string `json:"labels"`
|
||
Timestamps []int64 `json:"timestamps"`
|
||
Totals TrafficTotals `json:"totals"`
|
||
}
|
||
|
||
// TrafficChartResponse is the response for traffic chart API
|
||
type TrafficChartResponse struct {
|
||
Granularity string `json:"granularity"`
|
||
Since int64 `json:"since"`
|
||
Until int64 `json:"until"`
|
||
X TrafficChartAxis `json:"x"`
|
||
Series []TrafficSeries `json:"series"`
|
||
}
|
||
|
||
const (
|
||
defaultTrafficTopN = 5
|
||
maxTrafficTopN = 20
|
||
)
|
||
|
||
type trafficBucketRow struct {
|
||
Bucket time.Time
|
||
ModelName string
|
||
Cnt int64
|
||
TokensIn int64
|
||
TokensOut int64
|
||
}
|
||
|
||
func buildTrafficChartSeriesResponse(rows []trafficBucketRow, topN int, granularity string, sinceTime, untilTime time.Time) TrafficChartResponse {
|
||
bucketLabels := make(map[int64]string)
|
||
bucketOrder := make([]int64, 0)
|
||
for _, r := range rows {
|
||
ts := r.Bucket.Unix()
|
||
if _, exists := bucketLabels[ts]; !exists {
|
||
bucketLabels[ts] = r.Bucket.UTC().Format(time.RFC3339)
|
||
bucketOrder = append(bucketOrder, ts)
|
||
}
|
||
}
|
||
|
||
modelCounts := make(map[string]int64)
|
||
for _, r := range rows {
|
||
modelCounts[r.ModelName] += r.Cnt
|
||
}
|
||
|
||
type modelCount struct {
|
||
name string
|
||
count int64
|
||
}
|
||
modelList := make([]modelCount, 0, len(modelCounts))
|
||
for name, cnt := range modelCounts {
|
||
modelList = append(modelList, modelCount{name, cnt})
|
||
}
|
||
for i := 0; i < len(modelList)-1; i++ {
|
||
for j := i + 1; j < len(modelList); j++ {
|
||
if modelList[j].count > modelList[i].count {
|
||
modelList[i], modelList[j] = modelList[j], modelList[i]
|
||
}
|
||
}
|
||
}
|
||
|
||
topModels := make(map[string]bool, topN)
|
||
seriesNames := make([]string, 0, topN+1)
|
||
for i := 0; i < len(modelList) && i < topN; i++ {
|
||
topModels[modelList[i].name] = true
|
||
seriesNames = append(seriesNames, modelList[i].name)
|
||
}
|
||
if len(modelList) > topN {
|
||
seriesNames = append(seriesNames, "other")
|
||
}
|
||
|
||
bucketIndex := make(map[int64]int, len(bucketOrder))
|
||
labels := make([]string, len(bucketOrder))
|
||
timestamps := make([]int64, len(bucketOrder))
|
||
for i, ts := range bucketOrder {
|
||
bucketIndex[ts] = i
|
||
labels[i] = bucketLabels[ts]
|
||
timestamps[i] = ts
|
||
}
|
||
|
||
series := make([]TrafficSeries, len(seriesNames))
|
||
seriesIndex := make(map[string]int, len(seriesNames))
|
||
for i, name := range seriesNames {
|
||
series[i] = TrafficSeries{
|
||
Name: name,
|
||
Data: make([]int64, len(bucketOrder)),
|
||
TokensIn: make([]int64, len(bucketOrder)),
|
||
TokensOut: make([]int64, len(bucketOrder)),
|
||
}
|
||
seriesIndex[name] = i
|
||
}
|
||
|
||
totals := TrafficTotals{
|
||
Data: make([]int64, len(bucketOrder)),
|
||
TokensIn: make([]int64, len(bucketOrder)),
|
||
TokensOut: make([]int64, len(bucketOrder)),
|
||
}
|
||
|
||
for _, r := range rows {
|
||
ts := r.Bucket.Unix()
|
||
idx, ok := bucketIndex[ts]
|
||
if !ok {
|
||
continue
|
||
}
|
||
|
||
modelKey := r.ModelName
|
||
if !topModels[modelKey] {
|
||
modelKey = "other"
|
||
}
|
||
if seriesIdx, exists := seriesIndex[modelKey]; exists {
|
||
series[seriesIdx].Data[idx] += r.Cnt
|
||
series[seriesIdx].TokensIn[idx] += r.TokensIn
|
||
series[seriesIdx].TokensOut[idx] += r.TokensOut
|
||
}
|
||
|
||
totals.Data[idx] += r.Cnt
|
||
totals.TokensIn[idx] += r.TokensIn
|
||
totals.TokensOut[idx] += r.TokensOut
|
||
}
|
||
|
||
return TrafficChartResponse{
|
||
Granularity: granularity,
|
||
Since: sinceTime.Unix(),
|
||
Until: untilTime.Unix(),
|
||
X: TrafficChartAxis{
|
||
Labels: labels,
|
||
Timestamps: timestamps,
|
||
Totals: totals,
|
||
},
|
||
Series: series,
|
||
}
|
||
}
|
||
|
||
// GetTrafficChart godoc
|
||
// @Summary Traffic chart data (admin)
|
||
// @Description Get time × model aggregated data for stacked traffic charts. Returns a shared time axis under `x` and per-model series arrays aligned to that axis. Models outside top_n are aggregated under the series name "other".
|
||
// @Tags admin
|
||
// @Produce json
|
||
// @Security AdminAuth
|
||
// @Param granularity query string false "Time granularity: hour (default) or minute" Enums(hour, minute)
|
||
// @Param since query int false "Start time (unix seconds), defaults to 24h ago"
|
||
// @Param until query int false "End time (unix seconds), defaults to now"
|
||
// @Param top_n query int false "Number of top models to return (1-20), defaults to 5"
|
||
// @Success 200 {object} ResponseEnvelope{data=TrafficChartResponse}
|
||
// @Failure 400 {object} ResponseEnvelope{data=gin.H}
|
||
// @Failure 500 {object} ResponseEnvelope{data=gin.H}
|
||
// @Router /admin/logs/stats/traffic-chart [get]
|
||
func (h *Handler) GetTrafficChart(c *gin.Context) {
|
||
// Parse granularity
|
||
granularity := strings.TrimSpace(c.Query("granularity"))
|
||
if granularity == "" {
|
||
granularity = "hour"
|
||
}
|
||
if granularity != "hour" && granularity != "minute" {
|
||
c.JSON(http.StatusBadRequest, gin.H{"error": "granularity must be 'hour' or 'minute'"})
|
||
return
|
||
}
|
||
|
||
// Parse time range
|
||
now := time.Now().UTC()
|
||
var sinceTime, untilTime time.Time
|
||
|
||
if t, ok := parseUnixSeconds(c.Query("since")); ok {
|
||
sinceTime = t
|
||
} else {
|
||
sinceTime = now.Add(-24 * time.Hour)
|
||
}
|
||
|
||
if t, ok := parseUnixSeconds(c.Query("until")); ok {
|
||
untilTime = t
|
||
} else {
|
||
untilTime = now
|
||
}
|
||
|
||
// Validate time range for minute granularity
|
||
if granularity == "minute" {
|
||
if c.Query("since") == "" || c.Query("until") == "" {
|
||
c.JSON(http.StatusBadRequest, gin.H{"error": "minute-level aggregation requires both 'since' and 'until' parameters"})
|
||
return
|
||
}
|
||
duration := untilTime.Sub(sinceTime)
|
||
if duration > maxMinuteRangeDuration {
|
||
c.JSON(http.StatusBadRequest, gin.H{
|
||
"error": "time range too large for minute granularity",
|
||
"max_hours": 6,
|
||
"actual_hours": duration.Hours(),
|
||
})
|
||
return
|
||
}
|
||
if duration < 0 {
|
||
c.JSON(http.StatusBadRequest, gin.H{"error": "'until' must be after 'since'"})
|
||
return
|
||
}
|
||
}
|
||
|
||
// Parse top_n
|
||
topN := defaultTrafficTopN
|
||
if raw := strings.TrimSpace(c.Query("top_n")); raw != "" {
|
||
if v, err := strconv.Atoi(raw); err == nil && v > 0 {
|
||
topN = v
|
||
}
|
||
}
|
||
if topN > maxTrafficTopN {
|
||
c.JSON(http.StatusBadRequest, gin.H{"error": "top_n cannot exceed 20"})
|
||
return
|
||
}
|
||
|
||
// Build query
|
||
q := h.logBaseQuery().
|
||
Where("created_at >= ?", sinceTime).
|
||
Where("created_at <= ?", untilTime)
|
||
|
||
// Select with time truncation based on granularity
|
||
var truncFunc string
|
||
if granularity == "minute" {
|
||
truncFunc = "DATE_TRUNC('minute', created_at)"
|
||
} else {
|
||
truncFunc = "DATE_TRUNC('hour', created_at)"
|
||
}
|
||
|
||
var rows []trafficBucketRow
|
||
|
||
if err := q.Select(truncFunc + " as bucket, model_name, COUNT(*) as cnt, COALESCE(SUM(tokens_in),0) as tokens_in, COALESCE(SUM(tokens_out),0) as tokens_out").
|
||
Group("bucket, model_name").
|
||
Order("bucket ASC, cnt DESC").
|
||
Scan(&rows).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to aggregate traffic data", "details": err.Error()})
|
||
return
|
||
}
|
||
|
||
c.JSON(http.StatusOK, buildTrafficChartSeriesResponse(rows, topN, granularity, sinceTime, untilTime))
|
||
}
|
||
|
||
// ListSelfLogs godoc
|
||
// @Summary List logs (master)
|
||
// @Description List request logs for the authenticated master
|
||
// @Tags master
|
||
// @Produce json
|
||
// @Security MasterAuth
|
||
// @Param limit query int false "limit (default 50, max 200)"
|
||
// @Param offset query int false "offset"
|
||
// @Param since query int false "unix seconds"
|
||
// @Param until query int false "unix seconds"
|
||
// @Param model query string false "model"
|
||
// @Param status_code query int false "status code"
|
||
// @Success 200 {object} ResponseEnvelope{data=ListMasterLogsResponse}
|
||
// @Failure 401 {object} ResponseEnvelope{data=gin.H}
|
||
// @Failure 500 {object} ResponseEnvelope{data=gin.H}
|
||
// @Router /v1/logs [get]
|
||
func (h *MasterHandler) ListSelfLogs(c *gin.Context) {
|
||
master, exists := c.Get("master")
|
||
if !exists {
|
||
c.JSON(http.StatusUnauthorized, gin.H{"error": "master key not found in context"})
|
||
return
|
||
}
|
||
m := master.(*model.Master)
|
||
limit, offset := parseLimitOffset(c)
|
||
|
||
q, err := h.masterLogBase(m.ID)
|
||
if err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to build log query", "details": err.Error()})
|
||
return
|
||
}
|
||
|
||
if t, ok := parseUnixSeconds(c.Query("since")); ok {
|
||
q = q.Where("log_records.created_at >= ?", t)
|
||
}
|
||
if t, ok := parseUnixSeconds(c.Query("until")); ok {
|
||
q = q.Where("log_records.created_at <= ?", t)
|
||
}
|
||
if raw := strings.TrimSpace(c.Query("model")); raw != "" {
|
||
q = q.Where("log_records.model_name = ?", raw)
|
||
}
|
||
if raw := strings.TrimSpace(c.Query("status_code")); raw != "" {
|
||
if v, err := strconv.Atoi(raw); err == nil && v > 0 {
|
||
q = q.Where("log_records.status_code = ?", v)
|
||
}
|
||
}
|
||
|
||
var total int64
|
||
if err := q.Count(&total).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to count logs", "details": err.Error()})
|
||
return
|
||
}
|
||
var rows []model.LogRecord
|
||
if err := q.Order("log_records.id desc").Limit(limit).Offset(offset).Find(&rows).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list logs", "details": err.Error()})
|
||
return
|
||
}
|
||
out := make([]MasterLogView, 0, len(rows))
|
||
for _, r := range rows {
|
||
out = append(out, toMasterLogView(r))
|
||
}
|
||
c.JSON(http.StatusOK, ListMasterLogsResponse{Total: total, Limit: limit, Offset: offset, Items: out})
|
||
}
|
||
|
||
// GetSelfLogStats godoc
|
||
// @Summary Log stats (master)
|
||
// @Description Aggregate request log stats for the authenticated master
|
||
// @Tags master
|
||
// @Produce json
|
||
// @Security MasterAuth
|
||
// @Param since query int false "unix seconds"
|
||
// @Param until query int false "unix seconds"
|
||
// @Success 200 {object} ResponseEnvelope{data=LogStatsResponse}
|
||
// @Failure 401 {object} ResponseEnvelope{data=gin.H}
|
||
// @Failure 500 {object} ResponseEnvelope{data=gin.H}
|
||
// @Router /v1/logs/stats [get]
|
||
func (h *MasterHandler) GetSelfLogStats(c *gin.Context) {
|
||
master, exists := c.Get("master")
|
||
if !exists {
|
||
c.JSON(http.StatusUnauthorized, gin.H{"error": "master key not found in context"})
|
||
return
|
||
}
|
||
m := master.(*model.Master)
|
||
|
||
q, err := h.masterLogBase(m.ID)
|
||
if err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to build log query", "details": err.Error()})
|
||
return
|
||
}
|
||
|
||
if t, ok := parseUnixSeconds(c.Query("since")); ok {
|
||
q = q.Where("log_records.created_at >= ?", t)
|
||
}
|
||
if t, ok := parseUnixSeconds(c.Query("until")); ok {
|
||
q = q.Where("log_records.created_at <= ?", t)
|
||
}
|
||
|
||
var total int64
|
||
if err := q.Count(&total).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to count logs", "details": err.Error()})
|
||
return
|
||
}
|
||
|
||
type sums struct {
|
||
TokensIn int64
|
||
TokensOut int64
|
||
AvgLatency float64
|
||
}
|
||
var s sums
|
||
if err := q.Select("COALESCE(SUM(log_records.tokens_in),0) as tokens_in, COALESCE(SUM(log_records.tokens_out),0) as tokens_out, COALESCE(AVG(log_records.latency_ms),0) as avg_latency").Scan(&s).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to aggregate logs", "details": err.Error()})
|
||
return
|
||
}
|
||
|
||
type bucket struct {
|
||
StatusCode int
|
||
Cnt int64
|
||
}
|
||
var buckets []bucket
|
||
if err := q.Select("log_records.status_code as status_code, COUNT(*) as cnt").Group("log_records.status_code").Scan(&buckets).Error; err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to bucket logs", "details": err.Error()})
|
||
return
|
||
}
|
||
byStatus := make(map[string]int64, len(buckets))
|
||
for _, b := range buckets {
|
||
byStatus[strconv.Itoa(b.StatusCode)] = b.Cnt
|
||
}
|
||
|
||
c.JSON(http.StatusOK, LogStatsResponse{
|
||
Total: total,
|
||
TokensIn: s.TokensIn,
|
||
TokensOut: s.TokensOut,
|
||
AvgLatency: s.AvgLatency,
|
||
ByStatus: byStatus,
|
||
})
|
||
}
|