refactor(scheduler): migrate outbox and model registry to scheduler-based execution

Replace internal goroutine-based timing loops with scheduler integration
for SyncOutboxService and ModelRegistryService. Both services now expose
RunOnce() methods called by the central scheduler instead of managing
their own background loops.

- Add Interval() and RunOnce() methods to SyncOutboxService
- Add RefreshEvery() and RunOnce() methods to ModelRegistryService
- Remove started flag from SyncOutboxService struct
- Move scheduler.Start() after all services are initialized
- Ensure initial model registry refresh before scheduler starts
This commit is contained in:
zenfun
2026-01-01 00:55:51 +08:00
parent 05caed37c2
commit 31914b9ab5
3 changed files with 41 additions and 45 deletions

View File

@@ -164,6 +164,7 @@ func main() {
// 4. Setup Services and Handlers // 4. Setup Services and Handlers
syncService := service.NewSyncService(rdb) syncService := service.NewSyncService(rdb)
var outboxService *service.SyncOutboxService
if cfg.SyncOutbox.Enabled { if cfg.SyncOutbox.Enabled {
outboxCfg := service.SyncOutboxConfig{ outboxCfg := service.SyncOutboxConfig{
Enabled: cfg.SyncOutbox.Enabled, Enabled: cfg.SyncOutbox.Enabled,
@@ -171,11 +172,8 @@ func main() {
BatchSize: cfg.SyncOutbox.BatchSize, BatchSize: cfg.SyncOutbox.BatchSize,
MaxRetries: cfg.SyncOutbox.MaxRetries, MaxRetries: cfg.SyncOutbox.MaxRetries,
} }
outboxService := service.NewSyncOutboxService(db, syncService, outboxCfg, logger) outboxService = service.NewSyncOutboxService(db, syncService, outboxCfg, logger)
syncService.SetOutbox(outboxService) syncService.SetOutbox(outboxService)
outboxCtx, cancelOutbox := context.WithCancel(context.Background())
defer cancelOutbox()
go outboxService.Start(outboxCtx)
} }
logPartitioner := service.NewLogPartitioner(logDB, cfg.Log.Partitioning) logPartitioner := service.NewLogPartitioner(logDB, cfg.Log.Partitioning)
if logPartitioner.Enabled() { if logPartitioner.Enabled() {
@@ -202,13 +200,15 @@ func main() {
alertDetectorConfig := cron.DefaultAlertDetectorConfig() alertDetectorConfig := cron.DefaultAlertDetectorConfig()
alertDetector := cron.NewAlertDetector(db, logDB, rdb, service.NewStatsService(rdb), alertDetectorConfig, logger) alertDetector := cron.NewAlertDetector(db, logDB, rdb, service.NewStatsService(rdb), alertDetectorConfig, logger)
// Setup scheduler // Setup scheduler (jobs are added incrementally, Start() called after all services initialized)
sched := scheduler.New(scheduler.WithLogger(logger), scheduler.WithSkipIfRunning()) sched := scheduler.New(scheduler.WithLogger(logger), scheduler.WithSkipIfRunning())
sched.Every("quota-reset", time.Duration(cfg.Quota.ResetIntervalSeconds)*time.Second, quotaResetter.RunOnce) sched.Every("quota-reset", time.Duration(cfg.Quota.ResetIntervalSeconds)*time.Second, quotaResetter.RunOnce)
sched.Every("log-cleanup", time.Hour, logCleaner.RunOnce) sched.Every("log-cleanup", time.Hour, logCleaner.RunOnce)
sched.Every("token-refresh", time.Duration(cfg.TokenRefresh.IntervalSeconds)*time.Second, tokenRefresher.RunOnce) sched.Every("token-refresh", time.Duration(cfg.TokenRefresh.IntervalSeconds)*time.Second, tokenRefresher.RunOnce)
sched.Every("alert-detection", time.Minute, alertDetector.RunOnce) sched.Every("alert-detection", time.Minute, alertDetector.RunOnce)
sched.Start() if outboxService != nil && outboxService.Enabled() {
sched.Every("sync-outbox", outboxService.Interval(), outboxService.RunOnce)
}
defer sched.Stop() defer sched.Stop()
adminService, err := service.NewAdminService() adminService, err := service.NewAdminService()
@@ -243,7 +243,12 @@ func main() {
if err := syncService.SyncAll(db); err != nil { if err := syncService.SyncAll(db); err != nil {
logger.Warn("initial sync warning", "err", err) logger.Warn("initial sync warning", "err", err)
} }
modelRegistryService.Start(context.Background()) // Initial model registry refresh before scheduler starts
if modelRegistryService.Enabled() {
modelRegistryService.RunOnce(context.Background())
sched.Every("model-registry-refresh", modelRegistryService.RefreshEvery(), modelRegistryService.RunOnce)
}
sched.Start()
// 5. Setup Gin Router // 5. Setup Gin Router
r := gin.Default() r := gin.Default()

View File

@@ -81,26 +81,23 @@ func NewModelRegistryService(db *gorm.DB, rdb *redis.Client, cfg ModelRegistryCo
} }
} }
func (s *ModelRegistryService) Start(ctx context.Context) { func (s *ModelRegistryService) Enabled() bool {
return s != nil && s.cfg.Enabled
}
func (s *ModelRegistryService) RefreshEvery() time.Duration {
if s == nil || s.cfg.RefreshEvery <= 0 {
return 30 * time.Minute
}
return s.cfg.RefreshEvery
}
// RunOnce triggers a single model registry refresh. Called by scheduler.
func (s *ModelRegistryService) RunOnce(ctx context.Context) {
if !s.cfg.Enabled { if !s.cfg.Enabled {
return return
} }
go func() {
ticker := time.NewTicker(s.cfg.RefreshEvery)
defer ticker.Stop()
// Best-effort initial refresh.
_ = s.Refresh(ctx, "") _ = s.Refresh(ctx, "")
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = s.Refresh(ctx, "")
}
}
}()
} }
type ModelRegistryStatus struct { type ModelRegistryStatus struct {

View File

@@ -40,7 +40,6 @@ type SyncOutboxService struct {
sync *SyncService sync *SyncService
cfg SyncOutboxConfig cfg SyncOutboxConfig
logger *slog.Logger logger *slog.Logger
started bool
} }
func NewSyncOutboxService(db *gorm.DB, sync *SyncService, cfg SyncOutboxConfig, logger *slog.Logger) *SyncOutboxService { func NewSyncOutboxService(db *gorm.DB, sync *SyncService, cfg SyncOutboxConfig, logger *slog.Logger) *SyncOutboxService {
@@ -63,6 +62,13 @@ func (s *SyncOutboxService) Enabled() bool {
return s != nil && s.cfg.Enabled return s != nil && s.cfg.Enabled
} }
func (s *SyncOutboxService) Interval() time.Duration {
if s == nil || s.cfg.Interval <= 0 {
return 5 * time.Second
}
return s.cfg.Interval
}
// Enqueue persists a failed sync operation for retry. // Enqueue persists a failed sync operation for retry.
func (s *SyncOutboxService) Enqueue(entry SyncOutboxEntry) error { func (s *SyncOutboxService) Enqueue(entry SyncOutboxEntry) error {
if s == nil || !s.cfg.Enabled { if s == nil || !s.cfg.Enabled {
@@ -101,25 +107,13 @@ func (s *SyncOutboxService) Enqueue(entry SyncOutboxEntry) error {
return nil return nil
} }
// Start begins the background retry loop. // RunOnce processes one batch of pending outbox items. Called by scheduler.
func (s *SyncOutboxService) Start(ctx context.Context) { func (s *SyncOutboxService) RunOnce(ctx context.Context) {
if s == nil || !s.cfg.Enabled || s.started { if s == nil || !s.cfg.Enabled {
return return
} }
s.started = true
ticker := time.NewTicker(s.cfg.Interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.processBatch(ctx) s.processBatch(ctx)
} }
}
}
func (s *SyncOutboxService) processBatch(ctx context.Context) { func (s *SyncOutboxService) processBatch(ctx context.Context) {
if s == nil || !s.cfg.Enabled || s.db == nil || s.sync == nil { if s == nil || !s.cfg.Enabled || s.db == nil || s.sync == nil {