mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
feat: add internal stats flush API
This commit is contained in:
@@ -118,6 +118,7 @@ func main() {
|
|||||||
handler := api.NewHandler(db, syncService, logWriter)
|
handler := api.NewHandler(db, syncService, logWriter)
|
||||||
adminHandler := api.NewAdminHandler(db, masterService, syncService)
|
adminHandler := api.NewAdminHandler(db, masterService, syncService)
|
||||||
masterHandler := api.NewMasterHandler(db, masterService, syncService)
|
masterHandler := api.NewMasterHandler(db, masterService, syncService)
|
||||||
|
internalHandler := api.NewInternalHandler(db)
|
||||||
featureHandler := api.NewFeatureHandler(rdb)
|
featureHandler := api.NewFeatureHandler(rdb)
|
||||||
modelRegistryService := service.NewModelRegistryService(db, rdb, service.ModelRegistryConfig{
|
modelRegistryService := service.NewModelRegistryService(db, rdb, service.ModelRegistryConfig{
|
||||||
Enabled: cfg.ModelRegistry.Enabled,
|
Enabled: cfg.ModelRegistry.Enabled,
|
||||||
@@ -169,6 +170,13 @@ func main() {
|
|||||||
r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
|
r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
|
||||||
|
|
||||||
// API Routes
|
// API Routes
|
||||||
|
// Internal Routes
|
||||||
|
internalGroup := r.Group("/internal")
|
||||||
|
internalGroup.Use(middleware.InternalAuthMiddleware(cfg.Internal.StatsToken))
|
||||||
|
{
|
||||||
|
internalGroup.POST("/stats/flush", internalHandler.FlushStats)
|
||||||
|
}
|
||||||
|
|
||||||
// Admin Routes
|
// Admin Routes
|
||||||
adminGroup := r.Group("/admin")
|
adminGroup := r.Group("/admin")
|
||||||
adminGroup.Use(middleware.AdminAuthMiddleware(adminService))
|
adminGroup.Use(middleware.AdminAuthMiddleware(adminService))
|
||||||
|
|||||||
96
internal/api/internal_handler.go
Normal file
96
internal/api/internal_handler.go
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ez-api/ez-api/internal/model"
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
type InternalHandler struct {
|
||||||
|
db *gorm.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewInternalHandler(db *gorm.DB) *InternalHandler {
|
||||||
|
return &InternalHandler{db: db}
|
||||||
|
}
|
||||||
|
|
||||||
|
type statsFlushRequest struct {
|
||||||
|
Keys []statsFlushEntry `json:"keys"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type statsFlushEntry struct {
|
||||||
|
TokenHash string `json:"token_hash"`
|
||||||
|
Requests int64 `json:"requests"`
|
||||||
|
Tokens int64 `json:"tokens"`
|
||||||
|
LastAccessedAt int64 `json:"last_accessed_at"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *InternalHandler) FlushStats(c *gin.Context) {
|
||||||
|
if h == nil || h.db == nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "database not configured"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var req statsFlushRequest
|
||||||
|
if err := c.ShouldBindJSON(&req); err != nil {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(req.Keys) == 0 {
|
||||||
|
c.JSON(http.StatusOK, gin.H{"updated": 0})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
updated := 0
|
||||||
|
err := h.db.Transaction(func(tx *gorm.DB) error {
|
||||||
|
for _, entry := range req.Keys {
|
||||||
|
hash := strings.TrimSpace(entry.TokenHash)
|
||||||
|
if hash == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if entry.Requests < 0 || entry.Tokens < 0 || entry.LastAccessedAt < 0 {
|
||||||
|
return gorm.ErrInvalidData
|
||||||
|
}
|
||||||
|
|
||||||
|
updates := map[string]any{}
|
||||||
|
if entry.Requests > 0 {
|
||||||
|
updates["request_count"] = gorm.Expr("request_count + ?", entry.Requests)
|
||||||
|
}
|
||||||
|
if entry.Tokens > 0 {
|
||||||
|
updates["used_tokens"] = gorm.Expr("used_tokens + ?", entry.Tokens)
|
||||||
|
updates["quota_used"] = gorm.Expr("CASE WHEN quota_limit >= 0 THEN quota_used + ? ELSE quota_used END", entry.Tokens)
|
||||||
|
}
|
||||||
|
if entry.LastAccessedAt > 0 {
|
||||||
|
accessTime := time.Unix(entry.LastAccessedAt, 0).UTC()
|
||||||
|
updates["last_accessed_at"] = gorm.Expr("CASE WHEN last_accessed_at IS NULL OR last_accessed_at < ? THEN ? ELSE last_accessed_at END", accessTime, accessTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(updates) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
res := tx.Model(&model.Key{}).Where("token_hash = ?", hash).Updates(updates)
|
||||||
|
if res.Error != nil {
|
||||||
|
return res.Error
|
||||||
|
}
|
||||||
|
if res.RowsAffected > 0 {
|
||||||
|
updated++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
if err == gorm.ErrInvalidData {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid stats payload"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to flush stats", "details": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, gin.H{"updated": updated})
|
||||||
|
}
|
||||||
104
internal/api/internal_handler_test.go
Normal file
104
internal/api/internal_handler_test.go
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ez-api/ez-api/internal/model"
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"gorm.io/driver/sqlite"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInternalHandler_FlushStatsUpdatesCounters(t *testing.T) {
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
|
||||||
|
db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("open sqlite: %v", err)
|
||||||
|
}
|
||||||
|
if err := db.AutoMigrate(&model.Key{}); err != nil {
|
||||||
|
t.Fatalf("migrate: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
key1 := model.Key{
|
||||||
|
MasterID: 1,
|
||||||
|
IssuedAtEpoch: 1,
|
||||||
|
TokenHash: "hash-1",
|
||||||
|
RequestCount: 5,
|
||||||
|
UsedTokens: 20,
|
||||||
|
QuotaLimit: 100,
|
||||||
|
QuotaUsed: 10,
|
||||||
|
}
|
||||||
|
key2 := model.Key{
|
||||||
|
MasterID: 1,
|
||||||
|
IssuedAtEpoch: 1,
|
||||||
|
TokenHash: "hash-2",
|
||||||
|
RequestCount: 0,
|
||||||
|
UsedTokens: 0,
|
||||||
|
QuotaLimit: -1,
|
||||||
|
QuotaUsed: 7,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.Create(&key1).Error; err != nil {
|
||||||
|
t.Fatalf("create key1: %v", err)
|
||||||
|
}
|
||||||
|
if err := db.Create(&key2).Error; err != nil {
|
||||||
|
t.Fatalf("create key2: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
handler := NewInternalHandler(db)
|
||||||
|
r := gin.New()
|
||||||
|
r.POST("/internal/stats/flush", handler.FlushStats)
|
||||||
|
|
||||||
|
body := []byte(`{
|
||||||
|
"keys": [
|
||||||
|
{"token_hash": "hash-1", "requests": 3, "tokens": 15, "last_accessed_at": 1700000000},
|
||||||
|
{"token_hash": "hash-2", "requests": 1, "tokens": 5, "last_accessed_at": 1700000010}
|
||||||
|
]
|
||||||
|
}`)
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/internal/stats/flush", bytes.NewReader(body))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
r.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Fatalf("unexpected status: got=%d body=%s", rec.Code, rec.Body.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
var got1 model.Key
|
||||||
|
if err := db.First(&got1, "token_hash = ?", "hash-1").Error; err != nil {
|
||||||
|
t.Fatalf("load key1: %v", err)
|
||||||
|
}
|
||||||
|
if got1.RequestCount != 8 {
|
||||||
|
t.Fatalf("key1 request_count: got=%d", got1.RequestCount)
|
||||||
|
}
|
||||||
|
if got1.UsedTokens != 35 {
|
||||||
|
t.Fatalf("key1 used_tokens: got=%d", got1.UsedTokens)
|
||||||
|
}
|
||||||
|
if got1.QuotaUsed != 25 {
|
||||||
|
t.Fatalf("key1 quota_used: got=%d", got1.QuotaUsed)
|
||||||
|
}
|
||||||
|
if got1.LastAccessedAt == nil || got1.LastAccessedAt.Unix() != 1700000000 {
|
||||||
|
t.Fatalf("key1 last_accessed_at: got=%v", got1.LastAccessedAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
var got2 model.Key
|
||||||
|
if err := db.First(&got2, "token_hash = ?", "hash-2").Error; err != nil {
|
||||||
|
t.Fatalf("load key2: %v", err)
|
||||||
|
}
|
||||||
|
if got2.RequestCount != 1 {
|
||||||
|
t.Fatalf("key2 request_count: got=%d", got2.RequestCount)
|
||||||
|
}
|
||||||
|
if got2.UsedTokens != 5 {
|
||||||
|
t.Fatalf("key2 used_tokens: got=%d", got2.UsedTokens)
|
||||||
|
}
|
||||||
|
if got2.QuotaUsed != 7 {
|
||||||
|
t.Fatalf("key2 quota_used: got=%d", got2.QuotaUsed)
|
||||||
|
}
|
||||||
|
if got2.LastAccessedAt == nil || got2.LastAccessedAt.UTC().Unix() != time.Unix(1700000010, 0).UTC().Unix() {
|
||||||
|
t.Fatalf("key2 last_accessed_at: got=%v", got2.LastAccessedAt)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -17,6 +17,7 @@ type Config struct {
|
|||||||
Auth AuthConfig
|
Auth AuthConfig
|
||||||
ModelRegistry ModelRegistryConfig
|
ModelRegistry ModelRegistryConfig
|
||||||
Quota QuotaConfig
|
Quota QuotaConfig
|
||||||
|
Internal InternalConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServerConfig struct {
|
type ServerConfig struct {
|
||||||
@@ -57,6 +58,10 @@ type QuotaConfig struct {
|
|||||||
ResetIntervalSeconds int
|
ResetIntervalSeconds int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type InternalConfig struct {
|
||||||
|
StatsToken string
|
||||||
|
}
|
||||||
|
|
||||||
func Load() (*Config, error) {
|
func Load() (*Config, error) {
|
||||||
v := viper.New()
|
v := viper.New()
|
||||||
|
|
||||||
@@ -77,6 +82,7 @@ func Load() (*Config, error) {
|
|||||||
v.SetDefault("model_registry.cache_dir", "./data/model-registry")
|
v.SetDefault("model_registry.cache_dir", "./data/model-registry")
|
||||||
v.SetDefault("model_registry.timeout_seconds", 30)
|
v.SetDefault("model_registry.timeout_seconds", 30)
|
||||||
v.SetDefault("quota.reset_interval_seconds", 300)
|
v.SetDefault("quota.reset_interval_seconds", 300)
|
||||||
|
v.SetDefault("internal.stats_token", "")
|
||||||
|
|
||||||
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
||||||
v.AutomaticEnv()
|
v.AutomaticEnv()
|
||||||
@@ -98,6 +104,7 @@ func Load() (*Config, error) {
|
|||||||
_ = v.BindEnv("model_registry.cache_dir", "EZ_MODEL_REGISTRY_CACHE_DIR")
|
_ = v.BindEnv("model_registry.cache_dir", "EZ_MODEL_REGISTRY_CACHE_DIR")
|
||||||
_ = v.BindEnv("model_registry.timeout_seconds", "EZ_MODEL_REGISTRY_TIMEOUT_SECONDS")
|
_ = v.BindEnv("model_registry.timeout_seconds", "EZ_MODEL_REGISTRY_TIMEOUT_SECONDS")
|
||||||
_ = v.BindEnv("quota.reset_interval_seconds", "EZ_QUOTA_RESET_INTERVAL_SECONDS")
|
_ = v.BindEnv("quota.reset_interval_seconds", "EZ_QUOTA_RESET_INTERVAL_SECONDS")
|
||||||
|
_ = v.BindEnv("internal.stats_token", "EZ_INTERNAL_STATS_TOKEN")
|
||||||
|
|
||||||
if configFile := os.Getenv("EZ_CONFIG_FILE"); configFile != "" {
|
if configFile := os.Getenv("EZ_CONFIG_FILE"); configFile != "" {
|
||||||
v.SetConfigFile(configFile)
|
v.SetConfigFile(configFile)
|
||||||
@@ -146,6 +153,9 @@ func Load() (*Config, error) {
|
|||||||
Quota: QuotaConfig{
|
Quota: QuotaConfig{
|
||||||
ResetIntervalSeconds: v.GetInt("quota.reset_interval_seconds"),
|
ResetIntervalSeconds: v.GetInt("quota.reset_interval_seconds"),
|
||||||
},
|
},
|
||||||
|
Internal: InternalConfig{
|
||||||
|
StatsToken: v.GetString("internal.stats_token"),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
|
|||||||
26
internal/middleware/internal_auth.go
Normal file
26
internal/middleware/internal_auth.go
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
package middleware
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
)
|
||||||
|
|
||||||
|
func InternalAuthMiddleware(expectedToken string) gin.HandlerFunc {
|
||||||
|
expectedToken = strings.TrimSpace(expectedToken)
|
||||||
|
return func(c *gin.Context) {
|
||||||
|
if expectedToken == "" {
|
||||||
|
c.Next()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
token := strings.TrimSpace(c.GetHeader("X-Internal-Token"))
|
||||||
|
if token == "" || token != expectedToken {
|
||||||
|
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid internal token"})
|
||||||
|
c.Abort()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.Next()
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user