feat(log): wire log db, metrics, and body toggle

This commit is contained in:
zenfun
2025-12-21 16:18:22 +08:00
parent 4c1e03f83d
commit c2c65e774b
9 changed files with 305 additions and 40 deletions

View File

@@ -2,6 +2,7 @@ package main
import (
"context"
"expvar"
"log/slog"
"net/http"
"os"
@@ -92,14 +93,45 @@ func main() {
}
logger.Info("connected to postgresql successfully")
logDB := db
if cfg.Log.DSN != "" {
logDB, err = gorm.Open(postgres.Open(cfg.Log.DSN), &gorm.Config{})
if err != nil {
fatal(logger, "failed to connect to log database", "err", err)
}
sqlLogDB, err := logDB.DB()
if err != nil {
fatal(logger, "failed to get log database object", "err", err)
}
if err := sqlLogDB.Ping(); err != nil {
fatal(logger, "failed to ping log database", "err", err)
}
logger.Info("connected to log database successfully")
}
// Auto Migrate
if logDB != db {
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.Provider{}, &model.Model{}, &model.Binding{}); err != nil {
fatal(logger, "failed to auto migrate", "err", err)
}
if err := logDB.AutoMigrate(&model.LogRecord{}); err != nil {
fatal(logger, "failed to auto migrate log tables", "err", err)
}
if err := service.EnsureLogIndexes(logDB); err != nil {
fatal(logger, "failed to ensure log indexes", "err", err)
}
} else {
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.Provider{}, &model.Model{}, &model.Binding{}, &model.LogRecord{}); err != nil {
fatal(logger, "failed to auto migrate", "err", err)
}
if err := service.EnsureLogIndexes(db); err != nil {
fatal(logger, "failed to ensure log indexes", "err", err)
}
}
// 4. Setup Services and Handlers
syncService := service.NewSyncService(rdb)
logWriter := service.NewLogWriter(db, cfg.Log.QueueCapacity, cfg.Log.BatchSize, cfg.Log.FlushInterval)
logWriter := service.NewLogWriter(logDB, cfg.Log.QueueCapacity, cfg.Log.BatchSize, cfg.Log.FlushInterval)
logCtx, cancelLogs := context.WithCancel(context.Background())
defer cancelLogs()
logWriter.Start(logCtx)
@@ -107,7 +139,7 @@ func main() {
quotaCtx, cancelQuota := context.WithCancel(context.Background())
defer cancelQuota()
go quotaResetter.Start(quotaCtx)
logCleaner := cron.NewLogCleaner(db, rdb, cfg.Log.RetentionDays, int64(cfg.Log.MaxRecords), time.Hour)
logCleaner := cron.NewLogCleaner(logDB, rdb, cfg.Log.RetentionDays, int64(cfg.Log.MaxRecords), time.Hour)
cleanerCtx, cancelCleaner := context.WithCancel(context.Background())
defer cancelCleaner()
go logCleaner.Start(cleanerCtx)
@@ -119,9 +151,9 @@ func main() {
masterService := service.NewMasterService(db)
healthService := service.NewHealthCheckService(db, rdb)
handler := api.NewHandler(db, syncService, logWriter, rdb)
adminHandler := api.NewAdminHandler(db, masterService, syncService)
masterHandler := api.NewMasterHandler(db, masterService, syncService)
handler := api.NewHandler(db, logDB, syncService, logWriter, rdb)
adminHandler := api.NewAdminHandler(db, logDB, masterService, syncService)
masterHandler := api.NewMasterHandler(db, logDB, masterService, syncService)
internalHandler := api.NewInternalHandler(db)
featureHandler := api.NewFeatureHandler(rdb)
modelRegistryService := service.NewModelRegistryService(db, rdb, service.ModelRegistryConfig{
@@ -179,6 +211,7 @@ func main() {
internalGroup.Use(middleware.InternalAuthMiddleware(cfg.Internal.StatsToken))
{
internalGroup.POST("/stats/flush", internalHandler.FlushStats)
internalGroup.GET("/metrics", gin.WrapH(expvar.Handler()))
}
// Admin Routes

View File

@@ -14,12 +14,23 @@ import (
type AdminHandler struct {
db *gorm.DB
logDB *gorm.DB
masterService *service.MasterService
syncService *service.SyncService
}
func NewAdminHandler(db *gorm.DB, masterService *service.MasterService, syncService *service.SyncService) *AdminHandler {
return &AdminHandler{db: db, masterService: masterService, syncService: syncService}
func NewAdminHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService) *AdminHandler {
if logDB == nil {
logDB = db
}
return &AdminHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService}
}
func (h *AdminHandler) logDBConn() *gorm.DB {
if h == nil || h.logDB == nil {
return h.db
}
return h.logDB
}
type CreateMasterRequest struct {

View File

@@ -18,15 +18,20 @@ import (
type Handler struct {
db *gorm.DB
logDB *gorm.DB
sync *service.SyncService
logger *service.LogWriter
rdb *redis.Client
logWebhook *service.LogWebhookService
}
func NewHandler(db *gorm.DB, sync *service.SyncService, logger *service.LogWriter, rdb *redis.Client) *Handler {
func NewHandler(db *gorm.DB, logDB *gorm.DB, sync *service.SyncService, logger *service.LogWriter, rdb *redis.Client) *Handler {
if logDB == nil {
logDB = db
}
return &Handler{
db: db,
logDB: logDB,
sync: sync,
logger: logger,
rdb: rdb,
@@ -34,6 +39,13 @@ func NewHandler(db *gorm.DB, sync *service.SyncService, logger *service.LogWrite
}
}
func (h *Handler) logDBConn() *gorm.DB {
if h == nil || h.logDB == nil {
return h.db
}
return h.logDB
}
// CreateKey is now handled by MasterHandler
// CreateProvider godoc

View File

@@ -8,6 +8,7 @@ import (
"github.com/ez-api/ez-api/internal/model"
"github.com/gin-gonic/gin"
"gorm.io/gorm"
)
type LogView struct {
@@ -52,6 +53,38 @@ func toLogView(r model.LogRecord) LogView {
}
}
type MasterLogView struct {
ID uint `json:"id"`
CreatedAt int64 `json:"created_at"`
Group string `json:"group"`
KeyID uint `json:"key_id"`
ModelName string `json:"model"`
StatusCode int `json:"status_code"`
LatencyMs int64 `json:"latency_ms"`
TokensIn int64 `json:"tokens_in"`
TokensOut int64 `json:"tokens_out"`
ErrorMessage string `json:"error_message"`
RequestSize int64 `json:"request_size"`
ResponseSize int64 `json:"response_size"`
}
func toMasterLogView(r model.LogRecord) MasterLogView {
return MasterLogView{
ID: r.ID,
CreatedAt: r.CreatedAt.UTC().Unix(),
Group: r.Group,
KeyID: r.KeyID,
ModelName: r.ModelName,
StatusCode: r.StatusCode,
LatencyMs: r.LatencyMs,
TokensIn: r.TokensIn,
TokensOut: r.TokensOut,
ErrorMessage: r.ErrorMessage,
RequestSize: r.RequestSize,
ResponseSize: r.ResponseSize,
}
}
type ListLogsResponse struct {
Total int64 `json:"total"`
Limit int `json:"limit"`
@@ -59,6 +92,13 @@ type ListLogsResponse struct {
Items []LogView `json:"items"`
}
type ListMasterLogsResponse struct {
Total int64 `json:"total"`
Limit int `json:"limit"`
Offset int `json:"offset"`
Items []MasterLogView `json:"items"`
}
func parseLimitOffset(c *gin.Context) (limit, offset int) {
limit = 50
offset = 0
@@ -90,6 +130,26 @@ func parseUnixSeconds(raw string) (time.Time, bool) {
return time.Unix(sec, 0).UTC(), true
}
func (h *MasterHandler) masterLogBase(masterID uint) (*gorm.DB, error) {
logDB := h.logDBConn()
if logDB == h.db {
return logDB.Model(&model.LogRecord{}).
Joins("JOIN keys ON keys.id = log_records.key_id").
Where("keys.master_id = ?", masterID), nil
}
var keyIDs []uint
if err := h.db.Model(&model.Key{}).
Where("master_id = ?", masterID).
Pluck("id", &keyIDs).Error; err != nil {
return nil, err
}
if len(keyIDs) == 0 {
return logDB.Model(&model.LogRecord{}).Where("1 = 0"), nil
}
return logDB.Model(&model.LogRecord{}).
Where("log_records.key_id IN ?", keyIDs), nil
}
// ListLogs godoc
// @Summary List logs (admin)
// @Description List request logs with basic filtering/pagination
@@ -104,13 +164,13 @@ func parseUnixSeconds(raw string) (time.Time, bool) {
// @Param group query string false "route group"
// @Param model query string false "model"
// @Param status_code query int false "status code"
// @Success 200 {object} ListLogsResponse
// @Success 200 {object} ListMasterLogsResponse
// @Failure 500 {object} gin.H
// @Router /admin/logs [get]
func (h *Handler) ListLogs(c *gin.Context) {
limit, offset := parseLimitOffset(c)
q := h.db.Model(&model.LogRecord{})
q := h.logDBConn().Model(&model.LogRecord{})
if t, ok := parseUnixSeconds(c.Query("since")); ok {
q = q.Where("created_at >= ?", t)
@@ -146,11 +206,11 @@ func (h *Handler) ListLogs(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list logs", "details": err.Error()})
return
}
out := make([]LogView, 0, len(rows))
out := make([]MasterLogView, 0, len(rows))
for _, r := range rows {
out = append(out, toLogView(r))
out = append(out, toMasterLogView(r))
}
c.JSON(http.StatusOK, ListLogsResponse{Total: total, Limit: limit, Offset: offset, Items: out})
c.JSON(http.StatusOK, ListMasterLogsResponse{Total: total, Limit: limit, Offset: offset, Items: out})
}
type LogStatsResponse struct {
@@ -201,7 +261,7 @@ func (h *Handler) DeleteLogs(c *gin.Context) {
return
}
q := h.db.Unscoped().Where("created_at < ?", ts.UTC())
q := h.logDBConn().Unscoped().Where("created_at < ?", ts.UTC())
if req.KeyID > 0 {
q = q.Where("key_id = ?", req.KeyID)
}
@@ -229,7 +289,7 @@ func (h *Handler) DeleteLogs(c *gin.Context) {
// @Failure 500 {object} gin.H
// @Router /admin/logs/stats [get]
func (h *Handler) LogStats(c *gin.Context) {
q := h.db.Model(&model.LogRecord{})
q := h.logDBConn().Model(&model.LogRecord{})
if t, ok := parseUnixSeconds(c.Query("since")); ok {
q = q.Where("created_at >= ?", t)
}
@@ -289,7 +349,7 @@ func (h *Handler) LogStats(c *gin.Context) {
// @Param until query int false "unix seconds"
// @Param model query string false "model"
// @Param status_code query int false "status code"
// @Success 200 {object} ListLogsResponse
// @Success 200 {object} ListMasterLogsResponse
// @Failure 401 {object} gin.H
// @Failure 500 {object} gin.H
// @Router /v1/logs [get]
@@ -302,9 +362,11 @@ func (h *MasterHandler) ListSelfLogs(c *gin.Context) {
m := master.(*model.Master)
limit, offset := parseLimitOffset(c)
q := h.db.Model(&model.LogRecord{}).
Joins("JOIN keys ON keys.id = log_records.key_id").
Where("keys.master_id = ?", m.ID)
q, err := h.masterLogBase(m.ID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to build log query", "details": err.Error()})
return
}
if t, ok := parseUnixSeconds(c.Query("since")); ok {
q = q.Where("log_records.created_at >= ?", t)
@@ -331,11 +393,11 @@ func (h *MasterHandler) ListSelfLogs(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list logs", "details": err.Error()})
return
}
out := make([]LogView, 0, len(rows))
out := make([]MasterLogView, 0, len(rows))
for _, r := range rows {
out = append(out, toLogView(r))
out = append(out, toMasterLogView(r))
}
c.JSON(http.StatusOK, ListLogsResponse{Total: total, Limit: limit, Offset: offset, Items: out})
c.JSON(http.StatusOK, ListMasterLogsResponse{Total: total, Limit: limit, Offset: offset, Items: out})
}
// GetSelfLogStats godoc
@@ -358,9 +420,11 @@ func (h *MasterHandler) GetSelfLogStats(c *gin.Context) {
}
m := master.(*model.Master)
q := h.db.Model(&model.LogRecord{}).
Joins("JOIN keys ON keys.id = log_records.key_id").
Where("keys.master_id = ?", m.ID)
q, err := h.masterLogBase(m.ID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to build log query", "details": err.Error()})
return
}
if t, ok := parseUnixSeconds(c.Query("since")); ok {
q = q.Where("log_records.created_at >= ?", t)

View File

@@ -15,12 +15,23 @@ import (
type MasterHandler struct {
db *gorm.DB
logDB *gorm.DB
masterService *service.MasterService
syncService *service.SyncService
}
func NewMasterHandler(db *gorm.DB, masterService *service.MasterService, syncService *service.SyncService) *MasterHandler {
return &MasterHandler{db: db, masterService: masterService, syncService: syncService}
func NewMasterHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService) *MasterHandler {
if logDB == nil {
logDB = db
}
return &MasterHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService}
}
func (h *MasterHandler) logDBConn() *gorm.DB {
if h == nil || h.logDB == nil {
return h.db
}
return h.logDB
}
type IssueChildKeyRequest struct {

View File

@@ -3,6 +3,7 @@ package api
import (
"fmt"
"net/http"
"sort"
"strings"
"time"
@@ -59,9 +60,25 @@ func (h *MasterHandler) GetSelfStats(c *gin.Context) {
return
}
base := h.db.Model(&model.LogRecord{}).
Joins("JOIN keys ON keys.id = log_records.key_id").
logDB := h.logDBConn()
base := logDB.Model(&model.LogRecord{})
if logDB == h.db {
base = base.Joins("JOIN keys ON keys.id = log_records.key_id").
Where("keys.master_id = ?", m.ID)
} else {
var keyIDs []uint
if err := h.db.Model(&model.Key{}).
Where("master_id = ?", m.ID).
Pluck("id", &keyIDs).Error; err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load master keys", "details": err.Error()})
return
}
if len(keyIDs) == 0 {
base = base.Where("1 = 0")
} else {
base = base.Where("log_records.key_id IN ?", keyIDs)
}
}
base = applyStatsRange(base, rng)
totalRequests, totalTokens, err := aggregateTotals(base)
@@ -152,7 +169,8 @@ func (h *AdminHandler) GetAdminStats(c *gin.Context) {
return
}
base := h.db.Model(&model.LogRecord{})
logDB := h.logDBConn()
base := logDB.Model(&model.LogRecord{})
base = applyStatsRange(base, rng)
totalRequests, totalTokens, err := aggregateTotals(base)
@@ -162,6 +180,7 @@ func (h *AdminHandler) GetAdminStats(c *gin.Context) {
}
var byMaster []MasterUsageAgg
if logDB == h.db {
if err := base.Session(&gorm.Session{}).
Joins("JOIN keys ON keys.id = log_records.key_id").
Select("keys.master_id as master_id, COUNT(*) as requests, COALESCE(SUM(log_records.tokens_in + log_records.tokens_out),0) as tokens").
@@ -170,6 +189,14 @@ func (h *AdminHandler) GetAdminStats(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to group by master", "details": err.Error()})
return
}
} else {
var err error
byMaster, err = aggregateByMasterFromLogs(base, h.db)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to group by master", "details": err.Error()})
return
}
}
var byProvider []ProviderUsageAgg
if err := base.Session(&gorm.Session{}).
@@ -268,3 +295,69 @@ func aggregateTotals(q *gorm.DB) (int64, int64, error) {
}
return totalRequests, t.Tokens, nil
}
func aggregateByMasterFromLogs(base *gorm.DB, mainDB *gorm.DB) ([]MasterUsageAgg, error) {
if base == nil || mainDB == nil {
return nil, nil
}
var byKey []KeyUsageStat
if err := base.Session(&gorm.Session{}).
Select("log_records.key_id as key_id, COUNT(*) as requests, COALESCE(SUM(log_records.tokens_in + log_records.tokens_out),0) as tokens").
Group("log_records.key_id").
Scan(&byKey).Error; err != nil {
return nil, err
}
if len(byKey) == 0 {
return []MasterUsageAgg{}, nil
}
keyIDs := make([]uint, 0, len(byKey))
for _, row := range byKey {
if row.KeyID > 0 {
keyIDs = append(keyIDs, row.KeyID)
}
}
if len(keyIDs) == 0 {
return []MasterUsageAgg{}, nil
}
type keyMaster struct {
ID uint
MasterID uint
}
var keyMasters []keyMaster
if err := mainDB.Model(&model.Key{}).
Select("id, master_id").
Where("id IN ?", keyIDs).
Scan(&keyMasters).Error; err != nil {
return nil, err
}
keyToMaster := make(map[uint]uint, len(keyMasters))
for _, row := range keyMasters {
keyToMaster[row.ID] = row.MasterID
}
agg := make(map[uint]*MasterUsageAgg)
for _, row := range byKey {
masterID := keyToMaster[row.KeyID]
if masterID == 0 {
continue
}
entry, ok := agg[masterID]
if !ok {
entry = &MasterUsageAgg{MasterID: masterID}
agg[masterID] = entry
}
entry.Requests += row.Requests
entry.Tokens += row.Tokens
}
out := make([]MasterUsageAgg, 0, len(agg))
for _, row := range agg {
out = append(out, *row)
}
sort.Slice(out, func(i, j int) bool {
return out[i].MasterID < out[j].MasterID
})
return out, nil
}

View File

@@ -44,6 +44,7 @@ type LogConfig struct {
QueueCapacity int
RetentionDays int
MaxRecords int
DSN string
}
type ModelRegistryConfig struct {
@@ -77,6 +78,7 @@ func Load() (*Config, error) {
v.SetDefault("log.queue_capacity", 10000)
v.SetDefault("log.retention_days", 30)
v.SetDefault("log.max_records", 1000000)
v.SetDefault("log.dsn", "")
v.SetDefault("auth.jwt_secret", "change_me_in_production")
v.SetDefault("model_registry.enabled", false)
v.SetDefault("model_registry.refresh_seconds", 1800)
@@ -101,6 +103,7 @@ func Load() (*Config, error) {
_ = v.BindEnv("log.queue_capacity", "EZ_LOG_QUEUE")
_ = v.BindEnv("log.retention_days", "EZ_LOG_RETENTION_DAYS")
_ = v.BindEnv("log.max_records", "EZ_LOG_MAX_RECORDS")
_ = v.BindEnv("log.dsn", "EZ_LOG_PG_DSN")
_ = v.BindEnv("auth.jwt_secret", "EZ_JWT_SECRET")
_ = v.BindEnv("model_registry.enabled", "EZ_MODEL_REGISTRY_ENABLED")
_ = v.BindEnv("model_registry.refresh_seconds", "EZ_MODEL_REGISTRY_REFRESH_SECONDS")
@@ -145,6 +148,7 @@ func Load() (*Config, error) {
QueueCapacity: v.GetInt("log.queue_capacity"),
RetentionDays: v.GetInt("log.retention_days"),
MaxRecords: v.GetInt("log.max_records"),
DSN: strings.TrimSpace(v.GetString("log.dsn")),
},
Auth: AuthConfig{
JWTSecret: v.GetString("auth.jwt_secret"),

View File

@@ -0,0 +1,28 @@
package service
import (
"fmt"
"gorm.io/gorm"
)
// EnsureLogIndexes creates log_records indexes if missing.
func EnsureLogIndexes(db *gorm.DB) error {
if db == nil {
return nil
}
stmts := []string{
`CREATE INDEX IF NOT EXISTS idx_log_records_created_at ON log_records(created_at);`,
`CREATE INDEX IF NOT EXISTS idx_log_records_key_id ON log_records(key_id);`,
`CREATE INDEX IF NOT EXISTS idx_log_records_model_name ON log_records(model_name);`,
`CREATE INDEX IF NOT EXISTS idx_log_records_group ON log_records("group");`,
`CREATE INDEX IF NOT EXISTS idx_log_records_status_code ON log_records(status_code);`,
`CREATE INDEX IF NOT EXISTS idx_log_records_created_key ON log_records(created_at, key_id);`,
}
for _, stmt := range stmts {
if err := db.Exec(stmt).Error; err != nil {
return fmt.Errorf("ensure log indexes: %w", err)
}
}
return nil
}

View File

@@ -2,6 +2,7 @@ package service
import (
"context"
"expvar"
"log/slog"
"time"
@@ -9,6 +10,11 @@ import (
"gorm.io/gorm"
)
var (
logBatchWriteTotal = expvar.NewInt("log_write_batch_total")
logQueueDropped = expvar.NewInt("log_queue_dropped_total")
)
// LogWriter batches log records to reduce IO overhead.
type LogWriter struct {
ch chan model.LogRecord
@@ -48,6 +54,8 @@ func (w *LogWriter) Start(ctx context.Context) {
}
if err := w.db.Create(&buf).Error; err != nil {
slog.Default().Error("log batch insert failed", "err", err)
} else {
logBatchWriteTotal.Add(1)
}
buf = buf[:0]
}
@@ -75,5 +83,6 @@ func (w *LogWriter) Write(rec model.LogRecord) {
case w.ch <- rec:
default:
// drop to avoid blocking hot path
logQueueDropped.Add(1)
}
}