diff --git a/cmd/server/main.go b/cmd/server/main.go index 357f6de..2f015c3 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "expvar" "log/slog" "net/http" "os" @@ -92,14 +93,45 @@ func main() { } logger.Info("connected to postgresql successfully") + logDB := db + if cfg.Log.DSN != "" { + logDB, err = gorm.Open(postgres.Open(cfg.Log.DSN), &gorm.Config{}) + if err != nil { + fatal(logger, "failed to connect to log database", "err", err) + } + sqlLogDB, err := logDB.DB() + if err != nil { + fatal(logger, "failed to get log database object", "err", err) + } + if err := sqlLogDB.Ping(); err != nil { + fatal(logger, "failed to ping log database", "err", err) + } + logger.Info("connected to log database successfully") + } + // Auto Migrate - if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.Provider{}, &model.Model{}, &model.Binding{}, &model.LogRecord{}); err != nil { - fatal(logger, "failed to auto migrate", "err", err) + if logDB != db { + if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.Provider{}, &model.Model{}, &model.Binding{}); err != nil { + fatal(logger, "failed to auto migrate", "err", err) + } + if err := logDB.AutoMigrate(&model.LogRecord{}); err != nil { + fatal(logger, "failed to auto migrate log tables", "err", err) + } + if err := service.EnsureLogIndexes(logDB); err != nil { + fatal(logger, "failed to ensure log indexes", "err", err) + } + } else { + if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.Provider{}, &model.Model{}, &model.Binding{}, &model.LogRecord{}); err != nil { + fatal(logger, "failed to auto migrate", "err", err) + } + if err := service.EnsureLogIndexes(db); err != nil { + fatal(logger, "failed to ensure log indexes", "err", err) + } } // 4. Setup Services and Handlers syncService := service.NewSyncService(rdb) - logWriter := service.NewLogWriter(db, cfg.Log.QueueCapacity, cfg.Log.BatchSize, cfg.Log.FlushInterval) + logWriter := service.NewLogWriter(logDB, cfg.Log.QueueCapacity, cfg.Log.BatchSize, cfg.Log.FlushInterval) logCtx, cancelLogs := context.WithCancel(context.Background()) defer cancelLogs() logWriter.Start(logCtx) @@ -107,7 +139,7 @@ func main() { quotaCtx, cancelQuota := context.WithCancel(context.Background()) defer cancelQuota() go quotaResetter.Start(quotaCtx) - logCleaner := cron.NewLogCleaner(db, rdb, cfg.Log.RetentionDays, int64(cfg.Log.MaxRecords), time.Hour) + logCleaner := cron.NewLogCleaner(logDB, rdb, cfg.Log.RetentionDays, int64(cfg.Log.MaxRecords), time.Hour) cleanerCtx, cancelCleaner := context.WithCancel(context.Background()) defer cancelCleaner() go logCleaner.Start(cleanerCtx) @@ -119,9 +151,9 @@ func main() { masterService := service.NewMasterService(db) healthService := service.NewHealthCheckService(db, rdb) - handler := api.NewHandler(db, syncService, logWriter, rdb) - adminHandler := api.NewAdminHandler(db, masterService, syncService) - masterHandler := api.NewMasterHandler(db, masterService, syncService) + handler := api.NewHandler(db, logDB, syncService, logWriter, rdb) + adminHandler := api.NewAdminHandler(db, logDB, masterService, syncService) + masterHandler := api.NewMasterHandler(db, logDB, masterService, syncService) internalHandler := api.NewInternalHandler(db) featureHandler := api.NewFeatureHandler(rdb) modelRegistryService := service.NewModelRegistryService(db, rdb, service.ModelRegistryConfig{ @@ -179,6 +211,7 @@ func main() { internalGroup.Use(middleware.InternalAuthMiddleware(cfg.Internal.StatsToken)) { internalGroup.POST("/stats/flush", internalHandler.FlushStats) + internalGroup.GET("/metrics", gin.WrapH(expvar.Handler())) } // Admin Routes diff --git a/internal/api/admin_handler.go b/internal/api/admin_handler.go index 772c766..9eeb516 100644 --- a/internal/api/admin_handler.go +++ b/internal/api/admin_handler.go @@ -14,12 +14,23 @@ import ( type AdminHandler struct { db *gorm.DB + logDB *gorm.DB masterService *service.MasterService syncService *service.SyncService } -func NewAdminHandler(db *gorm.DB, masterService *service.MasterService, syncService *service.SyncService) *AdminHandler { - return &AdminHandler{db: db, masterService: masterService, syncService: syncService} +func NewAdminHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService) *AdminHandler { + if logDB == nil { + logDB = db + } + return &AdminHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService} +} + +func (h *AdminHandler) logDBConn() *gorm.DB { + if h == nil || h.logDB == nil { + return h.db + } + return h.logDB } type CreateMasterRequest struct { diff --git a/internal/api/handler.go b/internal/api/handler.go index 0ea80d7..0bc0a5d 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -18,15 +18,20 @@ import ( type Handler struct { db *gorm.DB + logDB *gorm.DB sync *service.SyncService logger *service.LogWriter rdb *redis.Client logWebhook *service.LogWebhookService } -func NewHandler(db *gorm.DB, sync *service.SyncService, logger *service.LogWriter, rdb *redis.Client) *Handler { +func NewHandler(db *gorm.DB, logDB *gorm.DB, sync *service.SyncService, logger *service.LogWriter, rdb *redis.Client) *Handler { + if logDB == nil { + logDB = db + } return &Handler{ db: db, + logDB: logDB, sync: sync, logger: logger, rdb: rdb, @@ -34,6 +39,13 @@ func NewHandler(db *gorm.DB, sync *service.SyncService, logger *service.LogWrite } } +func (h *Handler) logDBConn() *gorm.DB { + if h == nil || h.logDB == nil { + return h.db + } + return h.logDB +} + // CreateKey is now handled by MasterHandler // CreateProvider godoc diff --git a/internal/api/log_handler.go b/internal/api/log_handler.go index 2f9e307..c7e89c2 100644 --- a/internal/api/log_handler.go +++ b/internal/api/log_handler.go @@ -8,6 +8,7 @@ import ( "github.com/ez-api/ez-api/internal/model" "github.com/gin-gonic/gin" + "gorm.io/gorm" ) type LogView struct { @@ -52,6 +53,38 @@ func toLogView(r model.LogRecord) LogView { } } +type MasterLogView struct { + ID uint `json:"id"` + CreatedAt int64 `json:"created_at"` + Group string `json:"group"` + KeyID uint `json:"key_id"` + ModelName string `json:"model"` + StatusCode int `json:"status_code"` + LatencyMs int64 `json:"latency_ms"` + TokensIn int64 `json:"tokens_in"` + TokensOut int64 `json:"tokens_out"` + ErrorMessage string `json:"error_message"` + RequestSize int64 `json:"request_size"` + ResponseSize int64 `json:"response_size"` +} + +func toMasterLogView(r model.LogRecord) MasterLogView { + return MasterLogView{ + ID: r.ID, + CreatedAt: r.CreatedAt.UTC().Unix(), + Group: r.Group, + KeyID: r.KeyID, + ModelName: r.ModelName, + StatusCode: r.StatusCode, + LatencyMs: r.LatencyMs, + TokensIn: r.TokensIn, + TokensOut: r.TokensOut, + ErrorMessage: r.ErrorMessage, + RequestSize: r.RequestSize, + ResponseSize: r.ResponseSize, + } +} + type ListLogsResponse struct { Total int64 `json:"total"` Limit int `json:"limit"` @@ -59,6 +92,13 @@ type ListLogsResponse struct { Items []LogView `json:"items"` } +type ListMasterLogsResponse struct { + Total int64 `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + Items []MasterLogView `json:"items"` +} + func parseLimitOffset(c *gin.Context) (limit, offset int) { limit = 50 offset = 0 @@ -90,6 +130,26 @@ func parseUnixSeconds(raw string) (time.Time, bool) { return time.Unix(sec, 0).UTC(), true } +func (h *MasterHandler) masterLogBase(masterID uint) (*gorm.DB, error) { + logDB := h.logDBConn() + if logDB == h.db { + return logDB.Model(&model.LogRecord{}). + Joins("JOIN keys ON keys.id = log_records.key_id"). + Where("keys.master_id = ?", masterID), nil + } + var keyIDs []uint + if err := h.db.Model(&model.Key{}). + Where("master_id = ?", masterID). + Pluck("id", &keyIDs).Error; err != nil { + return nil, err + } + if len(keyIDs) == 0 { + return logDB.Model(&model.LogRecord{}).Where("1 = 0"), nil + } + return logDB.Model(&model.LogRecord{}). + Where("log_records.key_id IN ?", keyIDs), nil +} + // ListLogs godoc // @Summary List logs (admin) // @Description List request logs with basic filtering/pagination @@ -104,13 +164,13 @@ func parseUnixSeconds(raw string) (time.Time, bool) { // @Param group query string false "route group" // @Param model query string false "model" // @Param status_code query int false "status code" -// @Success 200 {object} ListLogsResponse +// @Success 200 {object} ListMasterLogsResponse // @Failure 500 {object} gin.H // @Router /admin/logs [get] func (h *Handler) ListLogs(c *gin.Context) { limit, offset := parseLimitOffset(c) - q := h.db.Model(&model.LogRecord{}) + q := h.logDBConn().Model(&model.LogRecord{}) if t, ok := parseUnixSeconds(c.Query("since")); ok { q = q.Where("created_at >= ?", t) @@ -146,11 +206,11 @@ func (h *Handler) ListLogs(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list logs", "details": err.Error()}) return } - out := make([]LogView, 0, len(rows)) + out := make([]MasterLogView, 0, len(rows)) for _, r := range rows { - out = append(out, toLogView(r)) + out = append(out, toMasterLogView(r)) } - c.JSON(http.StatusOK, ListLogsResponse{Total: total, Limit: limit, Offset: offset, Items: out}) + c.JSON(http.StatusOK, ListMasterLogsResponse{Total: total, Limit: limit, Offset: offset, Items: out}) } type LogStatsResponse struct { @@ -201,7 +261,7 @@ func (h *Handler) DeleteLogs(c *gin.Context) { return } - q := h.db.Unscoped().Where("created_at < ?", ts.UTC()) + q := h.logDBConn().Unscoped().Where("created_at < ?", ts.UTC()) if req.KeyID > 0 { q = q.Where("key_id = ?", req.KeyID) } @@ -229,7 +289,7 @@ func (h *Handler) DeleteLogs(c *gin.Context) { // @Failure 500 {object} gin.H // @Router /admin/logs/stats [get] func (h *Handler) LogStats(c *gin.Context) { - q := h.db.Model(&model.LogRecord{}) + q := h.logDBConn().Model(&model.LogRecord{}) if t, ok := parseUnixSeconds(c.Query("since")); ok { q = q.Where("created_at >= ?", t) } @@ -289,7 +349,7 @@ func (h *Handler) LogStats(c *gin.Context) { // @Param until query int false "unix seconds" // @Param model query string false "model" // @Param status_code query int false "status code" -// @Success 200 {object} ListLogsResponse +// @Success 200 {object} ListMasterLogsResponse // @Failure 401 {object} gin.H // @Failure 500 {object} gin.H // @Router /v1/logs [get] @@ -302,9 +362,11 @@ func (h *MasterHandler) ListSelfLogs(c *gin.Context) { m := master.(*model.Master) limit, offset := parseLimitOffset(c) - q := h.db.Model(&model.LogRecord{}). - Joins("JOIN keys ON keys.id = log_records.key_id"). - Where("keys.master_id = ?", m.ID) + q, err := h.masterLogBase(m.ID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to build log query", "details": err.Error()}) + return + } if t, ok := parseUnixSeconds(c.Query("since")); ok { q = q.Where("log_records.created_at >= ?", t) @@ -331,11 +393,11 @@ func (h *MasterHandler) ListSelfLogs(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list logs", "details": err.Error()}) return } - out := make([]LogView, 0, len(rows)) + out := make([]MasterLogView, 0, len(rows)) for _, r := range rows { - out = append(out, toLogView(r)) + out = append(out, toMasterLogView(r)) } - c.JSON(http.StatusOK, ListLogsResponse{Total: total, Limit: limit, Offset: offset, Items: out}) + c.JSON(http.StatusOK, ListMasterLogsResponse{Total: total, Limit: limit, Offset: offset, Items: out}) } // GetSelfLogStats godoc @@ -358,9 +420,11 @@ func (h *MasterHandler) GetSelfLogStats(c *gin.Context) { } m := master.(*model.Master) - q := h.db.Model(&model.LogRecord{}). - Joins("JOIN keys ON keys.id = log_records.key_id"). - Where("keys.master_id = ?", m.ID) + q, err := h.masterLogBase(m.ID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to build log query", "details": err.Error()}) + return + } if t, ok := parseUnixSeconds(c.Query("since")); ok { q = q.Where("log_records.created_at >= ?", t) diff --git a/internal/api/master_handler.go b/internal/api/master_handler.go index dd60bf7..5f6e3b3 100644 --- a/internal/api/master_handler.go +++ b/internal/api/master_handler.go @@ -15,12 +15,23 @@ import ( type MasterHandler struct { db *gorm.DB + logDB *gorm.DB masterService *service.MasterService syncService *service.SyncService } -func NewMasterHandler(db *gorm.DB, masterService *service.MasterService, syncService *service.SyncService) *MasterHandler { - return &MasterHandler{db: db, masterService: masterService, syncService: syncService} +func NewMasterHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService) *MasterHandler { + if logDB == nil { + logDB = db + } + return &MasterHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService} +} + +func (h *MasterHandler) logDBConn() *gorm.DB { + if h == nil || h.logDB == nil { + return h.db + } + return h.logDB } type IssueChildKeyRequest struct { diff --git a/internal/api/stats_handler.go b/internal/api/stats_handler.go index 088f919..4bafab9 100644 --- a/internal/api/stats_handler.go +++ b/internal/api/stats_handler.go @@ -3,6 +3,7 @@ package api import ( "fmt" "net/http" + "sort" "strings" "time" @@ -59,9 +60,25 @@ func (h *MasterHandler) GetSelfStats(c *gin.Context) { return } - base := h.db.Model(&model.LogRecord{}). - Joins("JOIN keys ON keys.id = log_records.key_id"). - Where("keys.master_id = ?", m.ID) + logDB := h.logDBConn() + base := logDB.Model(&model.LogRecord{}) + if logDB == h.db { + base = base.Joins("JOIN keys ON keys.id = log_records.key_id"). + Where("keys.master_id = ?", m.ID) + } else { + var keyIDs []uint + if err := h.db.Model(&model.Key{}). + Where("master_id = ?", m.ID). + Pluck("id", &keyIDs).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load master keys", "details": err.Error()}) + return + } + if len(keyIDs) == 0 { + base = base.Where("1 = 0") + } else { + base = base.Where("log_records.key_id IN ?", keyIDs) + } + } base = applyStatsRange(base, rng) totalRequests, totalTokens, err := aggregateTotals(base) @@ -152,7 +169,8 @@ func (h *AdminHandler) GetAdminStats(c *gin.Context) { return } - base := h.db.Model(&model.LogRecord{}) + logDB := h.logDBConn() + base := logDB.Model(&model.LogRecord{}) base = applyStatsRange(base, rng) totalRequests, totalTokens, err := aggregateTotals(base) @@ -162,13 +180,22 @@ func (h *AdminHandler) GetAdminStats(c *gin.Context) { } var byMaster []MasterUsageAgg - if err := base.Session(&gorm.Session{}). - Joins("JOIN keys ON keys.id = log_records.key_id"). - Select("keys.master_id as master_id, COUNT(*) as requests, COALESCE(SUM(log_records.tokens_in + log_records.tokens_out),0) as tokens"). - Group("keys.master_id"). - Scan(&byMaster).Error; err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to group by master", "details": err.Error()}) - return + if logDB == h.db { + if err := base.Session(&gorm.Session{}). + Joins("JOIN keys ON keys.id = log_records.key_id"). + Select("keys.master_id as master_id, COUNT(*) as requests, COALESCE(SUM(log_records.tokens_in + log_records.tokens_out),0) as tokens"). + Group("keys.master_id"). + Scan(&byMaster).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to group by master", "details": err.Error()}) + return + } + } else { + var err error + byMaster, err = aggregateByMasterFromLogs(base, h.db) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to group by master", "details": err.Error()}) + return + } } var byProvider []ProviderUsageAgg @@ -268,3 +295,69 @@ func aggregateTotals(q *gorm.DB) (int64, int64, error) { } return totalRequests, t.Tokens, nil } + +func aggregateByMasterFromLogs(base *gorm.DB, mainDB *gorm.DB) ([]MasterUsageAgg, error) { + if base == nil || mainDB == nil { + return nil, nil + } + var byKey []KeyUsageStat + if err := base.Session(&gorm.Session{}). + Select("log_records.key_id as key_id, COUNT(*) as requests, COALESCE(SUM(log_records.tokens_in + log_records.tokens_out),0) as tokens"). + Group("log_records.key_id"). + Scan(&byKey).Error; err != nil { + return nil, err + } + if len(byKey) == 0 { + return []MasterUsageAgg{}, nil + } + + keyIDs := make([]uint, 0, len(byKey)) + for _, row := range byKey { + if row.KeyID > 0 { + keyIDs = append(keyIDs, row.KeyID) + } + } + if len(keyIDs) == 0 { + return []MasterUsageAgg{}, nil + } + + type keyMaster struct { + ID uint + MasterID uint + } + var keyMasters []keyMaster + if err := mainDB.Model(&model.Key{}). + Select("id, master_id"). + Where("id IN ?", keyIDs). + Scan(&keyMasters).Error; err != nil { + return nil, err + } + keyToMaster := make(map[uint]uint, len(keyMasters)) + for _, row := range keyMasters { + keyToMaster[row.ID] = row.MasterID + } + + agg := make(map[uint]*MasterUsageAgg) + for _, row := range byKey { + masterID := keyToMaster[row.KeyID] + if masterID == 0 { + continue + } + entry, ok := agg[masterID] + if !ok { + entry = &MasterUsageAgg{MasterID: masterID} + agg[masterID] = entry + } + entry.Requests += row.Requests + entry.Tokens += row.Tokens + } + + out := make([]MasterUsageAgg, 0, len(agg)) + for _, row := range agg { + out = append(out, *row) + } + sort.Slice(out, func(i, j int) bool { + return out[i].MasterID < out[j].MasterID + }) + return out, nil +} diff --git a/internal/config/config.go b/internal/config/config.go index 59d333a..f2c8e69 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -44,6 +44,7 @@ type LogConfig struct { QueueCapacity int RetentionDays int MaxRecords int + DSN string } type ModelRegistryConfig struct { @@ -77,6 +78,7 @@ func Load() (*Config, error) { v.SetDefault("log.queue_capacity", 10000) v.SetDefault("log.retention_days", 30) v.SetDefault("log.max_records", 1000000) + v.SetDefault("log.dsn", "") v.SetDefault("auth.jwt_secret", "change_me_in_production") v.SetDefault("model_registry.enabled", false) v.SetDefault("model_registry.refresh_seconds", 1800) @@ -101,6 +103,7 @@ func Load() (*Config, error) { _ = v.BindEnv("log.queue_capacity", "EZ_LOG_QUEUE") _ = v.BindEnv("log.retention_days", "EZ_LOG_RETENTION_DAYS") _ = v.BindEnv("log.max_records", "EZ_LOG_MAX_RECORDS") + _ = v.BindEnv("log.dsn", "EZ_LOG_PG_DSN") _ = v.BindEnv("auth.jwt_secret", "EZ_JWT_SECRET") _ = v.BindEnv("model_registry.enabled", "EZ_MODEL_REGISTRY_ENABLED") _ = v.BindEnv("model_registry.refresh_seconds", "EZ_MODEL_REGISTRY_REFRESH_SECONDS") @@ -145,6 +148,7 @@ func Load() (*Config, error) { QueueCapacity: v.GetInt("log.queue_capacity"), RetentionDays: v.GetInt("log.retention_days"), MaxRecords: v.GetInt("log.max_records"), + DSN: strings.TrimSpace(v.GetString("log.dsn")), }, Auth: AuthConfig{ JWTSecret: v.GetString("auth.jwt_secret"), diff --git a/internal/service/log_indexes.go b/internal/service/log_indexes.go new file mode 100644 index 0000000..97cc74c --- /dev/null +++ b/internal/service/log_indexes.go @@ -0,0 +1,28 @@ +package service + +import ( + "fmt" + + "gorm.io/gorm" +) + +// EnsureLogIndexes creates log_records indexes if missing. +func EnsureLogIndexes(db *gorm.DB) error { + if db == nil { + return nil + } + stmts := []string{ + `CREATE INDEX IF NOT EXISTS idx_log_records_created_at ON log_records(created_at);`, + `CREATE INDEX IF NOT EXISTS idx_log_records_key_id ON log_records(key_id);`, + `CREATE INDEX IF NOT EXISTS idx_log_records_model_name ON log_records(model_name);`, + `CREATE INDEX IF NOT EXISTS idx_log_records_group ON log_records("group");`, + `CREATE INDEX IF NOT EXISTS idx_log_records_status_code ON log_records(status_code);`, + `CREATE INDEX IF NOT EXISTS idx_log_records_created_key ON log_records(created_at, key_id);`, + } + for _, stmt := range stmts { + if err := db.Exec(stmt).Error; err != nil { + return fmt.Errorf("ensure log indexes: %w", err) + } + } + return nil +} diff --git a/internal/service/logger.go b/internal/service/logger.go index b742d81..987f4d5 100644 --- a/internal/service/logger.go +++ b/internal/service/logger.go @@ -2,6 +2,7 @@ package service import ( "context" + "expvar" "log/slog" "time" @@ -9,6 +10,11 @@ import ( "gorm.io/gorm" ) +var ( + logBatchWriteTotal = expvar.NewInt("log_write_batch_total") + logQueueDropped = expvar.NewInt("log_queue_dropped_total") +) + // LogWriter batches log records to reduce IO overhead. type LogWriter struct { ch chan model.LogRecord @@ -48,6 +54,8 @@ func (w *LogWriter) Start(ctx context.Context) { } if err := w.db.Create(&buf).Error; err != nil { slog.Default().Error("log batch insert failed", "err", err) + } else { + logBatchWriteTotal.Add(1) } buf = buf[:0] } @@ -75,5 +83,6 @@ func (w *LogWriter) Write(rec model.LogRecord) { case w.ch <- rec: default: // drop to avoid blocking hot path + logQueueDropped.Add(1) } }