mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 09:37:53 +00:00
Rename endpoint from /auth/whoami to /users/info to align with ez-contract schema. Simplify WhoamiResponse by removing: - Realtime stats (requests, tokens, qps, rate_limited) - Key-specific fields (allow_ips, deny_ips, expires_at, model_limits, quota fields, usage stats) The endpoint now returns only essential identity information as defined in ez-contract/schemas/auth/auth.yaml. BREAKING CHANGE: /auth/whoami endpoint moved to /users/info with reduced response fields
538 lines
20 KiB
Go
538 lines
20 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"expvar"
|
|
"flag"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/ez-api/ez-api/docs"
|
|
"github.com/ez-api/ez-api/internal/api"
|
|
"github.com/ez-api/ez-api/internal/config"
|
|
"github.com/ez-api/ez-api/internal/cron"
|
|
"github.com/ez-api/ez-api/internal/middleware"
|
|
"github.com/ez-api/ez-api/internal/migrate"
|
|
"github.com/ez-api/ez-api/internal/model"
|
|
"github.com/ez-api/ez-api/internal/service"
|
|
"github.com/ez-api/foundation/logging"
|
|
"github.com/ez-api/foundation/scheduler"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/redis/go-redis/v9"
|
|
swaggerFiles "github.com/swaggo/files"
|
|
ginSwagger "github.com/swaggo/gin-swagger"
|
|
"gorm.io/driver/postgres"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
// @title EZ-API Control Plane
|
|
// @version 0.0.1
|
|
// @description Management API for EZ-API Gateway system.
|
|
// @termsOfService http://swagger.io/terms/
|
|
|
|
// @contact.name API Support
|
|
// @contact.url http://www.swagger.io/support
|
|
// @contact.email support@swagger.io
|
|
|
|
// @license.name Apache 2.0
|
|
// @license.url http://www.apache.org/licenses/LICENSE-2.0.html
|
|
|
|
// @host localhost:8080
|
|
// @BasePath /
|
|
|
|
// @securityDefinitions.apikey AdminAuth
|
|
// @in header
|
|
// @name Authorization
|
|
// @description Type "Bearer" followed by a space and the admin token. Example: Bearer admin123
|
|
|
|
// @securityDefinitions.apikey MasterAuth
|
|
// @in header
|
|
// @name Authorization
|
|
// @description Type "Bearer" followed by a space and the master key. Example: Bearer sk-xxx
|
|
|
|
func fatal(logger *slog.Logger, msg string, args ...any) {
|
|
logger.Error(msg, args...)
|
|
os.Exit(1)
|
|
}
|
|
|
|
func isOriginAllowed(allowed []string, origin string) bool {
|
|
if len(allowed) == 0 {
|
|
return false
|
|
}
|
|
for _, item := range allowed {
|
|
if item == "*" {
|
|
return true
|
|
}
|
|
if strings.EqualFold(strings.TrimSpace(item), strings.TrimSpace(origin)) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func main() {
|
|
// Handle --version flag before any initialization
|
|
if len(os.Args) > 1 && (os.Args[1] == "--version" || os.Args[1] == "-v") {
|
|
fmt.Printf("ez-api %s\n", api.Version)
|
|
os.Exit(0)
|
|
}
|
|
|
|
logger, _ := logging.New(logging.Options{Service: "ez-api"})
|
|
appCtx, appCancel := context.WithCancel(context.Background())
|
|
defer appCancel()
|
|
if len(os.Args) > 1 && os.Args[1] == "import" {
|
|
code := runImport(logger, os.Args[2:])
|
|
os.Exit(code)
|
|
}
|
|
|
|
// 1. Load Configuration
|
|
cfg, err := config.Load()
|
|
if err != nil {
|
|
fatal(logger, "failed to load config", "err", err)
|
|
}
|
|
|
|
// 2. Initialize Redis Client
|
|
rdb := redis.NewClient(&redis.Options{
|
|
Addr: cfg.Redis.Addr,
|
|
Password: cfg.Redis.Password,
|
|
DB: cfg.Redis.DB,
|
|
})
|
|
|
|
// Verify Redis connection
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := rdb.Ping(ctx).Err(); err != nil {
|
|
fatal(logger, "failed to connect to redis", "err", err)
|
|
}
|
|
logger.Info("connected to redis successfully")
|
|
|
|
// 3. Initialize GORM (PostgreSQL)
|
|
db, err := gorm.Open(postgres.Open(cfg.Postgres.DSN), &gorm.Config{})
|
|
if err != nil {
|
|
fatal(logger, "failed to connect to postgresql", "err", err)
|
|
}
|
|
sqlDB, err := db.DB()
|
|
if err != nil {
|
|
fatal(logger, "failed to get generic database object", "err", err)
|
|
}
|
|
// Verify DB connection
|
|
if err := sqlDB.Ping(); err != nil {
|
|
fatal(logger, "failed to ping postgresql", "err", err)
|
|
}
|
|
logger.Info("connected to postgresql successfully")
|
|
|
|
logDB := db
|
|
if cfg.Log.DSN != "" {
|
|
logDB, err = gorm.Open(postgres.Open(cfg.Log.DSN), &gorm.Config{})
|
|
if err != nil {
|
|
fatal(logger, "failed to connect to log database", "err", err)
|
|
}
|
|
sqlLogDB, err := logDB.DB()
|
|
if err != nil {
|
|
fatal(logger, "failed to get log database object", "err", err)
|
|
}
|
|
if err := sqlLogDB.Ping(); err != nil {
|
|
fatal(logger, "failed to ping log database", "err", err)
|
|
}
|
|
logger.Info("connected to log database successfully")
|
|
}
|
|
|
|
// Auto Migrate
|
|
if logDB != db {
|
|
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}, &model.IPBan{}); err != nil {
|
|
fatal(logger, "failed to auto migrate", "err", err)
|
|
}
|
|
if err := logDB.AutoMigrate(&model.LogRecord{}); err != nil {
|
|
fatal(logger, "failed to auto migrate log tables", "err", err)
|
|
}
|
|
if err := service.EnsureLogIndexes(logDB); err != nil {
|
|
fatal(logger, "failed to ensure log indexes", "err", err)
|
|
}
|
|
} else {
|
|
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.LogRecord{}, &model.SyncOutbox{}, &model.Alert{}, &model.AlertThresholdConfig{}, &model.IPBan{}); err != nil {
|
|
fatal(logger, "failed to auto migrate", "err", err)
|
|
}
|
|
if err := service.EnsureLogIndexes(db); err != nil {
|
|
fatal(logger, "failed to ensure log indexes", "err", err)
|
|
}
|
|
}
|
|
|
|
// 4. Setup Services and Handlers
|
|
syncService := service.NewSyncService(rdb)
|
|
var outboxService *service.SyncOutboxService
|
|
if cfg.SyncOutbox.Enabled {
|
|
outboxCfg := service.SyncOutboxConfig{
|
|
Enabled: cfg.SyncOutbox.Enabled,
|
|
Interval: time.Duration(cfg.SyncOutbox.IntervalSeconds) * time.Second,
|
|
BatchSize: cfg.SyncOutbox.BatchSize,
|
|
MaxRetries: cfg.SyncOutbox.MaxRetries,
|
|
}
|
|
outboxService = service.NewSyncOutboxService(db, syncService, outboxCfg, logger)
|
|
syncService.SetOutbox(outboxService)
|
|
}
|
|
logPartitioner := service.NewLogPartitioner(logDB, cfg.Log.Partitioning)
|
|
if logPartitioner.Enabled() {
|
|
if _, err := logPartitioner.EnsurePartitionFor(time.Now().UTC()); err != nil {
|
|
fatal(logger, "failed to ensure log partition", "err", err)
|
|
}
|
|
}
|
|
logWriter := service.NewLogWriter(logDB, cfg.Log.QueueCapacity, cfg.Log.BatchSize, cfg.Log.FlushInterval, logPartitioner)
|
|
logCtx, cancelLogs := context.WithCancel(context.Background())
|
|
defer cancelLogs()
|
|
logWriter.Start(logCtx)
|
|
|
|
// Initialize cron jobs
|
|
quotaResetter := cron.NewQuotaResetter(db, syncService)
|
|
logCleaner := cron.NewLogCleaner(logDB, rdb, cfg.Log.RetentionDays, int64(cfg.Log.MaxRecords), logPartitioner)
|
|
tokenRefresher := cron.NewTokenRefresher(
|
|
db,
|
|
rdb,
|
|
syncService,
|
|
time.Duration(cfg.TokenRefresh.RefreshSkewSeconds)*time.Second,
|
|
cfg.TokenRefresh.BatchSize,
|
|
cfg.TokenRefresh.MaxRetries,
|
|
)
|
|
alertDetectorConfig := cron.DefaultAlertDetectorConfig()
|
|
alertDetector := cron.NewAlertDetector(db, logDB, rdb, service.NewStatsService(rdb), alertDetectorConfig, logger)
|
|
|
|
// Setup scheduler (jobs are added incrementally, Start() called after all services initialized)
|
|
sched := scheduler.New(
|
|
scheduler.WithLogger(logger),
|
|
scheduler.WithSkipIfRunning(),
|
|
scheduler.WithBaseContext(appCtx),
|
|
)
|
|
sched.Every("quota-reset", time.Duration(cfg.Quota.ResetIntervalSeconds)*time.Second, quotaResetter.RunOnce)
|
|
sched.Every("log-cleanup", time.Hour, logCleaner.RunOnce)
|
|
sched.Every("token-refresh", time.Duration(cfg.TokenRefresh.IntervalSeconds)*time.Second, tokenRefresher.RunOnce)
|
|
sched.Every("alert-detection", time.Minute, alertDetector.RunOnce)
|
|
if outboxService != nil && outboxService.Enabled() {
|
|
sched.Every("sync-outbox", outboxService.Interval(), outboxService.RunOnce)
|
|
}
|
|
|
|
adminService, err := service.NewAdminService()
|
|
if err != nil {
|
|
fatal(logger, "failed to create admin service", "err", err)
|
|
}
|
|
masterService := service.NewMasterService(db)
|
|
statsService := service.NewStatsService(rdb)
|
|
healthService := service.NewHealthCheckService(db, rdb)
|
|
statusHandler := api.NewStatusHandler(healthService)
|
|
|
|
handler := api.NewHandler(db, logDB, syncService, logWriter, rdb, logPartitioner)
|
|
adminHandler := api.NewAdminHandler(db, logDB, masterService, syncService, statsService, logPartitioner)
|
|
masterHandler := api.NewMasterHandler(db, logDB, masterService, syncService, statsService, logPartitioner)
|
|
dashboardHandler := api.NewDashboardHandler(db, logDB, statsService, logPartitioner)
|
|
alertHandler := api.NewAlertHandler(db)
|
|
internalHandler := api.NewInternalHandler(db)
|
|
featureHandler := api.NewFeatureHandler(rdb)
|
|
authHandler := api.NewAuthHandler(db, rdb, adminService, masterService, statsService)
|
|
ipBanService := service.NewIPBanService(db, rdb)
|
|
ipBanHandler := api.NewIPBanHandler(ipBanService)
|
|
ipBanManager := cron.NewIPBanManager(ipBanService)
|
|
modelRegistryService := service.NewModelRegistryService(db, rdb, service.ModelRegistryConfig{
|
|
Enabled: cfg.ModelRegistry.Enabled,
|
|
RefreshEvery: time.Duration(cfg.ModelRegistry.RefreshSeconds) * time.Second,
|
|
ModelsDevBaseURL: cfg.ModelRegistry.ModelsDevBaseURL,
|
|
ModelsDevAPIBaseURL: cfg.ModelRegistry.ModelsDevAPIBaseURL,
|
|
ModelsDevRef: cfg.ModelRegistry.ModelsDevRef,
|
|
CacheDir: cfg.ModelRegistry.CacheDir,
|
|
Timeout: time.Duration(cfg.ModelRegistry.TimeoutSeconds) * time.Second,
|
|
})
|
|
modelRegistryHandler := api.NewModelRegistryHandler(modelRegistryService)
|
|
|
|
// 4.1 Prime Redis snapshots so DP can start with data
|
|
if err := syncService.SyncAll(db); err != nil {
|
|
logger.Warn("initial sync warning", "err", err)
|
|
}
|
|
if err := ipBanService.SyncAllToRedis(context.Background()); err != nil {
|
|
logger.Warn("initial IP ban sync warning", "err", err)
|
|
}
|
|
// Initial model registry refresh before scheduler starts
|
|
if modelRegistryService.Enabled() {
|
|
modelRegistryService.RunOnce(context.Background())
|
|
sched.Every("model-registry-refresh", modelRegistryService.RefreshEvery(), modelRegistryService.RunOnce)
|
|
}
|
|
sched.Every("ip-ban-expire", time.Minute, ipBanManager.ExpireRunOnce)
|
|
sched.Every("ip-ban-hit-sync", 5*time.Minute, ipBanManager.HitSyncRunOnce)
|
|
sched.Every("ip-ban-full-sync", 5*time.Minute, ipBanManager.FullSyncRunOnce)
|
|
sched.Start()
|
|
|
|
// 5. Setup Gin Router
|
|
r := gin.Default()
|
|
r.Use(middleware.RequestID())
|
|
|
|
allowedOrigins := cfg.CORS.AllowOrigins
|
|
allowAllOrigins := isOriginAllowed(allowedOrigins, "*")
|
|
|
|
// CORS Middleware
|
|
r.Use(func(c *gin.Context) {
|
|
origin := c.Request.Header.Get("Origin")
|
|
if allowAllOrigins {
|
|
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
|
|
} else if origin != "" && isOriginAllowed(allowedOrigins, origin) {
|
|
c.Writer.Header().Set("Access-Control-Allow-Origin", origin)
|
|
c.Writer.Header().Add("Vary", "Origin")
|
|
}
|
|
c.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
|
|
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With")
|
|
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT, DELETE")
|
|
|
|
if c.Request.Method == "OPTIONS" {
|
|
c.AbortWithStatus(204)
|
|
return
|
|
}
|
|
|
|
c.Next()
|
|
})
|
|
|
|
r.Use(middleware.ResponseEnvelope())
|
|
|
|
// 动态设置 Swagger Host
|
|
if cfg.Server.SwaggerHost != "" {
|
|
docs.SwaggerInfo.Host = cfg.Server.SwaggerHost
|
|
} else {
|
|
docs.SwaggerInfo.Host = "" // 使用相对路径
|
|
}
|
|
|
|
// Health Check Endpoint
|
|
r.GET("/health", func(c *gin.Context) {
|
|
status := healthService.Check(c.Request.Context())
|
|
httpStatus := http.StatusOK
|
|
if status.Status == "down" {
|
|
httpStatus = http.StatusServiceUnavailable
|
|
}
|
|
c.JSON(httpStatus, status)
|
|
})
|
|
|
|
// Public Status Endpoints
|
|
r.GET("/status", statusHandler.Status)
|
|
r.GET("/about", statusHandler.About)
|
|
|
|
// Swagger Documentation
|
|
r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
|
|
|
|
// API Routes
|
|
// Internal Routes
|
|
internalGroup := r.Group("/internal")
|
|
internalGroup.Use(middleware.InternalAuthMiddleware(cfg.Internal.StatsToken, cfg.Internal.AllowAnonymous))
|
|
{
|
|
internalGroup.POST("/stats/flush", internalHandler.FlushStats)
|
|
internalGroup.POST("/apikey-stats/flush", internalHandler.FlushAPIKeyStats)
|
|
internalGroup.POST("/alerts/report", internalHandler.ReportAlerts)
|
|
internalGroup.GET("/metrics", gin.WrapH(expvar.Handler()))
|
|
}
|
|
|
|
// Admin Routes
|
|
adminGroup := r.Group("/admin")
|
|
adminGroup.Use(middleware.AdminAuthMiddleware(adminService))
|
|
adminGroup.Use(middleware.OperationLogMiddleware(db))
|
|
{
|
|
adminGroup.POST("/masters", adminHandler.CreateMaster)
|
|
adminGroup.GET("/masters", adminHandler.ListMasters)
|
|
adminGroup.GET("/masters/:id", adminHandler.GetMaster)
|
|
adminGroup.GET("/masters/:id/realtime", adminHandler.GetMasterRealtime)
|
|
adminGroup.PUT("/masters/:id", adminHandler.UpdateMaster)
|
|
adminGroup.DELETE("/masters/:id", adminHandler.DeleteMaster)
|
|
adminGroup.POST("/masters/batch", adminHandler.BatchMasters)
|
|
adminGroup.POST("/masters/:id/manage", adminHandler.ManageMaster)
|
|
adminGroup.POST("/masters/:id/keys", adminHandler.IssueChildKeyForMaster)
|
|
adminGroup.GET("/masters/:id/keys", adminHandler.ListKeysForMaster)
|
|
adminGroup.DELETE("/masters/:id/keys/:key_id", adminHandler.DeleteKeyForMaster)
|
|
adminGroup.GET("/masters/:id/access", handler.GetMasterAccess)
|
|
adminGroup.PUT("/masters/:id/access", handler.UpdateMasterAccess)
|
|
adminGroup.GET("/keys/:id/access", handler.GetKeyAccess)
|
|
adminGroup.PUT("/keys/:id/access", handler.UpdateKeyAccess)
|
|
adminGroup.GET("/operation-logs", adminHandler.ListOperationLogs)
|
|
adminGroup.POST("/namespaces", handler.CreateNamespace)
|
|
adminGroup.GET("/namespaces", handler.ListNamespaces)
|
|
adminGroup.GET("/namespaces/:id", handler.GetNamespace)
|
|
adminGroup.PUT("/namespaces/:id", handler.UpdateNamespace)
|
|
adminGroup.DELETE("/namespaces/:id", handler.DeleteNamespace)
|
|
adminGroup.GET("/features", featureHandler.ListFeatures)
|
|
adminGroup.PUT("/features", featureHandler.UpdateFeatures)
|
|
adminGroup.GET("/model-registry/status", modelRegistryHandler.GetStatus)
|
|
adminGroup.POST("/model-registry/check", modelRegistryHandler.Check)
|
|
adminGroup.POST("/model-registry/refresh", modelRegistryHandler.Refresh)
|
|
adminGroup.POST("/model-registry/rollback", modelRegistryHandler.Rollback)
|
|
// Other admin routes for managing providers, models, etc.
|
|
adminGroup.POST("/provider-groups", handler.CreateProviderGroup)
|
|
adminGroup.GET("/provider-groups", handler.ListProviderGroups)
|
|
adminGroup.GET("/provider-groups/:id", handler.GetProviderGroup)
|
|
adminGroup.PUT("/provider-groups/:id", handler.UpdateProviderGroup)
|
|
adminGroup.DELETE("/provider-groups/:id", handler.DeleteProviderGroup)
|
|
adminGroup.POST("/api-keys", handler.CreateAPIKey)
|
|
adminGroup.GET("/api-keys", handler.ListAPIKeys)
|
|
adminGroup.GET("/api-keys/:id", handler.GetAPIKey)
|
|
adminGroup.PUT("/api-keys/:id", handler.UpdateAPIKey)
|
|
adminGroup.DELETE("/api-keys/:id", handler.DeleteAPIKey)
|
|
adminGroup.POST("/api-keys/batch", handler.BatchAPIKeys)
|
|
adminGroup.POST("/models", handler.CreateModel)
|
|
adminGroup.GET("/models", handler.ListModels)
|
|
adminGroup.PUT("/models/:id", handler.UpdateModel)
|
|
adminGroup.DELETE("/models/:id", handler.DeleteModel)
|
|
adminGroup.POST("/models/batch", handler.BatchModels)
|
|
adminGroup.GET("/logs", handler.ListLogs)
|
|
adminGroup.DELETE("/logs", handler.DeleteLogs)
|
|
adminGroup.GET("/logs/stats", handler.LogStats)
|
|
adminGroup.GET("/logs/stats/traffic-chart", handler.GetTrafficChart)
|
|
adminGroup.GET("/logs/webhook", handler.GetLogWebhookConfig)
|
|
adminGroup.PUT("/logs/webhook", handler.UpdateLogWebhookConfig)
|
|
adminGroup.GET("/stats", adminHandler.GetAdminStats)
|
|
adminGroup.GET("/realtime", adminHandler.GetAdminRealtime)
|
|
adminGroup.GET("/dashboard/summary", dashboardHandler.GetSummary)
|
|
adminGroup.GET("/apikey-stats/summary", adminHandler.GetAPIKeyStatsSummary)
|
|
adminGroup.GET("/alerts", alertHandler.ListAlerts)
|
|
adminGroup.POST("/alerts", alertHandler.CreateAlert)
|
|
adminGroup.GET("/alerts/stats", alertHandler.GetAlertStats)
|
|
adminGroup.GET("/alerts/thresholds", alertHandler.GetAlertThresholds)
|
|
adminGroup.PUT("/alerts/thresholds", alertHandler.UpdateAlertThresholds)
|
|
adminGroup.GET("/alerts/:id", alertHandler.GetAlert)
|
|
adminGroup.POST("/alerts/:id/ack", alertHandler.AcknowledgeAlert)
|
|
adminGroup.POST("/alerts/:id/resolve", alertHandler.ResolveAlert)
|
|
adminGroup.DELETE("/alerts/:id", alertHandler.DismissAlert)
|
|
adminGroup.POST("/bindings", handler.CreateBinding)
|
|
adminGroup.GET("/bindings", handler.ListBindings)
|
|
adminGroup.GET("/bindings/:id", handler.GetBinding)
|
|
adminGroup.PUT("/bindings/:id", handler.UpdateBinding)
|
|
adminGroup.DELETE("/bindings/:id", handler.DeleteBinding)
|
|
adminGroup.POST("/bindings/batch", handler.BatchBindings)
|
|
adminGroup.POST("/sync/snapshot", handler.SyncSnapshot)
|
|
// IP Ban routes
|
|
adminGroup.POST("/ip-bans", ipBanHandler.Create)
|
|
adminGroup.GET("/ip-bans", ipBanHandler.List)
|
|
adminGroup.GET("/ip-bans/:id", ipBanHandler.Get)
|
|
adminGroup.PUT("/ip-bans/:id", ipBanHandler.Update)
|
|
adminGroup.DELETE("/ip-bans/:id", ipBanHandler.Delete)
|
|
}
|
|
|
|
// Master Routes
|
|
masterGroup := r.Group("/v1")
|
|
masterGroup.Use(middleware.MasterAuthMiddleware(masterService))
|
|
{
|
|
masterGroup.GET("/self", masterHandler.GetSelf)
|
|
masterGroup.POST("/tokens", masterHandler.IssueChildKey)
|
|
masterGroup.GET("/tokens", masterHandler.ListTokens)
|
|
masterGroup.GET("/tokens/:id", masterHandler.GetToken)
|
|
masterGroup.PUT("/tokens/:id", masterHandler.UpdateToken)
|
|
masterGroup.DELETE("/tokens/:id", masterHandler.DeleteToken)
|
|
masterGroup.GET("/logs", masterHandler.ListSelfLogs)
|
|
masterGroup.GET("/logs/stats", masterHandler.GetSelfLogStats)
|
|
masterGroup.GET("/realtime", masterHandler.GetSelfRealtime)
|
|
masterGroup.GET("/stats", masterHandler.GetSelfStats)
|
|
}
|
|
|
|
// Users Routes (public, no middleware - self-validates token)
|
|
usersGroup := r.Group("/users")
|
|
{
|
|
usersGroup.GET("/info", authHandler.Whoami)
|
|
}
|
|
|
|
// Public/General Routes (if any)
|
|
r.POST("/logs", handler.IngestLog)
|
|
|
|
srv := &http.Server{
|
|
Addr: ":" + cfg.Server.Port,
|
|
Handler: r,
|
|
}
|
|
|
|
// 6. Start Server with Graceful Shutdown
|
|
go func() {
|
|
logger.Info("starting ez-api", "port", cfg.Server.Port)
|
|
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
fatal(logger, "server failed", "err", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for interrupt signal
|
|
quit := make(chan os.Signal, 1)
|
|
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
|
<-quit
|
|
logger.Info("shutting down server...")
|
|
appCancel()
|
|
sched.Stop()
|
|
|
|
// Shutdown with timeout
|
|
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := srv.Shutdown(ctx); err != nil {
|
|
fatal(logger, "server forced to shutdown", "err", err)
|
|
}
|
|
|
|
logger.Info("server exited properly")
|
|
}
|
|
|
|
func runImport(logger *slog.Logger, args []string) int {
|
|
fs := flag.NewFlagSet("import", flag.ContinueOnError)
|
|
fs.SetOutput(os.Stderr)
|
|
|
|
var filePath string
|
|
var dryRun bool
|
|
var conflictPolicy string
|
|
var includeBindings bool
|
|
|
|
fs.StringVar(&filePath, "file", "", "Path to export JSON")
|
|
fs.BoolVar(&dryRun, "dry-run", false, "Validate only, do not write to database")
|
|
fs.StringVar(&conflictPolicy, "conflict", migrate.ConflictSkip, "Conflict policy: skip or overwrite")
|
|
fs.BoolVar(&includeBindings, "include-bindings", false, "Import bindings from payload")
|
|
|
|
if err := fs.Parse(args); err != nil {
|
|
logger.Error("failed to parse flags", "err", err)
|
|
return 2
|
|
}
|
|
if strings.TrimSpace(filePath) == "" {
|
|
fmt.Fprintln(os.Stderr, "missing --file")
|
|
return 2
|
|
}
|
|
conflictPolicy = strings.ToLower(strings.TrimSpace(conflictPolicy))
|
|
if conflictPolicy != migrate.ConflictSkip && conflictPolicy != migrate.ConflictOverwrite {
|
|
fmt.Fprintf(os.Stderr, "invalid --conflict value: %s\n", conflictPolicy)
|
|
return 2
|
|
}
|
|
|
|
cfg, err := config.Load()
|
|
if err != nil {
|
|
logger.Error("failed to load config", "err", err)
|
|
return 1
|
|
}
|
|
|
|
db, err := gorm.Open(postgres.Open(cfg.Postgres.DSN), &gorm.Config{})
|
|
if err != nil {
|
|
logger.Error("failed to connect to postgresql", "err", err)
|
|
return 1
|
|
}
|
|
|
|
if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.SyncOutbox{}); err != nil {
|
|
logger.Error("failed to auto migrate", "err", err)
|
|
return 1
|
|
}
|
|
|
|
importer := migrate.NewImporter(db, migrate.ImportOptions{
|
|
DryRun: dryRun,
|
|
ConflictPolicy: conflictPolicy,
|
|
IncludeBindings: includeBindings,
|
|
})
|
|
summary, err := importer.ImportFile(filePath)
|
|
if err != nil {
|
|
logger.Error("import failed", "err", err)
|
|
return 1
|
|
}
|
|
|
|
payload, err := json.MarshalIndent(summary, "", " ")
|
|
if err != nil {
|
|
logger.Error("failed to render import summary", "err", err)
|
|
return 1
|
|
}
|
|
fmt.Fprintln(os.Stdout, string(payload))
|
|
if dryRun {
|
|
fmt.Fprintln(os.Stdout, "dry-run only: no data written")
|
|
}
|
|
return 0
|
|
}
|