mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
feat(server): implement management endpoints and redis sync
Enable full resource management via API and support data plane synchronization. - Add CRUD handlers for Providers, Models, and Keys using DTOs - Implement LogWriter service for asynchronous, batched audit logging - Update SyncService to snapshot full configuration state to Redis - Register new API routes and initialize background services - Add configuration options for logging performance tuning
This commit is contained in:
@@ -57,13 +57,22 @@ func main() {
|
|||||||
log.Println("Connected to PostgreSQL successfully")
|
log.Println("Connected to PostgreSQL successfully")
|
||||||
|
|
||||||
// Auto Migrate
|
// Auto Migrate
|
||||||
if err := db.AutoMigrate(&model.User{}, &model.Provider{}, &model.Key{}, &model.Model{}); err != nil {
|
if err := db.AutoMigrate(&model.User{}, &model.Provider{}, &model.Key{}, &model.Model{}, &model.LogRecord{}); err != nil {
|
||||||
log.Fatalf("Failed to auto migrate: %v", err)
|
log.Fatalf("Failed to auto migrate: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Setup Services and Handlers
|
// 4. Setup Services and Handlers
|
||||||
syncService := service.NewSyncService(rdb)
|
syncService := service.NewSyncService(rdb)
|
||||||
handler := api.NewHandler(db, syncService)
|
logWriter := service.NewLogWriter(db, cfg.Log.QueueCapacity, cfg.Log.BatchSize, cfg.Log.FlushInterval)
|
||||||
|
logCtx, cancelLogs := context.WithCancel(context.Background())
|
||||||
|
defer cancelLogs()
|
||||||
|
logWriter.Start(logCtx)
|
||||||
|
handler := api.NewHandler(db, syncService, logWriter)
|
||||||
|
|
||||||
|
// 4.1 Prime Redis snapshots so DP can start with data
|
||||||
|
if err := syncService.SyncAll(db); err != nil {
|
||||||
|
log.Printf("Initial sync warning: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// 5. Setup Gin Router
|
// 5. Setup Gin Router
|
||||||
r := gin.Default()
|
r := gin.Default()
|
||||||
@@ -74,7 +83,12 @@ func main() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// API Routes
|
// API Routes
|
||||||
|
r.POST("/providers", handler.CreateProvider)
|
||||||
r.POST("/keys", handler.CreateKey)
|
r.POST("/keys", handler.CreateKey)
|
||||||
|
r.POST("/models", handler.CreateModel)
|
||||||
|
r.GET("/models", handler.ListModels)
|
||||||
|
r.POST("/sync/snapshot", handler.SyncSnapshot)
|
||||||
|
r.POST("/logs", handler.IngestLog)
|
||||||
|
|
||||||
srv := &http.Server{
|
srv := &http.Server{
|
||||||
Addr: ":" + cfg.Server.Port,
|
Addr: ":" + cfg.Server.Port,
|
||||||
|
|||||||
@@ -2,7 +2,9 @@ package api
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/ez-api/ez-api/internal/dto"
|
||||||
"github.com/ez-api/ez-api/internal/model"
|
"github.com/ez-api/ez-api/internal/model"
|
||||||
"github.com/ez-api/ez-api/internal/service"
|
"github.com/ez-api/ez-api/internal/service"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
@@ -12,31 +14,168 @@ import (
|
|||||||
type Handler struct {
|
type Handler struct {
|
||||||
db *gorm.DB
|
db *gorm.DB
|
||||||
sync *service.SyncService
|
sync *service.SyncService
|
||||||
|
logger *service.LogWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHandler(db *gorm.DB, sync *service.SyncService) *Handler {
|
func NewHandler(db *gorm.DB, sync *service.SyncService, logger *service.LogWriter) *Handler {
|
||||||
return &Handler{db: db, sync: sync}
|
return &Handler{db: db, sync: sync, logger: logger}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) CreateKey(c *gin.Context) {
|
func (h *Handler) CreateKey(c *gin.Context) {
|
||||||
var key model.Key
|
var req dto.KeyDTO
|
||||||
if err := c.ShouldBindJSON(&key); err != nil {
|
if err := c.ShouldBindJSON(&req); err != nil {
|
||||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save to DB
|
key := model.Key{
|
||||||
|
ProviderID: &req.ProviderID,
|
||||||
|
KeySecret: req.KeySecret,
|
||||||
|
Balance: req.Balance,
|
||||||
|
Status: req.Status,
|
||||||
|
Weight: req.Weight,
|
||||||
|
}
|
||||||
|
|
||||||
if err := h.db.Create(&key).Error; err != nil {
|
if err := h.db.Create(&key).Error; err != nil {
|
||||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create key", "details": err.Error()})
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create key", "details": err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync to Redis
|
// Write auth hash and refresh snapshots
|
||||||
if err := h.sync.SyncKey(&key); err != nil {
|
if err := h.sync.SyncAll(h.db); err != nil {
|
||||||
// Note: In a real system, we might want to rollback DB or retry async
|
|
||||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync key to Redis", "details": err.Error()})
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync key to Redis", "details": err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.JSON(http.StatusCreated, key)
|
c.JSON(http.StatusCreated, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Handler) CreateProvider(c *gin.Context) {
|
||||||
|
var req dto.ProviderDTO
|
||||||
|
if err := c.ShouldBindJSON(&req); err != nil {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
provider := model.Provider{
|
||||||
|
Name: req.Name,
|
||||||
|
Type: req.Type,
|
||||||
|
BaseURL: req.BaseURL,
|
||||||
|
APIKey: req.APIKey,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.db.Create(&provider).Error; err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create provider", "details": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.sync.SyncAll(h.db); err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync provider", "details": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(http.StatusCreated, provider)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) CreateModel(c *gin.Context) {
|
||||||
|
var req dto.ModelDTO
|
||||||
|
if err := c.ShouldBindJSON(&req); err != nil {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
modelReq := model.Model{
|
||||||
|
Name: req.Name,
|
||||||
|
ContextWindow: req.ContextWindow,
|
||||||
|
CostPerToken: req.CostPerToken,
|
||||||
|
SupportsVision: req.SupportsVision,
|
||||||
|
SupportsFunctions: req.SupportsFunctions,
|
||||||
|
SupportsToolChoice: req.SupportsToolChoice,
|
||||||
|
SupportsFIM: req.SupportsFIM,
|
||||||
|
MaxOutputTokens: req.MaxOutputTokens,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.db.Create(&modelReq).Error; err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create model", "details": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.sync.SyncAll(h.db); err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync model", "details": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(http.StatusCreated, modelReq)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) ListModels(c *gin.Context) {
|
||||||
|
var models []model.Model
|
||||||
|
if err := h.db.Find(&models).Error; err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to list models", "details": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.JSON(http.StatusOK, models)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) UpdateModel(c *gin.Context) {
|
||||||
|
idParam := c.Param("id")
|
||||||
|
id, err := strconv.Atoi(idParam)
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var req dto.ModelDTO
|
||||||
|
if err := c.ShouldBindJSON(&req); err != nil {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var existing model.Model
|
||||||
|
if err := h.db.First(&existing, id).Error; err != nil {
|
||||||
|
c.JSON(http.StatusNotFound, gin.H{"error": "model not found"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
existing.Name = req.Name
|
||||||
|
existing.ContextWindow = req.ContextWindow
|
||||||
|
existing.CostPerToken = req.CostPerToken
|
||||||
|
existing.SupportsVision = req.SupportsVision
|
||||||
|
existing.SupportsFunctions = req.SupportsFunctions
|
||||||
|
existing.SupportsToolChoice = req.SupportsToolChoice
|
||||||
|
existing.SupportsFIM = req.SupportsFIM
|
||||||
|
existing.MaxOutputTokens = req.MaxOutputTokens
|
||||||
|
|
||||||
|
if err := h.db.Save(&existing).Error; err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update model", "details": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.sync.SyncAll(h.db); err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync model", "details": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, existing)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) SyncSnapshot(c *gin.Context) {
|
||||||
|
if err := h.sync.SyncAll(h.db); err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync snapshots", "details": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.JSON(http.StatusOK, gin.H{"status": "synced"})
|
||||||
|
}
|
||||||
|
|
||||||
|
// IngestLog accepts log records from data plane or other services.
|
||||||
|
func (h *Handler) IngestLog(c *gin.Context) {
|
||||||
|
var rec model.LogRecord
|
||||||
|
if err := c.ShouldBindJSON(&rec); err != nil {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// By default, only metadata is expected; payload fields may be empty.
|
||||||
|
h.logger.Write(rec)
|
||||||
|
c.JSON(http.StatusAccepted, gin.H{"status": "queued"})
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,12 +3,14 @@ package config
|
|||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Server ServerConfig
|
Server ServerConfig
|
||||||
Postgres PostgresConfig
|
Postgres PostgresConfig
|
||||||
Redis RedisConfig
|
Redis RedisConfig
|
||||||
|
Log LogConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServerConfig struct {
|
type ServerConfig struct {
|
||||||
@@ -25,6 +27,12 @@ type RedisConfig struct {
|
|||||||
DB int
|
DB int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type LogConfig struct {
|
||||||
|
BatchSize int
|
||||||
|
FlushInterval time.Duration
|
||||||
|
QueueCapacity int
|
||||||
|
}
|
||||||
|
|
||||||
func Load() (*Config, error) {
|
func Load() (*Config, error) {
|
||||||
return &Config{
|
return &Config{
|
||||||
Server: ServerConfig{
|
Server: ServerConfig{
|
||||||
@@ -38,6 +46,11 @@ func Load() (*Config, error) {
|
|||||||
Password: getEnv("EZ_REDIS_PASSWORD", ""),
|
Password: getEnv("EZ_REDIS_PASSWORD", ""),
|
||||||
DB: getEnvInt("EZ_REDIS_DB", 0),
|
DB: getEnvInt("EZ_REDIS_DB", 0),
|
||||||
},
|
},
|
||||||
|
Log: LogConfig{
|
||||||
|
BatchSize: getEnvInt("EZ_LOG_BATCH_SIZE", 10),
|
||||||
|
FlushInterval: getEnvDuration("EZ_LOG_FLUSH_MS", 1000),
|
||||||
|
QueueCapacity: getEnvInt("EZ_LOG_QUEUE", 10000),
|
||||||
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -56,3 +69,12 @@ func getEnvInt(key string, fallback int) int {
|
|||||||
}
|
}
|
||||||
return fallback
|
return fallback
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getEnvDuration(key string, fallbackMs int) time.Duration {
|
||||||
|
if value, ok := os.LookupEnv(key); ok {
|
||||||
|
if i, err := strconv.Atoi(value); err == nil {
|
||||||
|
return time.Duration(i) * time.Millisecond
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return time.Duration(fallbackMs) * time.Millisecond
|
||||||
|
}
|
||||||
|
|||||||
10
internal/dto/key.go
Normal file
10
internal/dto/key.go
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
package dto
|
||||||
|
|
||||||
|
// KeyDTO defines payload for key creation/update.
|
||||||
|
type KeyDTO struct {
|
||||||
|
ProviderID uint `json:"provider_id"`
|
||||||
|
KeySecret string `json:"key_secret"`
|
||||||
|
Balance float64 `json:"balance"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Weight int `json:"weight"`
|
||||||
|
}
|
||||||
13
internal/dto/model.go
Normal file
13
internal/dto/model.go
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
package dto
|
||||||
|
|
||||||
|
// ModelDTO is used for create/update of model capabilities.
|
||||||
|
type ModelDTO struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
ContextWindow int `json:"context_window"`
|
||||||
|
CostPerToken float64 `json:"cost_per_token"`
|
||||||
|
SupportsVision bool `json:"supports_vision"`
|
||||||
|
SupportsFunctions bool `json:"supports_functions"`
|
||||||
|
SupportsToolChoice bool `json:"supports_tool_choice"`
|
||||||
|
SupportsFIM bool `json:"supports_fim"`
|
||||||
|
MaxOutputTokens int `json:"max_output_tokens"`
|
||||||
|
}
|
||||||
9
internal/dto/provider.go
Normal file
9
internal/dto/provider.go
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
package dto
|
||||||
|
|
||||||
|
// ProviderDTO defines inbound payload for provider creation/update.
|
||||||
|
type ProviderDTO struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
BaseURL string `json:"base_url"`
|
||||||
|
APIKey string `json:"api_key"`
|
||||||
|
}
|
||||||
22
internal/model/log.go
Normal file
22
internal/model/log.go
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
package model
|
||||||
|
|
||||||
|
import "gorm.io/gorm"
|
||||||
|
|
||||||
|
// LogRecord stores lightweight metadata for auditing. Avoids full payload unless audit is triggered.
|
||||||
|
type LogRecord struct {
|
||||||
|
gorm.Model
|
||||||
|
Group string `json:"group"`
|
||||||
|
KeyID uint `json:"key_id"`
|
||||||
|
ModelName string `json:"model"`
|
||||||
|
StatusCode int `json:"status_code"`
|
||||||
|
LatencyMs int64 `json:"latency_ms"`
|
||||||
|
TokensIn int64 `json:"tokens_in"`
|
||||||
|
TokensOut int64 `json:"tokens_out"`
|
||||||
|
ErrorMessage string `json:"error_message"`
|
||||||
|
ClientIP string `json:"client_ip"`
|
||||||
|
RequestSize int64 `json:"request_size"`
|
||||||
|
ResponseSize int64 `json:"response_size"`
|
||||||
|
AuditReason string `json:"audit_reason"`
|
||||||
|
RequestBody string `json:"request_body"` // optional, only when audit triggered
|
||||||
|
ResponseBody string `json:"response_body"` // optional, only when audit triggered
|
||||||
|
}
|
||||||
@@ -6,32 +6,37 @@ import (
|
|||||||
|
|
||||||
type User struct {
|
type User struct {
|
||||||
gorm.Model
|
gorm.Model
|
||||||
Username string `gorm:"uniqueIndex;not null"`
|
Username string `gorm:"uniqueIndex;not null" json:"username"`
|
||||||
Quota int64 `gorm:"default:0"`
|
Quota int64 `gorm:"default:0" json:"quota"`
|
||||||
Role string `gorm:"default:'user'"` // admin, user
|
Role string `gorm:"default:'user'" json:"role"` // admin, user
|
||||||
}
|
}
|
||||||
|
|
||||||
type Provider struct {
|
type Provider struct {
|
||||||
gorm.Model
|
gorm.Model
|
||||||
Name string `gorm:"not null"`
|
Name string `gorm:"not null" json:"name"`
|
||||||
Type string `gorm:"not null"` // openai, anthropic, etc.
|
Type string `gorm:"not null" json:"type"` // openai, anthropic, etc.
|
||||||
BaseURL string
|
BaseURL string `json:"base_url"`
|
||||||
APIKey string
|
APIKey string `json:"api_key"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Key struct {
|
type Key struct {
|
||||||
gorm.Model
|
gorm.Model
|
||||||
ProviderID *uint
|
ProviderID *uint `json:"provider_id"`
|
||||||
Provider *Provider
|
Provider *Provider `json:"-"`
|
||||||
KeySecret string `gorm:"not null"`
|
KeySecret string `gorm:"not null" json:"key_secret"`
|
||||||
Balance float64
|
Balance float64 `json:"balance"`
|
||||||
Status string `gorm:"default:'active'"` // active, suspended
|
Status string `gorm:"default:'active'" json:"status"` // active, suspended
|
||||||
Weight int `gorm:"default:10"`
|
Weight int `gorm:"default:10" json:"weight"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Model struct {
|
type Model struct {
|
||||||
gorm.Model
|
gorm.Model
|
||||||
Name string `gorm:"uniqueIndex;not null"`
|
Name string `gorm:"uniqueIndex;not null" json:"name"`
|
||||||
ContextWindow int
|
ContextWindow int `json:"context_window"`
|
||||||
CostPerToken float64
|
CostPerToken float64 `json:"cost_per_token"`
|
||||||
|
SupportsVision bool `json:"supports_vision"`
|
||||||
|
SupportsFunctions bool `json:"supports_functions"`
|
||||||
|
SupportsToolChoice bool `json:"supports_tool_choice"`
|
||||||
|
SupportsFIM bool `json:"supports_fim"`
|
||||||
|
MaxOutputTokens int `json:"max_output_tokens"`
|
||||||
}
|
}
|
||||||
|
|||||||
79
internal/service/logger.go
Normal file
79
internal/service/logger.go
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ez-api/ez-api/internal/model"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LogWriter batches log records to reduce IO overhead.
|
||||||
|
type LogWriter struct {
|
||||||
|
ch chan model.LogRecord
|
||||||
|
batchSize int
|
||||||
|
flushInterval time.Duration
|
||||||
|
db *gorm.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLogWriter(db *gorm.DB, queueCapacity, batchSize int, flushInterval time.Duration) *LogWriter {
|
||||||
|
if batchSize <= 0 {
|
||||||
|
batchSize = 10
|
||||||
|
}
|
||||||
|
if queueCapacity <= 0 {
|
||||||
|
queueCapacity = 1000
|
||||||
|
}
|
||||||
|
if flushInterval <= 0 {
|
||||||
|
flushInterval = time.Second
|
||||||
|
}
|
||||||
|
return &LogWriter{
|
||||||
|
ch: make(chan model.LogRecord, queueCapacity),
|
||||||
|
batchSize: batchSize,
|
||||||
|
flushInterval: flushInterval,
|
||||||
|
db: db,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start begins a background writer. Should be called once at startup.
|
||||||
|
func (w *LogWriter) Start(ctx context.Context) {
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(w.flushInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
buf := make([]model.LogRecord, 0, w.batchSize)
|
||||||
|
flush := func() {
|
||||||
|
if len(buf) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := w.db.Create(&buf).Error; err != nil {
|
||||||
|
log.Printf("log batch insert failed: %v", err)
|
||||||
|
}
|
||||||
|
buf = buf[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
flush()
|
||||||
|
return
|
||||||
|
case rec := <-w.ch:
|
||||||
|
buf = append(buf, rec)
|
||||||
|
if len(buf) >= w.batchSize {
|
||||||
|
flush()
|
||||||
|
}
|
||||||
|
case <-ticker.C:
|
||||||
|
flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write queues a log record; drops silently if buffer is full to protect performance.
|
||||||
|
func (w *LogWriter) Write(rec model.LogRecord) {
|
||||||
|
select {
|
||||||
|
case w.ch <- rec:
|
||||||
|
default:
|
||||||
|
// drop to avoid blocking hot path
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,10 +4,12 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/ez-api/ez-api/internal/model"
|
"github.com/ez-api/ez-api/internal/model"
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
|
"gorm.io/gorm"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SyncService struct {
|
type SyncService struct {
|
||||||
@@ -19,26 +21,150 @@ func NewSyncService(rdb *redis.Client) *SyncService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SyncService) SyncKey(key *model.Key) error {
|
func (s *SyncService) SyncKey(key *model.Key) error {
|
||||||
// Hash the token
|
tokenHash := hashToken(key.KeySecret)
|
||||||
hasher := sha256.New()
|
|
||||||
hasher.Write([]byte(key.KeySecret))
|
|
||||||
tokenHash := hex.EncodeToString(hasher.Sum(nil))
|
|
||||||
|
|
||||||
redisKey := fmt.Sprintf("auth:token:%s", tokenHash)
|
redisKey := fmt.Sprintf("auth:token:%s", tokenHash)
|
||||||
|
|
||||||
// Store in Redis
|
|
||||||
// Using HSet to store multiple fields
|
|
||||||
fields := map[string]interface{}{
|
fields := map[string]interface{}{
|
||||||
"status": key.Status,
|
"status": key.Status,
|
||||||
"balance": key.Balance,
|
"balance": key.Balance,
|
||||||
}
|
}
|
||||||
if key.ProviderID != nil {
|
if key.ProviderID != nil {
|
||||||
fields["master_id"] = *key.ProviderID
|
fields["provider_id"] = *key.ProviderID
|
||||||
} else {
|
} else {
|
||||||
fields["master_id"] = 0 // Default or handle as needed
|
fields["provider_id"] = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.rdb.HSet(context.Background(), redisKey, fields).Err()
|
return s.rdb.HSet(context.Background(), redisKey, fields).Err()
|
||||||
|
}
|
||||||
return err
|
|
||||||
|
type providerSnapshot struct {
|
||||||
|
ID uint `json:"id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
BaseURL string `json:"base_url"`
|
||||||
|
APIKey string `json:"api_key"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type keySnapshot struct {
|
||||||
|
ID uint `json:"id"`
|
||||||
|
ProviderID uint `json:"provider_id"`
|
||||||
|
TokenHash string `json:"token_hash"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Weight int `json:"weight"`
|
||||||
|
Balance float64 `json:"balance"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type modelSnapshot struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
ContextWindow int `json:"context_window"`
|
||||||
|
CostPerToken float64 `json:"cost_per_token"`
|
||||||
|
SupportsVision bool `json:"supports_vision"`
|
||||||
|
SupportsFunction bool `json:"supports_functions"`
|
||||||
|
SupportsToolChoice bool `json:"supports_tool_choice"`
|
||||||
|
SupportsFIM bool `json:"supports_fim"`
|
||||||
|
MaxOutputTokens int `json:"max_output_tokens"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncAll writes full snapshots (providers/keys/models) into Redis for DP consumption.
|
||||||
|
func (s *SyncService) SyncAll(db *gorm.DB) error {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Providers snapshot
|
||||||
|
var providers []model.Provider
|
||||||
|
if err := db.Find(&providers).Error; err != nil {
|
||||||
|
return fmt.Errorf("load providers: %w", err)
|
||||||
|
}
|
||||||
|
providerSnap := make([]providerSnapshot, 0, len(providers))
|
||||||
|
for _, p := range providers {
|
||||||
|
providerSnap = append(providerSnap, providerSnapshot{
|
||||||
|
ID: p.ID,
|
||||||
|
Name: p.Name,
|
||||||
|
Type: p.Type,
|
||||||
|
BaseURL: p.BaseURL,
|
||||||
|
APIKey: p.APIKey,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := s.storeJSON(ctx, "config:providers", providerSnap); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keys snapshot + auth hashes
|
||||||
|
var keys []model.Key
|
||||||
|
if err := db.Find(&keys).Error; err != nil {
|
||||||
|
return fmt.Errorf("load keys: %w", err)
|
||||||
|
}
|
||||||
|
keySnap := make([]keySnapshot, 0, len(keys))
|
||||||
|
for _, k := range keys {
|
||||||
|
tokenHash := hashToken(k.KeySecret)
|
||||||
|
keySnap = append(keySnap, keySnapshot{
|
||||||
|
ID: k.ID,
|
||||||
|
ProviderID: firstID(k.ProviderID),
|
||||||
|
TokenHash: tokenHash,
|
||||||
|
Status: k.Status,
|
||||||
|
Weight: k.Weight,
|
||||||
|
Balance: k.Balance,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Maintain per-token auth hash for quick checks
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"status": k.Status,
|
||||||
|
"provider_id": firstID(k.ProviderID),
|
||||||
|
"weight": k.Weight,
|
||||||
|
"balance": k.Balance,
|
||||||
|
}
|
||||||
|
if err := s.rdb.HSet(ctx, fmt.Sprintf("auth:token:%s", tokenHash), fields).Err(); err != nil {
|
||||||
|
return fmt.Errorf("write auth token: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := s.storeJSON(ctx, "config:keys", keySnap); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Models snapshot
|
||||||
|
var models []model.Model
|
||||||
|
if err := db.Find(&models).Error; err != nil {
|
||||||
|
return fmt.Errorf("load models: %w", err)
|
||||||
|
}
|
||||||
|
modelSnap := make([]modelSnapshot, 0, len(models))
|
||||||
|
for _, m := range models {
|
||||||
|
modelSnap = append(modelSnap, modelSnapshot{
|
||||||
|
Name: m.Name,
|
||||||
|
ContextWindow: m.ContextWindow,
|
||||||
|
CostPerToken: m.CostPerToken,
|
||||||
|
SupportsVision: m.SupportsVision,
|
||||||
|
SupportsFunction: m.SupportsFunctions,
|
||||||
|
SupportsToolChoice: m.SupportsToolChoice,
|
||||||
|
SupportsFIM: m.SupportsFIM,
|
||||||
|
MaxOutputTokens: m.MaxOutputTokens,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := s.storeJSON(ctx, "meta:models", modelSnap); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SyncService) storeJSON(ctx context.Context, key string, val interface{}) error {
|
||||||
|
payload, err := json.Marshal(val)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal %s: %w", key, err)
|
||||||
|
}
|
||||||
|
if err := s.rdb.Set(ctx, key, payload, 0).Err(); err != nil {
|
||||||
|
return fmt.Errorf("write %s: %w", key, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func hashToken(token string) string {
|
||||||
|
hasher := sha256.New()
|
||||||
|
hasher.Write([]byte(token))
|
||||||
|
return hex.EncodeToString(hasher.Sum(nil))
|
||||||
|
}
|
||||||
|
|
||||||
|
func firstID(id *uint) uint {
|
||||||
|
if id == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return *id
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user