Files
ez-api/internal/service/stats.go
zenfun 53c18c3867 feat(api): add dashboard summary and system realtime endpoints
Add new admin API endpoints for dashboard metrics and system-wide
realtime statistics:

- Add /admin/dashboard/summary endpoint with aggregated metrics
  including requests, tokens, latency, masters, keys, and provider
  keys statistics with time period filtering
- Add /admin/realtime endpoint for system-level realtime stats
  aggregated across all masters
- Add status filter parameter to ListAPIKeys endpoint
- Add hour grouping option to log stats aggregation
- Update OpenAPI documentation with new endpoints and schemas
2025-12-31 13:17:23 +08:00

222 lines
6.1 KiB
Go

package service
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/redis/go-redis/v9"
)
type StatsService struct {
rdb *redis.Client
}
type RealtimeStats struct {
Requests int64
Tokens int64
LastAccessedAt *time.Time
}
type MasterRealtimeSnapshot struct {
Requests int64
Tokens int64
QPS int64
QPSLimit int64
RateLimited bool
UpdatedAt *time.Time
}
func NewStatsService(rdb *redis.Client) *StatsService {
return &StatsService{rdb: rdb}
}
func (s *StatsService) GetKeyRealtimeStats(ctx context.Context, tokenHash string) (RealtimeStats, error) {
if s == nil || s.rdb == nil {
return RealtimeStats{}, fmt.Errorf("redis client is required")
}
tokenHash = strings.TrimSpace(tokenHash)
if tokenHash == "" {
return RealtimeStats{}, fmt.Errorf("token hash required")
}
if ctx == nil {
ctx = context.Background()
}
reqs, err := s.rdb.Get(ctx, fmt.Sprintf("key:stats:%s:requests", tokenHash)).Int64()
if err != nil && err != redis.Nil {
return RealtimeStats{}, fmt.Errorf("read key requests: %w", err)
}
tokens, err := s.rdb.Get(ctx, fmt.Sprintf("key:stats:%s:tokens", tokenHash)).Int64()
if err != nil && err != redis.Nil {
return RealtimeStats{}, fmt.Errorf("read key tokens: %w", err)
}
lastRaw, err := s.rdb.Get(ctx, fmt.Sprintf("key:stats:%s:last_access", tokenHash)).Result()
if err != nil && err != redis.Nil {
return RealtimeStats{}, fmt.Errorf("read key last access: %w", err)
}
var lastAt *time.Time
if lastRaw != "" {
if sec, err := strconv.ParseInt(lastRaw, 10, 64); err == nil && sec > 0 {
t := time.Unix(sec, 0).UTC()
lastAt = &t
}
}
return RealtimeStats{Requests: reqs, Tokens: tokens, LastAccessedAt: lastAt}, nil
}
func (s *StatsService) GetMasterRealtimeStats(ctx context.Context, masterID uint) (RealtimeStats, error) {
if s == nil || s.rdb == nil {
return RealtimeStats{}, fmt.Errorf("redis client is required")
}
if masterID == 0 {
return RealtimeStats{}, fmt.Errorf("master id required")
}
if ctx == nil {
ctx = context.Background()
}
reqs, err := s.rdb.Get(ctx, fmt.Sprintf("master:stats:%d:requests", masterID)).Int64()
if err != nil && err != redis.Nil {
return RealtimeStats{}, fmt.Errorf("read master requests: %w", err)
}
tokens, err := s.rdb.Get(ctx, fmt.Sprintf("master:stats:%d:tokens", masterID)).Int64()
if err != nil && err != redis.Nil {
return RealtimeStats{}, fmt.Errorf("read master tokens: %w", err)
}
return RealtimeStats{Requests: reqs, Tokens: tokens}, nil
}
func (s *StatsService) GetMasterRealtimeSnapshot(ctx context.Context, masterID uint) (MasterRealtimeSnapshot, error) {
if s == nil || s.rdb == nil {
return MasterRealtimeSnapshot{}, fmt.Errorf("redis client is required")
}
if masterID == 0 {
return MasterRealtimeSnapshot{}, fmt.Errorf("master id required")
}
if ctx == nil {
ctx = context.Background()
}
statsKey := fmt.Sprintf("master:stats:%d", masterID)
rateKey := fmt.Sprintf("master:rate:%d", masterID)
pipe := s.rdb.Pipeline()
reqCmd := pipe.Get(ctx, statsKey+":requests")
tokCmd := pipe.Get(ctx, statsKey+":tokens")
qpsCmd := pipe.HGet(ctx, rateKey, "qps")
limitCmd := pipe.HGet(ctx, rateKey, "limit")
blockedCmd := pipe.HGet(ctx, rateKey, "blocked")
updatedCmd := pipe.HGet(ctx, rateKey, "updated_at")
if _, err := pipe.Exec(ctx); err != nil && err != redis.Nil {
return MasterRealtimeSnapshot{}, fmt.Errorf("read realtime stats: %w", err)
}
requests := readCmdInt64(reqCmd)
tokens := readCmdInt64(tokCmd)
qps := readCmdInt64(qpsCmd)
limit := readCmdInt64(limitCmd)
blocked := readCmdInt64(blockedCmd) == 1
updatedAt := readCmdTime(updatedCmd)
if limit < 0 {
limit = 0
}
return MasterRealtimeSnapshot{
Requests: requests,
Tokens: tokens,
QPS: qps,
QPSLimit: limit,
RateLimited: blocked,
UpdatedAt: updatedAt,
}, nil
}
func readCmdInt64(cmd *redis.StringCmd) int64 {
if cmd == nil {
return 0
}
v, err := cmd.Int64()
if err != nil {
return 0
}
return v
}
func readCmdTime(cmd *redis.StringCmd) *time.Time {
if cmd == nil {
return nil
}
v, err := cmd.Int64()
if err != nil || v <= 0 {
return nil
}
tm := time.Unix(v, 0).UTC()
return &tm
}
// SystemRealtimeSnapshot contains aggregated realtime stats across all masters
type SystemRealtimeSnapshot struct {
TotalQPS int64 `json:"qps"`
TotalRPM int64 `json:"rpm"`
RateLimitedCount int64 `json:"rate_limited_count"`
ByMaster []MasterRealtimeSummary `json:"by_master"`
UpdatedAt *time.Time `json:"updated_at,omitempty"`
}
// MasterRealtimeSummary is a brief summary of a master's realtime stats
type MasterRealtimeSummary struct {
MasterID uint `json:"master_id"`
QPS int64 `json:"qps"`
RateLimited bool `json:"rate_limited"`
}
// GetSystemRealtimeSnapshot aggregates realtime stats across all masters
func (s *StatsService) GetSystemRealtimeSnapshot(ctx context.Context, masterIDs []uint) (SystemRealtimeSnapshot, error) {
if s == nil || s.rdb == nil {
return SystemRealtimeSnapshot{}, fmt.Errorf("redis client is required")
}
if ctx == nil {
ctx = context.Background()
}
var totalQPS int64
var rateLimitedCount int64
byMaster := make([]MasterRealtimeSummary, 0, len(masterIDs))
var latestUpdatedAt *time.Time
for _, masterID := range masterIDs {
snapshot, err := s.GetMasterRealtimeSnapshot(ctx, masterID)
if err != nil {
continue // Skip masters with errors
}
totalQPS += snapshot.QPS
if snapshot.RateLimited {
rateLimitedCount++
}
byMaster = append(byMaster, MasterRealtimeSummary{
MasterID: masterID,
QPS: snapshot.QPS,
RateLimited: snapshot.RateLimited,
})
if snapshot.UpdatedAt != nil {
if latestUpdatedAt == nil || snapshot.UpdatedAt.After(*latestUpdatedAt) {
latestUpdatedAt = snapshot.UpdatedAt
}
}
}
// Estimate RPM as QPS * 60 (since we're measuring requests per second)
totalRPM := totalQPS * 60
return SystemRealtimeSnapshot{
TotalQPS: totalQPS,
TotalRPM: totalRPM,
RateLimitedCount: rateLimitedCount,
ByMaster: byMaster,
UpdatedAt: latestUpdatedAt,
}, nil
}