From 88289015fc1e9d3f87342f73a1625e636d545b9f Mon Sep 17 00:00:00 2001 From: zenfun Date: Fri, 19 Dec 2025 23:26:33 +0800 Subject: [PATCH] feat: add internal stats flush API --- cmd/server/main.go | 8 ++ internal/api/internal_handler.go | 96 ++++++++++++++++++++++++ internal/api/internal_handler_test.go | 104 ++++++++++++++++++++++++++ internal/config/config.go | 10 +++ internal/middleware/internal_auth.go | 26 +++++++ 5 files changed, 244 insertions(+) create mode 100644 internal/api/internal_handler.go create mode 100644 internal/api/internal_handler_test.go create mode 100644 internal/middleware/internal_auth.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 1c52419..28cba31 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -118,6 +118,7 @@ func main() { handler := api.NewHandler(db, syncService, logWriter) adminHandler := api.NewAdminHandler(db, masterService, syncService) masterHandler := api.NewMasterHandler(db, masterService, syncService) + internalHandler := api.NewInternalHandler(db) featureHandler := api.NewFeatureHandler(rdb) modelRegistryService := service.NewModelRegistryService(db, rdb, service.ModelRegistryConfig{ Enabled: cfg.ModelRegistry.Enabled, @@ -169,6 +170,13 @@ func main() { r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) // API Routes + // Internal Routes + internalGroup := r.Group("/internal") + internalGroup.Use(middleware.InternalAuthMiddleware(cfg.Internal.StatsToken)) + { + internalGroup.POST("/stats/flush", internalHandler.FlushStats) + } + // Admin Routes adminGroup := r.Group("/admin") adminGroup.Use(middleware.AdminAuthMiddleware(adminService)) diff --git a/internal/api/internal_handler.go b/internal/api/internal_handler.go new file mode 100644 index 0000000..2ac3b89 --- /dev/null +++ b/internal/api/internal_handler.go @@ -0,0 +1,96 @@ +package api + +import ( + "net/http" + "strings" + "time" + + "github.com/ez-api/ez-api/internal/model" + "github.com/gin-gonic/gin" + "gorm.io/gorm" +) + +type InternalHandler struct { + db *gorm.DB +} + +func NewInternalHandler(db *gorm.DB) *InternalHandler { + return &InternalHandler{db: db} +} + +type statsFlushRequest struct { + Keys []statsFlushEntry `json:"keys"` +} + +type statsFlushEntry struct { + TokenHash string `json:"token_hash"` + Requests int64 `json:"requests"` + Tokens int64 `json:"tokens"` + LastAccessedAt int64 `json:"last_accessed_at"` +} + +func (h *InternalHandler) FlushStats(c *gin.Context) { + if h == nil || h.db == nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "database not configured"}) + return + } + + var req statsFlushRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) + return + } + if len(req.Keys) == 0 { + c.JSON(http.StatusOK, gin.H{"updated": 0}) + return + } + + updated := 0 + err := h.db.Transaction(func(tx *gorm.DB) error { + for _, entry := range req.Keys { + hash := strings.TrimSpace(entry.TokenHash) + if hash == "" { + continue + } + if entry.Requests < 0 || entry.Tokens < 0 || entry.LastAccessedAt < 0 { + return gorm.ErrInvalidData + } + + updates := map[string]any{} + if entry.Requests > 0 { + updates["request_count"] = gorm.Expr("request_count + ?", entry.Requests) + } + if entry.Tokens > 0 { + updates["used_tokens"] = gorm.Expr("used_tokens + ?", entry.Tokens) + updates["quota_used"] = gorm.Expr("CASE WHEN quota_limit >= 0 THEN quota_used + ? ELSE quota_used END", entry.Tokens) + } + if entry.LastAccessedAt > 0 { + accessTime := time.Unix(entry.LastAccessedAt, 0).UTC() + updates["last_accessed_at"] = gorm.Expr("CASE WHEN last_accessed_at IS NULL OR last_accessed_at < ? THEN ? ELSE last_accessed_at END", accessTime, accessTime) + } + + if len(updates) == 0 { + continue + } + + res := tx.Model(&model.Key{}).Where("token_hash = ?", hash).Updates(updates) + if res.Error != nil { + return res.Error + } + if res.RowsAffected > 0 { + updated++ + } + } + return nil + }) + if err != nil { + if err == gorm.ErrInvalidData { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid stats payload"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to flush stats", "details": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"updated": updated}) +} diff --git a/internal/api/internal_handler_test.go b/internal/api/internal_handler_test.go new file mode 100644 index 0000000..1eed227 --- /dev/null +++ b/internal/api/internal_handler_test.go @@ -0,0 +1,104 @@ +package api + +import ( + "bytes" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/ez-api/ez-api/internal/model" + "github.com/gin-gonic/gin" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func TestInternalHandler_FlushStatsUpdatesCounters(t *testing.T) { + gin.SetMode(gin.TestMode) + + db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + if err := db.AutoMigrate(&model.Key{}); err != nil { + t.Fatalf("migrate: %v", err) + } + + key1 := model.Key{ + MasterID: 1, + IssuedAtEpoch: 1, + TokenHash: "hash-1", + RequestCount: 5, + UsedTokens: 20, + QuotaLimit: 100, + QuotaUsed: 10, + } + key2 := model.Key{ + MasterID: 1, + IssuedAtEpoch: 1, + TokenHash: "hash-2", + RequestCount: 0, + UsedTokens: 0, + QuotaLimit: -1, + QuotaUsed: 7, + } + + if err := db.Create(&key1).Error; err != nil { + t.Fatalf("create key1: %v", err) + } + if err := db.Create(&key2).Error; err != nil { + t.Fatalf("create key2: %v", err) + } + + handler := NewInternalHandler(db) + r := gin.New() + r.POST("/internal/stats/flush", handler.FlushStats) + + body := []byte(`{ + "keys": [ + {"token_hash": "hash-1", "requests": 3, "tokens": 15, "last_accessed_at": 1700000000}, + {"token_hash": "hash-2", "requests": 1, "tokens": 5, "last_accessed_at": 1700000010} + ] + }`) + req := httptest.NewRequest(http.MethodPost, "/internal/stats/flush", bytes.NewReader(body)) + rec := httptest.NewRecorder() + r.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("unexpected status: got=%d body=%s", rec.Code, rec.Body.String()) + } + + var got1 model.Key + if err := db.First(&got1, "token_hash = ?", "hash-1").Error; err != nil { + t.Fatalf("load key1: %v", err) + } + if got1.RequestCount != 8 { + t.Fatalf("key1 request_count: got=%d", got1.RequestCount) + } + if got1.UsedTokens != 35 { + t.Fatalf("key1 used_tokens: got=%d", got1.UsedTokens) + } + if got1.QuotaUsed != 25 { + t.Fatalf("key1 quota_used: got=%d", got1.QuotaUsed) + } + if got1.LastAccessedAt == nil || got1.LastAccessedAt.Unix() != 1700000000 { + t.Fatalf("key1 last_accessed_at: got=%v", got1.LastAccessedAt) + } + + var got2 model.Key + if err := db.First(&got2, "token_hash = ?", "hash-2").Error; err != nil { + t.Fatalf("load key2: %v", err) + } + if got2.RequestCount != 1 { + t.Fatalf("key2 request_count: got=%d", got2.RequestCount) + } + if got2.UsedTokens != 5 { + t.Fatalf("key2 used_tokens: got=%d", got2.UsedTokens) + } + if got2.QuotaUsed != 7 { + t.Fatalf("key2 quota_used: got=%d", got2.QuotaUsed) + } + if got2.LastAccessedAt == nil || got2.LastAccessedAt.UTC().Unix() != time.Unix(1700000010, 0).UTC().Unix() { + t.Fatalf("key2 last_accessed_at: got=%v", got2.LastAccessedAt) + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 3dc4c9b..8bdb1af 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -17,6 +17,7 @@ type Config struct { Auth AuthConfig ModelRegistry ModelRegistryConfig Quota QuotaConfig + Internal InternalConfig } type ServerConfig struct { @@ -57,6 +58,10 @@ type QuotaConfig struct { ResetIntervalSeconds int } +type InternalConfig struct { + StatsToken string +} + func Load() (*Config, error) { v := viper.New() @@ -77,6 +82,7 @@ func Load() (*Config, error) { v.SetDefault("model_registry.cache_dir", "./data/model-registry") v.SetDefault("model_registry.timeout_seconds", 30) v.SetDefault("quota.reset_interval_seconds", 300) + v.SetDefault("internal.stats_token", "") v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) v.AutomaticEnv() @@ -98,6 +104,7 @@ func Load() (*Config, error) { _ = v.BindEnv("model_registry.cache_dir", "EZ_MODEL_REGISTRY_CACHE_DIR") _ = v.BindEnv("model_registry.timeout_seconds", "EZ_MODEL_REGISTRY_TIMEOUT_SECONDS") _ = v.BindEnv("quota.reset_interval_seconds", "EZ_QUOTA_RESET_INTERVAL_SECONDS") + _ = v.BindEnv("internal.stats_token", "EZ_INTERNAL_STATS_TOKEN") if configFile := os.Getenv("EZ_CONFIG_FILE"); configFile != "" { v.SetConfigFile(configFile) @@ -146,6 +153,9 @@ func Load() (*Config, error) { Quota: QuotaConfig{ ResetIntervalSeconds: v.GetInt("quota.reset_interval_seconds"), }, + Internal: InternalConfig{ + StatsToken: v.GetString("internal.stats_token"), + }, } return cfg, nil diff --git a/internal/middleware/internal_auth.go b/internal/middleware/internal_auth.go new file mode 100644 index 0000000..554e0cd --- /dev/null +++ b/internal/middleware/internal_auth.go @@ -0,0 +1,26 @@ +package middleware + +import ( + "net/http" + "strings" + + "github.com/gin-gonic/gin" +) + +func InternalAuthMiddleware(expectedToken string) gin.HandlerFunc { + expectedToken = strings.TrimSpace(expectedToken) + return func(c *gin.Context) { + if expectedToken == "" { + c.Next() + return + } + + token := strings.TrimSpace(c.GetHeader("X-Internal-Token")) + if token == "" || token != expectedToken { + c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid internal token"}) + c.Abort() + return + } + c.Next() + } +}