Files
ez-api/internal/cron/alert_detector_test.go
zenfun 05caed37c2 refactor(cron): migrate cron jobs to foundation scheduler
Replace custom goroutine-based scheduling in cron jobs with centralized
foundation scheduler. Each cron job now exposes a RunOnce method called
by the scheduler instead of managing its own ticker loop.

Changes:
- Remove interval/enabled config from cron job structs
- Convert Start() methods to RunOnce() for all cron jobs
- Add scheduler setup in main.go with configurable intervals
- Update foundation dependency to v0.6.0 for scheduler support
- Update tests to validate RunOnce nil-safety
2025-12-31 20:42:25 +08:00

622 lines
16 KiB
Go

package cron
import (
"context"
"fmt"
"testing"
"time"
"github.com/ez-api/ez-api/internal/model"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func setupTestDB(t *testing.T) *gorm.DB {
dsn := fmt.Sprintf("file:%s?mode=memory&cache=shared", t.Name())
db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{})
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
if err := db.AutoMigrate(&model.Alert{}, &model.AlertThresholdConfig{}, &model.Master{}, &model.Key{}, &model.APIKey{}, &model.ProviderGroup{}, &model.LogRecord{}); err != nil {
t.Fatalf("migrate: %v", err)
}
return db
}
func TestDefaultAlertThresholdConfig(t *testing.T) {
cfg := model.DefaultAlertThresholdConfig()
if cfg.GlobalQPS != 100 {
t.Errorf("expected GlobalQPS=100, got %d", cfg.GlobalQPS)
}
if cfg.MasterRPM != 20 {
t.Errorf("expected MasterRPM=20, got %d", cfg.MasterRPM)
}
if cfg.MasterRPD != 1000 {
t.Errorf("expected MasterRPD=1000, got %d", cfg.MasterRPD)
}
if cfg.MasterTPM != 10_000_000 {
t.Errorf("expected MasterTPM=10000000, got %d", cfg.MasterTPM)
}
if cfg.MasterTPD != 100_000_000 {
t.Errorf("expected MasterTPD=100000000, got %d", cfg.MasterTPD)
}
if cfg.MinRPMRequests1m != 10 {
t.Errorf("expected MinRPMRequests1m=10, got %d", cfg.MinRPMRequests1m)
}
if cfg.MinTPMTokens1m != 1_000_000 {
t.Errorf("expected MinTPMTokens1m=1000000, got %d", cfg.MinTPMTokens1m)
}
}
func TestAlertDetectorLoadThresholdConfigDefault(t *testing.T) {
db := setupTestDB(t)
detector := &AlertDetector{db: db}
cfg := detector.loadThresholdConfig()
// Should return defaults when no config in DB
if cfg.GlobalQPS != 100 {
t.Errorf("expected default GlobalQPS=100, got %d", cfg.GlobalQPS)
}
if cfg.MasterRPM != 20 {
t.Errorf("expected default MasterRPM=20, got %d", cfg.MasterRPM)
}
}
func TestAlertDetectorLoadThresholdConfigFromDB(t *testing.T) {
db := setupTestDB(t)
// Insert custom config
customCfg := model.AlertThresholdConfig{
GlobalQPS: 500,
MasterRPM: 100,
MasterRPD: 5000,
MasterTPM: 50_000_000,
MasterTPD: 500_000_000,
MinRPMRequests1m: 50,
MinTPMTokens1m: 5_000_000,
}
if err := db.Create(&customCfg).Error; err != nil {
t.Fatalf("create config: %v", err)
}
detector := &AlertDetector{db: db}
cfg := detector.loadThresholdConfig()
if cfg.GlobalQPS != 500 {
t.Errorf("expected GlobalQPS=500, got %d", cfg.GlobalQPS)
}
if cfg.MasterRPM != 100 {
t.Errorf("expected MasterRPM=100, got %d", cfg.MasterRPM)
}
if cfg.MasterRPD != 5000 {
t.Errorf("expected MasterRPD=5000, got %d", cfg.MasterRPD)
}
}
func TestTrafficSpikeSeverity(t *testing.T) {
tests := []struct {
value int64
threshold int64
expected model.AlertSeverity
}{
{50, 100, model.AlertSeverityWarning}, // below threshold, but this func is only called when >= threshold
{100, 100, model.AlertSeverityWarning}, // exactly at threshold
{150, 100, model.AlertSeverityWarning}, // 1.5x threshold
{199, 100, model.AlertSeverityWarning}, // just below 2x
{200, 100, model.AlertSeverityCritical}, // exactly 2x threshold
{300, 100, model.AlertSeverityCritical}, // 3x threshold
}
for _, tc := range tests {
result := trafficSpikeSeverity(tc.value, tc.threshold)
if result != tc.expected {
t.Errorf("trafficSpikeSeverity(%d, %d) = %s, expected %s", tc.value, tc.threshold, result, tc.expected)
}
}
}
func TestTrafficSpikeMetadataJSON(t *testing.T) {
meta := trafficSpikeMetadata{
Metric: "master_rpm",
Value: 150,
Threshold: 20,
Window: "1m",
}
json := meta.JSON()
if json == "" {
t.Error("expected non-empty JSON")
}
if len(json) < 10 {
t.Errorf("JSON too short: %s", json)
}
}
func TestAlertDetectorDeduplication(t *testing.T) {
db := setupTestDB(t)
config := DefaultAlertDetectorConfig()
config.DeduplicationCooldown = 5 * time.Minute
detector := NewAlertDetector(db, db, nil, nil, config, nil)
// Create first alert
detector.createAlertIfNew(
model.AlertTypeRateLimit,
model.AlertSeverityWarning,
"Test Alert",
"Test Message",
1,
"master",
"test-master",
)
var count int64
db.Model(&model.Alert{}).Count(&count)
if count != 1 {
t.Fatalf("expected 1 alert, got %d", count)
}
// Try to create duplicate (should be skipped)
detector.createAlertIfNew(
model.AlertTypeRateLimit,
model.AlertSeverityWarning,
"Test Alert Duplicate",
"Test Message Duplicate",
1,
"master",
"test-master",
)
db.Model(&model.Alert{}).Count(&count)
if count != 1 {
t.Fatalf("expected still 1 alert after duplicate, got %d", count)
}
// Different fingerprint should create new alert
detector.createAlertIfNew(
model.AlertTypeRateLimit,
model.AlertSeverityWarning,
"Different Alert",
"Different Message",
2, // Different related_id
"master",
"test-master-2",
)
db.Model(&model.Alert{}).Count(&count)
if count != 2 {
t.Fatalf("expected 2 alerts with different fingerprint, got %d", count)
}
}
func TestAlertDetectorTrafficSpikeDeduplication(t *testing.T) {
db := setupTestDB(t)
config := DefaultAlertDetectorConfig()
config.DeduplicationCooldown = 5 * time.Minute
detector := NewAlertDetector(db, db, nil, nil, config, nil)
meta := trafficSpikeMetadata{
Metric: "master_rpm",
Value: 150,
Threshold: 20,
Window: "1m",
}
// Create first traffic spike alert
detector.createTrafficSpikeAlert(
model.AlertSeverityWarning,
"RPM Exceeded",
"Master exceeded RPM",
1,
"master",
"test-master",
meta,
)
var count int64
db.Model(&model.Alert{}).Count(&count)
if count != 1 {
t.Fatalf("expected 1 alert, got %d", count)
}
// Try to create duplicate (same metric, same master)
detector.createTrafficSpikeAlert(
model.AlertSeverityWarning,
"RPM Exceeded Again",
"Master exceeded RPM again",
1,
"master",
"test-master",
meta,
)
db.Model(&model.Alert{}).Count(&count)
if count != 1 {
t.Fatalf("expected still 1 alert after duplicate, got %d", count)
}
// Different metric should create new alert
meta2 := trafficSpikeMetadata{
Metric: "master_tpm", // Different metric
Value: 15000000,
Threshold: 10000000,
Window: "1m",
}
detector.createTrafficSpikeAlert(
model.AlertSeverityWarning,
"TPM Exceeded",
"Master exceeded TPM",
1,
"master",
"test-master",
meta2,
)
db.Model(&model.Alert{}).Count(&count)
if count != 2 {
t.Fatalf("expected 2 alerts with different metric, got %d", count)
}
}
func TestAlertDetectorErrorRateSeverity(t *testing.T) {
detector := &AlertDetector{}
tests := []struct {
rate float64
expected model.AlertSeverity
}{
{0.05, model.AlertSeverityInfo}, // 5%
{0.10, model.AlertSeverityInfo}, // 10%
{0.24, model.AlertSeverityInfo}, // 24%
{0.25, model.AlertSeverityWarning}, // 25%
{0.40, model.AlertSeverityWarning}, // 40%
{0.49, model.AlertSeverityWarning}, // 49%
{0.50, model.AlertSeverityCritical}, // 50%
{0.75, model.AlertSeverityCritical}, // 75%
{1.00, model.AlertSeverityCritical}, // 100%
}
for _, tc := range tests {
result := detector.errorRateSeverity(tc.rate)
if result != tc.expected {
t.Errorf("errorRateSeverity(%.2f) = %s, expected %s", tc.rate, result, tc.expected)
}
}
}
func TestAlertDetectorDetectOnceNilSafe(t *testing.T) {
// Test nil detector
var nilDetector *AlertDetector
nilDetector.detectOnce(context.Background())
// Test detector with nil db
detector := &AlertDetector{}
detector.detectOnce(context.Background())
// Should not panic
}
func TestAlertDetectorRunOnceNilSafe(t *testing.T) {
// Test nil detector
var nilDetector *AlertDetector
nilDetector.RunOnce(context.Background())
// Test detector with nil db
detector := &AlertDetector{}
detector.RunOnce(context.Background())
// Should not panic
}
func TestDetectMasterMinuteSpikesRPM(t *testing.T) {
db := setupTestDB(t)
// Create a master
master := model.Master{Name: "test-master", Status: "active"}
if err := db.Create(&master).Error; err != nil {
t.Fatalf("create master: %v", err)
}
// Create threshold config with low thresholds for testing
cfg := model.AlertThresholdConfig{
GlobalQPS: 100,
MasterRPM: 5, // Low threshold for testing
MasterRPD: 1000,
MasterTPM: 10_000_000,
MasterTPD: 100_000_000,
MinRPMRequests1m: 3, // Low minimum sample
MinTPMTokens1m: 1000,
}
if err := db.Create(&cfg).Error; err != nil {
t.Fatalf("create config: %v", err)
}
// Create log records within the last minute
now := time.Now().UTC()
for i := 0; i < 10; i++ {
log := model.LogRecord{
MasterID: master.ID,
KeyID: 1,
TokensIn: 100,
TokensOut: 200,
}
if err := db.Create(&log).Error; err != nil {
t.Fatalf("create log: %v", err)
}
// Update created_at to be within the last minute
db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Second))
}
config := DefaultAlertDetectorConfig()
detector := NewAlertDetector(db, db, nil, nil, config, nil)
// Run detection
detector.detectMasterMinuteSpikes(context.Background(), cfg)
// Should have created an RPM alert
var alerts []model.Alert
if err := db.Where("type = ?", model.AlertTypeTrafficSpike).Find(&alerts).Error; err != nil {
t.Fatalf("query alerts: %v", err)
}
if len(alerts) < 1 {
t.Error("expected at least 1 traffic spike alert for RPM")
}
// Check alert properties
var foundRPM bool
for _, a := range alerts {
if a.RelatedType == "master" && a.RelatedID == master.ID {
foundRPM = true
if a.Severity != model.AlertSeverityWarning && a.Severity != model.AlertSeverityCritical {
t.Errorf("expected warning or critical severity, got %s", a.Severity)
}
}
}
if !foundRPM {
t.Error("expected RPM alert for test master")
}
}
func TestDetectMasterDaySpikesRPD(t *testing.T) {
db := setupTestDB(t)
// Create a master
master := model.Master{Name: "test-master-day", Status: "active"}
if err := db.Create(&master).Error; err != nil {
t.Fatalf("create master: %v", err)
}
// Create threshold config with low thresholds for testing
cfg := model.AlertThresholdConfig{
GlobalQPS: 100,
MasterRPM: 1000,
MasterRPD: 5, // Low threshold for testing
MasterTPM: 10_000_000,
MasterTPD: 100_000_000,
MinRPMRequests1m: 10,
MinTPMTokens1m: 1000,
}
if err := db.Create(&cfg).Error; err != nil {
t.Fatalf("create config: %v", err)
}
// Create log records within the last 24 hours
now := time.Now().UTC()
for i := 0; i < 10; i++ {
log := model.LogRecord{
MasterID: master.ID,
KeyID: 1,
TokensIn: 100,
TokensOut: 200,
}
if err := db.Create(&log).Error; err != nil {
t.Fatalf("create log: %v", err)
}
// Update created_at to be within the last 24 hours
db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Hour))
}
config := DefaultAlertDetectorConfig()
detector := NewAlertDetector(db, db, nil, nil, config, nil)
// Run detection
detector.detectMasterDaySpikes(context.Background(), cfg)
// Should have created an RPD alert
var alerts []model.Alert
if err := db.Where("type = ?", model.AlertTypeTrafficSpike).Find(&alerts).Error; err != nil {
t.Fatalf("query alerts: %v", err)
}
if len(alerts) < 1 {
t.Error("expected at least 1 traffic spike alert for RPD")
}
// Check alert properties
var foundRPD bool
for _, a := range alerts {
if a.RelatedType == "master" && a.RelatedID == master.ID {
foundRPD = true
if a.Severity != model.AlertSeverityWarning && a.Severity != model.AlertSeverityCritical {
t.Errorf("expected warning or critical severity, got %s", a.Severity)
}
}
}
if !foundRPD {
t.Error("expected RPD alert for test master")
}
}
func TestDetectMasterMinuteSpikesTPM(t *testing.T) {
db := setupTestDB(t)
// Create a master
master := model.Master{Name: "test-master-tpm", Status: "active"}
if err := db.Create(&master).Error; err != nil {
t.Fatalf("create master: %v", err)
}
// Create threshold config with low token thresholds for testing
cfg := model.AlertThresholdConfig{
GlobalQPS: 100,
MasterRPM: 1000, // High so RPM won't trigger
MasterRPD: 10000,
MasterTPM: 1000, // Low token threshold for testing
MasterTPD: 100_000_000,
MinRPMRequests1m: 100, // High so RPM minimum won't be met
MinTPMTokens1m: 500, // Low minimum sample
}
if err := db.Create(&cfg).Error; err != nil {
t.Fatalf("create config: %v", err)
}
// Create log records with high token counts within the last minute
now := time.Now().UTC()
for i := 0; i < 5; i++ {
log := model.LogRecord{
MasterID: master.ID,
KeyID: 1,
TokensIn: 500,
TokensOut: 500, // 1000 total per record
}
if err := db.Create(&log).Error; err != nil {
t.Fatalf("create log: %v", err)
}
// Update created_at to be within the last minute
db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Second))
}
config := DefaultAlertDetectorConfig()
detector := NewAlertDetector(db, db, nil, nil, config, nil)
// Run detection
detector.detectMasterMinuteSpikes(context.Background(), cfg)
// Should have created a TPM alert (5 * 1000 = 5000 tokens >= 1000 threshold, and >= 500 min)
var alerts []model.Alert
if err := db.Where("type = ? AND related_id = ?", model.AlertTypeTrafficSpike, master.ID).Find(&alerts).Error; err != nil {
t.Fatalf("query alerts: %v", err)
}
if len(alerts) < 1 {
t.Error("expected at least 1 traffic spike alert for TPM")
}
}
func TestDetectTrafficSpikesWithoutRedis(t *testing.T) {
db := setupTestDB(t)
// Create a master
master := model.Master{Name: "test-master-no-redis", Status: "active"}
if err := db.Create(&master).Error; err != nil {
t.Fatalf("create master: %v", err)
}
// Create threshold config
cfg := model.AlertThresholdConfig{
GlobalQPS: 100,
MasterRPM: 5, // Low threshold
MasterRPD: 5, // Low threshold
MasterTPM: 10_000_000,
MasterTPD: 100_000_000,
MinRPMRequests1m: 3,
MinTPMTokens1m: 1000,
}
if err := db.Create(&cfg).Error; err != nil {
t.Fatalf("create config: %v", err)
}
// Create log records
now := time.Now().UTC()
for i := 0; i < 10; i++ {
log := model.LogRecord{
MasterID: master.ID,
KeyID: 1,
TokensIn: 100,
TokensOut: 200,
}
if err := db.Create(&log).Error; err != nil {
t.Fatalf("create log: %v", err)
}
db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Second))
}
// Create detector WITHOUT Redis (rdb = nil)
config := DefaultAlertDetectorConfig()
detector := NewAlertDetector(db, db, nil, nil, config, nil)
// Run full detectTrafficSpikes - should not panic and should detect log-based spikes
detector.detectTrafficSpikes(context.Background())
// Should have created alerts for minute spikes (log-based detection works without Redis)
var alerts []model.Alert
if err := db.Where("type = ?", model.AlertTypeTrafficSpike).Find(&alerts).Error; err != nil {
t.Fatalf("query alerts: %v", err)
}
if len(alerts) < 1 {
t.Error("expected traffic spike alerts from log-based detection (without Redis)")
}
}
func TestDetectMasterMinuteSpikesNoAlertBelowThreshold(t *testing.T) {
db := setupTestDB(t)
// Create a master
master := model.Master{Name: "test-master-low", Status: "active"}
if err := db.Create(&master).Error; err != nil {
t.Fatalf("create master: %v", err)
}
// Create threshold config with high thresholds
cfg := model.AlertThresholdConfig{
GlobalQPS: 100,
MasterRPM: 1000, // High threshold
MasterRPD: 10000,
MasterTPM: 10_000_000, // High threshold
MasterTPD: 100_000_000,
MinRPMRequests1m: 10,
MinTPMTokens1m: 1_000_000,
}
if err := db.Create(&cfg).Error; err != nil {
t.Fatalf("create config: %v", err)
}
// Create only a few log records (below threshold)
now := time.Now().UTC()
for i := 0; i < 3; i++ {
log := model.LogRecord{
MasterID: master.ID,
KeyID: 1,
TokensIn: 100,
TokensOut: 200,
}
if err := db.Create(&log).Error; err != nil {
t.Fatalf("create log: %v", err)
}
db.Model(&log).Update("created_at", now.Add(-time.Duration(i)*time.Second))
}
config := DefaultAlertDetectorConfig()
detector := NewAlertDetector(db, db, nil, nil, config, nil)
// Run detection
detector.detectMasterMinuteSpikes(context.Background(), cfg)
// Should NOT have created any alerts (below threshold and minimum sample)
var count int64
if err := db.Model(&model.Alert{}).Where("type = ?", model.AlertTypeTrafficSpike).Count(&count).Error; err != nil {
t.Fatalf("count alerts: %v", err)
}
if count != 0 {
t.Errorf("expected 0 alerts when below threshold, got %d", count)
}
}