feat(core): implement sync outbox mechanism and refactor provider validation

- Introduce `SyncOutboxService` and model to retry failed CP-to-Redis sync operations
- Update `SyncService` to handle sync failures by enqueuing tasks to the outbox
- Centralize provider group and API key validation logic into `ProviderGroupManager`
- Refactor API handlers to utilize the new manager and robust sync methods
- Add configuration options for sync outbox (interval, batch size, retries)
This commit is contained in:
zenfun
2025-12-25 01:24:19 +08:00
parent 44a82fa252
commit 6a16712b9d
12 changed files with 750 additions and 113 deletions

View File

@@ -0,0 +1,91 @@
package service
import (
"fmt"
"strings"
"github.com/ez-api/ez-api/internal/model"
"github.com/ez-api/foundation/provider"
)
// ProviderGroupManager centralizes ProviderGroup defaults and validation.
type ProviderGroupManager struct{}
func NewProviderGroupManager() *ProviderGroupManager {
return &ProviderGroupManager{}
}
// NormalizeGroup applies type-specific defaults and validates required fields.
func (m *ProviderGroupManager) NormalizeGroup(group model.ProviderGroup) (model.ProviderGroup, error) {
name := strings.TrimSpace(group.Name)
if name == "" {
return model.ProviderGroup{}, fmt.Errorf("name required")
}
group.Name = name
ptypeRaw := strings.TrimSpace(group.Type)
ptype := provider.NormalizeType(ptypeRaw)
if ptype == "" {
return model.ProviderGroup{}, fmt.Errorf("type required")
}
group.Type = ptypeRaw
group.BaseURL = strings.TrimSpace(group.BaseURL)
group.GoogleProject = strings.TrimSpace(group.GoogleProject)
group.GoogleLocation = strings.TrimSpace(group.GoogleLocation)
switch ptype {
case provider.TypeOpenAI:
if group.BaseURL == "" {
group.BaseURL = "https://api.openai.com/v1"
}
case provider.TypeAnthropic, provider.TypeClaude:
if group.BaseURL == "" {
group.BaseURL = "https://api.anthropic.com"
}
case provider.TypeCompatible:
if group.BaseURL == "" {
return model.ProviderGroup{}, fmt.Errorf("base_url required for compatible providers")
}
default:
if provider.IsVertexFamily(ptype) {
if group.GoogleLocation == "" {
group.GoogleLocation = provider.DefaultGoogleLocation(ptype, "")
}
} else if provider.IsGoogleFamily(ptype) {
// Google SDK (gemini/google/aistudio) ignores base_url.
group.BaseURL = strings.TrimSpace(group.BaseURL)
}
}
if group.Status == "" {
group.Status = "active"
}
return group, nil
}
// ValidateAPIKey enforces provider-type requirements for APIKey entries.
func (m *ProviderGroupManager) ValidateAPIKey(group model.ProviderGroup, key model.APIKey) error {
ptype := provider.NormalizeType(group.Type)
if ptype == "" {
return fmt.Errorf("provider group type required")
}
apiKey := strings.TrimSpace(key.APIKey)
switch {
case provider.IsVertexFamily(ptype):
// Vertex uses ADC; api_key can be empty.
return nil
case provider.IsGoogleFamily(ptype):
if apiKey == "" {
return fmt.Errorf("api_key required for gemini api providers")
}
return nil
default:
if apiKey == "" {
return fmt.Errorf("api_key required")
}
return nil
}
}

View File

@@ -0,0 +1,56 @@
package service
import (
"testing"
"github.com/ez-api/ez-api/internal/model"
)
func TestProviderGroupManager_NormalizeGroupDefaults(t *testing.T) {
mgr := NewProviderGroupManager()
group := model.ProviderGroup{
Name: "g1",
Type: "openai",
}
got, err := mgr.NormalizeGroup(group)
if err != nil {
t.Fatalf("NormalizeGroup: %v", err)
}
if got.BaseURL != "https://api.openai.com/v1" {
t.Fatalf("expected openai base_url default, got %q", got.BaseURL)
}
group = model.ProviderGroup{
Name: "g2",
Type: "vertex",
}
got, err = mgr.NormalizeGroup(group)
if err != nil {
t.Fatalf("NormalizeGroup vertex: %v", err)
}
if got.GoogleLocation == "" {
t.Fatalf("expected default google_location for vertex")
}
}
func TestProviderGroupManager_CompatibleRequiresBaseURL(t *testing.T) {
mgr := NewProviderGroupManager()
_, err := mgr.NormalizeGroup(model.ProviderGroup{Name: "g3", Type: "compatible"})
if err == nil {
t.Fatalf("expected error for compatible without base_url")
}
}
func TestProviderGroupManager_ValidateAPIKey(t *testing.T) {
mgr := NewProviderGroupManager()
group := model.ProviderGroup{Name: "g", Type: "gemini"}
if _, err := mgr.NormalizeGroup(group); err != nil {
t.Fatalf("NormalizeGroup gemini: %v", err)
}
if err := mgr.ValidateAPIKey(group, model.APIKey{}); err == nil {
t.Fatalf("expected api_key required for gemini")
}
}

View File

@@ -17,16 +17,46 @@ import (
)
type SyncService struct {
rdb *redis.Client
rdb *redis.Client
outbox *SyncOutboxService
}
func NewSyncService(rdb *redis.Client) *SyncService {
return &SyncService{rdb: rdb}
}
// SetOutbox enables sync outbox retry for this service.
func (s *SyncService) SetOutbox(outbox *SyncOutboxService) {
s.outbox = outbox
}
// SyncKey writes a single key into Redis without rebuilding the entire snapshot.
func (s *SyncService) SyncKey(key *model.Key) error {
ctx := context.Background()
if key == nil {
return fmt.Errorf("key required")
}
tokenHash := key.TokenHash
if strings.TrimSpace(tokenHash) == "" {
tokenHash = tokenhash.HashToken(key.KeySecret) // backward compatibility
}
if strings.TrimSpace(tokenHash) == "" {
return fmt.Errorf("token hash missing for key %d", key.ID)
}
return s.handleSyncError(s.SyncKeyNow(context.Background(), key), SyncOutboxEntry{
ResourceType: "key",
Action: "upsert",
ResourceID: &key.ID,
})
}
// SyncKeyNow writes key metadata to Redis without outbox handling.
func (s *SyncService) SyncKeyNow(ctx context.Context, key *model.Key) error {
if key == nil {
return fmt.Errorf("key required")
}
if ctx == nil {
ctx = context.Background()
}
tokenHash := key.TokenHash
if strings.TrimSpace(tokenHash) == "" {
tokenHash = tokenhash.HashToken(key.KeySecret) // backward compatibility
@@ -65,7 +95,24 @@ func (s *SyncService) SyncKey(key *model.Key) error {
// SyncMaster writes master metadata into Redis used by the balancer for validation.
func (s *SyncService) SyncMaster(master *model.Master) error {
ctx := context.Background()
if master == nil {
return fmt.Errorf("master required")
}
return s.handleSyncError(s.SyncMasterNow(context.Background(), master), SyncOutboxEntry{
ResourceType: "master",
Action: "upsert",
ResourceID: &master.ID,
})
}
// SyncMasterNow writes master metadata to Redis without outbox handling.
func (s *SyncService) SyncMasterNow(ctx context.Context, master *model.Master) error {
if master == nil {
return fmt.Errorf("master required")
}
if ctx == nil {
ctx = context.Background()
}
key := fmt.Sprintf("auth:master:%d", master.ID)
if err := s.rdb.HSet(ctx, key, map[string]interface{}{
"epoch": master.Epoch,
@@ -79,10 +126,45 @@ func (s *SyncService) SyncMaster(master *model.Master) error {
// SyncProviders rebuilds provider snapshots from ProviderGroup + APIKey tables.
func (s *SyncService) SyncProviders(db *gorm.DB) error {
return s.syncProviders(db, SyncOutboxEntry{
ResourceType: "snapshot",
Action: "sync_providers",
})
}
// SyncProvidersForGroup retries provider snapshot with provider_group context.
func (s *SyncService) SyncProvidersForGroup(db *gorm.DB, groupID uint) error {
return s.syncProviders(db, SyncOutboxEntry{
ResourceType: "provider_group",
Action: "sync_providers",
ResourceID: &groupID,
})
}
// SyncProvidersForAPIKey retries provider snapshot with api_key context.
func (s *SyncService) SyncProvidersForAPIKey(db *gorm.DB, apiKeyID uint) error {
return s.syncProviders(db, SyncOutboxEntry{
ResourceType: "api_key",
Action: "sync_providers",
ResourceID: &apiKeyID,
})
}
func (s *SyncService) syncProviders(db *gorm.DB, entry SyncOutboxEntry) error {
if db == nil {
return fmt.Errorf("db required")
}
ctx := context.Background()
return s.handleSyncError(s.SyncProvidersNow(context.Background(), db), entry)
}
// SyncProvidersNow rebuilds provider snapshots without outbox handling.
func (s *SyncService) SyncProvidersNow(ctx context.Context, db *gorm.DB) error {
if db == nil {
return fmt.Errorf("db required")
}
if ctx == nil {
ctx = context.Background()
}
var groups []model.ProviderGroup
if err := db.Find(&groups).Error; err != nil {
@@ -106,7 +188,30 @@ func (s *SyncService) SyncProviders(db *gorm.DB) error {
// SyncModel writes a single model metadata record.
func (s *SyncService) SyncModel(m *model.Model) error {
ctx := context.Background()
if m == nil {
return fmt.Errorf("model required")
}
if strings.TrimSpace(m.Name) == "" {
return fmt.Errorf("model name required")
}
return s.handleSyncError(s.SyncModelNow(context.Background(), m), SyncOutboxEntry{
ResourceType: "model",
Action: "upsert",
ResourceID: &m.ID,
})
}
// SyncModelNow writes a single model metadata record without outbox handling.
func (s *SyncService) SyncModelNow(ctx context.Context, m *model.Model) error {
if m == nil {
return fmt.Errorf("model required")
}
if strings.TrimSpace(m.Name) == "" {
return fmt.Errorf("model name required")
}
if ctx == nil {
ctx = context.Background()
}
snap := modelcap.Model{
Name: m.Name,
Kind: string(modelcap.NormalizeKind(m.Kind)),
@@ -136,7 +241,28 @@ func (s *SyncService) SyncModelDelete(m *model.Model) error {
if name == "" {
return fmt.Errorf("model name required")
}
ctx := context.Background()
return s.handleSyncError(s.SyncModelDeleteNow(context.Background(), m), SyncOutboxEntry{
ResourceType: "model",
Action: "delete",
ResourceID: &m.ID,
Payload: map[string]any{
"name": name,
},
})
}
// SyncModelDeleteNow removes model metadata from Redis without outbox handling.
func (s *SyncService) SyncModelDeleteNow(ctx context.Context, m *model.Model) error {
if m == nil {
return fmt.Errorf("model required")
}
name := strings.TrimSpace(m.Name)
if name == "" {
return fmt.Errorf("model name required")
}
if ctx == nil {
ctx = context.Background()
}
if err := s.rdb.HDel(ctx, "meta:models", name).Err(); err != nil {
return fmt.Errorf("delete meta:models: %w", err)
}
@@ -249,7 +375,23 @@ func (s *SyncService) writeProvidersSnapshot(ctx context.Context, pipe redis.Pip
// SyncAll rebuilds Redis hashes from the database; use for cold starts or forced refreshes.
func (s *SyncService) SyncAll(db *gorm.DB) error {
ctx := context.Background()
if db == nil {
return fmt.Errorf("db required")
}
return s.handleSyncError(s.SyncAllNow(context.Background(), db), SyncOutboxEntry{
ResourceType: "snapshot",
Action: "sync_all",
})
}
// SyncAllNow rebuilds snapshots without outbox handling.
func (s *SyncService) SyncAllNow(ctx context.Context, db *gorm.DB) error {
if db == nil {
return fmt.Errorf("db required")
}
if ctx == nil {
ctx = context.Background()
}
var groups []model.ProviderGroup
if err := db.Find(&groups).Error; err != nil {
@@ -388,7 +530,45 @@ func (s *SyncService) SyncAll(db *gorm.DB) error {
// SyncBindings rebuilds the binding snapshot for DP routing.
// This is intentionally a rebuild to avoid stale entries on deletes/updates.
func (s *SyncService) SyncBindings(db *gorm.DB) error {
ctx := context.Background()
return s.syncBindings(db, SyncOutboxEntry{
ResourceType: "snapshot",
Action: "sync_bindings",
})
}
// SyncBindingsForGroup retries binding snapshot with provider_group context.
func (s *SyncService) SyncBindingsForGroup(db *gorm.DB, groupID uint) error {
return s.syncBindings(db, SyncOutboxEntry{
ResourceType: "provider_group",
Action: "sync_bindings",
ResourceID: &groupID,
})
}
// SyncBindingsForAPIKey retries binding snapshot with api_key context.
func (s *SyncService) SyncBindingsForAPIKey(db *gorm.DB, apiKeyID uint) error {
return s.syncBindings(db, SyncOutboxEntry{
ResourceType: "api_key",
Action: "sync_bindings",
ResourceID: &apiKeyID,
})
}
func (s *SyncService) syncBindings(db *gorm.DB, entry SyncOutboxEntry) error {
if db == nil {
return fmt.Errorf("db required")
}
return s.handleSyncError(s.SyncBindingsNow(context.Background(), db), entry)
}
// SyncBindingsNow rebuilds binding snapshot without outbox handling.
func (s *SyncService) SyncBindingsNow(ctx context.Context, db *gorm.DB) error {
if db == nil {
return fmt.Errorf("db required")
}
if ctx == nil {
ctx = context.Background()
}
var groups []model.ProviderGroup
if err := db.Find(&groups).Error; err != nil {
@@ -570,6 +750,19 @@ func (s *SyncService) hsetJSON(ctx context.Context, key, field string, val inter
return nil
}
func (s *SyncService) handleSyncError(err error, entry SyncOutboxEntry) error {
if err == nil {
return nil
}
if s == nil || s.outbox == nil || !s.outbox.Enabled() {
return err
}
if enqueueErr := s.outbox.Enqueue(entry); enqueueErr != nil {
return fmt.Errorf("sync failed: %w (outbox enqueue failed: %v)", err, enqueueErr)
}
return nil
}
func normalizeWeight(weight int) int {
if weight <= 0 {
return 1

View File

@@ -0,0 +1,265 @@
package service
import (
"context"
"fmt"
"log/slog"
"strings"
"time"
"github.com/ez-api/ez-api/internal/model"
"github.com/ez-api/foundation/jsoncodec"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
const (
outboxStatusPending = "pending"
outboxStatusFailed = "failed"
)
// SyncOutboxConfig controls retry behavior for CP -> Redis sync failures.
type SyncOutboxConfig struct {
Enabled bool
Interval time.Duration
BatchSize int
MaxRetries int
}
// SyncOutboxEntry captures a sync operation for retry.
type SyncOutboxEntry struct {
ResourceType string
Action string
ResourceID *uint
Payload any
}
// SyncOutboxService retries failed sync operations stored in the database.
type SyncOutboxService struct {
db *gorm.DB
sync *SyncService
cfg SyncOutboxConfig
logger *slog.Logger
started bool
}
func NewSyncOutboxService(db *gorm.DB, sync *SyncService, cfg SyncOutboxConfig, logger *slog.Logger) *SyncOutboxService {
if logger == nil {
logger = slog.Default()
}
if cfg.Interval <= 0 {
cfg.Interval = 5 * time.Second
}
if cfg.BatchSize <= 0 {
cfg.BatchSize = 200
}
if cfg.MaxRetries <= 0 {
cfg.MaxRetries = 10
}
return &SyncOutboxService{db: db, sync: sync, cfg: cfg, logger: logger}
}
func (s *SyncOutboxService) Enabled() bool {
return s != nil && s.cfg.Enabled
}
// Enqueue persists a failed sync operation for retry.
func (s *SyncOutboxService) Enqueue(entry SyncOutboxEntry) error {
if s == nil || !s.cfg.Enabled {
return fmt.Errorf("sync outbox disabled")
}
if s.db == nil {
return fmt.Errorf("sync outbox db missing")
}
entry.ResourceType = strings.TrimSpace(entry.ResourceType)
entry.Action = strings.TrimSpace(entry.Action)
if entry.ResourceType == "" || entry.Action == "" {
return fmt.Errorf("resource_type and action required")
}
payload := ""
if entry.Payload != nil {
if raw, err := jsoncodec.Marshal(entry.Payload); err == nil {
payload = string(raw)
}
}
next := time.Now().UTC()
outbox := model.SyncOutbox{
ResourceType: entry.ResourceType,
Action: entry.Action,
ResourceID: entry.ResourceID,
Payload: payload,
RetryCount: 0,
LastError: "",
NextRetryAt: &next,
Status: outboxStatusPending,
}
if err := s.db.Create(&outbox).Error; err != nil {
return err
}
return nil
}
// Start begins the background retry loop.
func (s *SyncOutboxService) Start(ctx context.Context) {
if s == nil || !s.cfg.Enabled || s.started {
return
}
s.started = true
ticker := time.NewTicker(s.cfg.Interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.processBatch(ctx)
}
}
}
func (s *SyncOutboxService) processBatch(ctx context.Context) {
if s == nil || !s.cfg.Enabled || s.db == nil || s.sync == nil {
return
}
if ctx == nil {
ctx = context.Background()
}
now := time.Now().UTC()
var items []model.SyncOutbox
q := s.db.WithContext(ctx).
Where("status = ?", outboxStatusPending).
Where("next_retry_at <= ?", now).
Order("id asc").
Limit(s.cfg.BatchSize)
if err := q.Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}).Find(&items).Error; err != nil {
s.logger.Warn("sync outbox load failed", "err", err)
return
}
for i := range items {
item := items[i]
if err := s.processItem(ctx, &item); err != nil {
s.logger.Warn("sync outbox retry failed", "id", item.ID, "err", err)
}
}
}
func (s *SyncOutboxService) processItem(ctx context.Context, item *model.SyncOutbox) error {
if item == nil {
return nil
}
if err := s.applyItem(ctx, item); err != nil {
return s.recordFailure(ctx, item, err)
}
return s.db.WithContext(ctx).Delete(&model.SyncOutbox{}, item.ID).Error
}
func (s *SyncOutboxService) recordFailure(ctx context.Context, item *model.SyncOutbox, err error) error {
item.RetryCount++
item.LastError = err.Error()
if item.RetryCount >= s.cfg.MaxRetries {
item.Status = outboxStatusFailed
item.NextRetryAt = nil
s.logger.Error("sync outbox max retries reached", "id", item.ID, "resource_type", item.ResourceType, "action", item.Action, "err", err)
} else {
next := time.Now().UTC().Add(s.cfg.Interval)
item.NextRetryAt = &next
}
return s.db.WithContext(ctx).Save(item).Error
}
func (s *SyncOutboxService) applyItem(ctx context.Context, item *model.SyncOutbox) error {
resource := strings.TrimSpace(item.ResourceType)
action := strings.TrimSpace(item.Action)
if resource == "" || action == "" {
return fmt.Errorf("resource_type/action missing")
}
switch resource {
case "master":
return s.applyMaster(ctx, item)
case "key":
return s.applyKey(ctx, item)
case "model":
return s.applyModel(ctx, item)
case "provider_group", "api_key":
switch action {
case "sync_bindings":
return s.sync.SyncBindingsNow(ctx, s.db)
case "sync_providers":
return s.sync.SyncProvidersNow(ctx, s.db)
default:
return fmt.Errorf("unsupported %s action: %s", resource, action)
}
case "binding":
return s.sync.SyncBindingsNow(ctx, s.db)
case "snapshot":
switch action {
case "sync_all":
return s.sync.SyncAllNow(ctx, s.db)
case "sync_bindings":
return s.sync.SyncBindingsNow(ctx, s.db)
case "sync_providers":
return s.sync.SyncProvidersNow(ctx, s.db)
default:
return fmt.Errorf("unsupported snapshot action: %s", action)
}
default:
return fmt.Errorf("unsupported resource_type: %s", resource)
}
}
func (s *SyncOutboxService) applyMaster(ctx context.Context, item *model.SyncOutbox) error {
if item.ResourceID == nil {
return fmt.Errorf("master id missing")
}
var m model.Master
if err := s.db.WithContext(ctx).First(&m, *item.ResourceID).Error; err != nil {
return err
}
return s.sync.SyncMasterNow(ctx, &m)
}
func (s *SyncOutboxService) applyKey(ctx context.Context, item *model.SyncOutbox) error {
if item.ResourceID == nil {
return fmt.Errorf("key id missing")
}
var k model.Key
if err := s.db.WithContext(ctx).First(&k, *item.ResourceID).Error; err != nil {
return err
}
return s.sync.SyncKeyNow(ctx, &k)
}
func (s *SyncOutboxService) applyModel(ctx context.Context, item *model.SyncOutbox) error {
action := strings.TrimSpace(item.Action)
switch action {
case "delete":
var payload struct {
Name string `json:"name"`
}
if item.Payload != "" {
if err := jsoncodec.Unmarshal([]byte(item.Payload), &payload); err != nil {
return err
}
}
if strings.TrimSpace(payload.Name) == "" {
return fmt.Errorf("model name missing")
}
m := model.Model{Name: strings.TrimSpace(payload.Name)}
return s.sync.SyncModelDeleteNow(ctx, &m)
default:
if item.ResourceID == nil {
return fmt.Errorf("model id missing")
}
var m model.Model
if err := s.db.WithContext(ctx).First(&m, *item.ResourceID).Error; err != nil {
return err
}
return s.sync.SyncModelNow(ctx, &m)
}
}