feat(api): add realtime stats endpoints for masters

Introduce StatsService integration to admin and master handlers,
exposing realtime metrics (requests, tokens, QPS, rate limit status)
via new endpoints:
- GET /admin/masters/:id/realtime
- GET /v1/realtime

Also embed realtime stats in the existing GET /admin/masters/:id
response and change GlobalQPS default to 0 with validation to
reject negative values.
This commit is contained in:
zenfun
2025-12-22 12:02:27 +08:00
parent fa7f92c6e3
commit 2c5ccd56ee
12 changed files with 404 additions and 27 deletions

View File

@@ -171,12 +171,13 @@ func main() {
fatal(logger, "failed to create admin service", "err", err)
}
masterService := service.NewMasterService(db)
statsService := service.NewStatsService(rdb)
healthService := service.NewHealthCheckService(db, rdb)
healthHandler := api.NewHealthHandler(healthService)
handler := api.NewHandler(db, logDB, syncService, logWriter, rdb, logPartitioner)
adminHandler := api.NewAdminHandler(db, logDB, masterService, syncService, logPartitioner)
masterHandler := api.NewMasterHandler(db, logDB, masterService, syncService, logPartitioner)
adminHandler := api.NewAdminHandler(db, logDB, masterService, syncService, statsService, logPartitioner)
masterHandler := api.NewMasterHandler(db, logDB, masterService, syncService, statsService, logPartitioner)
internalHandler := api.NewInternalHandler(db)
featureHandler := api.NewFeatureHandler(rdb)
modelRegistryService := service.NewModelRegistryService(db, rdb, service.ModelRegistryConfig{
@@ -255,6 +256,7 @@ func main() {
adminGroup.POST("/masters", adminHandler.CreateMaster)
adminGroup.GET("/masters", adminHandler.ListMasters)
adminGroup.GET("/masters/:id", adminHandler.GetMaster)
adminGroup.GET("/masters/:id/realtime", adminHandler.GetMasterRealtime)
adminGroup.PUT("/masters/:id", adminHandler.UpdateMaster)
adminGroup.DELETE("/masters/:id", adminHandler.DeleteMaster)
adminGroup.POST("/masters/batch", adminHandler.BatchMasters)
@@ -320,6 +322,7 @@ func main() {
masterGroup.DELETE("/tokens/:id", masterHandler.DeleteToken)
masterGroup.GET("/logs", masterHandler.ListSelfLogs)
masterGroup.GET("/logs/stats", masterHandler.GetSelfLogStats)
masterGroup.GET("/realtime", masterHandler.GetSelfRealtime)
masterGroup.GET("/stats", masterHandler.GetSelfStats)
}

View File

@@ -33,8 +33,9 @@ func newTestAdminHandler(t *testing.T) (*AdminHandler, *gorm.DB, *miniredis.Mini
mr := miniredis.RunT(t)
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
sync := service.NewSyncService(rdb)
stats := service.NewStatsService(rdb)
masterService := service.NewMasterService(db)
return NewAdminHandler(db, db, masterService, sync, nil), db, mr
return NewAdminHandler(db, db, masterService, sync, stats, nil), db, mr
}
func TestAdmin_BatchMasters_Status(t *testing.T) {

View File

@@ -17,14 +17,15 @@ type AdminHandler struct {
logDB *gorm.DB
masterService *service.MasterService
syncService *service.SyncService
statsService *service.StatsService
logPartitioner *service.LogPartitioner
}
func NewAdminHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService, partitioner *service.LogPartitioner) *AdminHandler {
func NewAdminHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService, statsService *service.StatsService, partitioner *service.LogPartitioner) *AdminHandler {
if logDB == nil {
logDB = db
}
return &AdminHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService, logPartitioner: partitioner}
return &AdminHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService, statsService: statsService, logPartitioner: partitioner}
}
func (h *AdminHandler) logDBConn() *gorm.DB {
@@ -68,8 +69,9 @@ func (h *AdminHandler) CreateMaster(c *gin.Context) {
if req.MaxChildKeys == 0 {
req.MaxChildKeys = 5
}
if req.GlobalQPS == 0 {
req.GlobalQPS = 3
if req.GlobalQPS < 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "global_qps must be >= 0"})
return
}
master, rawMasterKey, err := h.masterService.CreateMaster(req.Name, req.Group, req.MaxChildKeys, req.GlobalQPS)
@@ -105,6 +107,7 @@ type MasterView struct {
GlobalQPS int `json:"global_qps"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
Realtime *MasterRealtimeView `json:"realtime,omitempty"`
}
func toMasterView(m model.Master) MasterView {
@@ -176,7 +179,16 @@ func (h *AdminHandler) GetMaster(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{"error": "master not found"})
return
}
c.JSON(http.StatusOK, toMasterView(m))
view := toMasterView(m)
if h.statsService != nil {
if stats, err := h.statsService.GetMasterRealtimeSnapshot(c.Request.Context(), m.ID); err == nil {
if stats.QPSLimit == 0 && m.GlobalQPS > 0 {
stats.QPSLimit = int64(m.GlobalQPS)
}
view.Realtime = toMasterRealtimeView(stats)
}
}
c.JSON(http.StatusOK, view)
}
type UpdateMasterRequest struct {
@@ -233,7 +245,11 @@ func (h *AdminHandler) UpdateMaster(c *gin.Context) {
if req.MaxChildKeys != nil && *req.MaxChildKeys > 0 {
update["max_child_keys"] = *req.MaxChildKeys
}
if req.GlobalQPS != nil && *req.GlobalQPS > 0 {
if req.GlobalQPS != nil {
if *req.GlobalQPS < 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "global_qps must be >= 0"})
return
}
update["global_qps"] = *req.GlobalQPS
}
if len(update) == 0 && !req.PropagateToKeys {

View File

@@ -34,7 +34,8 @@ func TestAdmin_IssueChildKeyForMaster_IssuedByAdminAndSynced(t *testing.T) {
syncService := service.NewSyncService(rdb)
masterService := service.NewMasterService(db)
adminHandler := NewAdminHandler(db, db, masterService, syncService, nil)
statsService := service.NewStatsService(rdb)
adminHandler := NewAdminHandler(db, db, masterService, syncService, statsService, nil)
m, _, err := masterService.CreateMaster("m1", "default", 5, 10)
if err != nil {

View File

@@ -18,14 +18,15 @@ type MasterHandler struct {
logDB *gorm.DB
masterService *service.MasterService
syncService *service.SyncService
statsService *service.StatsService
logPartitioner *service.LogPartitioner
}
func NewMasterHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService, partitioner *service.LogPartitioner) *MasterHandler {
func NewMasterHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService, statsService *service.StatsService, partitioner *service.LogPartitioner) *MasterHandler {
if logDB == nil {
logDB = db
}
return &MasterHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService, logPartitioner: partitioner}
return &MasterHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService, statsService: statsService, logPartitioner: partitioner}
}
func (h *MasterHandler) logDBConn() *gorm.DB {

View File

@@ -42,7 +42,8 @@ func TestMaster_ListTokens_AndUpdateToken(t *testing.T) {
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
syncSvc := service.NewSyncService(rdb)
masterSvc := service.NewMasterService(db)
h := NewMasterHandler(db, db, masterSvc, syncSvc, nil)
statsSvc := service.NewStatsService(rdb)
h := NewMasterHandler(db, db, masterSvc, syncSvc, statsSvc, nil)
withMaster := func(next gin.HandlerFunc) gin.HandlerFunc {
return func(c *gin.Context) {

View File

@@ -0,0 +1,114 @@
package api
import (
"errors"
"net/http"
"strconv"
"strings"
"github.com/ez-api/ez-api/internal/model"
"github.com/ez-api/ez-api/internal/service"
"github.com/gin-gonic/gin"
"gorm.io/gorm"
)
type MasterRealtimeView struct {
Requests int64 `json:"requests"`
Tokens int64 `json:"tokens"`
QPS int64 `json:"qps"`
QPSLimit int64 `json:"qps_limit"`
RateLimited bool `json:"rate_limited"`
UpdatedAt *int64 `json:"updated_at,omitempty"`
}
func toMasterRealtimeView(stats service.MasterRealtimeSnapshot) *MasterRealtimeView {
var updatedAt *int64
if stats.UpdatedAt != nil {
sec := stats.UpdatedAt.Unix()
updatedAt = &sec
}
return &MasterRealtimeView{
Requests: stats.Requests,
Tokens: stats.Tokens,
QPS: stats.QPS,
QPSLimit: stats.QPSLimit,
RateLimited: stats.RateLimited,
UpdatedAt: updatedAt,
}
}
// GetMasterRealtime godoc
// @Summary Master realtime stats (admin)
// @Description Return realtime counters for the specified master
// @Tags admin
// @Produce json
// @Security AdminAuth
// @Param id path int true "Master ID"
// @Success 200 {object} MasterRealtimeView
// @Failure 400 {object} gin.H
// @Failure 404 {object} gin.H
// @Failure 500 {object} gin.H
// @Router /admin/masters/{id}/realtime [get]
func (h *AdminHandler) GetMasterRealtime(c *gin.Context) {
idRaw := strings.TrimSpace(c.Param("id"))
idU64, err := strconv.ParseUint(idRaw, 10, 64)
if err != nil || idU64 == 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid master id"})
return
}
if h.statsService == nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "stats service not configured"})
return
}
var m model.Master
if err := h.db.Select("id", "global_qps").First(&m, uint(idU64)).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "master not found"})
return
}
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load master", "details": err.Error()})
return
}
stats, err := h.statsService.GetMasterRealtimeSnapshot(c.Request.Context(), uint(idU64))
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load realtime stats", "details": err.Error()})
return
}
if stats.QPSLimit == 0 && m.GlobalQPS > 0 {
stats.QPSLimit = int64(m.GlobalQPS)
}
c.JSON(http.StatusOK, toMasterRealtimeView(stats))
}
// GetSelfRealtime godoc
// @Summary Master realtime stats
// @Description Return realtime counters for the authenticated master
// @Tags master
// @Produce json
// @Security MasterAuth
// @Success 200 {object} MasterRealtimeView
// @Failure 401 {object} gin.H
// @Failure 500 {object} gin.H
// @Router /v1/realtime [get]
func (h *MasterHandler) GetSelfRealtime(c *gin.Context) {
master, exists := c.Get("master")
if !exists {
c.JSON(http.StatusUnauthorized, gin.H{"error": "master key not found in context"})
return
}
if h.statsService == nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "stats service not configured"})
return
}
m := master.(*model.Master)
stats, err := h.statsService.GetMasterRealtimeSnapshot(c.Request.Context(), m.ID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load realtime stats", "details": err.Error()})
return
}
if stats.QPSLimit == 0 && m.GlobalQPS > 0 {
stats.QPSLimit = int64(m.GlobalQPS)
}
c.JSON(http.StatusOK, toMasterRealtimeView(stats))
}

View File

@@ -0,0 +1,133 @@
package api
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/alicebob/miniredis/v2"
"github.com/ez-api/ez-api/internal/model"
"github.com/ez-api/ez-api/internal/service"
"github.com/gin-gonic/gin"
"github.com/redis/go-redis/v9"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func TestAdminMasterRealtimeEndpoints(t *testing.T) {
gin.SetMode(gin.TestMode)
dsn := fmt.Sprintf("file:%s?mode=memory&cache=shared", t.Name())
db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{})
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
if err := db.AutoMigrate(&model.Master{}); err != nil {
t.Fatalf("migrate: %v", err)
}
m := &model.Master{Name: "m1", Group: "g", Status: "active", Epoch: 1}
if err := db.Create(m).Error; err != nil {
t.Fatalf("create master: %v", err)
}
mr := miniredis.RunT(t)
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
mr.Set(fmt.Sprintf("master:stats:%d:requests", m.ID), "12")
mr.Set(fmt.Sprintf("master:stats:%d:tokens", m.ID), "34")
mr.HSet(fmt.Sprintf("master:rate:%d", m.ID), "qps", "2")
mr.HSet(fmt.Sprintf("master:rate:%d", m.ID), "limit", "5")
mr.HSet(fmt.Sprintf("master:rate:%d", m.ID), "blocked", "0")
mr.HSet(fmt.Sprintf("master:rate:%d", m.ID), "updated_at", "1700000200")
syncSvc := service.NewSyncService(rdb)
masterSvc := service.NewMasterService(db)
statsSvc := service.NewStatsService(rdb)
adminHandler := NewAdminHandler(db, db, masterSvc, syncSvc, statsSvc, nil)
r := gin.New()
r.GET("/admin/masters/:id", adminHandler.GetMaster)
r.GET("/admin/masters/:id/realtime", adminHandler.GetMasterRealtime)
rr := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/admin/masters/%d", m.ID), nil)
r.ServeHTTP(rr, req)
if rr.Code != http.StatusOK {
t.Fatalf("expected 200, got %d body=%s", rr.Code, rr.Body.String())
}
var view MasterView
if err := json.Unmarshal(rr.Body.Bytes(), &view); err != nil {
t.Fatalf("unmarshal master view: %v", err)
}
if view.Realtime == nil || view.Realtime.QPS != 2 || view.Realtime.QPSLimit != 5 {
t.Fatalf("unexpected realtime in master view: %+v", view.Realtime)
}
rr = httptest.NewRecorder()
req = httptest.NewRequest(http.MethodGet, fmt.Sprintf("/admin/masters/%d/realtime", m.ID), nil)
r.ServeHTTP(rr, req)
if rr.Code != http.StatusOK {
t.Fatalf("expected 200, got %d body=%s", rr.Code, rr.Body.String())
}
var realtime MasterRealtimeView
if err := json.Unmarshal(rr.Body.Bytes(), &realtime); err != nil {
t.Fatalf("unmarshal realtime: %v", err)
}
if realtime.Requests != 12 || realtime.Tokens != 34 || realtime.QPS != 2 {
t.Fatalf("unexpected realtime payload: %+v", realtime)
}
}
func TestMasterSelfRealtime(t *testing.T) {
gin.SetMode(gin.TestMode)
dsn := fmt.Sprintf("file:%s?mode=memory&cache=shared", t.Name())
db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{})
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
if err := db.AutoMigrate(&model.Master{}); err != nil {
t.Fatalf("migrate: %v", err)
}
m := &model.Master{Name: "m1", Group: "g", Status: "active", Epoch: 1}
if err := db.Create(m).Error; err != nil {
t.Fatalf("create master: %v", err)
}
mr := miniredis.RunT(t)
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
mr.Set(fmt.Sprintf("master:stats:%d:requests", m.ID), "6")
mr.Set(fmt.Sprintf("master:stats:%d:tokens", m.ID), "8")
syncSvc := service.NewSyncService(rdb)
masterSvc := service.NewMasterService(db)
statsSvc := service.NewStatsService(rdb)
masterHandler := NewMasterHandler(db, db, masterSvc, syncSvc, statsSvc, nil)
withMaster := func(next gin.HandlerFunc) gin.HandlerFunc {
return func(c *gin.Context) {
c.Set("master", m)
next(c)
}
}
r := gin.New()
r.GET("/v1/realtime", withMaster(masterHandler.GetSelfRealtime))
rr := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/v1/realtime", nil)
r.ServeHTTP(rr, req)
if rr.Code != http.StatusOK {
t.Fatalf("expected 200, got %d body=%s", rr.Code, rr.Body.String())
}
var realtime MasterRealtimeView
if err := json.Unmarshal(rr.Body.Bytes(), &realtime); err != nil {
t.Fatalf("unmarshal realtime: %v", err)
}
if realtime.Requests != 6 || realtime.Tokens != 8 {
t.Fatalf("unexpected realtime payload: %+v", realtime)
}
}

View File

@@ -72,7 +72,8 @@ func TestMasterStats_AggregatesByKeyAndModel(t *testing.T) {
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
masterSvc := service.NewMasterService(db)
syncSvc := service.NewSyncService(rdb)
h := NewMasterHandler(db, db, masterSvc, syncSvc, nil)
statsSvc := service.NewStatsService(rdb)
h := NewMasterHandler(db, db, masterSvc, syncSvc, statsSvc, nil)
withMaster := func(next gin.HandlerFunc) gin.HandlerFunc {
return func(c *gin.Context) {
@@ -163,7 +164,8 @@ func TestAdminStats_AggregatesByProvider(t *testing.T) {
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
masterSvc := service.NewMasterService(db)
syncSvc := service.NewSyncService(rdb)
adminHandler := NewAdminHandler(db, db, masterSvc, syncSvc, nil)
statsSvc := service.NewStatsService(rdb)
adminHandler := NewAdminHandler(db, db, masterSvc, syncSvc, statsSvc, nil)
r := gin.New()
r.GET("/admin/stats", adminHandler.GetAdminStats)

View File

@@ -20,7 +20,7 @@ type Master struct {
Epoch int64 `gorm:"default:1" json:"epoch"` // used for revocation/rotation
Status string `gorm:"size:50;default:'active'" json:"status"` // active, suspended
MaxChildKeys int `gorm:"default:5" json:"max_child_keys"`
GlobalQPS int `gorm:"default:3" json:"global_qps"`
GlobalQPS int `gorm:"default:0" json:"global_qps"`
}
// Key represents a child access token issued by a Master.

View File

@@ -20,6 +20,15 @@ type RealtimeStats struct {
LastAccessedAt *time.Time
}
type MasterRealtimeSnapshot struct {
Requests int64
Tokens int64
QPS int64
QPSLimit int64
RateLimited bool
UpdatedAt *time.Time
}
func NewStatsService(rdb *redis.Client) *StatsService {
return &StatsService{rdb: rdb}
}
@@ -77,3 +86,70 @@ func (s *StatsService) GetMasterRealtimeStats(ctx context.Context, masterID uint
}
return RealtimeStats{Requests: reqs, Tokens: tokens}, nil
}
func (s *StatsService) GetMasterRealtimeSnapshot(ctx context.Context, masterID uint) (MasterRealtimeSnapshot, error) {
if s == nil || s.rdb == nil {
return MasterRealtimeSnapshot{}, fmt.Errorf("redis client is required")
}
if masterID == 0 {
return MasterRealtimeSnapshot{}, fmt.Errorf("master id required")
}
if ctx == nil {
ctx = context.Background()
}
statsKey := fmt.Sprintf("master:stats:%d", masterID)
rateKey := fmt.Sprintf("master:rate:%d", masterID)
pipe := s.rdb.Pipeline()
reqCmd := pipe.Get(ctx, statsKey+":requests")
tokCmd := pipe.Get(ctx, statsKey+":tokens")
qpsCmd := pipe.HGet(ctx, rateKey, "qps")
limitCmd := pipe.HGet(ctx, rateKey, "limit")
blockedCmd := pipe.HGet(ctx, rateKey, "blocked")
updatedCmd := pipe.HGet(ctx, rateKey, "updated_at")
if _, err := pipe.Exec(ctx); err != nil && err != redis.Nil {
return MasterRealtimeSnapshot{}, fmt.Errorf("read realtime stats: %w", err)
}
requests := readCmdInt64(reqCmd)
tokens := readCmdInt64(tokCmd)
qps := readCmdInt64(qpsCmd)
limit := readCmdInt64(limitCmd)
blocked := readCmdInt64(blockedCmd) == 1
updatedAt := readCmdTime(updatedCmd)
if limit < 0 {
limit = 0
}
return MasterRealtimeSnapshot{
Requests: requests,
Tokens: tokens,
QPS: qps,
QPSLimit: limit,
RateLimited: blocked,
UpdatedAt: updatedAt,
}, nil
}
func readCmdInt64(cmd *redis.StringCmd) int64 {
if cmd == nil {
return 0
}
v, err := cmd.Int64()
if err != nil {
return 0
}
return v
}
func readCmdTime(cmd *redis.StringCmd) *time.Time {
if cmd == nil {
return nil
}
v, err := cmd.Int64()
if err != nil || v <= 0 {
return nil
}
tm := time.Unix(v, 0).UTC()
return &tm
}

View File

@@ -50,3 +50,32 @@ func TestStatsService_MasterRealtimeStats(t *testing.T) {
t.Fatalf("unexpected stats: %+v", stats)
}
}
func TestStatsService_MasterRealtimeSnapshot(t *testing.T) {
t.Parallel()
mr := miniredis.RunT(t)
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
svc := NewStatsService(rdb)
mr.Set("master:stats:12:requests", "9")
mr.Set("master:stats:12:tokens", "18")
mr.HSet("master:rate:12", "qps", "3")
mr.HSet("master:rate:12", "limit", "10")
mr.HSet("master:rate:12", "blocked", "1")
mr.HSet("master:rate:12", "updated_at", "1700000100")
stats, err := svc.GetMasterRealtimeSnapshot(context.Background(), 12)
if err != nil {
t.Fatalf("GetMasterRealtimeSnapshot: %v", err)
}
if stats.Requests != 9 || stats.Tokens != 18 {
t.Fatalf("unexpected totals: %+v", stats)
}
if stats.QPS != 3 || stats.QPSLimit != 10 || !stats.RateLimited {
t.Fatalf("unexpected rate stats: %+v", stats)
}
if stats.UpdatedAt == nil || !stats.UpdatedAt.Equal(time.Unix(1700000100, 0).UTC()) {
t.Fatalf("unexpected updated_at: %+v", stats.UpdatedAt)
}
}