diff --git a/.env.example b/.env.example index 253d566..d4ea798 100644 --- a/.env.example +++ b/.env.example @@ -14,6 +14,11 @@ EZ_LOG_RETENTION_DAYS=30 EZ_LOG_MAX_RECORDS=1000000 # 日志分区(off/month/day) EZ_LOG_PARTITIONING=off +# 同步可靠性(CP -> Redis outbox) +EZ_SYNC_OUTBOX_ENABLED=true +EZ_SYNC_OUTBOX_INTERVAL_SECONDS=5 +EZ_SYNC_OUTBOX_BATCH_SIZE=200 +EZ_SYNC_OUTBOX_MAX_RETRIES=10 # Log DB (docker-compose log-postgres,可选;默认不启用独立日志库) LOG_POSTGRES_USER=postgres @@ -31,3 +36,5 @@ EZ_BALANCER_STATS_FLUSH_TOKEN=internal123 EZ_BALANCER_STATS_FLUSH_INTERVAL_SECONDS=300 EZ_BALANCER_STATS_FLUSH_BATCH_SIZE=200 EZ_BALANCER_STATS_FLUSH_TIMEOUT_SECONDS=5 +# 按 provider type 配置非流式请求超时(秒,JSON) +EZ_BALANCER_TIMEOUT_BY_PROVIDER={"default":30,"openai":40,"compatible":40,"anthropic":50,"claude":50,"google":60} diff --git a/README.md b/README.md index 4718e55..908b4cd 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,10 @@ EZ-API 是“控制平面”,负责管理事实来源 (Source of Truth)。 | `EZ_LOG_RETENTION_DAYS` | `30` | 日志保留天数。 | | `EZ_LOG_MAX_RECORDS` | `1000000` | 日志最大记录数。 | | `EZ_LOG_PARTITIONING` | `off` | 日志分区(off/month/day)。 | +| `EZ_SYNC_OUTBOX_ENABLED` | `true` | CP->Redis 同步失败时启用 outbox 重试。 | +| `EZ_SYNC_OUTBOX_INTERVAL_SECONDS` | `5` | outbox 重试间隔(秒)。 | +| `EZ_SYNC_OUTBOX_BATCH_SIZE` | `200` | outbox 单次处理数量。 | +| `EZ_SYNC_OUTBOX_MAX_RETRIES` | `10` | outbox 最大重试次数。 | | `EZ_MODEL_REGISTRY_ENABLED` | `false` | 模型注册表开关。 | | `EZ_MODEL_REGISTRY_REFRESH_SECONDS` | `1800` | 模型注册表刷新间隔。 | | `EZ_MODEL_REGISTRY_CACHE_DIR` | `./data/model-registry` | 模型注册表缓存目录。 | diff --git a/cmd/server/main.go b/cmd/server/main.go index 4a3ac62..56d8132 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -135,7 +135,7 @@ func main() { // Auto Migrate if logDB != db { - if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}); err != nil { + if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.SyncOutbox{}); err != nil { fatal(logger, "failed to auto migrate", "err", err) } if err := logDB.AutoMigrate(&model.LogRecord{}); err != nil { @@ -145,7 +145,7 @@ func main() { fatal(logger, "failed to ensure log indexes", "err", err) } } else { - if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.LogRecord{}); err != nil { + if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.OperationLog{}, &model.LogRecord{}, &model.SyncOutbox{}); err != nil { fatal(logger, "failed to auto migrate", "err", err) } if err := service.EnsureLogIndexes(db); err != nil { @@ -155,6 +155,19 @@ func main() { // 4. Setup Services and Handlers syncService := service.NewSyncService(rdb) + if cfg.SyncOutbox.Enabled { + outboxCfg := service.SyncOutboxConfig{ + Enabled: cfg.SyncOutbox.Enabled, + Interval: time.Duration(cfg.SyncOutbox.IntervalSeconds) * time.Second, + BatchSize: cfg.SyncOutbox.BatchSize, + MaxRetries: cfg.SyncOutbox.MaxRetries, + } + outboxService := service.NewSyncOutboxService(db, syncService, outboxCfg, logger) + syncService.SetOutbox(outboxService) + outboxCtx, cancelOutbox := context.WithCancel(context.Background()) + defer cancelOutbox() + go outboxService.Start(outboxCtx) + } logPartitioner := service.NewLogPartitioner(logDB, cfg.Log.Partitioning) if logPartitioner.Enabled() { if _, err := logPartitioner.EnsurePartitionFor(time.Now().UTC()); err != nil { @@ -406,7 +419,7 @@ func runImport(logger *slog.Logger, args []string) int { return 1 } - if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}); err != nil { + if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.ProviderGroup{}, &model.APIKey{}, &model.Model{}, &model.Binding{}, &model.Namespace{}, &model.SyncOutbox{}); err != nil { logger.Error("failed to auto migrate", "err", err) return 1 } diff --git a/internal/api/api_key_handler.go b/internal/api/api_key_handler.go index c92ab5d..102958f 100644 --- a/internal/api/api_key_handler.go +++ b/internal/api/api_key_handler.go @@ -6,7 +6,6 @@ import ( "github.com/ez-api/ez-api/internal/dto" "github.com/ez-api/ez-api/internal/model" - "github.com/ez-api/foundation/provider" "github.com/gin-gonic/gin" ) @@ -39,11 +38,6 @@ func (h *Handler) CreateAPIKey(c *gin.Context) { } apiKey := strings.TrimSpace(req.APIKey) - ptype := provider.NormalizeType(group.Type) - if provider.IsGoogleFamily(ptype) && !provider.IsVertexFamily(ptype) && apiKey == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "api_key required for gemini api providers"}) - return - } status := strings.TrimSpace(req.Status) if status == "" { @@ -66,17 +60,21 @@ func (h *Handler) CreateAPIKey(c *gin.Context) { tu := req.BanUntil.UTC() key.BanUntil = &tu } + if err := h.groupManager.ValidateAPIKey(group, key); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } if err := h.db.Create(&key).Error; err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create api key", "details": err.Error()}) return } - if err := h.sync.SyncProviders(h.db); err != nil { + if err := h.sync.SyncProvidersForAPIKey(h.db, key.ID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync providers", "details": err.Error()}) return } - if err := h.sync.SyncBindings(h.db); err != nil { + if err := h.sync.SyncBindingsForAPIKey(h.db, key.ID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync bindings", "details": err.Error()}) return } @@ -167,12 +165,16 @@ func (h *Handler) UpdateAPIKey(c *gin.Context) { } update := map[string]any{} + groupID := key.GroupID + if req.GroupID != 0 { + groupID = req.GroupID + } + var group model.ProviderGroup + if err := h.db.First(&group, groupID).Error; err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "provider group not found"}) + return + } if req.GroupID != 0 { - var group model.ProviderGroup - if err := h.db.First(&group, req.GroupID).Error; err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "provider group not found"}) - return - } update["group_id"] = req.GroupID } if strings.TrimSpace(req.APIKey) != "" { @@ -197,6 +199,19 @@ func (h *Handler) UpdateAPIKey(c *gin.Context) { if req.BanUntil.IsZero() && strings.TrimSpace(req.Status) == "active" { update["ban_until"] = nil } + if req.GroupID != 0 || strings.TrimSpace(req.APIKey) != "" { + nextKey := key + if v, ok := update["api_key"].(string); ok { + nextKey.APIKey = v + } + if req.GroupID != 0 { + nextKey.GroupID = req.GroupID + } + if err := h.groupManager.ValidateAPIKey(group, nextKey); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + } if err := h.db.Model(&key).Updates(update).Error; err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update api key", "details": err.Error()}) @@ -207,11 +222,11 @@ func (h *Handler) UpdateAPIKey(c *gin.Context) { return } - if err := h.sync.SyncProviders(h.db); err != nil { + if err := h.sync.SyncProvidersForAPIKey(h.db, key.ID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync providers", "details": err.Error()}) return } - if err := h.sync.SyncBindings(h.db); err != nil { + if err := h.sync.SyncBindingsForAPIKey(h.db, key.ID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync bindings", "details": err.Error()}) return } @@ -246,11 +261,11 @@ func (h *Handler) DeleteAPIKey(c *gin.Context) { return } - if err := h.sync.SyncProviders(h.db); err != nil { + if err := h.sync.SyncProvidersForAPIKey(h.db, key.ID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync providers", "details": err.Error()}) return } - if err := h.sync.SyncBindings(h.db); err != nil { + if err := h.sync.SyncBindingsForAPIKey(h.db, key.ID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync bindings", "details": err.Error()}) return } diff --git a/internal/api/handler.go b/internal/api/handler.go index 3756d6c..a9226b0 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -22,6 +22,7 @@ type Handler struct { rdb *redis.Client logWebhook *service.LogWebhookService logPartitioner *service.LogPartitioner + groupManager *service.ProviderGroupManager } func NewHandler(db *gorm.DB, logDB *gorm.DB, sync *service.SyncService, logger *service.LogWriter, rdb *redis.Client, partitioner *service.LogPartitioner) *Handler { @@ -36,6 +37,7 @@ func NewHandler(db *gorm.DB, logDB *gorm.DB, sync *service.SyncService, logger * rdb: rdb, logWebhook: service.NewLogWebhookService(rdb), logPartitioner: partitioner, + groupManager: service.NewProviderGroupManager(), } } diff --git a/internal/api/provider_group_handler.go b/internal/api/provider_group_handler.go index 5936f50..302048d 100644 --- a/internal/api/provider_group_handler.go +++ b/internal/api/provider_group_handler.go @@ -7,7 +7,6 @@ import ( "github.com/ez-api/ez-api/internal/dto" "github.com/ez-api/ez-api/internal/model" - "github.com/ez-api/foundation/provider" "github.com/gin-gonic/gin" "gorm.io/gorm" ) @@ -37,64 +36,35 @@ func (h *Handler) CreateProviderGroup(c *gin.Context) { return } - ptype := provider.NormalizeType(req.Type) - if ptype == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "type required"}) - return - } - - baseURL := strings.TrimSpace(req.BaseURL) - googleLocation := provider.DefaultGoogleLocation(ptype, req.GoogleLocation) - - switch ptype { - case provider.TypeOpenAI: - if baseURL == "" { - baseURL = "https://api.openai.com/v1" - } - case provider.TypeAnthropic, provider.TypeClaude: - if baseURL == "" { - baseURL = "https://api.anthropic.com" - } - case provider.TypeCompatible: - if baseURL == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "base_url required for compatible providers"}) - return - } - default: - if provider.IsVertexFamily(ptype) && strings.TrimSpace(googleLocation) == "" { - googleLocation = provider.DefaultGoogleLocation(ptype, "") - } - } - - status := strings.TrimSpace(req.Status) - if status == "" { - status = "active" - } - group := model.ProviderGroup{ Name: name, Type: strings.TrimSpace(req.Type), - BaseURL: baseURL, + BaseURL: strings.TrimSpace(req.BaseURL), GoogleProject: strings.TrimSpace(req.GoogleProject), - GoogleLocation: googleLocation, + GoogleLocation: strings.TrimSpace(req.GoogleLocation), Models: strings.Join(req.Models, ","), - Status: status, + Status: strings.TrimSpace(req.Status), } - if err := h.db.Create(&group).Error; err != nil { + normalized, err := h.groupManager.NormalizeGroup(group) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + if err := h.db.Create(&normalized).Error; err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create provider group", "details": err.Error()}) return } - if err := h.sync.SyncProviders(h.db); err != nil { + if err := h.sync.SyncProvidersForGroup(h.db, normalized.ID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync providers", "details": err.Error()}) return } - if err := h.sync.SyncBindings(h.db); err != nil { + if err := h.sync.SyncBindingsForGroup(h.db, normalized.ID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync bindings", "details": err.Error()}) return } - c.JSON(http.StatusCreated, group) + c.JSON(http.StatusCreated, normalized) } // ListProviderGroups godoc @@ -181,71 +151,52 @@ func (h *Handler) UpdateProviderGroup(c *gin.Context) { return } - nextType := strings.TrimSpace(group.Type) - if t := strings.TrimSpace(req.Type); t != "" { - nextType = t - } - nextTypeLower := provider.NormalizeType(nextType) - nextBaseURL := strings.TrimSpace(group.BaseURL) - if strings.TrimSpace(req.BaseURL) != "" { - nextBaseURL = strings.TrimSpace(req.BaseURL) - } - - update := map[string]any{} + next := group if strings.TrimSpace(req.Name) != "" { - update["name"] = strings.TrimSpace(req.Name) + next.Name = strings.TrimSpace(req.Name) } if strings.TrimSpace(req.Type) != "" { - update["type"] = strings.TrimSpace(req.Type) + next.Type = strings.TrimSpace(req.Type) } if strings.TrimSpace(req.BaseURL) != "" { - update["base_url"] = strings.TrimSpace(req.BaseURL) + next.BaseURL = strings.TrimSpace(req.BaseURL) } if strings.TrimSpace(req.GoogleProject) != "" { - update["google_project"] = strings.TrimSpace(req.GoogleProject) + next.GoogleProject = strings.TrimSpace(req.GoogleProject) } if strings.TrimSpace(req.GoogleLocation) != "" { - update["google_location"] = strings.TrimSpace(req.GoogleLocation) - } else if provider.IsVertexFamily(nextTypeLower) && strings.TrimSpace(group.GoogleLocation) == "" { - update["google_location"] = provider.DefaultGoogleLocation(nextTypeLower, "") + next.GoogleLocation = strings.TrimSpace(req.GoogleLocation) } if req.Models != nil { - update["models"] = strings.Join(req.Models, ",") + next.Models = strings.Join(req.Models, ",") } if strings.TrimSpace(req.Status) != "" { - update["status"] = strings.TrimSpace(req.Status) + next.Status = strings.TrimSpace(req.Status) } - switch nextTypeLower { - case provider.TypeOpenAI: - if nextBaseURL == "" { - update["base_url"] = "https://api.openai.com/v1" - } - case provider.TypeAnthropic, provider.TypeClaude: - if nextBaseURL == "" { - update["base_url"] = "https://api.anthropic.com" - } - case provider.TypeCompatible: - if nextBaseURL == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "base_url required for compatible providers"}) - return - } + normalized, err := h.groupManager.NormalizeGroup(next) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return } + group.Name = normalized.Name + group.Type = normalized.Type + group.BaseURL = normalized.BaseURL + group.GoogleProject = normalized.GoogleProject + group.GoogleLocation = normalized.GoogleLocation + group.Models = normalized.Models + group.Status = normalized.Status - if err := h.db.Model(&group).Updates(update).Error; err != nil { + if err := h.db.Save(&group).Error; err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update provider group", "details": err.Error()}) return } - if err := h.db.First(&group, id).Error; err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to reload provider group", "details": err.Error()}) - return - } - if err := h.sync.SyncProviders(h.db); err != nil { + if err := h.sync.SyncProvidersForGroup(h.db, group.ID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync providers", "details": err.Error()}) return } - if err := h.sync.SyncBindings(h.db); err != nil { + if err := h.sync.SyncBindingsForGroup(h.db, group.ID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync bindings", "details": err.Error()}) return } @@ -289,11 +240,11 @@ func (h *Handler) DeleteProviderGroup(c *gin.Context) { return } - if err := h.sync.SyncProviders(h.db); err != nil { + if err := h.sync.SyncProvidersForGroup(h.db, group.ID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync providers", "details": err.Error()}) return } - if err := h.sync.SyncBindings(h.db); err != nil { + if err := h.sync.SyncBindingsForGroup(h.db, group.ID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync bindings", "details": err.Error()}) return } diff --git a/internal/config/config.go b/internal/config/config.go index ee3b200..bb9404d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -19,6 +19,7 @@ type Config struct { ModelRegistry ModelRegistryConfig Quota QuotaConfig Internal InternalConfig + SyncOutbox SyncOutboxConfig } type ServerConfig struct { @@ -71,6 +72,13 @@ type InternalConfig struct { StatsToken string } +type SyncOutboxConfig struct { + Enabled bool + IntervalSeconds int + BatchSize int + MaxRetries int +} + func Load() (*Config, error) { v := viper.New() @@ -97,6 +105,10 @@ func Load() (*Config, error) { v.SetDefault("model_registry.timeout_seconds", 30) v.SetDefault("quota.reset_interval_seconds", 300) v.SetDefault("internal.stats_token", "") + v.SetDefault("sync_outbox.enabled", true) + v.SetDefault("sync_outbox.interval_seconds", 5) + v.SetDefault("sync_outbox.batch_size", 200) + v.SetDefault("sync_outbox.max_retries", 10) v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) v.AutomaticEnv() @@ -124,6 +136,10 @@ func Load() (*Config, error) { _ = v.BindEnv("model_registry.timeout_seconds", "EZ_MODEL_REGISTRY_TIMEOUT_SECONDS") _ = v.BindEnv("quota.reset_interval_seconds", "EZ_QUOTA_RESET_INTERVAL_SECONDS") _ = v.BindEnv("internal.stats_token", "EZ_INTERNAL_STATS_TOKEN") + _ = v.BindEnv("sync_outbox.enabled", "EZ_SYNC_OUTBOX_ENABLED") + _ = v.BindEnv("sync_outbox.interval_seconds", "EZ_SYNC_OUTBOX_INTERVAL_SECONDS") + _ = v.BindEnv("sync_outbox.batch_size", "EZ_SYNC_OUTBOX_BATCH_SIZE") + _ = v.BindEnv("sync_outbox.max_retries", "EZ_SYNC_OUTBOX_MAX_RETRIES") if configFile := os.Getenv("EZ_CONFIG_FILE"); configFile != "" { v.SetConfigFile(configFile) @@ -182,6 +198,12 @@ func Load() (*Config, error) { Internal: InternalConfig{ StatsToken: v.GetString("internal.stats_token"), }, + SyncOutbox: SyncOutboxConfig{ + Enabled: v.GetBool("sync_outbox.enabled"), + IntervalSeconds: v.GetInt("sync_outbox.interval_seconds"), + BatchSize: v.GetInt("sync_outbox.batch_size"), + MaxRetries: v.GetInt("sync_outbox.max_retries"), + }, } return cfg, nil diff --git a/internal/model/sync_outbox.go b/internal/model/sync_outbox.go new file mode 100644 index 0000000..50d922d --- /dev/null +++ b/internal/model/sync_outbox.go @@ -0,0 +1,18 @@ +package model + +import "time" + +// SyncOutbox stores failed CP -> Redis sync operations for retry. +type SyncOutbox struct { + ID uint `gorm:"primaryKey" json:"id"` + ResourceType string `gorm:"size:50;index" json:"resource_type"` + Action string `gorm:"size:50" json:"action"` + ResourceID *uint `gorm:"index" json:"resource_id,omitempty"` + Payload string `gorm:"type:text" json:"payload,omitempty"` + RetryCount int `gorm:"default:0" json:"retry_count"` + LastError string `gorm:"type:text" json:"last_error,omitempty"` + NextRetryAt *time.Time `gorm:"index" json:"next_retry_at,omitempty"` + Status string `gorm:"size:20;default:'pending'" json:"status"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} diff --git a/internal/service/provider_group_manager.go b/internal/service/provider_group_manager.go new file mode 100644 index 0000000..82b01fc --- /dev/null +++ b/internal/service/provider_group_manager.go @@ -0,0 +1,91 @@ +package service + +import ( + "fmt" + "strings" + + "github.com/ez-api/ez-api/internal/model" + "github.com/ez-api/foundation/provider" +) + +// ProviderGroupManager centralizes ProviderGroup defaults and validation. +type ProviderGroupManager struct{} + +func NewProviderGroupManager() *ProviderGroupManager { + return &ProviderGroupManager{} +} + +// NormalizeGroup applies type-specific defaults and validates required fields. +func (m *ProviderGroupManager) NormalizeGroup(group model.ProviderGroup) (model.ProviderGroup, error) { + name := strings.TrimSpace(group.Name) + if name == "" { + return model.ProviderGroup{}, fmt.Errorf("name required") + } + group.Name = name + + ptypeRaw := strings.TrimSpace(group.Type) + ptype := provider.NormalizeType(ptypeRaw) + if ptype == "" { + return model.ProviderGroup{}, fmt.Errorf("type required") + } + group.Type = ptypeRaw + + group.BaseURL = strings.TrimSpace(group.BaseURL) + group.GoogleProject = strings.TrimSpace(group.GoogleProject) + group.GoogleLocation = strings.TrimSpace(group.GoogleLocation) + + switch ptype { + case provider.TypeOpenAI: + if group.BaseURL == "" { + group.BaseURL = "https://api.openai.com/v1" + } + case provider.TypeAnthropic, provider.TypeClaude: + if group.BaseURL == "" { + group.BaseURL = "https://api.anthropic.com" + } + case provider.TypeCompatible: + if group.BaseURL == "" { + return model.ProviderGroup{}, fmt.Errorf("base_url required for compatible providers") + } + default: + if provider.IsVertexFamily(ptype) { + if group.GoogleLocation == "" { + group.GoogleLocation = provider.DefaultGoogleLocation(ptype, "") + } + } else if provider.IsGoogleFamily(ptype) { + // Google SDK (gemini/google/aistudio) ignores base_url. + group.BaseURL = strings.TrimSpace(group.BaseURL) + } + } + + if group.Status == "" { + group.Status = "active" + } + + return group, nil +} + +// ValidateAPIKey enforces provider-type requirements for APIKey entries. +func (m *ProviderGroupManager) ValidateAPIKey(group model.ProviderGroup, key model.APIKey) error { + ptype := provider.NormalizeType(group.Type) + if ptype == "" { + return fmt.Errorf("provider group type required") + } + apiKey := strings.TrimSpace(key.APIKey) + + switch { + case provider.IsVertexFamily(ptype): + // Vertex uses ADC; api_key can be empty. + return nil + case provider.IsGoogleFamily(ptype): + if apiKey == "" { + return fmt.Errorf("api_key required for gemini api providers") + } + return nil + default: + if apiKey == "" { + return fmt.Errorf("api_key required") + } + return nil + } +} diff --git a/internal/service/provider_group_manager_test.go b/internal/service/provider_group_manager_test.go new file mode 100644 index 0000000..76bc20b --- /dev/null +++ b/internal/service/provider_group_manager_test.go @@ -0,0 +1,56 @@ +package service + +import ( + "testing" + + "github.com/ez-api/ez-api/internal/model" +) + +func TestProviderGroupManager_NormalizeGroupDefaults(t *testing.T) { + mgr := NewProviderGroupManager() + + group := model.ProviderGroup{ + Name: "g1", + Type: "openai", + } + got, err := mgr.NormalizeGroup(group) + if err != nil { + t.Fatalf("NormalizeGroup: %v", err) + } + if got.BaseURL != "https://api.openai.com/v1" { + t.Fatalf("expected openai base_url default, got %q", got.BaseURL) + } + + group = model.ProviderGroup{ + Name: "g2", + Type: "vertex", + } + got, err = mgr.NormalizeGroup(group) + if err != nil { + t.Fatalf("NormalizeGroup vertex: %v", err) + } + if got.GoogleLocation == "" { + t.Fatalf("expected default google_location for vertex") + } +} + +func TestProviderGroupManager_CompatibleRequiresBaseURL(t *testing.T) { + mgr := NewProviderGroupManager() + + _, err := mgr.NormalizeGroup(model.ProviderGroup{Name: "g3", Type: "compatible"}) + if err == nil { + t.Fatalf("expected error for compatible without base_url") + } +} + +func TestProviderGroupManager_ValidateAPIKey(t *testing.T) { + mgr := NewProviderGroupManager() + + group := model.ProviderGroup{Name: "g", Type: "gemini"} + if _, err := mgr.NormalizeGroup(group); err != nil { + t.Fatalf("NormalizeGroup gemini: %v", err) + } + if err := mgr.ValidateAPIKey(group, model.APIKey{}); err == nil { + t.Fatalf("expected api_key required for gemini") + } +} diff --git a/internal/service/sync.go b/internal/service/sync.go index f0f5f20..bb44ced 100644 --- a/internal/service/sync.go +++ b/internal/service/sync.go @@ -17,16 +17,46 @@ import ( ) type SyncService struct { - rdb *redis.Client + rdb *redis.Client + outbox *SyncOutboxService } func NewSyncService(rdb *redis.Client) *SyncService { return &SyncService{rdb: rdb} } +// SetOutbox enables sync outbox retry for this service. +func (s *SyncService) SetOutbox(outbox *SyncOutboxService) { + s.outbox = outbox +} + // SyncKey writes a single key into Redis without rebuilding the entire snapshot. func (s *SyncService) SyncKey(key *model.Key) error { - ctx := context.Background() + if key == nil { + return fmt.Errorf("key required") + } + tokenHash := key.TokenHash + if strings.TrimSpace(tokenHash) == "" { + tokenHash = tokenhash.HashToken(key.KeySecret) // backward compatibility + } + if strings.TrimSpace(tokenHash) == "" { + return fmt.Errorf("token hash missing for key %d", key.ID) + } + return s.handleSyncError(s.SyncKeyNow(context.Background(), key), SyncOutboxEntry{ + ResourceType: "key", + Action: "upsert", + ResourceID: &key.ID, + }) +} + +// SyncKeyNow writes key metadata to Redis without outbox handling. +func (s *SyncService) SyncKeyNow(ctx context.Context, key *model.Key) error { + if key == nil { + return fmt.Errorf("key required") + } + if ctx == nil { + ctx = context.Background() + } tokenHash := key.TokenHash if strings.TrimSpace(tokenHash) == "" { tokenHash = tokenhash.HashToken(key.KeySecret) // backward compatibility @@ -65,7 +95,24 @@ func (s *SyncService) SyncKey(key *model.Key) error { // SyncMaster writes master metadata into Redis used by the balancer for validation. func (s *SyncService) SyncMaster(master *model.Master) error { - ctx := context.Background() + if master == nil { + return fmt.Errorf("master required") + } + return s.handleSyncError(s.SyncMasterNow(context.Background(), master), SyncOutboxEntry{ + ResourceType: "master", + Action: "upsert", + ResourceID: &master.ID, + }) +} + +// SyncMasterNow writes master metadata to Redis without outbox handling. +func (s *SyncService) SyncMasterNow(ctx context.Context, master *model.Master) error { + if master == nil { + return fmt.Errorf("master required") + } + if ctx == nil { + ctx = context.Background() + } key := fmt.Sprintf("auth:master:%d", master.ID) if err := s.rdb.HSet(ctx, key, map[string]interface{}{ "epoch": master.Epoch, @@ -79,10 +126,45 @@ func (s *SyncService) SyncMaster(master *model.Master) error { // SyncProviders rebuilds provider snapshots from ProviderGroup + APIKey tables. func (s *SyncService) SyncProviders(db *gorm.DB) error { + return s.syncProviders(db, SyncOutboxEntry{ + ResourceType: "snapshot", + Action: "sync_providers", + }) +} + +// SyncProvidersForGroup retries provider snapshot with provider_group context. +func (s *SyncService) SyncProvidersForGroup(db *gorm.DB, groupID uint) error { + return s.syncProviders(db, SyncOutboxEntry{ + ResourceType: "provider_group", + Action: "sync_providers", + ResourceID: &groupID, + }) +} + +// SyncProvidersForAPIKey retries provider snapshot with api_key context. +func (s *SyncService) SyncProvidersForAPIKey(db *gorm.DB, apiKeyID uint) error { + return s.syncProviders(db, SyncOutboxEntry{ + ResourceType: "api_key", + Action: "sync_providers", + ResourceID: &apiKeyID, + }) +} + +func (s *SyncService) syncProviders(db *gorm.DB, entry SyncOutboxEntry) error { if db == nil { return fmt.Errorf("db required") } - ctx := context.Background() + return s.handleSyncError(s.SyncProvidersNow(context.Background(), db), entry) +} + +// SyncProvidersNow rebuilds provider snapshots without outbox handling. +func (s *SyncService) SyncProvidersNow(ctx context.Context, db *gorm.DB) error { + if db == nil { + return fmt.Errorf("db required") + } + if ctx == nil { + ctx = context.Background() + } var groups []model.ProviderGroup if err := db.Find(&groups).Error; err != nil { @@ -106,7 +188,30 @@ func (s *SyncService) SyncProviders(db *gorm.DB) error { // SyncModel writes a single model metadata record. func (s *SyncService) SyncModel(m *model.Model) error { - ctx := context.Background() + if m == nil { + return fmt.Errorf("model required") + } + if strings.TrimSpace(m.Name) == "" { + return fmt.Errorf("model name required") + } + return s.handleSyncError(s.SyncModelNow(context.Background(), m), SyncOutboxEntry{ + ResourceType: "model", + Action: "upsert", + ResourceID: &m.ID, + }) +} + +// SyncModelNow writes a single model metadata record without outbox handling. +func (s *SyncService) SyncModelNow(ctx context.Context, m *model.Model) error { + if m == nil { + return fmt.Errorf("model required") + } + if strings.TrimSpace(m.Name) == "" { + return fmt.Errorf("model name required") + } + if ctx == nil { + ctx = context.Background() + } snap := modelcap.Model{ Name: m.Name, Kind: string(modelcap.NormalizeKind(m.Kind)), @@ -136,7 +241,28 @@ func (s *SyncService) SyncModelDelete(m *model.Model) error { if name == "" { return fmt.Errorf("model name required") } - ctx := context.Background() + return s.handleSyncError(s.SyncModelDeleteNow(context.Background(), m), SyncOutboxEntry{ + ResourceType: "model", + Action: "delete", + ResourceID: &m.ID, + Payload: map[string]any{ + "name": name, + }, + }) +} + +// SyncModelDeleteNow removes model metadata from Redis without outbox handling. +func (s *SyncService) SyncModelDeleteNow(ctx context.Context, m *model.Model) error { + if m == nil { + return fmt.Errorf("model required") + } + name := strings.TrimSpace(m.Name) + if name == "" { + return fmt.Errorf("model name required") + } + if ctx == nil { + ctx = context.Background() + } if err := s.rdb.HDel(ctx, "meta:models", name).Err(); err != nil { return fmt.Errorf("delete meta:models: %w", err) } @@ -249,7 +375,23 @@ func (s *SyncService) writeProvidersSnapshot(ctx context.Context, pipe redis.Pip // SyncAll rebuilds Redis hashes from the database; use for cold starts or forced refreshes. func (s *SyncService) SyncAll(db *gorm.DB) error { - ctx := context.Background() + if db == nil { + return fmt.Errorf("db required") + } + return s.handleSyncError(s.SyncAllNow(context.Background(), db), SyncOutboxEntry{ + ResourceType: "snapshot", + Action: "sync_all", + }) +} + +// SyncAllNow rebuilds snapshots without outbox handling. +func (s *SyncService) SyncAllNow(ctx context.Context, db *gorm.DB) error { + if db == nil { + return fmt.Errorf("db required") + } + if ctx == nil { + ctx = context.Background() + } var groups []model.ProviderGroup if err := db.Find(&groups).Error; err != nil { @@ -388,7 +530,45 @@ func (s *SyncService) SyncAll(db *gorm.DB) error { // SyncBindings rebuilds the binding snapshot for DP routing. // This is intentionally a rebuild to avoid stale entries on deletes/updates. func (s *SyncService) SyncBindings(db *gorm.DB) error { - ctx := context.Background() + return s.syncBindings(db, SyncOutboxEntry{ + ResourceType: "snapshot", + Action: "sync_bindings", + }) +} + +// SyncBindingsForGroup retries binding snapshot with provider_group context. +func (s *SyncService) SyncBindingsForGroup(db *gorm.DB, groupID uint) error { + return s.syncBindings(db, SyncOutboxEntry{ + ResourceType: "provider_group", + Action: "sync_bindings", + ResourceID: &groupID, + }) +} + +// SyncBindingsForAPIKey retries binding snapshot with api_key context. +func (s *SyncService) SyncBindingsForAPIKey(db *gorm.DB, apiKeyID uint) error { + return s.syncBindings(db, SyncOutboxEntry{ + ResourceType: "api_key", + Action: "sync_bindings", + ResourceID: &apiKeyID, + }) +} + +func (s *SyncService) syncBindings(db *gorm.DB, entry SyncOutboxEntry) error { + if db == nil { + return fmt.Errorf("db required") + } + return s.handleSyncError(s.SyncBindingsNow(context.Background(), db), entry) +} + +// SyncBindingsNow rebuilds binding snapshot without outbox handling. +func (s *SyncService) SyncBindingsNow(ctx context.Context, db *gorm.DB) error { + if db == nil { + return fmt.Errorf("db required") + } + if ctx == nil { + ctx = context.Background() + } var groups []model.ProviderGroup if err := db.Find(&groups).Error; err != nil { @@ -570,6 +750,19 @@ func (s *SyncService) hsetJSON(ctx context.Context, key, field string, val inter return nil } +func (s *SyncService) handleSyncError(err error, entry SyncOutboxEntry) error { + if err == nil { + return nil + } + if s == nil || s.outbox == nil || !s.outbox.Enabled() { + return err + } + if enqueueErr := s.outbox.Enqueue(entry); enqueueErr != nil { + return fmt.Errorf("sync failed: %w (outbox enqueue failed: %v)", err, enqueueErr) + } + return nil +} + func normalizeWeight(weight int) int { if weight <= 0 { return 1 diff --git a/internal/service/sync_outbox.go b/internal/service/sync_outbox.go new file mode 100644 index 0000000..a12006e --- /dev/null +++ b/internal/service/sync_outbox.go @@ -0,0 +1,265 @@ +package service + +import ( + "context" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/ez-api/ez-api/internal/model" + "github.com/ez-api/foundation/jsoncodec" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +const ( + outboxStatusPending = "pending" + outboxStatusFailed = "failed" +) + +// SyncOutboxConfig controls retry behavior for CP -> Redis sync failures. +type SyncOutboxConfig struct { + Enabled bool + Interval time.Duration + BatchSize int + MaxRetries int +} + +// SyncOutboxEntry captures a sync operation for retry. +type SyncOutboxEntry struct { + ResourceType string + Action string + ResourceID *uint + Payload any +} + +// SyncOutboxService retries failed sync operations stored in the database. +type SyncOutboxService struct { + db *gorm.DB + sync *SyncService + cfg SyncOutboxConfig + logger *slog.Logger + started bool +} + +func NewSyncOutboxService(db *gorm.DB, sync *SyncService, cfg SyncOutboxConfig, logger *slog.Logger) *SyncOutboxService { + if logger == nil { + logger = slog.Default() + } + if cfg.Interval <= 0 { + cfg.Interval = 5 * time.Second + } + if cfg.BatchSize <= 0 { + cfg.BatchSize = 200 + } + if cfg.MaxRetries <= 0 { + cfg.MaxRetries = 10 + } + return &SyncOutboxService{db: db, sync: sync, cfg: cfg, logger: logger} +} + +func (s *SyncOutboxService) Enabled() bool { + return s != nil && s.cfg.Enabled +} + +// Enqueue persists a failed sync operation for retry. +func (s *SyncOutboxService) Enqueue(entry SyncOutboxEntry) error { + if s == nil || !s.cfg.Enabled { + return fmt.Errorf("sync outbox disabled") + } + if s.db == nil { + return fmt.Errorf("sync outbox db missing") + } + entry.ResourceType = strings.TrimSpace(entry.ResourceType) + entry.Action = strings.TrimSpace(entry.Action) + if entry.ResourceType == "" || entry.Action == "" { + return fmt.Errorf("resource_type and action required") + } + + payload := "" + if entry.Payload != nil { + if raw, err := jsoncodec.Marshal(entry.Payload); err == nil { + payload = string(raw) + } + } + + next := time.Now().UTC() + outbox := model.SyncOutbox{ + ResourceType: entry.ResourceType, + Action: entry.Action, + ResourceID: entry.ResourceID, + Payload: payload, + RetryCount: 0, + LastError: "", + NextRetryAt: &next, + Status: outboxStatusPending, + } + if err := s.db.Create(&outbox).Error; err != nil { + return err + } + return nil +} + +// Start begins the background retry loop. +func (s *SyncOutboxService) Start(ctx context.Context) { + if s == nil || !s.cfg.Enabled || s.started { + return + } + s.started = true + + ticker := time.NewTicker(s.cfg.Interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.processBatch(ctx) + } + } +} + +func (s *SyncOutboxService) processBatch(ctx context.Context) { + if s == nil || !s.cfg.Enabled || s.db == nil || s.sync == nil { + return + } + if ctx == nil { + ctx = context.Background() + } + + now := time.Now().UTC() + var items []model.SyncOutbox + q := s.db.WithContext(ctx). + Where("status = ?", outboxStatusPending). + Where("next_retry_at <= ?", now). + Order("id asc"). + Limit(s.cfg.BatchSize) + if err := q.Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}).Find(&items).Error; err != nil { + s.logger.Warn("sync outbox load failed", "err", err) + return + } + for i := range items { + item := items[i] + if err := s.processItem(ctx, &item); err != nil { + s.logger.Warn("sync outbox retry failed", "id", item.ID, "err", err) + } + } +} + +func (s *SyncOutboxService) processItem(ctx context.Context, item *model.SyncOutbox) error { + if item == nil { + return nil + } + if err := s.applyItem(ctx, item); err != nil { + return s.recordFailure(ctx, item, err) + } + return s.db.WithContext(ctx).Delete(&model.SyncOutbox{}, item.ID).Error +} + +func (s *SyncOutboxService) recordFailure(ctx context.Context, item *model.SyncOutbox, err error) error { + item.RetryCount++ + item.LastError = err.Error() + if item.RetryCount >= s.cfg.MaxRetries { + item.Status = outboxStatusFailed + item.NextRetryAt = nil + s.logger.Error("sync outbox max retries reached", "id", item.ID, "resource_type", item.ResourceType, "action", item.Action, "err", err) + } else { + next := time.Now().UTC().Add(s.cfg.Interval) + item.NextRetryAt = &next + } + return s.db.WithContext(ctx).Save(item).Error +} + +func (s *SyncOutboxService) applyItem(ctx context.Context, item *model.SyncOutbox) error { + resource := strings.TrimSpace(item.ResourceType) + action := strings.TrimSpace(item.Action) + if resource == "" || action == "" { + return fmt.Errorf("resource_type/action missing") + } + + switch resource { + case "master": + return s.applyMaster(ctx, item) + case "key": + return s.applyKey(ctx, item) + case "model": + return s.applyModel(ctx, item) + case "provider_group", "api_key": + switch action { + case "sync_bindings": + return s.sync.SyncBindingsNow(ctx, s.db) + case "sync_providers": + return s.sync.SyncProvidersNow(ctx, s.db) + default: + return fmt.Errorf("unsupported %s action: %s", resource, action) + } + case "binding": + return s.sync.SyncBindingsNow(ctx, s.db) + case "snapshot": + switch action { + case "sync_all": + return s.sync.SyncAllNow(ctx, s.db) + case "sync_bindings": + return s.sync.SyncBindingsNow(ctx, s.db) + case "sync_providers": + return s.sync.SyncProvidersNow(ctx, s.db) + default: + return fmt.Errorf("unsupported snapshot action: %s", action) + } + default: + return fmt.Errorf("unsupported resource_type: %s", resource) + } +} + +func (s *SyncOutboxService) applyMaster(ctx context.Context, item *model.SyncOutbox) error { + if item.ResourceID == nil { + return fmt.Errorf("master id missing") + } + var m model.Master + if err := s.db.WithContext(ctx).First(&m, *item.ResourceID).Error; err != nil { + return err + } + return s.sync.SyncMasterNow(ctx, &m) +} + +func (s *SyncOutboxService) applyKey(ctx context.Context, item *model.SyncOutbox) error { + if item.ResourceID == nil { + return fmt.Errorf("key id missing") + } + var k model.Key + if err := s.db.WithContext(ctx).First(&k, *item.ResourceID).Error; err != nil { + return err + } + return s.sync.SyncKeyNow(ctx, &k) +} + +func (s *SyncOutboxService) applyModel(ctx context.Context, item *model.SyncOutbox) error { + action := strings.TrimSpace(item.Action) + switch action { + case "delete": + var payload struct { + Name string `json:"name"` + } + if item.Payload != "" { + if err := jsoncodec.Unmarshal([]byte(item.Payload), &payload); err != nil { + return err + } + } + if strings.TrimSpace(payload.Name) == "" { + return fmt.Errorf("model name missing") + } + m := model.Model{Name: strings.TrimSpace(payload.Name)} + return s.sync.SyncModelDeleteNow(ctx, &m) + default: + if item.ResourceID == nil { + return fmt.Errorf("model id missing") + } + var m model.Model + if err := s.db.WithContext(ctx).First(&m, *item.ResourceID).Error; err != nil { + return err + } + return s.sync.SyncModelNow(ctx, &m) + } +}