mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
feat(alerts): add MasterID to log records and improve traffic spike detection
- Add MasterID field with index to LogRecord model for efficient queries - Fix threshold config loading to use fixed ID=1 with FirstOrCreate - Allow traffic spike detection to work without Redis for log-based checks - Add traffic_spike to API documentation for alert type filter - Add comprehensive tests for RPM/RPD/TPM spike detection scenarios
This commit is contained in:
@@ -89,7 +89,7 @@ type ListAlertsResponse struct {
|
|||||||
// @Param offset query int false "offset"
|
// @Param offset query int false "offset"
|
||||||
// @Param status query string false "filter by status (active, acknowledged, resolved, dismissed)"
|
// @Param status query string false "filter by status (active, acknowledged, resolved, dismissed)"
|
||||||
// @Param severity query string false "filter by severity (info, warning, critical)"
|
// @Param severity query string false "filter by severity (info, warning, critical)"
|
||||||
// @Param type query string false "filter by type (rate_limit, error_spike, quota_exceeded, key_disabled, key_expired, provider_down)"
|
// @Param type query string false "filter by type (rate_limit, error_spike, quota_exceeded, key_disabled, key_expired, provider_down, traffic_spike)"
|
||||||
// @Success 200 {object} ListAlertsResponse
|
// @Success 200 {object} ListAlertsResponse
|
||||||
// @Failure 500 {object} gin.H
|
// @Failure 500 {object} gin.H
|
||||||
// @Router /admin/alerts [get]
|
// @Router /admin/alerts [get]
|
||||||
@@ -544,19 +544,12 @@ func (h *AlertHandler) UpdateAlertThresholds(c *gin.Context) {
|
|||||||
c.JSON(http.StatusOK, toAlertThresholdView(cfg))
|
c.JSON(http.StatusOK, toAlertThresholdView(cfg))
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadThresholdConfig loads the threshold config from DB or returns defaults
|
// loadThresholdConfig loads the threshold config from DB or creates default with fixed ID=1
|
||||||
func (h *AlertHandler) loadThresholdConfig() (model.AlertThresholdConfig, error) {
|
func (h *AlertHandler) loadThresholdConfig() (model.AlertThresholdConfig, error) {
|
||||||
var cfg model.AlertThresholdConfig
|
cfg := model.DefaultAlertThresholdConfig()
|
||||||
err := h.db.First(&cfg).Error
|
cfg.ID = 1 // Fixed ID to ensure single config row
|
||||||
|
err := h.db.Where("id = ?", 1).FirstOrCreate(&cfg).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err.Error() == "record not found" {
|
|
||||||
// Create default config
|
|
||||||
cfg = model.DefaultAlertThresholdConfig()
|
|
||||||
if createErr := h.db.Create(&cfg).Error; createErr != nil {
|
|
||||||
return cfg, createErr
|
|
||||||
}
|
|
||||||
return cfg, nil
|
|
||||||
}
|
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
|
|||||||
@@ -326,11 +326,12 @@ func (d *AlertDetector) errorRateSeverity(rate float64) model.AlertSeverity {
|
|||||||
return model.AlertSeverityInfo
|
return model.AlertSeverityInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadThresholdConfig loads the threshold config from DB or returns defaults
|
// loadThresholdConfig loads the threshold config from DB with fixed ID=1, or returns defaults
|
||||||
func (d *AlertDetector) loadThresholdConfig() model.AlertThresholdConfig {
|
func (d *AlertDetector) loadThresholdConfig() model.AlertThresholdConfig {
|
||||||
var cfg model.AlertThresholdConfig
|
cfg := model.DefaultAlertThresholdConfig()
|
||||||
err := d.db.First(&cfg).Error
|
cfg.ID = 1 // Fixed ID to ensure single config row
|
||||||
if err != nil {
|
if err := d.db.Where("id = ?", 1).FirstOrCreate(&cfg).Error; err != nil {
|
||||||
|
d.logger.Warn("failed to load threshold config, using defaults", "err", err)
|
||||||
return model.DefaultAlertThresholdConfig()
|
return model.DefaultAlertThresholdConfig()
|
||||||
}
|
}
|
||||||
return cfg
|
return cfg
|
||||||
@@ -360,19 +361,17 @@ func (m trafficSpikeMetadata) JSON() string {
|
|||||||
|
|
||||||
// detectTrafficSpikes checks for traffic threshold breaches
|
// detectTrafficSpikes checks for traffic threshold breaches
|
||||||
func (d *AlertDetector) detectTrafficSpikes(ctx context.Context) {
|
func (d *AlertDetector) detectTrafficSpikes(ctx context.Context) {
|
||||||
if d.rdb == nil || d.statsService == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg := d.loadThresholdConfig()
|
cfg := d.loadThresholdConfig()
|
||||||
|
|
||||||
// 1. Global QPS check
|
// 1. Global QPS check (requires Redis)
|
||||||
|
if d.rdb != nil && d.statsService != nil {
|
||||||
d.detectGlobalQPSSpike(ctx, cfg)
|
d.detectGlobalQPSSpike(ctx, cfg)
|
||||||
|
}
|
||||||
|
|
||||||
// 2. Per-master RPM/TPM (1-minute window)
|
// 2. Per-master RPM/TPM (1-minute window) - uses logDB, works without Redis
|
||||||
d.detectMasterMinuteSpikes(ctx, cfg)
|
d.detectMasterMinuteSpikes(ctx, cfg)
|
||||||
|
|
||||||
// 3. Per-master RPD/TPD (24-hour window)
|
// 3. Per-master RPD/TPD (24-hour window) - uses logDB, works without Redis
|
||||||
d.detectMasterDaySpikes(ctx, cfg)
|
d.detectMasterDaySpikes(ctx, cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -326,3 +326,310 @@ func TestAlertDetectorStartDisabled(t *testing.T) {
|
|||||||
t.Error("Start did not return immediately when disabled")
|
t.Error("Start did not return immediately when disabled")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDetectMasterMinuteSpikesRPM(t *testing.T) {
|
||||||
|
db := setupTestDB(t)
|
||||||
|
|
||||||
|
// Create a master
|
||||||
|
master := model.Master{Name: "test-master", Status: "active"}
|
||||||
|
if err := db.Create(&master).Error; err != nil {
|
||||||
|
t.Fatalf("create master: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create threshold config with low thresholds for testing
|
||||||
|
cfg := model.AlertThresholdConfig{
|
||||||
|
GlobalQPS: 100,
|
||||||
|
MasterRPM: 5, // Low threshold for testing
|
||||||
|
MasterRPD: 1000,
|
||||||
|
MasterTPM: 10_000_000,
|
||||||
|
MasterTPD: 100_000_000,
|
||||||
|
MinRPMRequests1m: 3, // Low minimum sample
|
||||||
|
MinTPMTokens1m: 1000,
|
||||||
|
}
|
||||||
|
if err := db.Create(&cfg).Error; err != nil {
|
||||||
|
t.Fatalf("create config: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create log records within the last minute
|
||||||
|
now := time.Now().UTC()
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
log := model.LogRecord{
|
||||||
|
MasterID: master.ID,
|
||||||
|
KeyID: 1,
|
||||||
|
TokensIn: 100,
|
||||||
|
TokensOut: 200,
|
||||||
|
}
|
||||||
|
if err := db.Create(&log).Error; err != nil {
|
||||||
|
t.Fatalf("create log: %v", err)
|
||||||
|
}
|
||||||
|
// Update created_at to be within the last minute
|
||||||
|
db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Second))
|
||||||
|
}
|
||||||
|
|
||||||
|
config := DefaultAlertDetectorConfig()
|
||||||
|
detector := NewAlertDetector(db, db, nil, nil, config, nil)
|
||||||
|
|
||||||
|
// Run detection
|
||||||
|
detector.detectMasterMinuteSpikes(context.Background(), cfg)
|
||||||
|
|
||||||
|
// Should have created an RPM alert
|
||||||
|
var alerts []model.Alert
|
||||||
|
if err := db.Where("type = ?", model.AlertTypeTrafficSpike).Find(&alerts).Error; err != nil {
|
||||||
|
t.Fatalf("query alerts: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(alerts) < 1 {
|
||||||
|
t.Error("expected at least 1 traffic spike alert for RPM")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check alert properties
|
||||||
|
var foundRPM bool
|
||||||
|
for _, a := range alerts {
|
||||||
|
if a.RelatedType == "master" && a.RelatedID == master.ID {
|
||||||
|
foundRPM = true
|
||||||
|
if a.Severity != model.AlertSeverityWarning && a.Severity != model.AlertSeverityCritical {
|
||||||
|
t.Errorf("expected warning or critical severity, got %s", a.Severity)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !foundRPM {
|
||||||
|
t.Error("expected RPM alert for test master")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDetectMasterDaySpikesRPD(t *testing.T) {
|
||||||
|
db := setupTestDB(t)
|
||||||
|
|
||||||
|
// Create a master
|
||||||
|
master := model.Master{Name: "test-master-day", Status: "active"}
|
||||||
|
if err := db.Create(&master).Error; err != nil {
|
||||||
|
t.Fatalf("create master: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create threshold config with low thresholds for testing
|
||||||
|
cfg := model.AlertThresholdConfig{
|
||||||
|
GlobalQPS: 100,
|
||||||
|
MasterRPM: 1000,
|
||||||
|
MasterRPD: 5, // Low threshold for testing
|
||||||
|
MasterTPM: 10_000_000,
|
||||||
|
MasterTPD: 100_000_000,
|
||||||
|
MinRPMRequests1m: 10,
|
||||||
|
MinTPMTokens1m: 1000,
|
||||||
|
}
|
||||||
|
if err := db.Create(&cfg).Error; err != nil {
|
||||||
|
t.Fatalf("create config: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create log records within the last 24 hours
|
||||||
|
now := time.Now().UTC()
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
log := model.LogRecord{
|
||||||
|
MasterID: master.ID,
|
||||||
|
KeyID: 1,
|
||||||
|
TokensIn: 100,
|
||||||
|
TokensOut: 200,
|
||||||
|
}
|
||||||
|
if err := db.Create(&log).Error; err != nil {
|
||||||
|
t.Fatalf("create log: %v", err)
|
||||||
|
}
|
||||||
|
// Update created_at to be within the last 24 hours
|
||||||
|
db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Hour))
|
||||||
|
}
|
||||||
|
|
||||||
|
config := DefaultAlertDetectorConfig()
|
||||||
|
detector := NewAlertDetector(db, db, nil, nil, config, nil)
|
||||||
|
|
||||||
|
// Run detection
|
||||||
|
detector.detectMasterDaySpikes(context.Background(), cfg)
|
||||||
|
|
||||||
|
// Should have created an RPD alert
|
||||||
|
var alerts []model.Alert
|
||||||
|
if err := db.Where("type = ?", model.AlertTypeTrafficSpike).Find(&alerts).Error; err != nil {
|
||||||
|
t.Fatalf("query alerts: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(alerts) < 1 {
|
||||||
|
t.Error("expected at least 1 traffic spike alert for RPD")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check alert properties
|
||||||
|
var foundRPD bool
|
||||||
|
for _, a := range alerts {
|
||||||
|
if a.RelatedType == "master" && a.RelatedID == master.ID {
|
||||||
|
foundRPD = true
|
||||||
|
if a.Severity != model.AlertSeverityWarning && a.Severity != model.AlertSeverityCritical {
|
||||||
|
t.Errorf("expected warning or critical severity, got %s", a.Severity)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !foundRPD {
|
||||||
|
t.Error("expected RPD alert for test master")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDetectMasterMinuteSpikesTPM(t *testing.T) {
|
||||||
|
db := setupTestDB(t)
|
||||||
|
|
||||||
|
// Create a master
|
||||||
|
master := model.Master{Name: "test-master-tpm", Status: "active"}
|
||||||
|
if err := db.Create(&master).Error; err != nil {
|
||||||
|
t.Fatalf("create master: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create threshold config with low token thresholds for testing
|
||||||
|
cfg := model.AlertThresholdConfig{
|
||||||
|
GlobalQPS: 100,
|
||||||
|
MasterRPM: 1000, // High so RPM won't trigger
|
||||||
|
MasterRPD: 10000,
|
||||||
|
MasterTPM: 1000, // Low token threshold for testing
|
||||||
|
MasterTPD: 100_000_000,
|
||||||
|
MinRPMRequests1m: 100, // High so RPM minimum won't be met
|
||||||
|
MinTPMTokens1m: 500, // Low minimum sample
|
||||||
|
}
|
||||||
|
if err := db.Create(&cfg).Error; err != nil {
|
||||||
|
t.Fatalf("create config: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create log records with high token counts within the last minute
|
||||||
|
now := time.Now().UTC()
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
log := model.LogRecord{
|
||||||
|
MasterID: master.ID,
|
||||||
|
KeyID: 1,
|
||||||
|
TokensIn: 500,
|
||||||
|
TokensOut: 500, // 1000 total per record
|
||||||
|
}
|
||||||
|
if err := db.Create(&log).Error; err != nil {
|
||||||
|
t.Fatalf("create log: %v", err)
|
||||||
|
}
|
||||||
|
// Update created_at to be within the last minute
|
||||||
|
db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Second))
|
||||||
|
}
|
||||||
|
|
||||||
|
config := DefaultAlertDetectorConfig()
|
||||||
|
detector := NewAlertDetector(db, db, nil, nil, config, nil)
|
||||||
|
|
||||||
|
// Run detection
|
||||||
|
detector.detectMasterMinuteSpikes(context.Background(), cfg)
|
||||||
|
|
||||||
|
// Should have created a TPM alert (5 * 1000 = 5000 tokens >= 1000 threshold, and >= 500 min)
|
||||||
|
var alerts []model.Alert
|
||||||
|
if err := db.Where("type = ? AND related_id = ?", model.AlertTypeTrafficSpike, master.ID).Find(&alerts).Error; err != nil {
|
||||||
|
t.Fatalf("query alerts: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(alerts) < 1 {
|
||||||
|
t.Error("expected at least 1 traffic spike alert for TPM")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDetectTrafficSpikesWithoutRedis(t *testing.T) {
|
||||||
|
db := setupTestDB(t)
|
||||||
|
|
||||||
|
// Create a master
|
||||||
|
master := model.Master{Name: "test-master-no-redis", Status: "active"}
|
||||||
|
if err := db.Create(&master).Error; err != nil {
|
||||||
|
t.Fatalf("create master: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create threshold config
|
||||||
|
cfg := model.AlertThresholdConfig{
|
||||||
|
GlobalQPS: 100,
|
||||||
|
MasterRPM: 5, // Low threshold
|
||||||
|
MasterRPD: 5, // Low threshold
|
||||||
|
MasterTPM: 10_000_000,
|
||||||
|
MasterTPD: 100_000_000,
|
||||||
|
MinRPMRequests1m: 3,
|
||||||
|
MinTPMTokens1m: 1000,
|
||||||
|
}
|
||||||
|
if err := db.Create(&cfg).Error; err != nil {
|
||||||
|
t.Fatalf("create config: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create log records
|
||||||
|
now := time.Now().UTC()
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
log := model.LogRecord{
|
||||||
|
MasterID: master.ID,
|
||||||
|
KeyID: 1,
|
||||||
|
TokensIn: 100,
|
||||||
|
TokensOut: 200,
|
||||||
|
}
|
||||||
|
if err := db.Create(&log).Error; err != nil {
|
||||||
|
t.Fatalf("create log: %v", err)
|
||||||
|
}
|
||||||
|
db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Second))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create detector WITHOUT Redis (rdb = nil)
|
||||||
|
config := DefaultAlertDetectorConfig()
|
||||||
|
detector := NewAlertDetector(db, db, nil, nil, config, nil)
|
||||||
|
|
||||||
|
// Run full detectTrafficSpikes - should not panic and should detect log-based spikes
|
||||||
|
detector.detectTrafficSpikes(context.Background())
|
||||||
|
|
||||||
|
// Should have created alerts for minute spikes (log-based detection works without Redis)
|
||||||
|
var alerts []model.Alert
|
||||||
|
if err := db.Where("type = ?", model.AlertTypeTrafficSpike).Find(&alerts).Error; err != nil {
|
||||||
|
t.Fatalf("query alerts: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(alerts) < 1 {
|
||||||
|
t.Error("expected traffic spike alerts from log-based detection (without Redis)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDetectMasterMinuteSpikesNoAlertBelowThreshold(t *testing.T) {
|
||||||
|
db := setupTestDB(t)
|
||||||
|
|
||||||
|
// Create a master
|
||||||
|
master := model.Master{Name: "test-master-low", Status: "active"}
|
||||||
|
if err := db.Create(&master).Error; err != nil {
|
||||||
|
t.Fatalf("create master: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create threshold config with high thresholds
|
||||||
|
cfg := model.AlertThresholdConfig{
|
||||||
|
GlobalQPS: 100,
|
||||||
|
MasterRPM: 1000, // High threshold
|
||||||
|
MasterRPD: 10000,
|
||||||
|
MasterTPM: 10_000_000, // High threshold
|
||||||
|
MasterTPD: 100_000_000,
|
||||||
|
MinRPMRequests1m: 10,
|
||||||
|
MinTPMTokens1m: 1_000_000,
|
||||||
|
}
|
||||||
|
if err := db.Create(&cfg).Error; err != nil {
|
||||||
|
t.Fatalf("create config: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create only a few log records (below threshold)
|
||||||
|
now := time.Now().UTC()
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
log := model.LogRecord{
|
||||||
|
MasterID: master.ID,
|
||||||
|
KeyID: 1,
|
||||||
|
TokensIn: 100,
|
||||||
|
TokensOut: 200,
|
||||||
|
}
|
||||||
|
if err := db.Create(&log).Error; err != nil {
|
||||||
|
t.Fatalf("create log: %v", err)
|
||||||
|
}
|
||||||
|
db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Second))
|
||||||
|
}
|
||||||
|
|
||||||
|
config := DefaultAlertDetectorConfig()
|
||||||
|
detector := NewAlertDetector(db, db, nil, nil, config, nil)
|
||||||
|
|
||||||
|
// Run detection
|
||||||
|
detector.detectMasterMinuteSpikes(context.Background(), cfg)
|
||||||
|
|
||||||
|
// Should NOT have created any alerts (below threshold and minimum sample)
|
||||||
|
var count int64
|
||||||
|
if err := db.Model(&model.Alert{}).Where("type = ?", model.AlertTypeTrafficSpike).Count(&count).Error; err != nil {
|
||||||
|
t.Fatalf("count alerts: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if count != 0 {
|
||||||
|
t.Errorf("expected 0 alerts when below threshold, got %d", count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import "gorm.io/gorm"
|
|||||||
type LogRecord struct {
|
type LogRecord struct {
|
||||||
gorm.Model
|
gorm.Model
|
||||||
Group string `json:"group"`
|
Group string `json:"group"`
|
||||||
|
MasterID uint `gorm:"index" json:"master_id"`
|
||||||
KeyID uint `json:"key_id"`
|
KeyID uint `json:"key_id"`
|
||||||
ModelName string `json:"model"`
|
ModelName string `json:"model"`
|
||||||
ProviderID uint `json:"provider_id"`
|
ProviderID uint `json:"provider_id"`
|
||||||
|
|||||||
Reference in New Issue
Block a user