package service import ( "context" "fmt" "log/slog" "strings" "time" "github.com/ez-api/ez-api/internal/model" "github.com/ez-api/foundation/jsoncodec" "gorm.io/gorm" "gorm.io/gorm/clause" ) const ( outboxStatusPending = "pending" outboxStatusFailed = "failed" ) // SyncOutboxConfig controls retry behavior for CP -> Redis sync failures. type SyncOutboxConfig struct { Enabled bool Interval time.Duration BatchSize int MaxRetries int } // SyncOutboxEntry captures a sync operation for retry. type SyncOutboxEntry struct { ResourceType string Action string ResourceID *uint Payload any } // SyncOutboxService retries failed sync operations stored in the database. type SyncOutboxService struct { db *gorm.DB sync *SyncService cfg SyncOutboxConfig logger *slog.Logger started bool } func NewSyncOutboxService(db *gorm.DB, sync *SyncService, cfg SyncOutboxConfig, logger *slog.Logger) *SyncOutboxService { if logger == nil { logger = slog.Default() } if cfg.Interval <= 0 { cfg.Interval = 5 * time.Second } if cfg.BatchSize <= 0 { cfg.BatchSize = 200 } if cfg.MaxRetries <= 0 { cfg.MaxRetries = 10 } return &SyncOutboxService{db: db, sync: sync, cfg: cfg, logger: logger} } func (s *SyncOutboxService) Enabled() bool { return s != nil && s.cfg.Enabled } // Enqueue persists a failed sync operation for retry. func (s *SyncOutboxService) Enqueue(entry SyncOutboxEntry) error { if s == nil || !s.cfg.Enabled { return fmt.Errorf("sync outbox disabled") } if s.db == nil { return fmt.Errorf("sync outbox db missing") } entry.ResourceType = strings.TrimSpace(entry.ResourceType) entry.Action = strings.TrimSpace(entry.Action) if entry.ResourceType == "" || entry.Action == "" { return fmt.Errorf("resource_type and action required") } payload := "" if entry.Payload != nil { if raw, err := jsoncodec.Marshal(entry.Payload); err == nil { payload = string(raw) } } next := time.Now().UTC() outbox := model.SyncOutbox{ ResourceType: entry.ResourceType, Action: entry.Action, ResourceID: entry.ResourceID, Payload: payload, RetryCount: 0, LastError: "", NextRetryAt: &next, Status: outboxStatusPending, } if err := s.db.Create(&outbox).Error; err != nil { return err } return nil } // Start begins the background retry loop. func (s *SyncOutboxService) Start(ctx context.Context) { if s == nil || !s.cfg.Enabled || s.started { return } s.started = true ticker := time.NewTicker(s.cfg.Interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s.processBatch(ctx) } } } func (s *SyncOutboxService) processBatch(ctx context.Context) { if s == nil || !s.cfg.Enabled || s.db == nil || s.sync == nil { return } if ctx == nil { ctx = context.Background() } now := time.Now().UTC() var items []model.SyncOutbox q := s.db.WithContext(ctx). Where("status = ?", outboxStatusPending). Where("next_retry_at <= ?", now). Order("id asc"). Limit(s.cfg.BatchSize) if err := q.Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}).Find(&items).Error; err != nil { s.logger.Warn("sync outbox load failed", "err", err) return } for i := range items { item := items[i] if err := s.processItem(ctx, &item); err != nil { s.logger.Warn("sync outbox retry failed", "id", item.ID, "err", err) } } } func (s *SyncOutboxService) processItem(ctx context.Context, item *model.SyncOutbox) error { if item == nil { return nil } if err := s.applyItem(ctx, item); err != nil { return s.recordFailure(ctx, item, err) } return s.db.WithContext(ctx).Delete(&model.SyncOutbox{}, item.ID).Error } func (s *SyncOutboxService) recordFailure(ctx context.Context, item *model.SyncOutbox, err error) error { item.RetryCount++ item.LastError = err.Error() if item.RetryCount >= s.cfg.MaxRetries { item.Status = outboxStatusFailed item.NextRetryAt = nil s.logger.Error("sync outbox max retries reached", "id", item.ID, "resource_type", item.ResourceType, "action", item.Action, "err", err) } else { next := time.Now().UTC().Add(s.cfg.Interval) item.NextRetryAt = &next } return s.db.WithContext(ctx).Save(item).Error } func (s *SyncOutboxService) applyItem(ctx context.Context, item *model.SyncOutbox) error { resource := strings.TrimSpace(item.ResourceType) action := strings.TrimSpace(item.Action) if resource == "" || action == "" { return fmt.Errorf("resource_type/action missing") } switch resource { case "master": return s.applyMaster(ctx, item) case "key": return s.applyKey(ctx, item) case "model": return s.applyModel(ctx, item) case "provider_group", "api_key": switch action { case "sync_bindings": return s.sync.SyncBindingsNow(ctx, s.db) case "sync_providers": return s.sync.SyncProvidersNow(ctx, s.db) default: return fmt.Errorf("unsupported %s action: %s", resource, action) } case "binding": return s.sync.SyncBindingsNow(ctx, s.db) case "snapshot": switch action { case "sync_all": return s.sync.SyncAllNow(ctx, s.db) case "sync_bindings": return s.sync.SyncBindingsNow(ctx, s.db) case "sync_providers": return s.sync.SyncProvidersNow(ctx, s.db) default: return fmt.Errorf("unsupported snapshot action: %s", action) } default: return fmt.Errorf("unsupported resource_type: %s", resource) } } func (s *SyncOutboxService) applyMaster(ctx context.Context, item *model.SyncOutbox) error { if item.ResourceID == nil { return fmt.Errorf("master id missing") } var m model.Master if err := s.db.WithContext(ctx).First(&m, *item.ResourceID).Error; err != nil { return err } return s.sync.SyncMasterNow(ctx, &m) } func (s *SyncOutboxService) applyKey(ctx context.Context, item *model.SyncOutbox) error { if item.ResourceID == nil { return fmt.Errorf("key id missing") } var k model.Key if err := s.db.WithContext(ctx).First(&k, *item.ResourceID).Error; err != nil { return err } return s.sync.SyncKeyNow(ctx, &k) } func (s *SyncOutboxService) applyModel(ctx context.Context, item *model.SyncOutbox) error { action := strings.TrimSpace(item.Action) switch action { case "delete": var payload struct { Name string `json:"name"` } if item.Payload != "" { if err := jsoncodec.Unmarshal([]byte(item.Payload), &payload); err != nil { return err } } if strings.TrimSpace(payload.Name) == "" { return fmt.Errorf("model name missing") } m := model.Model{Name: strings.TrimSpace(payload.Name)} return s.sync.SyncModelDeleteNow(ctx, &m) default: if item.ResourceID == nil { return fmt.Errorf("model id missing") } var m model.Model if err := s.db.WithContext(ctx).First(&m, *item.ResourceID).Error; err != nil { return err } return s.sync.SyncModelNow(ctx, &m) } }