From 31914b9ab5fc23f417916089b89865101a9ea3ef Mon Sep 17 00:00:00 2001 From: zenfun Date: Thu, 1 Jan 2026 00:55:51 +0800 Subject: [PATCH] refactor(scheduler): migrate outbox and model registry to scheduler-based execution Replace internal goroutine-based timing loops with scheduler integration for SyncOutboxService and ModelRegistryService. Both services now expose RunOnce() methods called by the central scheduler instead of managing their own background loops. - Add Interval() and RunOnce() methods to SyncOutboxService - Add RefreshEvery() and RunOnce() methods to ModelRegistryService - Remove started flag from SyncOutboxService struct - Move scheduler.Start() after all services are initialized - Ensure initial model registry refresh before scheduler starts --- cmd/server/main.go | 19 ++++++++++------ internal/service/model_registry.go | 31 ++++++++++++------------- internal/service/sync_outbox.go | 36 +++++++++++++----------------- 3 files changed, 41 insertions(+), 45 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 0cec043..fee4de7 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -164,6 +164,7 @@ func main() { // 4. Setup Services and Handlers syncService := service.NewSyncService(rdb) + var outboxService *service.SyncOutboxService if cfg.SyncOutbox.Enabled { outboxCfg := service.SyncOutboxConfig{ Enabled: cfg.SyncOutbox.Enabled, @@ -171,11 +172,8 @@ func main() { BatchSize: cfg.SyncOutbox.BatchSize, MaxRetries: cfg.SyncOutbox.MaxRetries, } - outboxService := service.NewSyncOutboxService(db, syncService, outboxCfg, logger) + outboxService = service.NewSyncOutboxService(db, syncService, outboxCfg, logger) syncService.SetOutbox(outboxService) - outboxCtx, cancelOutbox := context.WithCancel(context.Background()) - defer cancelOutbox() - go outboxService.Start(outboxCtx) } logPartitioner := service.NewLogPartitioner(logDB, cfg.Log.Partitioning) if logPartitioner.Enabled() { @@ -202,13 +200,15 @@ func main() { alertDetectorConfig := cron.DefaultAlertDetectorConfig() alertDetector := cron.NewAlertDetector(db, logDB, rdb, service.NewStatsService(rdb), alertDetectorConfig, logger) - // Setup scheduler + // Setup scheduler (jobs are added incrementally, Start() called after all services initialized) sched := scheduler.New(scheduler.WithLogger(logger), scheduler.WithSkipIfRunning()) sched.Every("quota-reset", time.Duration(cfg.Quota.ResetIntervalSeconds)*time.Second, quotaResetter.RunOnce) sched.Every("log-cleanup", time.Hour, logCleaner.RunOnce) sched.Every("token-refresh", time.Duration(cfg.TokenRefresh.IntervalSeconds)*time.Second, tokenRefresher.RunOnce) sched.Every("alert-detection", time.Minute, alertDetector.RunOnce) - sched.Start() + if outboxService != nil && outboxService.Enabled() { + sched.Every("sync-outbox", outboxService.Interval(), outboxService.RunOnce) + } defer sched.Stop() adminService, err := service.NewAdminService() @@ -243,7 +243,12 @@ func main() { if err := syncService.SyncAll(db); err != nil { logger.Warn("initial sync warning", "err", err) } - modelRegistryService.Start(context.Background()) + // Initial model registry refresh before scheduler starts + if modelRegistryService.Enabled() { + modelRegistryService.RunOnce(context.Background()) + sched.Every("model-registry-refresh", modelRegistryService.RefreshEvery(), modelRegistryService.RunOnce) + } + sched.Start() // 5. Setup Gin Router r := gin.Default() diff --git a/internal/service/model_registry.go b/internal/service/model_registry.go index e8951fb..63f1975 100644 --- a/internal/service/model_registry.go +++ b/internal/service/model_registry.go @@ -81,26 +81,23 @@ func NewModelRegistryService(db *gorm.DB, rdb *redis.Client, cfg ModelRegistryCo } } -func (s *ModelRegistryService) Start(ctx context.Context) { +func (s *ModelRegistryService) Enabled() bool { + return s != nil && s.cfg.Enabled +} + +func (s *ModelRegistryService) RefreshEvery() time.Duration { + if s == nil || s.cfg.RefreshEvery <= 0 { + return 30 * time.Minute + } + return s.cfg.RefreshEvery +} + +// RunOnce triggers a single model registry refresh. Called by scheduler. +func (s *ModelRegistryService) RunOnce(ctx context.Context) { if !s.cfg.Enabled { return } - go func() { - ticker := time.NewTicker(s.cfg.RefreshEvery) - defer ticker.Stop() - - // Best-effort initial refresh. - _ = s.Refresh(ctx, "") - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - _ = s.Refresh(ctx, "") - } - } - }() + _ = s.Refresh(ctx, "") } type ModelRegistryStatus struct { diff --git a/internal/service/sync_outbox.go b/internal/service/sync_outbox.go index a12006e..a8df583 100644 --- a/internal/service/sync_outbox.go +++ b/internal/service/sync_outbox.go @@ -36,11 +36,10 @@ type SyncOutboxEntry struct { // SyncOutboxService retries failed sync operations stored in the database. type SyncOutboxService struct { - db *gorm.DB - sync *SyncService - cfg SyncOutboxConfig - logger *slog.Logger - started bool + db *gorm.DB + sync *SyncService + cfg SyncOutboxConfig + logger *slog.Logger } func NewSyncOutboxService(db *gorm.DB, sync *SyncService, cfg SyncOutboxConfig, logger *slog.Logger) *SyncOutboxService { @@ -63,6 +62,13 @@ func (s *SyncOutboxService) Enabled() bool { return s != nil && s.cfg.Enabled } +func (s *SyncOutboxService) Interval() time.Duration { + if s == nil || s.cfg.Interval <= 0 { + return 5 * time.Second + } + return s.cfg.Interval +} + // Enqueue persists a failed sync operation for retry. func (s *SyncOutboxService) Enqueue(entry SyncOutboxEntry) error { if s == nil || !s.cfg.Enabled { @@ -101,24 +107,12 @@ func (s *SyncOutboxService) Enqueue(entry SyncOutboxEntry) error { return nil } -// Start begins the background retry loop. -func (s *SyncOutboxService) Start(ctx context.Context) { - if s == nil || !s.cfg.Enabled || s.started { +// RunOnce processes one batch of pending outbox items. Called by scheduler. +func (s *SyncOutboxService) RunOnce(ctx context.Context) { + if s == nil || !s.cfg.Enabled { return } - s.started = true - - ticker := time.NewTicker(s.cfg.Interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - s.processBatch(ctx) - } - } + s.processBatch(ctx) } func (s *SyncOutboxService) processBatch(ctx context.Context) {