From 4cda273f7b0f97ee7eb75cedb081fb3b68daa6d3 Mon Sep 17 00:00:00 2001 From: zenfun Date: Wed, 31 Dec 2025 18:01:09 +0800 Subject: [PATCH] feat(alerts): add MasterID to log records and improve traffic spike detection - Add MasterID field with index to LogRecord model for efficient queries - Fix threshold config loading to use fixed ID=1 with FirstOrCreate - Allow traffic spike detection to work without Redis for log-based checks - Add traffic_spike to API documentation for alert type filter - Add comprehensive tests for RPM/RPD/TPM spike detection scenarios --- internal/api/alert_handler.go | 17 +- internal/cron/alert_detector.go | 23 +- internal/cron/alert_detector_test.go | 307 +++++++++++++++++++++++++++ internal/model/log.go | 1 + 4 files changed, 324 insertions(+), 24 deletions(-) diff --git a/internal/api/alert_handler.go b/internal/api/alert_handler.go index 28452a2..1b3fcee 100644 --- a/internal/api/alert_handler.go +++ b/internal/api/alert_handler.go @@ -89,7 +89,7 @@ type ListAlertsResponse struct { // @Param offset query int false "offset" // @Param status query string false "filter by status (active, acknowledged, resolved, dismissed)" // @Param severity query string false "filter by severity (info, warning, critical)" -// @Param type query string false "filter by type (rate_limit, error_spike, quota_exceeded, key_disabled, key_expired, provider_down)" +// @Param type query string false "filter by type (rate_limit, error_spike, quota_exceeded, key_disabled, key_expired, provider_down, traffic_spike)" // @Success 200 {object} ListAlertsResponse // @Failure 500 {object} gin.H // @Router /admin/alerts [get] @@ -544,19 +544,12 @@ func (h *AlertHandler) UpdateAlertThresholds(c *gin.Context) { c.JSON(http.StatusOK, toAlertThresholdView(cfg)) } -// loadThresholdConfig loads the threshold config from DB or returns defaults +// loadThresholdConfig loads the threshold config from DB or creates default with fixed ID=1 func (h *AlertHandler) loadThresholdConfig() (model.AlertThresholdConfig, error) { - var cfg model.AlertThresholdConfig - err := h.db.First(&cfg).Error + cfg := model.DefaultAlertThresholdConfig() + cfg.ID = 1 // Fixed ID to ensure single config row + err := h.db.Where("id = ?", 1).FirstOrCreate(&cfg).Error if err != nil { - if err.Error() == "record not found" { - // Create default config - cfg = model.DefaultAlertThresholdConfig() - if createErr := h.db.Create(&cfg).Error; createErr != nil { - return cfg, createErr - } - return cfg, nil - } return cfg, err } return cfg, nil diff --git a/internal/cron/alert_detector.go b/internal/cron/alert_detector.go index b926f83..9eaadb0 100644 --- a/internal/cron/alert_detector.go +++ b/internal/cron/alert_detector.go @@ -326,11 +326,12 @@ func (d *AlertDetector) errorRateSeverity(rate float64) model.AlertSeverity { return model.AlertSeverityInfo } -// loadThresholdConfig loads the threshold config from DB or returns defaults +// loadThresholdConfig loads the threshold config from DB with fixed ID=1, or returns defaults func (d *AlertDetector) loadThresholdConfig() model.AlertThresholdConfig { - var cfg model.AlertThresholdConfig - err := d.db.First(&cfg).Error - if err != nil { + cfg := model.DefaultAlertThresholdConfig() + cfg.ID = 1 // Fixed ID to ensure single config row + if err := d.db.Where("id = ?", 1).FirstOrCreate(&cfg).Error; err != nil { + d.logger.Warn("failed to load threshold config, using defaults", "err", err) return model.DefaultAlertThresholdConfig() } return cfg @@ -360,19 +361,17 @@ func (m trafficSpikeMetadata) JSON() string { // detectTrafficSpikes checks for traffic threshold breaches func (d *AlertDetector) detectTrafficSpikes(ctx context.Context) { - if d.rdb == nil || d.statsService == nil { - return - } - cfg := d.loadThresholdConfig() - // 1. Global QPS check - d.detectGlobalQPSSpike(ctx, cfg) + // 1. Global QPS check (requires Redis) + if d.rdb != nil && d.statsService != nil { + d.detectGlobalQPSSpike(ctx, cfg) + } - // 2. Per-master RPM/TPM (1-minute window) + // 2. Per-master RPM/TPM (1-minute window) - uses logDB, works without Redis d.detectMasterMinuteSpikes(ctx, cfg) - // 3. Per-master RPD/TPD (24-hour window) + // 3. Per-master RPD/TPD (24-hour window) - uses logDB, works without Redis d.detectMasterDaySpikes(ctx, cfg) } diff --git a/internal/cron/alert_detector_test.go b/internal/cron/alert_detector_test.go index c657ef2..e31f4c5 100644 --- a/internal/cron/alert_detector_test.go +++ b/internal/cron/alert_detector_test.go @@ -326,3 +326,310 @@ func TestAlertDetectorStartDisabled(t *testing.T) { t.Error("Start did not return immediately when disabled") } } + +func TestDetectMasterMinuteSpikesRPM(t *testing.T) { + db := setupTestDB(t) + + // Create a master + master := model.Master{Name: "test-master", Status: "active"} + if err := db.Create(&master).Error; err != nil { + t.Fatalf("create master: %v", err) + } + + // Create threshold config with low thresholds for testing + cfg := model.AlertThresholdConfig{ + GlobalQPS: 100, + MasterRPM: 5, // Low threshold for testing + MasterRPD: 1000, + MasterTPM: 10_000_000, + MasterTPD: 100_000_000, + MinRPMRequests1m: 3, // Low minimum sample + MinTPMTokens1m: 1000, + } + if err := db.Create(&cfg).Error; err != nil { + t.Fatalf("create config: %v", err) + } + + // Create log records within the last minute + now := time.Now().UTC() + for i := 0; i < 10; i++ { + log := model.LogRecord{ + MasterID: master.ID, + KeyID: 1, + TokensIn: 100, + TokensOut: 200, + } + if err := db.Create(&log).Error; err != nil { + t.Fatalf("create log: %v", err) + } + // Update created_at to be within the last minute + db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Second)) + } + + config := DefaultAlertDetectorConfig() + detector := NewAlertDetector(db, db, nil, nil, config, nil) + + // Run detection + detector.detectMasterMinuteSpikes(context.Background(), cfg) + + // Should have created an RPM alert + var alerts []model.Alert + if err := db.Where("type = ?", model.AlertTypeTrafficSpike).Find(&alerts).Error; err != nil { + t.Fatalf("query alerts: %v", err) + } + + if len(alerts) < 1 { + t.Error("expected at least 1 traffic spike alert for RPM") + } + + // Check alert properties + var foundRPM bool + for _, a := range alerts { + if a.RelatedType == "master" && a.RelatedID == master.ID { + foundRPM = true + if a.Severity != model.AlertSeverityWarning && a.Severity != model.AlertSeverityCritical { + t.Errorf("expected warning or critical severity, got %s", a.Severity) + } + } + } + if !foundRPM { + t.Error("expected RPM alert for test master") + } +} + +func TestDetectMasterDaySpikesRPD(t *testing.T) { + db := setupTestDB(t) + + // Create a master + master := model.Master{Name: "test-master-day", Status: "active"} + if err := db.Create(&master).Error; err != nil { + t.Fatalf("create master: %v", err) + } + + // Create threshold config with low thresholds for testing + cfg := model.AlertThresholdConfig{ + GlobalQPS: 100, + MasterRPM: 1000, + MasterRPD: 5, // Low threshold for testing + MasterTPM: 10_000_000, + MasterTPD: 100_000_000, + MinRPMRequests1m: 10, + MinTPMTokens1m: 1000, + } + if err := db.Create(&cfg).Error; err != nil { + t.Fatalf("create config: %v", err) + } + + // Create log records within the last 24 hours + now := time.Now().UTC() + for i := 0; i < 10; i++ { + log := model.LogRecord{ + MasterID: master.ID, + KeyID: 1, + TokensIn: 100, + TokensOut: 200, + } + if err := db.Create(&log).Error; err != nil { + t.Fatalf("create log: %v", err) + } + // Update created_at to be within the last 24 hours + db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Hour)) + } + + config := DefaultAlertDetectorConfig() + detector := NewAlertDetector(db, db, nil, nil, config, nil) + + // Run detection + detector.detectMasterDaySpikes(context.Background(), cfg) + + // Should have created an RPD alert + var alerts []model.Alert + if err := db.Where("type = ?", model.AlertTypeTrafficSpike).Find(&alerts).Error; err != nil { + t.Fatalf("query alerts: %v", err) + } + + if len(alerts) < 1 { + t.Error("expected at least 1 traffic spike alert for RPD") + } + + // Check alert properties + var foundRPD bool + for _, a := range alerts { + if a.RelatedType == "master" && a.RelatedID == master.ID { + foundRPD = true + if a.Severity != model.AlertSeverityWarning && a.Severity != model.AlertSeverityCritical { + t.Errorf("expected warning or critical severity, got %s", a.Severity) + } + } + } + if !foundRPD { + t.Error("expected RPD alert for test master") + } +} + +func TestDetectMasterMinuteSpikesTPM(t *testing.T) { + db := setupTestDB(t) + + // Create a master + master := model.Master{Name: "test-master-tpm", Status: "active"} + if err := db.Create(&master).Error; err != nil { + t.Fatalf("create master: %v", err) + } + + // Create threshold config with low token thresholds for testing + cfg := model.AlertThresholdConfig{ + GlobalQPS: 100, + MasterRPM: 1000, // High so RPM won't trigger + MasterRPD: 10000, + MasterTPM: 1000, // Low token threshold for testing + MasterTPD: 100_000_000, + MinRPMRequests1m: 100, // High so RPM minimum won't be met + MinTPMTokens1m: 500, // Low minimum sample + } + if err := db.Create(&cfg).Error; err != nil { + t.Fatalf("create config: %v", err) + } + + // Create log records with high token counts within the last minute + now := time.Now().UTC() + for i := 0; i < 5; i++ { + log := model.LogRecord{ + MasterID: master.ID, + KeyID: 1, + TokensIn: 500, + TokensOut: 500, // 1000 total per record + } + if err := db.Create(&log).Error; err != nil { + t.Fatalf("create log: %v", err) + } + // Update created_at to be within the last minute + db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Second)) + } + + config := DefaultAlertDetectorConfig() + detector := NewAlertDetector(db, db, nil, nil, config, nil) + + // Run detection + detector.detectMasterMinuteSpikes(context.Background(), cfg) + + // Should have created a TPM alert (5 * 1000 = 5000 tokens >= 1000 threshold, and >= 500 min) + var alerts []model.Alert + if err := db.Where("type = ? AND related_id = ?", model.AlertTypeTrafficSpike, master.ID).Find(&alerts).Error; err != nil { + t.Fatalf("query alerts: %v", err) + } + + if len(alerts) < 1 { + t.Error("expected at least 1 traffic spike alert for TPM") + } +} + +func TestDetectTrafficSpikesWithoutRedis(t *testing.T) { + db := setupTestDB(t) + + // Create a master + master := model.Master{Name: "test-master-no-redis", Status: "active"} + if err := db.Create(&master).Error; err != nil { + t.Fatalf("create master: %v", err) + } + + // Create threshold config + cfg := model.AlertThresholdConfig{ + GlobalQPS: 100, + MasterRPM: 5, // Low threshold + MasterRPD: 5, // Low threshold + MasterTPM: 10_000_000, + MasterTPD: 100_000_000, + MinRPMRequests1m: 3, + MinTPMTokens1m: 1000, + } + if err := db.Create(&cfg).Error; err != nil { + t.Fatalf("create config: %v", err) + } + + // Create log records + now := time.Now().UTC() + for i := 0; i < 10; i++ { + log := model.LogRecord{ + MasterID: master.ID, + KeyID: 1, + TokensIn: 100, + TokensOut: 200, + } + if err := db.Create(&log).Error; err != nil { + t.Fatalf("create log: %v", err) + } + db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Second)) + } + + // Create detector WITHOUT Redis (rdb = nil) + config := DefaultAlertDetectorConfig() + detector := NewAlertDetector(db, db, nil, nil, config, nil) + + // Run full detectTrafficSpikes - should not panic and should detect log-based spikes + detector.detectTrafficSpikes(context.Background()) + + // Should have created alerts for minute spikes (log-based detection works without Redis) + var alerts []model.Alert + if err := db.Where("type = ?", model.AlertTypeTrafficSpike).Find(&alerts).Error; err != nil { + t.Fatalf("query alerts: %v", err) + } + + if len(alerts) < 1 { + t.Error("expected traffic spike alerts from log-based detection (without Redis)") + } +} + +func TestDetectMasterMinuteSpikesNoAlertBelowThreshold(t *testing.T) { + db := setupTestDB(t) + + // Create a master + master := model.Master{Name: "test-master-low", Status: "active"} + if err := db.Create(&master).Error; err != nil { + t.Fatalf("create master: %v", err) + } + + // Create threshold config with high thresholds + cfg := model.AlertThresholdConfig{ + GlobalQPS: 100, + MasterRPM: 1000, // High threshold + MasterRPD: 10000, + MasterTPM: 10_000_000, // High threshold + MasterTPD: 100_000_000, + MinRPMRequests1m: 10, + MinTPMTokens1m: 1_000_000, + } + if err := db.Create(&cfg).Error; err != nil { + t.Fatalf("create config: %v", err) + } + + // Create only a few log records (below threshold) + now := time.Now().UTC() + for i := 0; i < 3; i++ { + log := model.LogRecord{ + MasterID: master.ID, + KeyID: 1, + TokensIn: 100, + TokensOut: 200, + } + if err := db.Create(&log).Error; err != nil { + t.Fatalf("create log: %v", err) + } + db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Second)) + } + + config := DefaultAlertDetectorConfig() + detector := NewAlertDetector(db, db, nil, nil, config, nil) + + // Run detection + detector.detectMasterMinuteSpikes(context.Background(), cfg) + + // Should NOT have created any alerts (below threshold and minimum sample) + var count int64 + if err := db.Model(&model.Alert{}).Where("type = ?", model.AlertTypeTrafficSpike).Count(&count).Error; err != nil { + t.Fatalf("count alerts: %v", err) + } + + if count != 0 { + t.Errorf("expected 0 alerts when below threshold, got %d", count) + } +} diff --git a/internal/model/log.go b/internal/model/log.go index 552cc5c..9cafab1 100644 --- a/internal/model/log.go +++ b/internal/model/log.go @@ -6,6 +6,7 @@ import "gorm.io/gorm" type LogRecord struct { gorm.Model Group string `json:"group"` + MasterID uint `gorm:"index" json:"master_id"` KeyID uint `json:"key_id"` ModelName string `json:"model"` ProviderID uint `json:"provider_id"`