From 64d71953a68c0112fbfbe0c261f7520bdc871601 Mon Sep 17 00:00:00 2001 From: zenfun Date: Tue, 2 Dec 2025 14:26:16 +0800 Subject: [PATCH] feat(server): implement management endpoints and redis sync Enable full resource management via API and support data plane synchronization. - Add CRUD handlers for Providers, Models, and Keys using DTOs - Implement LogWriter service for asynchronous, batched audit logging - Update SyncService to snapshot full configuration state to Redis - Register new API routes and initialize background services - Add configuration options for logging performance tuning --- cmd/server/main.go | 18 ++++- internal/api/handler.go | 159 ++++++++++++++++++++++++++++++++++--- internal/config/config.go | 22 +++++ internal/dto/key.go | 10 +++ internal/dto/model.go | 13 +++ internal/dto/provider.go | 9 +++ internal/model/log.go | 22 +++++ internal/model/models.go | 37 +++++---- internal/service/logger.go | 79 ++++++++++++++++++ internal/service/sync.go | 150 +++++++++++++++++++++++++++++++--- 10 files changed, 479 insertions(+), 40 deletions(-) create mode 100644 internal/dto/key.go create mode 100644 internal/dto/model.go create mode 100644 internal/dto/provider.go create mode 100644 internal/model/log.go create mode 100644 internal/service/logger.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 3092df9..f884e8a 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -57,13 +57,22 @@ func main() { log.Println("Connected to PostgreSQL successfully") // Auto Migrate - if err := db.AutoMigrate(&model.User{}, &model.Provider{}, &model.Key{}, &model.Model{}); err != nil { + if err := db.AutoMigrate(&model.User{}, &model.Provider{}, &model.Key{}, &model.Model{}, &model.LogRecord{}); err != nil { log.Fatalf("Failed to auto migrate: %v", err) } // 4. Setup Services and Handlers syncService := service.NewSyncService(rdb) - handler := api.NewHandler(db, syncService) + logWriter := service.NewLogWriter(db, cfg.Log.QueueCapacity, cfg.Log.BatchSize, cfg.Log.FlushInterval) + logCtx, cancelLogs := context.WithCancel(context.Background()) + defer cancelLogs() + logWriter.Start(logCtx) + handler := api.NewHandler(db, syncService, logWriter) + + // 4.1 Prime Redis snapshots so DP can start with data + if err := syncService.SyncAll(db); err != nil { + log.Printf("Initial sync warning: %v", err) + } // 5. Setup Gin Router r := gin.Default() @@ -74,7 +83,12 @@ func main() { }) // API Routes + r.POST("/providers", handler.CreateProvider) r.POST("/keys", handler.CreateKey) + r.POST("/models", handler.CreateModel) + r.GET("/models", handler.ListModels) + r.POST("/sync/snapshot", handler.SyncSnapshot) + r.POST("/logs", handler.IngestLog) srv := &http.Server{ Addr: ":" + cfg.Server.Port, diff --git a/internal/api/handler.go b/internal/api/handler.go index 83748b6..596415e 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -2,7 +2,9 @@ package api import ( "net/http" + "strconv" + "github.com/ez-api/ez-api/internal/dto" "github.com/ez-api/ez-api/internal/model" "github.com/ez-api/ez-api/internal/service" "github.com/gin-gonic/gin" @@ -10,33 +12,170 @@ import ( ) type Handler struct { - db *gorm.DB - sync *service.SyncService + db *gorm.DB + sync *service.SyncService + logger *service.LogWriter } -func NewHandler(db *gorm.DB, sync *service.SyncService) *Handler { - return &Handler{db: db, sync: sync} +func NewHandler(db *gorm.DB, sync *service.SyncService, logger *service.LogWriter) *Handler { + return &Handler{db: db, sync: sync, logger: logger} } func (h *Handler) CreateKey(c *gin.Context) { - var key model.Key - if err := c.ShouldBindJSON(&key); err != nil { + var req dto.KeyDTO + if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } - // Save to DB + key := model.Key{ + ProviderID: &req.ProviderID, + KeySecret: req.KeySecret, + Balance: req.Balance, + Status: req.Status, + Weight: req.Weight, + } + if err := h.db.Create(&key).Error; err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create key", "details": err.Error()}) return } - // Sync to Redis - if err := h.sync.SyncKey(&key); err != nil { - // Note: In a real system, we might want to rollback DB or retry async + // Write auth hash and refresh snapshots + if err := h.sync.SyncAll(h.db); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync key to Redis", "details": err.Error()}) return } c.JSON(http.StatusCreated, key) } + +func (h *Handler) CreateProvider(c *gin.Context) { + var req dto.ProviderDTO + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + provider := model.Provider{ + Name: req.Name, + Type: req.Type, + BaseURL: req.BaseURL, + APIKey: req.APIKey, + } + + if err := h.db.Create(&provider).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create provider", "details": err.Error()}) + return + } + + if err := h.sync.SyncAll(h.db); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync provider", "details": err.Error()}) + return + } + + c.JSON(http.StatusCreated, provider) +} + +func (h *Handler) CreateModel(c *gin.Context) { + var req dto.ModelDTO + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + modelReq := model.Model{ + Name: req.Name, + ContextWindow: req.ContextWindow, + CostPerToken: req.CostPerToken, + SupportsVision: req.SupportsVision, + SupportsFunctions: req.SupportsFunctions, + SupportsToolChoice: req.SupportsToolChoice, + SupportsFIM: req.SupportsFIM, + MaxOutputTokens: req.MaxOutputTokens, + } + + if err := h.db.Create(&modelReq).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create model", "details": err.Error()}) + return + } + + if err := h.sync.SyncAll(h.db); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync model", "details": err.Error()}) + return + } + + c.JSON(http.StatusCreated, modelReq) +} + +func (h *Handler) ListModels(c *gin.Context) { + var models []model.Model + if err := h.db.Find(&models).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to list models", "details": err.Error()}) + return + } + c.JSON(http.StatusOK, models) +} + +func (h *Handler) UpdateModel(c *gin.Context) { + idParam := c.Param("id") + id, err := strconv.Atoi(idParam) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) + return + } + + var req dto.ModelDTO + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + var existing model.Model + if err := h.db.First(&existing, id).Error; err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "model not found"}) + return + } + + existing.Name = req.Name + existing.ContextWindow = req.ContextWindow + existing.CostPerToken = req.CostPerToken + existing.SupportsVision = req.SupportsVision + existing.SupportsFunctions = req.SupportsFunctions + existing.SupportsToolChoice = req.SupportsToolChoice + existing.SupportsFIM = req.SupportsFIM + existing.MaxOutputTokens = req.MaxOutputTokens + + if err := h.db.Save(&existing).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update model", "details": err.Error()}) + return + } + + if err := h.sync.SyncAll(h.db); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync model", "details": err.Error()}) + return + } + + c.JSON(http.StatusOK, existing) +} + +func (h *Handler) SyncSnapshot(c *gin.Context) { + if err := h.sync.SyncAll(h.db); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync snapshots", "details": err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"status": "synced"}) +} + +// IngestLog accepts log records from data plane or other services. +func (h *Handler) IngestLog(c *gin.Context) { + var rec model.LogRecord + if err := c.ShouldBindJSON(&rec); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // By default, only metadata is expected; payload fields may be empty. + h.logger.Write(rec) + c.JSON(http.StatusAccepted, gin.H{"status": "queued"}) +} diff --git a/internal/config/config.go b/internal/config/config.go index ece5183..9262aaf 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,12 +3,14 @@ package config import ( "os" "strconv" + "time" ) type Config struct { Server ServerConfig Postgres PostgresConfig Redis RedisConfig + Log LogConfig } type ServerConfig struct { @@ -25,6 +27,12 @@ type RedisConfig struct { DB int } +type LogConfig struct { + BatchSize int + FlushInterval time.Duration + QueueCapacity int +} + func Load() (*Config, error) { return &Config{ Server: ServerConfig{ @@ -38,6 +46,11 @@ func Load() (*Config, error) { Password: getEnv("EZ_REDIS_PASSWORD", ""), DB: getEnvInt("EZ_REDIS_DB", 0), }, + Log: LogConfig{ + BatchSize: getEnvInt("EZ_LOG_BATCH_SIZE", 10), + FlushInterval: getEnvDuration("EZ_LOG_FLUSH_MS", 1000), + QueueCapacity: getEnvInt("EZ_LOG_QUEUE", 10000), + }, }, nil } @@ -56,3 +69,12 @@ func getEnvInt(key string, fallback int) int { } return fallback } + +func getEnvDuration(key string, fallbackMs int) time.Duration { + if value, ok := os.LookupEnv(key); ok { + if i, err := strconv.Atoi(value); err == nil { + return time.Duration(i) * time.Millisecond + } + } + return time.Duration(fallbackMs) * time.Millisecond +} diff --git a/internal/dto/key.go b/internal/dto/key.go new file mode 100644 index 0000000..976096a --- /dev/null +++ b/internal/dto/key.go @@ -0,0 +1,10 @@ +package dto + +// KeyDTO defines payload for key creation/update. +type KeyDTO struct { + ProviderID uint `json:"provider_id"` + KeySecret string `json:"key_secret"` + Balance float64 `json:"balance"` + Status string `json:"status"` + Weight int `json:"weight"` +} diff --git a/internal/dto/model.go b/internal/dto/model.go new file mode 100644 index 0000000..3845a4f --- /dev/null +++ b/internal/dto/model.go @@ -0,0 +1,13 @@ +package dto + +// ModelDTO is used for create/update of model capabilities. +type ModelDTO struct { + Name string `json:"name"` + ContextWindow int `json:"context_window"` + CostPerToken float64 `json:"cost_per_token"` + SupportsVision bool `json:"supports_vision"` + SupportsFunctions bool `json:"supports_functions"` + SupportsToolChoice bool `json:"supports_tool_choice"` + SupportsFIM bool `json:"supports_fim"` + MaxOutputTokens int `json:"max_output_tokens"` +} diff --git a/internal/dto/provider.go b/internal/dto/provider.go new file mode 100644 index 0000000..373693b --- /dev/null +++ b/internal/dto/provider.go @@ -0,0 +1,9 @@ +package dto + +// ProviderDTO defines inbound payload for provider creation/update. +type ProviderDTO struct { + Name string `json:"name"` + Type string `json:"type"` + BaseURL string `json:"base_url"` + APIKey string `json:"api_key"` +} diff --git a/internal/model/log.go b/internal/model/log.go new file mode 100644 index 0000000..cc1aef5 --- /dev/null +++ b/internal/model/log.go @@ -0,0 +1,22 @@ +package model + +import "gorm.io/gorm" + +// LogRecord stores lightweight metadata for auditing. Avoids full payload unless audit is triggered. +type LogRecord struct { + gorm.Model + Group string `json:"group"` + KeyID uint `json:"key_id"` + ModelName string `json:"model"` + StatusCode int `json:"status_code"` + LatencyMs int64 `json:"latency_ms"` + TokensIn int64 `json:"tokens_in"` + TokensOut int64 `json:"tokens_out"` + ErrorMessage string `json:"error_message"` + ClientIP string `json:"client_ip"` + RequestSize int64 `json:"request_size"` + ResponseSize int64 `json:"response_size"` + AuditReason string `json:"audit_reason"` + RequestBody string `json:"request_body"` // optional, only when audit triggered + ResponseBody string `json:"response_body"` // optional, only when audit triggered +} diff --git a/internal/model/models.go b/internal/model/models.go index ac51f97..1273332 100644 --- a/internal/model/models.go +++ b/internal/model/models.go @@ -6,32 +6,37 @@ import ( type User struct { gorm.Model - Username string `gorm:"uniqueIndex;not null"` - Quota int64 `gorm:"default:0"` - Role string `gorm:"default:'user'"` // admin, user + Username string `gorm:"uniqueIndex;not null" json:"username"` + Quota int64 `gorm:"default:0" json:"quota"` + Role string `gorm:"default:'user'" json:"role"` // admin, user } type Provider struct { gorm.Model - Name string `gorm:"not null"` - Type string `gorm:"not null"` // openai, anthropic, etc. - BaseURL string - APIKey string + Name string `gorm:"not null" json:"name"` + Type string `gorm:"not null" json:"type"` // openai, anthropic, etc. + BaseURL string `json:"base_url"` + APIKey string `json:"api_key"` } type Key struct { gorm.Model - ProviderID *uint - Provider *Provider - KeySecret string `gorm:"not null"` - Balance float64 - Status string `gorm:"default:'active'"` // active, suspended - Weight int `gorm:"default:10"` + ProviderID *uint `json:"provider_id"` + Provider *Provider `json:"-"` + KeySecret string `gorm:"not null" json:"key_secret"` + Balance float64 `json:"balance"` + Status string `gorm:"default:'active'" json:"status"` // active, suspended + Weight int `gorm:"default:10" json:"weight"` } type Model struct { gorm.Model - Name string `gorm:"uniqueIndex;not null"` - ContextWindow int - CostPerToken float64 + Name string `gorm:"uniqueIndex;not null" json:"name"` + ContextWindow int `json:"context_window"` + CostPerToken float64 `json:"cost_per_token"` + SupportsVision bool `json:"supports_vision"` + SupportsFunctions bool `json:"supports_functions"` + SupportsToolChoice bool `json:"supports_tool_choice"` + SupportsFIM bool `json:"supports_fim"` + MaxOutputTokens int `json:"max_output_tokens"` } diff --git a/internal/service/logger.go b/internal/service/logger.go new file mode 100644 index 0000000..f1b7bec --- /dev/null +++ b/internal/service/logger.go @@ -0,0 +1,79 @@ +package service + +import ( + "context" + "log" + "time" + + "github.com/ez-api/ez-api/internal/model" + "gorm.io/gorm" +) + +// LogWriter batches log records to reduce IO overhead. +type LogWriter struct { + ch chan model.LogRecord + batchSize int + flushInterval time.Duration + db *gorm.DB +} + +func NewLogWriter(db *gorm.DB, queueCapacity, batchSize int, flushInterval time.Duration) *LogWriter { + if batchSize <= 0 { + batchSize = 10 + } + if queueCapacity <= 0 { + queueCapacity = 1000 + } + if flushInterval <= 0 { + flushInterval = time.Second + } + return &LogWriter{ + ch: make(chan model.LogRecord, queueCapacity), + batchSize: batchSize, + flushInterval: flushInterval, + db: db, + } +} + +// Start begins a background writer. Should be called once at startup. +func (w *LogWriter) Start(ctx context.Context) { + go func() { + ticker := time.NewTicker(w.flushInterval) + defer ticker.Stop() + + buf := make([]model.LogRecord, 0, w.batchSize) + flush := func() { + if len(buf) == 0 { + return + } + if err := w.db.Create(&buf).Error; err != nil { + log.Printf("log batch insert failed: %v", err) + } + buf = buf[:0] + } + + for { + select { + case <-ctx.Done(): + flush() + return + case rec := <-w.ch: + buf = append(buf, rec) + if len(buf) >= w.batchSize { + flush() + } + case <-ticker.C: + flush() + } + } + }() +} + +// Write queues a log record; drops silently if buffer is full to protect performance. +func (w *LogWriter) Write(rec model.LogRecord) { + select { + case w.ch <- rec: + default: + // drop to avoid blocking hot path + } +} diff --git a/internal/service/sync.go b/internal/service/sync.go index 605f9a1..c2df3ab 100644 --- a/internal/service/sync.go +++ b/internal/service/sync.go @@ -4,10 +4,12 @@ import ( "context" "crypto/sha256" "encoding/hex" + "encoding/json" "fmt" "github.com/ez-api/ez-api/internal/model" "github.com/redis/go-redis/v9" + "gorm.io/gorm" ) type SyncService struct { @@ -19,26 +21,150 @@ func NewSyncService(rdb *redis.Client) *SyncService { } func (s *SyncService) SyncKey(key *model.Key) error { - // Hash the token - hasher := sha256.New() - hasher.Write([]byte(key.KeySecret)) - tokenHash := hex.EncodeToString(hasher.Sum(nil)) - + tokenHash := hashToken(key.KeySecret) redisKey := fmt.Sprintf("auth:token:%s", tokenHash) - // Store in Redis - // Using HSet to store multiple fields fields := map[string]interface{}{ "status": key.Status, "balance": key.Balance, } if key.ProviderID != nil { - fields["master_id"] = *key.ProviderID + fields["provider_id"] = *key.ProviderID } else { - fields["master_id"] = 0 // Default or handle as needed + fields["provider_id"] = 0 } - err := s.rdb.HSet(context.Background(), redisKey, fields).Err() - - return err + return s.rdb.HSet(context.Background(), redisKey, fields).Err() +} + +type providerSnapshot struct { + ID uint `json:"id"` + Name string `json:"name"` + Type string `json:"type"` + BaseURL string `json:"base_url"` + APIKey string `json:"api_key"` +} + +type keySnapshot struct { + ID uint `json:"id"` + ProviderID uint `json:"provider_id"` + TokenHash string `json:"token_hash"` + Status string `json:"status"` + Weight int `json:"weight"` + Balance float64 `json:"balance"` +} + +type modelSnapshot struct { + Name string `json:"name"` + ContextWindow int `json:"context_window"` + CostPerToken float64 `json:"cost_per_token"` + SupportsVision bool `json:"supports_vision"` + SupportsFunction bool `json:"supports_functions"` + SupportsToolChoice bool `json:"supports_tool_choice"` + SupportsFIM bool `json:"supports_fim"` + MaxOutputTokens int `json:"max_output_tokens"` +} + +// SyncAll writes full snapshots (providers/keys/models) into Redis for DP consumption. +func (s *SyncService) SyncAll(db *gorm.DB) error { + ctx := context.Background() + + // Providers snapshot + var providers []model.Provider + if err := db.Find(&providers).Error; err != nil { + return fmt.Errorf("load providers: %w", err) + } + providerSnap := make([]providerSnapshot, 0, len(providers)) + for _, p := range providers { + providerSnap = append(providerSnap, providerSnapshot{ + ID: p.ID, + Name: p.Name, + Type: p.Type, + BaseURL: p.BaseURL, + APIKey: p.APIKey, + }) + } + if err := s.storeJSON(ctx, "config:providers", providerSnap); err != nil { + return err + } + + // Keys snapshot + auth hashes + var keys []model.Key + if err := db.Find(&keys).Error; err != nil { + return fmt.Errorf("load keys: %w", err) + } + keySnap := make([]keySnapshot, 0, len(keys)) + for _, k := range keys { + tokenHash := hashToken(k.KeySecret) + keySnap = append(keySnap, keySnapshot{ + ID: k.ID, + ProviderID: firstID(k.ProviderID), + TokenHash: tokenHash, + Status: k.Status, + Weight: k.Weight, + Balance: k.Balance, + }) + + // Maintain per-token auth hash for quick checks + fields := map[string]interface{}{ + "status": k.Status, + "provider_id": firstID(k.ProviderID), + "weight": k.Weight, + "balance": k.Balance, + } + if err := s.rdb.HSet(ctx, fmt.Sprintf("auth:token:%s", tokenHash), fields).Err(); err != nil { + return fmt.Errorf("write auth token: %w", err) + } + } + if err := s.storeJSON(ctx, "config:keys", keySnap); err != nil { + return err + } + + // Models snapshot + var models []model.Model + if err := db.Find(&models).Error; err != nil { + return fmt.Errorf("load models: %w", err) + } + modelSnap := make([]modelSnapshot, 0, len(models)) + for _, m := range models { + modelSnap = append(modelSnap, modelSnapshot{ + Name: m.Name, + ContextWindow: m.ContextWindow, + CostPerToken: m.CostPerToken, + SupportsVision: m.SupportsVision, + SupportsFunction: m.SupportsFunctions, + SupportsToolChoice: m.SupportsToolChoice, + SupportsFIM: m.SupportsFIM, + MaxOutputTokens: m.MaxOutputTokens, + }) + } + if err := s.storeJSON(ctx, "meta:models", modelSnap); err != nil { + return err + } + + return nil +} + +func (s *SyncService) storeJSON(ctx context.Context, key string, val interface{}) error { + payload, err := json.Marshal(val) + if err != nil { + return fmt.Errorf("marshal %s: %w", key, err) + } + if err := s.rdb.Set(ctx, key, payload, 0).Err(); err != nil { + return fmt.Errorf("write %s: %w", key, err) + } + return nil +} + +func hashToken(token string) string { + hasher := sha256.New() + hasher.Write([]byte(token)) + return hex.EncodeToString(hasher.Sum(nil)) +} + +func firstID(id *uint) uint { + if id == nil { + return 0 + } + return *id }