From 2c5ccd56ee520a2a19860736f543733cd458adf8 Mon Sep 17 00:00:00 2001 From: zenfun Date: Mon, 22 Dec 2025 12:02:27 +0800 Subject: [PATCH] feat(api): add realtime stats endpoints for masters Introduce StatsService integration to admin and master handlers, exposing realtime metrics (requests, tokens, QPS, rate limit status) via new endpoints: - GET /admin/masters/:id/realtime - GET /v1/realtime Also embed realtime stats in the existing GET /admin/masters/:id response and change GlobalQPS default to 0 with validation to reject negative values. --- cmd/server/main.go | 7 +- internal/api/admin_batch_handler_test.go | 3 +- internal/api/admin_handler.go | 50 +++++--- internal/api/admin_issue_key_test.go | 3 +- internal/api/master_handler.go | 5 +- internal/api/master_tokens_handler_test.go | 3 +- internal/api/realtime_handler.go | 114 ++++++++++++++++++ internal/api/realtime_handler_test.go | 133 +++++++++++++++++++++ internal/api/stats_handler_test.go | 6 +- internal/model/models.go | 2 +- internal/service/stats.go | 76 ++++++++++++ internal/service/stats_test.go | 29 +++++ 12 files changed, 404 insertions(+), 27 deletions(-) create mode 100644 internal/api/realtime_handler.go create mode 100644 internal/api/realtime_handler_test.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 6af8a39..4dd8bf3 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -171,12 +171,13 @@ func main() { fatal(logger, "failed to create admin service", "err", err) } masterService := service.NewMasterService(db) + statsService := service.NewStatsService(rdb) healthService := service.NewHealthCheckService(db, rdb) healthHandler := api.NewHealthHandler(healthService) handler := api.NewHandler(db, logDB, syncService, logWriter, rdb, logPartitioner) - adminHandler := api.NewAdminHandler(db, logDB, masterService, syncService, logPartitioner) - masterHandler := api.NewMasterHandler(db, logDB, masterService, syncService, logPartitioner) + adminHandler := api.NewAdminHandler(db, logDB, masterService, syncService, statsService, logPartitioner) + masterHandler := api.NewMasterHandler(db, logDB, masterService, syncService, statsService, logPartitioner) internalHandler := api.NewInternalHandler(db) featureHandler := api.NewFeatureHandler(rdb) modelRegistryService := service.NewModelRegistryService(db, rdb, service.ModelRegistryConfig{ @@ -255,6 +256,7 @@ func main() { adminGroup.POST("/masters", adminHandler.CreateMaster) adminGroup.GET("/masters", adminHandler.ListMasters) adminGroup.GET("/masters/:id", adminHandler.GetMaster) + adminGroup.GET("/masters/:id/realtime", adminHandler.GetMasterRealtime) adminGroup.PUT("/masters/:id", adminHandler.UpdateMaster) adminGroup.DELETE("/masters/:id", adminHandler.DeleteMaster) adminGroup.POST("/masters/batch", adminHandler.BatchMasters) @@ -320,6 +322,7 @@ func main() { masterGroup.DELETE("/tokens/:id", masterHandler.DeleteToken) masterGroup.GET("/logs", masterHandler.ListSelfLogs) masterGroup.GET("/logs/stats", masterHandler.GetSelfLogStats) + masterGroup.GET("/realtime", masterHandler.GetSelfRealtime) masterGroup.GET("/stats", masterHandler.GetSelfStats) } diff --git a/internal/api/admin_batch_handler_test.go b/internal/api/admin_batch_handler_test.go index f048627..3a46ad4 100644 --- a/internal/api/admin_batch_handler_test.go +++ b/internal/api/admin_batch_handler_test.go @@ -33,8 +33,9 @@ func newTestAdminHandler(t *testing.T) (*AdminHandler, *gorm.DB, *miniredis.Mini mr := miniredis.RunT(t) rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) sync := service.NewSyncService(rdb) + stats := service.NewStatsService(rdb) masterService := service.NewMasterService(db) - return NewAdminHandler(db, db, masterService, sync, nil), db, mr + return NewAdminHandler(db, db, masterService, sync, stats, nil), db, mr } func TestAdmin_BatchMasters_Status(t *testing.T) { diff --git a/internal/api/admin_handler.go b/internal/api/admin_handler.go index 493146a..d57b2b7 100644 --- a/internal/api/admin_handler.go +++ b/internal/api/admin_handler.go @@ -17,14 +17,15 @@ type AdminHandler struct { logDB *gorm.DB masterService *service.MasterService syncService *service.SyncService + statsService *service.StatsService logPartitioner *service.LogPartitioner } -func NewAdminHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService, partitioner *service.LogPartitioner) *AdminHandler { +func NewAdminHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService, statsService *service.StatsService, partitioner *service.LogPartitioner) *AdminHandler { if logDB == nil { logDB = db } - return &AdminHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService, logPartitioner: partitioner} + return &AdminHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService, statsService: statsService, logPartitioner: partitioner} } func (h *AdminHandler) logDBConn() *gorm.DB { @@ -68,8 +69,9 @@ func (h *AdminHandler) CreateMaster(c *gin.Context) { if req.MaxChildKeys == 0 { req.MaxChildKeys = 5 } - if req.GlobalQPS == 0 { - req.GlobalQPS = 3 + if req.GlobalQPS < 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "global_qps must be >= 0"}) + return } master, rawMasterKey, err := h.masterService.CreateMaster(req.Name, req.Group, req.MaxChildKeys, req.GlobalQPS) @@ -94,17 +96,18 @@ func (h *AdminHandler) CreateMaster(c *gin.Context) { } type MasterView struct { - ID uint `json:"id"` - Name string `json:"name"` - Group string `json:"group"` - DefaultNamespace string `json:"default_namespace"` - Namespaces string `json:"namespaces"` - Epoch int64 `json:"epoch"` - Status string `json:"status"` - MaxChildKeys int `json:"max_child_keys"` - GlobalQPS int `json:"global_qps"` - CreatedAt int64 `json:"created_at"` - UpdatedAt int64 `json:"updated_at"` + ID uint `json:"id"` + Name string `json:"name"` + Group string `json:"group"` + DefaultNamespace string `json:"default_namespace"` + Namespaces string `json:"namespaces"` + Epoch int64 `json:"epoch"` + Status string `json:"status"` + MaxChildKeys int `json:"max_child_keys"` + GlobalQPS int `json:"global_qps"` + CreatedAt int64 `json:"created_at"` + UpdatedAt int64 `json:"updated_at"` + Realtime *MasterRealtimeView `json:"realtime,omitempty"` } func toMasterView(m model.Master) MasterView { @@ -176,7 +179,16 @@ func (h *AdminHandler) GetMaster(c *gin.Context) { c.JSON(http.StatusNotFound, gin.H{"error": "master not found"}) return } - c.JSON(http.StatusOK, toMasterView(m)) + view := toMasterView(m) + if h.statsService != nil { + if stats, err := h.statsService.GetMasterRealtimeSnapshot(c.Request.Context(), m.ID); err == nil { + if stats.QPSLimit == 0 && m.GlobalQPS > 0 { + stats.QPSLimit = int64(m.GlobalQPS) + } + view.Realtime = toMasterRealtimeView(stats) + } + } + c.JSON(http.StatusOK, view) } type UpdateMasterRequest struct { @@ -233,7 +245,11 @@ func (h *AdminHandler) UpdateMaster(c *gin.Context) { if req.MaxChildKeys != nil && *req.MaxChildKeys > 0 { update["max_child_keys"] = *req.MaxChildKeys } - if req.GlobalQPS != nil && *req.GlobalQPS > 0 { + if req.GlobalQPS != nil { + if *req.GlobalQPS < 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "global_qps must be >= 0"}) + return + } update["global_qps"] = *req.GlobalQPS } if len(update) == 0 && !req.PropagateToKeys { diff --git a/internal/api/admin_issue_key_test.go b/internal/api/admin_issue_key_test.go index e4dd98b..69ad058 100644 --- a/internal/api/admin_issue_key_test.go +++ b/internal/api/admin_issue_key_test.go @@ -34,7 +34,8 @@ func TestAdmin_IssueChildKeyForMaster_IssuedByAdminAndSynced(t *testing.T) { syncService := service.NewSyncService(rdb) masterService := service.NewMasterService(db) - adminHandler := NewAdminHandler(db, db, masterService, syncService, nil) + statsService := service.NewStatsService(rdb) + adminHandler := NewAdminHandler(db, db, masterService, syncService, statsService, nil) m, _, err := masterService.CreateMaster("m1", "default", 5, 10) if err != nil { diff --git a/internal/api/master_handler.go b/internal/api/master_handler.go index cede56c..991d048 100644 --- a/internal/api/master_handler.go +++ b/internal/api/master_handler.go @@ -18,14 +18,15 @@ type MasterHandler struct { logDB *gorm.DB masterService *service.MasterService syncService *service.SyncService + statsService *service.StatsService logPartitioner *service.LogPartitioner } -func NewMasterHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService, partitioner *service.LogPartitioner) *MasterHandler { +func NewMasterHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService, statsService *service.StatsService, partitioner *service.LogPartitioner) *MasterHandler { if logDB == nil { logDB = db } - return &MasterHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService, logPartitioner: partitioner} + return &MasterHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService, statsService: statsService, logPartitioner: partitioner} } func (h *MasterHandler) logDBConn() *gorm.DB { diff --git a/internal/api/master_tokens_handler_test.go b/internal/api/master_tokens_handler_test.go index 8980617..77f92d3 100644 --- a/internal/api/master_tokens_handler_test.go +++ b/internal/api/master_tokens_handler_test.go @@ -42,7 +42,8 @@ func TestMaster_ListTokens_AndUpdateToken(t *testing.T) { rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) syncSvc := service.NewSyncService(rdb) masterSvc := service.NewMasterService(db) - h := NewMasterHandler(db, db, masterSvc, syncSvc, nil) + statsSvc := service.NewStatsService(rdb) + h := NewMasterHandler(db, db, masterSvc, syncSvc, statsSvc, nil) withMaster := func(next gin.HandlerFunc) gin.HandlerFunc { return func(c *gin.Context) { diff --git a/internal/api/realtime_handler.go b/internal/api/realtime_handler.go new file mode 100644 index 0000000..a3261b8 --- /dev/null +++ b/internal/api/realtime_handler.go @@ -0,0 +1,114 @@ +package api + +import ( + "errors" + "net/http" + "strconv" + "strings" + + "github.com/ez-api/ez-api/internal/model" + "github.com/ez-api/ez-api/internal/service" + "github.com/gin-gonic/gin" + "gorm.io/gorm" +) + +type MasterRealtimeView struct { + Requests int64 `json:"requests"` + Tokens int64 `json:"tokens"` + QPS int64 `json:"qps"` + QPSLimit int64 `json:"qps_limit"` + RateLimited bool `json:"rate_limited"` + UpdatedAt *int64 `json:"updated_at,omitempty"` +} + +func toMasterRealtimeView(stats service.MasterRealtimeSnapshot) *MasterRealtimeView { + var updatedAt *int64 + if stats.UpdatedAt != nil { + sec := stats.UpdatedAt.Unix() + updatedAt = &sec + } + return &MasterRealtimeView{ + Requests: stats.Requests, + Tokens: stats.Tokens, + QPS: stats.QPS, + QPSLimit: stats.QPSLimit, + RateLimited: stats.RateLimited, + UpdatedAt: updatedAt, + } +} + +// GetMasterRealtime godoc +// @Summary Master realtime stats (admin) +// @Description Return realtime counters for the specified master +// @Tags admin +// @Produce json +// @Security AdminAuth +// @Param id path int true "Master ID" +// @Success 200 {object} MasterRealtimeView +// @Failure 400 {object} gin.H +// @Failure 404 {object} gin.H +// @Failure 500 {object} gin.H +// @Router /admin/masters/{id}/realtime [get] +func (h *AdminHandler) GetMasterRealtime(c *gin.Context) { + idRaw := strings.TrimSpace(c.Param("id")) + idU64, err := strconv.ParseUint(idRaw, 10, 64) + if err != nil || idU64 == 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid master id"}) + return + } + if h.statsService == nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "stats service not configured"}) + return + } + var m model.Master + if err := h.db.Select("id", "global_qps").First(&m, uint(idU64)).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "master not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load master", "details": err.Error()}) + return + } + + stats, err := h.statsService.GetMasterRealtimeSnapshot(c.Request.Context(), uint(idU64)) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load realtime stats", "details": err.Error()}) + return + } + if stats.QPSLimit == 0 && m.GlobalQPS > 0 { + stats.QPSLimit = int64(m.GlobalQPS) + } + c.JSON(http.StatusOK, toMasterRealtimeView(stats)) +} + +// GetSelfRealtime godoc +// @Summary Master realtime stats +// @Description Return realtime counters for the authenticated master +// @Tags master +// @Produce json +// @Security MasterAuth +// @Success 200 {object} MasterRealtimeView +// @Failure 401 {object} gin.H +// @Failure 500 {object} gin.H +// @Router /v1/realtime [get] +func (h *MasterHandler) GetSelfRealtime(c *gin.Context) { + master, exists := c.Get("master") + if !exists { + c.JSON(http.StatusUnauthorized, gin.H{"error": "master key not found in context"}) + return + } + if h.statsService == nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "stats service not configured"}) + return + } + m := master.(*model.Master) + stats, err := h.statsService.GetMasterRealtimeSnapshot(c.Request.Context(), m.ID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load realtime stats", "details": err.Error()}) + return + } + if stats.QPSLimit == 0 && m.GlobalQPS > 0 { + stats.QPSLimit = int64(m.GlobalQPS) + } + c.JSON(http.StatusOK, toMasterRealtimeView(stats)) +} diff --git a/internal/api/realtime_handler_test.go b/internal/api/realtime_handler_test.go new file mode 100644 index 0000000..4be3789 --- /dev/null +++ b/internal/api/realtime_handler_test.go @@ -0,0 +1,133 @@ +package api + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/alicebob/miniredis/v2" + "github.com/ez-api/ez-api/internal/model" + "github.com/ez-api/ez-api/internal/service" + "github.com/gin-gonic/gin" + "github.com/redis/go-redis/v9" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func TestAdminMasterRealtimeEndpoints(t *testing.T) { + gin.SetMode(gin.TestMode) + + dsn := fmt.Sprintf("file:%s?mode=memory&cache=shared", t.Name()) + db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{}) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + if err := db.AutoMigrate(&model.Master{}); err != nil { + t.Fatalf("migrate: %v", err) + } + + m := &model.Master{Name: "m1", Group: "g", Status: "active", Epoch: 1} + if err := db.Create(m).Error; err != nil { + t.Fatalf("create master: %v", err) + } + + mr := miniredis.RunT(t) + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + mr.Set(fmt.Sprintf("master:stats:%d:requests", m.ID), "12") + mr.Set(fmt.Sprintf("master:stats:%d:tokens", m.ID), "34") + mr.HSet(fmt.Sprintf("master:rate:%d", m.ID), "qps", "2") + mr.HSet(fmt.Sprintf("master:rate:%d", m.ID), "limit", "5") + mr.HSet(fmt.Sprintf("master:rate:%d", m.ID), "blocked", "0") + mr.HSet(fmt.Sprintf("master:rate:%d", m.ID), "updated_at", "1700000200") + + syncSvc := service.NewSyncService(rdb) + masterSvc := service.NewMasterService(db) + statsSvc := service.NewStatsService(rdb) + adminHandler := NewAdminHandler(db, db, masterSvc, syncSvc, statsSvc, nil) + + r := gin.New() + r.GET("/admin/masters/:id", adminHandler.GetMaster) + r.GET("/admin/masters/:id/realtime", adminHandler.GetMasterRealtime) + + rr := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/admin/masters/%d", m.ID), nil) + r.ServeHTTP(rr, req) + if rr.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rr.Code, rr.Body.String()) + } + var view MasterView + if err := json.Unmarshal(rr.Body.Bytes(), &view); err != nil { + t.Fatalf("unmarshal master view: %v", err) + } + if view.Realtime == nil || view.Realtime.QPS != 2 || view.Realtime.QPSLimit != 5 { + t.Fatalf("unexpected realtime in master view: %+v", view.Realtime) + } + + rr = httptest.NewRecorder() + req = httptest.NewRequest(http.MethodGet, fmt.Sprintf("/admin/masters/%d/realtime", m.ID), nil) + r.ServeHTTP(rr, req) + if rr.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rr.Code, rr.Body.String()) + } + var realtime MasterRealtimeView + if err := json.Unmarshal(rr.Body.Bytes(), &realtime); err != nil { + t.Fatalf("unmarshal realtime: %v", err) + } + if realtime.Requests != 12 || realtime.Tokens != 34 || realtime.QPS != 2 { + t.Fatalf("unexpected realtime payload: %+v", realtime) + } +} + +func TestMasterSelfRealtime(t *testing.T) { + gin.SetMode(gin.TestMode) + + dsn := fmt.Sprintf("file:%s?mode=memory&cache=shared", t.Name()) + db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{}) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + if err := db.AutoMigrate(&model.Master{}); err != nil { + t.Fatalf("migrate: %v", err) + } + + m := &model.Master{Name: "m1", Group: "g", Status: "active", Epoch: 1} + if err := db.Create(m).Error; err != nil { + t.Fatalf("create master: %v", err) + } + + mr := miniredis.RunT(t) + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + mr.Set(fmt.Sprintf("master:stats:%d:requests", m.ID), "6") + mr.Set(fmt.Sprintf("master:stats:%d:tokens", m.ID), "8") + + syncSvc := service.NewSyncService(rdb) + masterSvc := service.NewMasterService(db) + statsSvc := service.NewStatsService(rdb) + masterHandler := NewMasterHandler(db, db, masterSvc, syncSvc, statsSvc, nil) + + withMaster := func(next gin.HandlerFunc) gin.HandlerFunc { + return func(c *gin.Context) { + c.Set("master", m) + next(c) + } + } + + r := gin.New() + r.GET("/v1/realtime", withMaster(masterHandler.GetSelfRealtime)) + + rr := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/realtime", nil) + r.ServeHTTP(rr, req) + if rr.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rr.Code, rr.Body.String()) + } + var realtime MasterRealtimeView + if err := json.Unmarshal(rr.Body.Bytes(), &realtime); err != nil { + t.Fatalf("unmarshal realtime: %v", err) + } + if realtime.Requests != 6 || realtime.Tokens != 8 { + t.Fatalf("unexpected realtime payload: %+v", realtime) + } +} diff --git a/internal/api/stats_handler_test.go b/internal/api/stats_handler_test.go index dff1313..d79bd72 100644 --- a/internal/api/stats_handler_test.go +++ b/internal/api/stats_handler_test.go @@ -72,7 +72,8 @@ func TestMasterStats_AggregatesByKeyAndModel(t *testing.T) { rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) masterSvc := service.NewMasterService(db) syncSvc := service.NewSyncService(rdb) - h := NewMasterHandler(db, db, masterSvc, syncSvc, nil) + statsSvc := service.NewStatsService(rdb) + h := NewMasterHandler(db, db, masterSvc, syncSvc, statsSvc, nil) withMaster := func(next gin.HandlerFunc) gin.HandlerFunc { return func(c *gin.Context) { @@ -163,7 +164,8 @@ func TestAdminStats_AggregatesByProvider(t *testing.T) { rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) masterSvc := service.NewMasterService(db) syncSvc := service.NewSyncService(rdb) - adminHandler := NewAdminHandler(db, db, masterSvc, syncSvc, nil) + statsSvc := service.NewStatsService(rdb) + adminHandler := NewAdminHandler(db, db, masterSvc, syncSvc, statsSvc, nil) r := gin.New() r.GET("/admin/stats", adminHandler.GetAdminStats) diff --git a/internal/model/models.go b/internal/model/models.go index 3fb4347..9f07020 100644 --- a/internal/model/models.go +++ b/internal/model/models.go @@ -20,7 +20,7 @@ type Master struct { Epoch int64 `gorm:"default:1" json:"epoch"` // used for revocation/rotation Status string `gorm:"size:50;default:'active'" json:"status"` // active, suspended MaxChildKeys int `gorm:"default:5" json:"max_child_keys"` - GlobalQPS int `gorm:"default:3" json:"global_qps"` + GlobalQPS int `gorm:"default:0" json:"global_qps"` } // Key represents a child access token issued by a Master. diff --git a/internal/service/stats.go b/internal/service/stats.go index 83c3ae6..c647f74 100644 --- a/internal/service/stats.go +++ b/internal/service/stats.go @@ -20,6 +20,15 @@ type RealtimeStats struct { LastAccessedAt *time.Time } +type MasterRealtimeSnapshot struct { + Requests int64 + Tokens int64 + QPS int64 + QPSLimit int64 + RateLimited bool + UpdatedAt *time.Time +} + func NewStatsService(rdb *redis.Client) *StatsService { return &StatsService{rdb: rdb} } @@ -77,3 +86,70 @@ func (s *StatsService) GetMasterRealtimeStats(ctx context.Context, masterID uint } return RealtimeStats{Requests: reqs, Tokens: tokens}, nil } + +func (s *StatsService) GetMasterRealtimeSnapshot(ctx context.Context, masterID uint) (MasterRealtimeSnapshot, error) { + if s == nil || s.rdb == nil { + return MasterRealtimeSnapshot{}, fmt.Errorf("redis client is required") + } + if masterID == 0 { + return MasterRealtimeSnapshot{}, fmt.Errorf("master id required") + } + if ctx == nil { + ctx = context.Background() + } + statsKey := fmt.Sprintf("master:stats:%d", masterID) + rateKey := fmt.Sprintf("master:rate:%d", masterID) + + pipe := s.rdb.Pipeline() + reqCmd := pipe.Get(ctx, statsKey+":requests") + tokCmd := pipe.Get(ctx, statsKey+":tokens") + qpsCmd := pipe.HGet(ctx, rateKey, "qps") + limitCmd := pipe.HGet(ctx, rateKey, "limit") + blockedCmd := pipe.HGet(ctx, rateKey, "blocked") + updatedCmd := pipe.HGet(ctx, rateKey, "updated_at") + if _, err := pipe.Exec(ctx); err != nil && err != redis.Nil { + return MasterRealtimeSnapshot{}, fmt.Errorf("read realtime stats: %w", err) + } + + requests := readCmdInt64(reqCmd) + tokens := readCmdInt64(tokCmd) + qps := readCmdInt64(qpsCmd) + limit := readCmdInt64(limitCmd) + blocked := readCmdInt64(blockedCmd) == 1 + updatedAt := readCmdTime(updatedCmd) + if limit < 0 { + limit = 0 + } + + return MasterRealtimeSnapshot{ + Requests: requests, + Tokens: tokens, + QPS: qps, + QPSLimit: limit, + RateLimited: blocked, + UpdatedAt: updatedAt, + }, nil +} + +func readCmdInt64(cmd *redis.StringCmd) int64 { + if cmd == nil { + return 0 + } + v, err := cmd.Int64() + if err != nil { + return 0 + } + return v +} + +func readCmdTime(cmd *redis.StringCmd) *time.Time { + if cmd == nil { + return nil + } + v, err := cmd.Int64() + if err != nil || v <= 0 { + return nil + } + tm := time.Unix(v, 0).UTC() + return &tm +} diff --git a/internal/service/stats_test.go b/internal/service/stats_test.go index 0b19e9f..b14e6af 100644 --- a/internal/service/stats_test.go +++ b/internal/service/stats_test.go @@ -50,3 +50,32 @@ func TestStatsService_MasterRealtimeStats(t *testing.T) { t.Fatalf("unexpected stats: %+v", stats) } } + +func TestStatsService_MasterRealtimeSnapshot(t *testing.T) { + t.Parallel() + + mr := miniredis.RunT(t) + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + svc := NewStatsService(rdb) + + mr.Set("master:stats:12:requests", "9") + mr.Set("master:stats:12:tokens", "18") + mr.HSet("master:rate:12", "qps", "3") + mr.HSet("master:rate:12", "limit", "10") + mr.HSet("master:rate:12", "blocked", "1") + mr.HSet("master:rate:12", "updated_at", "1700000100") + + stats, err := svc.GetMasterRealtimeSnapshot(context.Background(), 12) + if err != nil { + t.Fatalf("GetMasterRealtimeSnapshot: %v", err) + } + if stats.Requests != 9 || stats.Tokens != 18 { + t.Fatalf("unexpected totals: %+v", stats) + } + if stats.QPS != 3 || stats.QPSLimit != 10 || !stats.RateLimited { + t.Fatalf("unexpected rate stats: %+v", stats) + } + if stats.UpdatedAt == nil || !stats.UpdatedAt.Equal(time.Unix(1700000100, 0).UTC()) { + t.Fatalf("unexpected updated_at: %+v", stats.UpdatedAt) + } +}