diff --git a/cmd/server/main.go b/cmd/server/main.go index c734208..0cec043 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -23,6 +23,7 @@ import ( "github.com/ez-api/ez-api/internal/model" "github.com/ez-api/ez-api/internal/service" "github.com/ez-api/foundation/logging" + "github.com/ez-api/foundation/scheduler" "github.com/gin-gonic/gin" "github.com/redis/go-redis/v9" swaggerFiles "github.com/swaggo/files" @@ -186,33 +187,29 @@ func main() { logCtx, cancelLogs := context.WithCancel(context.Background()) defer cancelLogs() logWriter.Start(logCtx) - quotaResetter := cron.NewQuotaResetter(db, syncService, time.Duration(cfg.Quota.ResetIntervalSeconds)*time.Second) - quotaCtx, cancelQuota := context.WithCancel(context.Background()) - defer cancelQuota() - go quotaResetter.Start(quotaCtx) - logCleaner := cron.NewLogCleaner(logDB, rdb, cfg.Log.RetentionDays, int64(cfg.Log.MaxRecords), time.Hour, logPartitioner) - cleanerCtx, cancelCleaner := context.WithCancel(context.Background()) - defer cancelCleaner() - go logCleaner.Start(cleanerCtx) + + // Initialize cron jobs + quotaResetter := cron.NewQuotaResetter(db, syncService) + logCleaner := cron.NewLogCleaner(logDB, rdb, cfg.Log.RetentionDays, int64(cfg.Log.MaxRecords), logPartitioner) tokenRefresher := cron.NewTokenRefresher( db, rdb, syncService, - time.Duration(cfg.TokenRefresh.IntervalSeconds)*time.Second, time.Duration(cfg.TokenRefresh.RefreshSkewSeconds)*time.Second, cfg.TokenRefresh.BatchSize, cfg.TokenRefresh.MaxRetries, ) - tokenCtx, cancelToken := context.WithCancel(context.Background()) - defer cancelToken() - go tokenRefresher.Start(tokenCtx) - - // Alert Detector alertDetectorConfig := cron.DefaultAlertDetectorConfig() alertDetector := cron.NewAlertDetector(db, logDB, rdb, service.NewStatsService(rdb), alertDetectorConfig, logger) - alertDetectorCtx, cancelAlertDetector := context.WithCancel(context.Background()) - defer cancelAlertDetector() - go alertDetector.Start(alertDetectorCtx) + + // Setup scheduler + sched := scheduler.New(scheduler.WithLogger(logger), scheduler.WithSkipIfRunning()) + sched.Every("quota-reset", time.Duration(cfg.Quota.ResetIntervalSeconds)*time.Second, quotaResetter.RunOnce) + sched.Every("log-cleanup", time.Hour, logCleaner.RunOnce) + sched.Every("token-refresh", time.Duration(cfg.TokenRefresh.IntervalSeconds)*time.Second, tokenRefresher.RunOnce) + sched.Every("alert-detection", time.Minute, alertDetector.RunOnce) + sched.Start() + defer sched.Stop() adminService, err := service.NewAdminService() if err != nil { diff --git a/go.mod b/go.mod index 63d3847..3e2e362 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24.5 require ( github.com/alicebob/miniredis/v2 v2.35.0 - github.com/ez-api/foundation v0.5.0 + github.com/ez-api/foundation v0.6.0 github.com/gin-gonic/gin v1.11.0 github.com/pelletier/go-toml/v2 v2.2.4 github.com/redis/go-redis/v9 v9.17.2 @@ -60,6 +60,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/quic-go/qpack v0.5.1 // indirect github.com/quic-go/quic-go v0.54.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rs/zerolog v1.34.0 // indirect github.com/sagikazarmark/locafero v0.11.0 // indirect github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect diff --git a/go.sum b/go.sum index d936386..6f7e04f 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/ez-api/foundation v0.5.0 h1:+7FYziVt9eZN/Npwn/rKj2LOkMWpngT2br8ym4LMAC4= -github.com/ez-api/foundation v0.5.0/go.mod h1:bTh1LA42TW4CXi1SebDEUE+fhEssFUphzcGEzyAFFZI= +github.com/ez-api/foundation v0.6.0 h1:rRC3D6KreTuRGSUppPCvEIgqu697NLSHuk+SDuc4lGk= +github.com/ez-api/foundation v0.6.0/go.mod h1:Ds1imonA0hbjh1vY5YPwjdAG1XhkckDSe/pR5aY2mJQ= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= @@ -125,6 +125,8 @@ github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQB github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY= github.com/redis/go-redis/v9 v9.17.2 h1:P2EGsA4qVIM3Pp+aPocCJ7DguDHhqrXNhVcEp4ViluI= github.com/redis/go-redis/v9 v9.17.2/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= diff --git a/internal/cron/alert_detector.go b/internal/cron/alert_detector.go index 9eaadb0..02fcb12 100644 --- a/internal/cron/alert_detector.go +++ b/internal/cron/alert_detector.go @@ -15,8 +15,6 @@ import ( // AlertDetectorConfig holds configuration for alert detection type AlertDetectorConfig struct { - Enabled bool - Interval time.Duration ErrorSpikeThreshold float64 // Error rate threshold (0.1 = 10%) ErrorSpikeWindow time.Duration QuotaWarningThreshold float64 // Quota usage threshold (0.9 = 90%) @@ -27,8 +25,6 @@ type AlertDetectorConfig struct { // DefaultAlertDetectorConfig returns default configuration func DefaultAlertDetectorConfig() AlertDetectorConfig { return AlertDetectorConfig{ - Enabled: true, - Interval: 1 * time.Minute, ErrorSpikeThreshold: 0.1, // 10% error rate ErrorSpikeWindow: 5 * time.Minute, QuotaWarningThreshold: 0.9, // 90% quota used @@ -65,29 +61,12 @@ func NewAlertDetector(db, logDB *gorm.DB, rdb *redis.Client, statsService *servi } } -// Start begins the alert detection loop -func (d *AlertDetector) Start(ctx context.Context) { - if d == nil || !d.config.Enabled { +// RunOnce executes a single detection cycle. Called by scheduler. +func (d *AlertDetector) RunOnce(ctx context.Context) { + if d == nil || d.db == nil { return } - if ctx == nil { - ctx = context.Background() - } - - ticker := time.NewTicker(d.config.Interval) - defer ticker.Stop() - - d.logger.Info("alert detector started", "interval", d.config.Interval) - - for { - select { - case <-ctx.Done(): - d.logger.Info("alert detector stopped") - return - case <-ticker.C: - d.detectOnce(ctx) - } - } + d.detectOnce(ctx) } // detectOnce runs all detection rules once diff --git a/internal/cron/alert_detector_test.go b/internal/cron/alert_detector_test.go index e31f4c5..0663f87 100644 --- a/internal/cron/alert_detector_test.go +++ b/internal/cron/alert_detector_test.go @@ -301,30 +301,16 @@ func TestAlertDetectorDetectOnceNilSafe(t *testing.T) { // Should not panic } -func TestAlertDetectorStartDisabled(t *testing.T) { - db := setupTestDB(t) +func TestAlertDetectorRunOnceNilSafe(t *testing.T) { + // Test nil detector + var nilDetector *AlertDetector + nilDetector.RunOnce(context.Background()) - config := DefaultAlertDetectorConfig() - config.Enabled = false + // Test detector with nil db + detector := &AlertDetector{} + detector.RunOnce(context.Background()) - detector := NewAlertDetector(db, db, nil, nil, config, nil) - - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - - // Should return immediately without blocking - done := make(chan struct{}) - go func() { - detector.Start(ctx) - close(done) - }() - - select { - case <-done: - // Expected: Start returned immediately because Enabled=false - case <-time.After(200 * time.Millisecond): - t.Error("Start did not return immediately when disabled") - } + // Should not panic } func TestDetectMasterMinuteSpikesRPM(t *testing.T) { diff --git a/internal/cron/log_cleaner.go b/internal/cron/log_cleaner.go index 2286d10..b1dcd2e 100644 --- a/internal/cron/log_cleaner.go +++ b/internal/cron/log_cleaner.go @@ -25,48 +25,27 @@ type LogCleaner struct { rdb *redis.Client retentionDays int maxRecords int64 - interval time.Duration partitioner *service.LogPartitioner } -func NewLogCleaner(db *gorm.DB, rdb *redis.Client, retentionDays int, maxRecords int64, interval time.Duration, partitioner *service.LogPartitioner) *LogCleaner { - if interval <= 0 { - interval = time.Hour - } +func NewLogCleaner(db *gorm.DB, rdb *redis.Client, retentionDays int, maxRecords int64, partitioner *service.LogPartitioner) *LogCleaner { return &LogCleaner{ db: db, rdb: rdb, retentionDays: retentionDays, maxRecords: maxRecords, - interval: interval, partitioner: partitioner, } } -func (c *LogCleaner) Start(ctx context.Context) { +// RunOnce executes a single log cleanup. Called by scheduler. +func (c *LogCleaner) RunOnce(ctx context.Context) { if c == nil || c.db == nil { return } - if ctx == nil { - ctx = context.Background() - } - if err := c.cleanOnce(ctx); err != nil { slog.Default().Warn("log cleaner run failed", "err", err) } - - ticker := time.NewTicker(c.interval) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if err := c.cleanOnce(ctx); err != nil { - slog.Default().Warn("log cleaner run failed", "err", err) - } - } - } } func (c *LogCleaner) cleanOnce(ctx context.Context) error { diff --git a/internal/cron/log_cleaner_test.go b/internal/cron/log_cleaner_test.go index ac66dc0..9169160 100644 --- a/internal/cron/log_cleaner_test.go +++ b/internal/cron/log_cleaner_test.go @@ -34,7 +34,7 @@ func TestLogCleanerRetentionDeletesOld(t *testing.T) { t.Fatalf("create fresh: %v", err) } - cleaner := NewLogCleaner(db, nil, 1, 0, time.Minute, nil) + cleaner := NewLogCleaner(db, nil, 1, 0, nil) if err := cleaner.cleanOnce(context.Background()); err != nil { t.Fatalf("clean once: %v", err) } @@ -64,7 +64,7 @@ func TestLogCleanerMaxRecordsKeepsLatest(t *testing.T) { } } - cleaner := NewLogCleaner(db, nil, 0, 3, time.Minute, nil) + cleaner := NewLogCleaner(db, nil, 0, 3, nil) if err := cleaner.cleanOnce(context.Background()); err != nil { t.Fatalf("clean once: %v", err) } @@ -85,3 +85,15 @@ func TestLogCleanerMaxRecordsKeepsLatest(t *testing.T) { t.Fatalf("expected min id >= 3, got %d", min.ID) } } + +func TestLogCleanerRunOnceNilSafe(t *testing.T) { + // Test nil cleaner + var nilCleaner *LogCleaner + nilCleaner.RunOnce(context.Background()) + + // Test cleaner with nil db + cleaner := &LogCleaner{} + cleaner.RunOnce(context.Background()) + + // Should not panic +} diff --git a/internal/cron/quota_reset.go b/internal/cron/quota_reset.go index a6e2a68..b9ab0a3 100644 --- a/internal/cron/quota_reset.go +++ b/internal/cron/quota_reset.go @@ -12,37 +12,21 @@ import ( ) type QuotaResetter struct { - db *gorm.DB - sync *service.SyncService - interval time.Duration + db *gorm.DB + sync *service.SyncService } -func NewQuotaResetter(db *gorm.DB, sync *service.SyncService, interval time.Duration) *QuotaResetter { - if interval <= 0 { - interval = 5 * time.Minute - } - return &QuotaResetter{db: db, sync: sync, interval: interval} +func NewQuotaResetter(db *gorm.DB, sync *service.SyncService) *QuotaResetter { + return &QuotaResetter{db: db, sync: sync} } -func (q *QuotaResetter) Start(ctx context.Context) { +// RunOnce executes a single quota reset check. Called by scheduler. +func (q *QuotaResetter) RunOnce(ctx context.Context) { if q == nil || q.db == nil { return } - if ctx == nil { - ctx = context.Background() - } - ticker := time.NewTicker(q.interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if err := q.resetOnce(ctx); err != nil { - slog.Default().Warn("quota reset failed", "err", err) - } - } + if err := q.resetOnce(ctx); err != nil { + slog.Default().Warn("quota reset failed", "err", err) } } diff --git a/internal/cron/token_refresh.go b/internal/cron/token_refresh.go index c2742f4..adc6e3a 100644 --- a/internal/cron/token_refresh.go +++ b/internal/cron/token_refresh.go @@ -32,17 +32,13 @@ type TokenRefresher struct { db *gorm.DB rdb *redis.Client sync *service.SyncService - interval time.Duration refreshSkew time.Duration batchSize int maxRetries int httpClient *http.Client } -func NewTokenRefresher(db *gorm.DB, rdb *redis.Client, sync *service.SyncService, interval, refreshSkew time.Duration, batchSize, maxRetries int) *TokenRefresher { - if interval <= 0 { - interval = 30 * time.Minute - } +func NewTokenRefresher(db *gorm.DB, rdb *redis.Client, sync *service.SyncService, refreshSkew time.Duration, batchSize, maxRetries int) *TokenRefresher { if refreshSkew <= 0 { refreshSkew = 50 * time.Minute } @@ -56,7 +52,6 @@ func NewTokenRefresher(db *gorm.DB, rdb *redis.Client, sync *service.SyncService db: db, rdb: rdb, sync: sync, - interval: interval, refreshSkew: refreshSkew, batchSize: batchSize, maxRetries: maxRetries, @@ -64,25 +59,13 @@ func NewTokenRefresher(db *gorm.DB, rdb *redis.Client, sync *service.SyncService } } -func (t *TokenRefresher) Start(ctx context.Context) { +// RunOnce executes a single token refresh cycle. Called by scheduler. +func (t *TokenRefresher) RunOnce(ctx context.Context) { if t == nil || t.db == nil { return } - if ctx == nil { - ctx = context.Background() - } - ticker := time.NewTicker(t.interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if err := t.refreshOnce(ctx); err != nil { - slog.Default().Warn("token refresh failed", "err", err) - } - } + if err := t.refreshOnce(ctx); err != nil { + slog.Default().Warn("token refresh failed", "err", err) } }