Files
ez-api/cmd/server/main.go
zenfun 1a2cc5b798 feat(api): add API key stats flush and summary endpoints
Introduce internal endpoint for flushing accumulated APIKey statistics
from data plane to control plane database, updating both individual
API keys and their parent provider groups with request counts and
success/failure rates.

Add admin endpoint to retrieve aggregated API key statistics summary
across all provider groups, including total requests, success/failure
counts, and calculated rates.
2025-12-30 00:11:52 +08:00

488 lines
17 KiB
Go

package main
import (
"context"
"encoding/json"
"expvar"
"flag"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/ez-api/ez-api/docs"
"github.com/ez-api/ez-api/internal/api"
"github.com/ez-api/ez-api/internal/config"
"github.com/ez-api/ez-api/internal/cron"
"github.com/ez-api/ez-api/internal/middleware"
"github.com/ez-api/ez-api/internal/migrate"
"github.com/ez-api/ez-api/internal/model"
"github.com/ez-api/ez-api/internal/service"
"github.com/ez-api/foundation/logging"
"github.com/gin-gonic/gin"
"github.com/redis/go-redis/v9"
swaggerFiles "github.com/swaggo/files"
ginSwagger "github.com/swaggo/gin-swagger"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
// @title EZ-API Control Plane
// @version 0.0.1
// @description Management API for EZ-API Gateway system.
// @termsOfService http://swagger.io/terms/
// @contact.name API Support
// @contact.url http://www.swagger.io/support
// @contact.email support@swagger.io
// @license.name Apache 2.0
// @license.url http://www.apache.org/licenses/LICENSE-2.0.html
// @host localhost:8080
// @BasePath /
// @securityDefinitions.apikey AdminAuth
// @in header
// @name Authorization
// @description Type "Bearer" followed by a space and the admin token. Example: Bearer admin123
// @securityDefinitions.apikey MasterAuth
// @in header
// @name Authorization
// @description Type "Bearer" followed by a space and the master key. Example: Bearer sk-xxx
func fatal(logger *slog.Logger, msg string, args ...any) {
logger.Error(msg, args...)
os.Exit(1)
}
func isOriginAllowed(allowed []string, origin string) bool {
if len(allowed) == 0 {
return false
}
for _, item := range allowed {
if item == "*" {
return true
}
if strings.EqualFold(strings.TrimSpace(item), strings.TrimSpace(origin)) {
return true
}
}
return false
}
func main() {
// Handle --version flag before any initialization
if len(os.Args) > 1 && (os.Args[1] == "--version" || os.Args[1] == "-v") {
fmt.Printf("ez-api %s\n", api.Version)
os.Exit(0)
}
logger, _ := logging.New(logging.Options{Service: "ez-api"})
if len(os.Args) > 1 && os.Args[1] == "import" {
code := runImport(logger, os.Args[2:])
os.Exit(code)
}
// 1. Load Configuration
cfg, err := config.Load()
if err != nil {
fatal(logger, "failed to load config", "err", err)
}
// 2. Initialize Redis Client
rdb := redis.NewClient(&redis.Options{
Addr: cfg.Redis.Addr,
Password: cfg.Redis.Password,
DB: cfg.Redis.DB,
})
// Verify Redis connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := rdb.Ping(ctx).Err(); err != nil {
fatal(logger, "failed to connect to redis", "err", err)
}
logger.Info("connected to redis successfully")
// 3. Initialize GORM (PostgreSQL)
db, err := gorm.Open(postgres.Open(cfg.Postgres.DSN), &gorm.Config{})
if err != nil {
fatal(logger, "failed to connect to postgresql", "err", err)
}
sqlDB, err := db.DB()
if err != nil {
fatal(logger, "failed to get generic database object", "err", err)
}
// Verify DB connection
if err := sqlDB.Ping(); err != nil {
fatal(logger, "failed to ping postgresql", "err", err)
}
logger.Info("connected to postgresql successfully")
logDB := db
if cfg.Log.DSN != "" {
logDB, err = gorm.Open(postgres.Open(cfg.Log.DSN), &gorm.Config{})
if err != nil {
fatal(logger, "failed to connect to log database", "err", err)
}
sqlLogDB, err := logDB.DB()
if err != nil {
fatal(logger, "failed to get log database object", "err", err)
}
if err := sqlLogDB.Ping(); err != nil {
fatal(logger, "failed to ping log database", "err", err)
}
logger.Info("connected to log database successfully")
}
// Auto Migrate
if logDB != db {
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.SyncOutbox{}); err != nil {
fatal(logger, "failed to auto migrate", "err", err)
}
if err := logDB.AutoMigrate(&model.LogRecord{}); err != nil {
fatal(logger, "failed to auto migrate log tables", "err", err)
}
if err := service.EnsureLogIndexes(logDB); err != nil {
fatal(logger, "failed to ensure log indexes", "err", err)
}
} else {
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.LogRecord{}, &model.SyncOutbox{}); err != nil {
fatal(logger, "failed to auto migrate", "err", err)
}
if err := service.EnsureLogIndexes(db); err != nil {
fatal(logger, "failed to ensure log indexes", "err", err)
}
}
// 4. Setup Services and Handlers
syncService := service.NewSyncService(rdb)
if cfg.SyncOutbox.Enabled {
outboxCfg := service.SyncOutboxConfig{
Enabled: cfg.SyncOutbox.Enabled,
Interval: time.Duration(cfg.SyncOutbox.IntervalSeconds) * time.Second,
BatchSize: cfg.SyncOutbox.BatchSize,
MaxRetries: cfg.SyncOutbox.MaxRetries,
}
outboxService := service.NewSyncOutboxService(db, syncService, outboxCfg, logger)
syncService.SetOutbox(outboxService)
outboxCtx, cancelOutbox := context.WithCancel(context.Background())
defer cancelOutbox()
go outboxService.Start(outboxCtx)
}
logPartitioner := service.NewLogPartitioner(logDB, cfg.Log.Partitioning)
if logPartitioner.Enabled() {
if _, err := logPartitioner.EnsurePartitionFor(time.Now().UTC()); err != nil {
fatal(logger, "failed to ensure log partition", "err", err)
}
}
logWriter := service.NewLogWriter(logDB, cfg.Log.QueueCapacity, cfg.Log.BatchSize, cfg.Log.FlushInterval, logPartitioner)
logCtx, cancelLogs := context.WithCancel(context.Background())
defer cancelLogs()
logWriter.Start(logCtx)
quotaResetter := cron.NewQuotaResetter(db, syncService, time.Duration(cfg.Quota.ResetIntervalSeconds)*time.Second)
quotaCtx, cancelQuota := context.WithCancel(context.Background())
defer cancelQuota()
go quotaResetter.Start(quotaCtx)
logCleaner := cron.NewLogCleaner(logDB, rdb, cfg.Log.RetentionDays, int64(cfg.Log.MaxRecords), time.Hour, logPartitioner)
cleanerCtx, cancelCleaner := context.WithCancel(context.Background())
defer cancelCleaner()
go logCleaner.Start(cleanerCtx)
tokenRefresher := cron.NewTokenRefresher(
db,
rdb,
syncService,
time.Duration(cfg.TokenRefresh.IntervalSeconds)*time.Second,
time.Duration(cfg.TokenRefresh.RefreshSkewSeconds)*time.Second,
cfg.TokenRefresh.BatchSize,
cfg.TokenRefresh.MaxRetries,
)
tokenCtx, cancelToken := context.WithCancel(context.Background())
defer cancelToken()
go tokenRefresher.Start(tokenCtx)
adminService, err := service.NewAdminService()
if err != nil {
fatal(logger, "failed to create admin service", "err", err)
}
masterService := service.NewMasterService(db)
statsService := service.NewStatsService(rdb)
healthService := service.NewHealthCheckService(db, rdb)
statusHandler := api.NewStatusHandler(healthService)
handler := api.NewHandler(db, logDB, syncService, logWriter, rdb, logPartitioner)
adminHandler := api.NewAdminHandler(db, logDB, masterService, syncService, statsService, logPartitioner)
masterHandler := api.NewMasterHandler(db, logDB, masterService, syncService, statsService, logPartitioner)
internalHandler := api.NewInternalHandler(db)
featureHandler := api.NewFeatureHandler(rdb)
authHandler := api.NewAuthHandler(db, rdb, adminService, masterService)
modelRegistryService := service.NewModelRegistryService(db, rdb, service.ModelRegistryConfig{
Enabled: cfg.ModelRegistry.Enabled,
RefreshEvery: time.Duration(cfg.ModelRegistry.RefreshSeconds) * time.Second,
ModelsDevBaseURL: cfg.ModelRegistry.ModelsDevBaseURL,
ModelsDevAPIBaseURL: cfg.ModelRegistry.ModelsDevAPIBaseURL,
ModelsDevRef: cfg.ModelRegistry.ModelsDevRef,
CacheDir: cfg.ModelRegistry.CacheDir,
Timeout: time.Duration(cfg.ModelRegistry.TimeoutSeconds) * time.Second,
})
modelRegistryHandler := api.NewModelRegistryHandler(modelRegistryService)
// 4.1 Prime Redis snapshots so DP can start with data
if err := syncService.SyncAll(db); err != nil {
logger.Warn("initial sync warning", "err", err)
}
modelRegistryService.Start(context.Background())
// 5. Setup Gin Router
r := gin.Default()
r.Use(middleware.RequestID())
allowedOrigins := cfg.CORS.AllowOrigins
allowAllOrigins := isOriginAllowed(allowedOrigins, "*")
// CORS Middleware
r.Use(func(c *gin.Context) {
origin := c.Request.Header.Get("Origin")
if allowAllOrigins {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
} else if origin != "" && isOriginAllowed(allowedOrigins, origin) {
c.Writer.Header().Set("Access-Control-Allow-Origin", origin)
c.Writer.Header().Add("Vary", "Origin")
}
c.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With")
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT, DELETE")
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(204)
return
}
c.Next()
})
// 动态设置 Swagger Host
if cfg.Server.SwaggerHost != "" {
docs.SwaggerInfo.Host = cfg.Server.SwaggerHost
} else {
docs.SwaggerInfo.Host = "" // 使用相对路径
}
// Health Check Endpoint
r.GET("/health", func(c *gin.Context) {
status := healthService.Check(c.Request.Context())
httpStatus := http.StatusOK
if status.Status == "down" {
httpStatus = http.StatusServiceUnavailable
}
c.JSON(httpStatus, status)
})
// Public Status Endpoints
r.GET("/status", statusHandler.Status)
r.GET("/about", statusHandler.About)
// Swagger Documentation
r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
// API Routes
// Internal Routes
internalGroup := r.Group("/internal")
internalGroup.Use(middleware.InternalAuthMiddleware(cfg.Internal.StatsToken))
{
internalGroup.POST("/stats/flush", internalHandler.FlushStats)
internalGroup.POST("/apikey-stats/flush", internalHandler.FlushAPIKeyStats)
internalGroup.GET("/metrics", gin.WrapH(expvar.Handler()))
}
// Admin Routes
adminGroup := r.Group("/admin")
adminGroup.Use(middleware.AdminAuthMiddleware(adminService))
adminGroup.Use(middleware.OperationLogMiddleware(db))
{
adminGroup.POST("/masters", adminHandler.CreateMaster)
adminGroup.GET("/masters", adminHandler.ListMasters)
adminGroup.GET("/masters/:id", adminHandler.GetMaster)
adminGroup.GET("/masters/:id/realtime", adminHandler.GetMasterRealtime)
adminGroup.PUT("/masters/:id", adminHandler.UpdateMaster)
adminGroup.DELETE("/masters/:id", adminHandler.DeleteMaster)
adminGroup.POST("/masters/batch", adminHandler.BatchMasters)
adminGroup.POST("/masters/:id/manage", adminHandler.ManageMaster)
adminGroup.POST("/masters/:id/keys", adminHandler.IssueChildKeyForMaster)
adminGroup.GET("/masters/:id/access", handler.GetMasterAccess)
adminGroup.PUT("/masters/:id/access", handler.UpdateMasterAccess)
adminGroup.GET("/keys/:id/access", handler.GetKeyAccess)
adminGroup.PUT("/keys/:id/access", handler.UpdateKeyAccess)
adminGroup.GET("/operation-logs", adminHandler.ListOperationLogs)
adminGroup.POST("/namespaces", handler.CreateNamespace)
adminGroup.GET("/namespaces", handler.ListNamespaces)
adminGroup.GET("/namespaces/:id", handler.GetNamespace)
adminGroup.PUT("/namespaces/:id", handler.UpdateNamespace)
adminGroup.DELETE("/namespaces/:id", handler.DeleteNamespace)
adminGroup.GET("/features", featureHandler.ListFeatures)
adminGroup.PUT("/features", featureHandler.UpdateFeatures)
adminGroup.GET("/model-registry/status", modelRegistryHandler.GetStatus)
adminGroup.POST("/model-registry/check", modelRegistryHandler.Check)
adminGroup.POST("/model-registry/refresh", modelRegistryHandler.Refresh)
adminGroup.POST("/model-registry/rollback", modelRegistryHandler.Rollback)
// Other admin routes for managing providers, models, etc.
adminGroup.POST("/provider-groups", handler.CreateProviderGroup)
adminGroup.GET("/provider-groups", handler.ListProviderGroups)
adminGroup.GET("/provider-groups/:id", handler.GetProviderGroup)
adminGroup.PUT("/provider-groups/:id", handler.UpdateProviderGroup)
adminGroup.DELETE("/provider-groups/:id", handler.DeleteProviderGroup)
adminGroup.POST("/api-keys", handler.CreateAPIKey)
adminGroup.GET("/api-keys", handler.ListAPIKeys)
adminGroup.GET("/api-keys/:id", handler.GetAPIKey)
adminGroup.PUT("/api-keys/:id", handler.UpdateAPIKey)
adminGroup.DELETE("/api-keys/:id", handler.DeleteAPIKey)
adminGroup.POST("/api-keys/batch", handler.BatchAPIKeys)
adminGroup.POST("/models", handler.CreateModel)
adminGroup.GET("/models", handler.ListModels)
adminGroup.PUT("/models/:id", handler.UpdateModel)
adminGroup.DELETE("/models/:id", handler.DeleteModel)
adminGroup.POST("/models/batch", handler.BatchModels)
adminGroup.GET("/logs", handler.ListLogs)
adminGroup.DELETE("/logs", handler.DeleteLogs)
adminGroup.GET("/logs/stats", handler.LogStats)
adminGroup.GET("/logs/webhook", handler.GetLogWebhookConfig)
adminGroup.PUT("/logs/webhook", handler.UpdateLogWebhookConfig)
adminGroup.GET("/stats", adminHandler.GetAdminStats)
adminGroup.GET("/apikey-stats/summary", adminHandler.GetAPIKeyStatsSummary)
adminGroup.POST("/bindings", handler.CreateBinding)
adminGroup.GET("/bindings", handler.ListBindings)
adminGroup.GET("/bindings/:id", handler.GetBinding)
adminGroup.PUT("/bindings/:id", handler.UpdateBinding)
adminGroup.DELETE("/bindings/:id", handler.DeleteBinding)
adminGroup.POST("/bindings/batch", handler.BatchBindings)
adminGroup.POST("/sync/snapshot", handler.SyncSnapshot)
}
// Master Routes
masterGroup := r.Group("/v1")
masterGroup.Use(middleware.MasterAuthMiddleware(masterService))
{
masterGroup.GET("/self", masterHandler.GetSelf)
masterGroup.POST("/tokens", masterHandler.IssueChildKey)
masterGroup.GET("/tokens", masterHandler.ListTokens)
masterGroup.GET("/tokens/:id", masterHandler.GetToken)
masterGroup.PUT("/tokens/:id", masterHandler.UpdateToken)
masterGroup.DELETE("/tokens/:id", masterHandler.DeleteToken)
masterGroup.GET("/logs", masterHandler.ListSelfLogs)
masterGroup.GET("/logs/stats", masterHandler.GetSelfLogStats)
masterGroup.GET("/realtime", masterHandler.GetSelfRealtime)
masterGroup.GET("/stats", masterHandler.GetSelfStats)
}
// Auth Routes (public, no middleware - self-validates token)
authGroup := r.Group("/auth")
{
authGroup.GET("/whoami", authHandler.Whoami)
}
// Public/General Routes (if any)
r.POST("/logs", handler.IngestLog)
srv := &http.Server{
Addr: ":" + cfg.Server.Port,
Handler: r,
}
// 6. Start Server with Graceful Shutdown
go func() {
logger.Info("starting ez-api", "port", cfg.Server.Port)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fatal(logger, "server failed", "err", err)
}
}()
// Wait for interrupt signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
logger.Info("shutting down server...")
// Shutdown with timeout
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
fatal(logger, "server forced to shutdown", "err", err)
}
logger.Info("server exited properly")
}
func runImport(logger *slog.Logger, args []string) int {
fs := flag.NewFlagSet("import", flag.ContinueOnError)
fs.SetOutput(os.Stderr)
var filePath string
var dryRun bool
var conflictPolicy string
var includeBindings bool
fs.StringVar(&filePath, "file", "", "Path to export JSON")
fs.BoolVar(&dryRun, "dry-run", false, "Validate only, do not write to database")
fs.StringVar(&conflictPolicy, "conflict", migrate.ConflictSkip, "Conflict policy: skip or overwrite")
fs.BoolVar(&includeBindings, "include-bindings", false, "Import bindings from payload")
if err := fs.Parse(args); err != nil {
logger.Error("failed to parse flags", "err", err)
return 2
}
if strings.TrimSpace(filePath) == "" {
fmt.Fprintln(os.Stderr, "missing --file")
return 2
}
conflictPolicy = strings.ToLower(strings.TrimSpace(conflictPolicy))
if conflictPolicy != migrate.ConflictSkip && conflictPolicy != migrate.ConflictOverwrite {
fmt.Fprintf(os.Stderr, "invalid --conflict value: %s\n", conflictPolicy)
return 2
}
cfg, err := config.Load()
if err != nil {
logger.Error("failed to load config", "err", err)
return 1
}
db, err := gorm.Open(postgres.Open(cfg.Postgres.DSN), &gorm.Config{})
if err != nil {
logger.Error("failed to connect to postgresql", "err", err)
return 1
}
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.SyncOutbox{}); err != nil {
logger.Error("failed to auto migrate", "err", err)
return 1
}
importer := migrate.NewImporter(db, migrate.ImportOptions{
DryRun: dryRun,
ConflictPolicy: conflictPolicy,
IncludeBindings: includeBindings,
})
summary, err := importer.ImportFile(filePath)
if err != nil {
logger.Error("import failed", "err", err)
return 1
}
payload, err := json.MarshalIndent(summary, "", " ")
if err != nil {
logger.Error("failed to render import summary", "err", err)
return 1
}
fmt.Fprintln(os.Stdout, string(payload))
if dryRun {
fmt.Fprintln(os.Stdout, "dry-run only: no data written")
}
return 0
}