mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
refactor(cron): migrate cron jobs to foundation scheduler
Replace custom goroutine-based scheduling in cron jobs with centralized foundation scheduler. Each cron job now exposes a RunOnce method called by the scheduler instead of managing its own ticker loop. Changes: - Remove interval/enabled config from cron job structs - Convert Start() methods to RunOnce() for all cron jobs - Add scheduler setup in main.go with configurable intervals - Update foundation dependency to v0.6.0 for scheduler support - Update tests to validate RunOnce nil-safety
This commit is contained in:
@@ -15,8 +15,6 @@ import (
|
||||
|
||||
// AlertDetectorConfig holds configuration for alert detection
|
||||
type AlertDetectorConfig struct {
|
||||
Enabled bool
|
||||
Interval time.Duration
|
||||
ErrorSpikeThreshold float64 // Error rate threshold (0.1 = 10%)
|
||||
ErrorSpikeWindow time.Duration
|
||||
QuotaWarningThreshold float64 // Quota usage threshold (0.9 = 90%)
|
||||
@@ -27,8 +25,6 @@ type AlertDetectorConfig struct {
|
||||
// DefaultAlertDetectorConfig returns default configuration
|
||||
func DefaultAlertDetectorConfig() AlertDetectorConfig {
|
||||
return AlertDetectorConfig{
|
||||
Enabled: true,
|
||||
Interval: 1 * time.Minute,
|
||||
ErrorSpikeThreshold: 0.1, // 10% error rate
|
||||
ErrorSpikeWindow: 5 * time.Minute,
|
||||
QuotaWarningThreshold: 0.9, // 90% quota used
|
||||
@@ -65,29 +61,12 @@ func NewAlertDetector(db, logDB *gorm.DB, rdb *redis.Client, statsService *servi
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the alert detection loop
|
||||
func (d *AlertDetector) Start(ctx context.Context) {
|
||||
if d == nil || !d.config.Enabled {
|
||||
// RunOnce executes a single detection cycle. Called by scheduler.
|
||||
func (d *AlertDetector) RunOnce(ctx context.Context) {
|
||||
if d == nil || d.db == nil {
|
||||
return
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(d.config.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
d.logger.Info("alert detector started", "interval", d.config.Interval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
d.logger.Info("alert detector stopped")
|
||||
return
|
||||
case <-ticker.C:
|
||||
d.detectOnce(ctx)
|
||||
}
|
||||
}
|
||||
d.detectOnce(ctx)
|
||||
}
|
||||
|
||||
// detectOnce runs all detection rules once
|
||||
|
||||
@@ -301,30 +301,16 @@ func TestAlertDetectorDetectOnceNilSafe(t *testing.T) {
|
||||
// Should not panic
|
||||
}
|
||||
|
||||
func TestAlertDetectorStartDisabled(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
func TestAlertDetectorRunOnceNilSafe(t *testing.T) {
|
||||
// Test nil detector
|
||||
var nilDetector *AlertDetector
|
||||
nilDetector.RunOnce(context.Background())
|
||||
|
||||
config := DefaultAlertDetectorConfig()
|
||||
config.Enabled = false
|
||||
// Test detector with nil db
|
||||
detector := &AlertDetector{}
|
||||
detector.RunOnce(context.Background())
|
||||
|
||||
detector := NewAlertDetector(db, db, nil, nil, config, nil)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
// Should return immediately without blocking
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
detector.Start(ctx)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// Expected: Start returned immediately because Enabled=false
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Error("Start did not return immediately when disabled")
|
||||
}
|
||||
// Should not panic
|
||||
}
|
||||
|
||||
func TestDetectMasterMinuteSpikesRPM(t *testing.T) {
|
||||
|
||||
@@ -25,48 +25,27 @@ type LogCleaner struct {
|
||||
rdb *redis.Client
|
||||
retentionDays int
|
||||
maxRecords int64
|
||||
interval time.Duration
|
||||
partitioner *service.LogPartitioner
|
||||
}
|
||||
|
||||
func NewLogCleaner(db *gorm.DB, rdb *redis.Client, retentionDays int, maxRecords int64, interval time.Duration, partitioner *service.LogPartitioner) *LogCleaner {
|
||||
if interval <= 0 {
|
||||
interval = time.Hour
|
||||
}
|
||||
func NewLogCleaner(db *gorm.DB, rdb *redis.Client, retentionDays int, maxRecords int64, partitioner *service.LogPartitioner) *LogCleaner {
|
||||
return &LogCleaner{
|
||||
db: db,
|
||||
rdb: rdb,
|
||||
retentionDays: retentionDays,
|
||||
maxRecords: maxRecords,
|
||||
interval: interval,
|
||||
partitioner: partitioner,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LogCleaner) Start(ctx context.Context) {
|
||||
// RunOnce executes a single log cleanup. Called by scheduler.
|
||||
func (c *LogCleaner) RunOnce(ctx context.Context) {
|
||||
if c == nil || c.db == nil {
|
||||
return
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
if err := c.cleanOnce(ctx); err != nil {
|
||||
slog.Default().Warn("log cleaner run failed", "err", err)
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(c.interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := c.cleanOnce(ctx); err != nil {
|
||||
slog.Default().Warn("log cleaner run failed", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LogCleaner) cleanOnce(ctx context.Context) error {
|
||||
|
||||
@@ -34,7 +34,7 @@ func TestLogCleanerRetentionDeletesOld(t *testing.T) {
|
||||
t.Fatalf("create fresh: %v", err)
|
||||
}
|
||||
|
||||
cleaner := NewLogCleaner(db, nil, 1, 0, time.Minute, nil)
|
||||
cleaner := NewLogCleaner(db, nil, 1, 0, nil)
|
||||
if err := cleaner.cleanOnce(context.Background()); err != nil {
|
||||
t.Fatalf("clean once: %v", err)
|
||||
}
|
||||
@@ -64,7 +64,7 @@ func TestLogCleanerMaxRecordsKeepsLatest(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
cleaner := NewLogCleaner(db, nil, 0, 3, time.Minute, nil)
|
||||
cleaner := NewLogCleaner(db, nil, 0, 3, nil)
|
||||
if err := cleaner.cleanOnce(context.Background()); err != nil {
|
||||
t.Fatalf("clean once: %v", err)
|
||||
}
|
||||
@@ -85,3 +85,15 @@ func TestLogCleanerMaxRecordsKeepsLatest(t *testing.T) {
|
||||
t.Fatalf("expected min id >= 3, got %d", min.ID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogCleanerRunOnceNilSafe(t *testing.T) {
|
||||
// Test nil cleaner
|
||||
var nilCleaner *LogCleaner
|
||||
nilCleaner.RunOnce(context.Background())
|
||||
|
||||
// Test cleaner with nil db
|
||||
cleaner := &LogCleaner{}
|
||||
cleaner.RunOnce(context.Background())
|
||||
|
||||
// Should not panic
|
||||
}
|
||||
|
||||
@@ -12,37 +12,21 @@ import (
|
||||
)
|
||||
|
||||
type QuotaResetter struct {
|
||||
db *gorm.DB
|
||||
sync *service.SyncService
|
||||
interval time.Duration
|
||||
db *gorm.DB
|
||||
sync *service.SyncService
|
||||
}
|
||||
|
||||
func NewQuotaResetter(db *gorm.DB, sync *service.SyncService, interval time.Duration) *QuotaResetter {
|
||||
if interval <= 0 {
|
||||
interval = 5 * time.Minute
|
||||
}
|
||||
return &QuotaResetter{db: db, sync: sync, interval: interval}
|
||||
func NewQuotaResetter(db *gorm.DB, sync *service.SyncService) *QuotaResetter {
|
||||
return &QuotaResetter{db: db, sync: sync}
|
||||
}
|
||||
|
||||
func (q *QuotaResetter) Start(ctx context.Context) {
|
||||
// RunOnce executes a single quota reset check. Called by scheduler.
|
||||
func (q *QuotaResetter) RunOnce(ctx context.Context) {
|
||||
if q == nil || q.db == nil {
|
||||
return
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
ticker := time.NewTicker(q.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := q.resetOnce(ctx); err != nil {
|
||||
slog.Default().Warn("quota reset failed", "err", err)
|
||||
}
|
||||
}
|
||||
if err := q.resetOnce(ctx); err != nil {
|
||||
slog.Default().Warn("quota reset failed", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,17 +32,13 @@ type TokenRefresher struct {
|
||||
db *gorm.DB
|
||||
rdb *redis.Client
|
||||
sync *service.SyncService
|
||||
interval time.Duration
|
||||
refreshSkew time.Duration
|
||||
batchSize int
|
||||
maxRetries int
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
func NewTokenRefresher(db *gorm.DB, rdb *redis.Client, sync *service.SyncService, interval, refreshSkew time.Duration, batchSize, maxRetries int) *TokenRefresher {
|
||||
if interval <= 0 {
|
||||
interval = 30 * time.Minute
|
||||
}
|
||||
func NewTokenRefresher(db *gorm.DB, rdb *redis.Client, sync *service.SyncService, refreshSkew time.Duration, batchSize, maxRetries int) *TokenRefresher {
|
||||
if refreshSkew <= 0 {
|
||||
refreshSkew = 50 * time.Minute
|
||||
}
|
||||
@@ -56,7 +52,6 @@ func NewTokenRefresher(db *gorm.DB, rdb *redis.Client, sync *service.SyncService
|
||||
db: db,
|
||||
rdb: rdb,
|
||||
sync: sync,
|
||||
interval: interval,
|
||||
refreshSkew: refreshSkew,
|
||||
batchSize: batchSize,
|
||||
maxRetries: maxRetries,
|
||||
@@ -64,25 +59,13 @@ func NewTokenRefresher(db *gorm.DB, rdb *redis.Client, sync *service.SyncService
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TokenRefresher) Start(ctx context.Context) {
|
||||
// RunOnce executes a single token refresh cycle. Called by scheduler.
|
||||
func (t *TokenRefresher) RunOnce(ctx context.Context) {
|
||||
if t == nil || t.db == nil {
|
||||
return
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
ticker := time.NewTicker(t.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := t.refreshOnce(ctx); err != nil {
|
||||
slog.Default().Warn("token refresh failed", "err", err)
|
||||
}
|
||||
}
|
||||
if err := t.refreshOnce(ctx); err != nil {
|
||||
slog.Default().Warn("token refresh failed", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user