diff --git a/cmd/server/main.go b/cmd/server/main.go index 1ab3934..92000d4 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -207,6 +207,13 @@ func main() { defer cancelToken() go tokenRefresher.Start(tokenCtx) + // Alert Detector + alertDetectorConfig := cron.DefaultAlertDetectorConfig() + alertDetector := cron.NewAlertDetector(db, logDB, rdb, service.NewStatsService(rdb), alertDetectorConfig, logger) + alertDetectorCtx, cancelAlertDetector := context.WithCancel(context.Background()) + defer cancelAlertDetector() + go alertDetector.Start(alertDetectorCtx) + adminService, err := service.NewAdminService() if err != nil { fatal(logger, "failed to create admin service", "err", err) diff --git a/docs/api.md b/docs/api.md index af82444..b92f6f3 100644 --- a/docs/api.md +++ b/docs/api.md @@ -524,6 +524,33 @@ curl -X POST http://localhost:8080/internal/alerts/report \ } ``` +### 6.6 自动告警检测 (AlertDetector) + +CP 侧运行后台任务,每分钟自动检测异常并生成告警。 + +**检测规则**: + +| 规则 | 类型 | 严重性 | 说明 | +| :--- | :--- | :--- | :--- | +| 速率限制 | `rate_limit` | warning | 检测 Redis 中被限流的 Master | +| 错误飙升 | `error_spike` | info/warning/critical | 近 5 分钟错误率 >= 10%(>=50% 为 critical) | +| 配额超限 | `quota_exceeded` | warning/critical | Key 配额使用 >= 90%(达到 100% 为 critical) | +| 上游故障 | `provider_down` | critical | API Key 失败率 >= 50% 且失败次数 >= 10 | + +**去重机制**: +- 基于 `fingerprint`(`type:related_type:related_id`)去重 +- 5 分钟内同一 fingerprint 的活跃告警不重复创建 + +**配置默认值**: +```go +Interval: 1 * time.Minute // 检测间隔 +ErrorSpikeThreshold: 0.1 // 错误率阈值 (10%) +ErrorSpikeWindow: 5 * time.Minute // 错误统计窗口 +QuotaWarningThreshold: 0.9 // 配额告警阈值 (90%) +ProviderFailThreshold: 10 // 上游失败次数阈值 +DeduplicationCooldown: 5 * time.Minute // 去重冷却期 +``` + --- ## 7. 备注 diff --git a/internal/cron/alert_detector.go b/internal/cron/alert_detector.go new file mode 100644 index 0000000..94ac9cf --- /dev/null +++ b/internal/cron/alert_detector.go @@ -0,0 +1,325 @@ +package cron + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/ez-api/ez-api/internal/model" + "github.com/ez-api/ez-api/internal/service" + "github.com/redis/go-redis/v9" + "gorm.io/gorm" +) + +// AlertDetectorConfig holds configuration for alert detection +type AlertDetectorConfig struct { + Enabled bool + Interval time.Duration + ErrorSpikeThreshold float64 // Error rate threshold (0.1 = 10%) + ErrorSpikeWindow time.Duration + QuotaWarningThreshold float64 // Quota usage threshold (0.9 = 90%) + ProviderFailThreshold int // Consecutive failures before alert + DeduplicationCooldown time.Duration +} + +// DefaultAlertDetectorConfig returns default configuration +func DefaultAlertDetectorConfig() AlertDetectorConfig { + return AlertDetectorConfig{ + Enabled: true, + Interval: 1 * time.Minute, + ErrorSpikeThreshold: 0.1, // 10% error rate + ErrorSpikeWindow: 5 * time.Minute, + QuotaWarningThreshold: 0.9, // 90% quota used + ProviderFailThreshold: 10, + DeduplicationCooldown: 5 * time.Minute, + } +} + +// AlertDetector detects anomalies and creates alerts +type AlertDetector struct { + db *gorm.DB + logDB *gorm.DB + rdb *redis.Client + statsService *service.StatsService + config AlertDetectorConfig + logger *slog.Logger +} + +// NewAlertDetector creates a new AlertDetector +func NewAlertDetector(db, logDB *gorm.DB, rdb *redis.Client, statsService *service.StatsService, config AlertDetectorConfig, logger *slog.Logger) *AlertDetector { + if logDB == nil { + logDB = db + } + if logger == nil { + logger = slog.Default() + } + return &AlertDetector{ + db: db, + logDB: logDB, + rdb: rdb, + statsService: statsService, + config: config, + logger: logger, + } +} + +// Start begins the alert detection loop +func (d *AlertDetector) Start(ctx context.Context) { + if d == nil || !d.config.Enabled { + return + } + if ctx == nil { + ctx = context.Background() + } + + ticker := time.NewTicker(d.config.Interval) + defer ticker.Stop() + + d.logger.Info("alert detector started", "interval", d.config.Interval) + + for { + select { + case <-ctx.Done(): + d.logger.Info("alert detector stopped") + return + case <-ticker.C: + d.detectOnce(ctx) + } + } +} + +// detectOnce runs all detection rules once +func (d *AlertDetector) detectOnce(ctx context.Context) { + if d == nil || d.db == nil { + return + } + + // Run each detection rule + d.detectRateLimits(ctx) + d.detectErrorSpikes(ctx) + d.detectQuotaExceeded(ctx) + d.detectProviderDown(ctx) +} + +// detectRateLimits checks for masters hitting rate limits +func (d *AlertDetector) detectRateLimits(ctx context.Context) { + if d.rdb == nil || d.statsService == nil { + return + } + + // Get all active masters + var masters []model.Master + if err := d.db.Where("status = ?", "active").Find(&masters).Error; err != nil { + d.logger.Warn("failed to load masters for rate limit detection", "err", err) + return + } + + for _, master := range masters { + snapshot, err := d.statsService.GetMasterRealtimeSnapshot(ctx, master.ID) + if err != nil { + continue + } + + if snapshot.RateLimited { + d.createAlertIfNew( + model.AlertTypeRateLimit, + model.AlertSeverityWarning, + fmt.Sprintf("Master '%s' is rate limited", master.Name), + fmt.Sprintf("Master '%s' (ID: %d) is currently being rate limited. Current QPS: %.2f", master.Name, master.ID, snapshot.QPS), + master.ID, + "master", + master.Name, + ) + } + } +} + +// detectErrorSpikes checks for high error rates in recent logs +func (d *AlertDetector) detectErrorSpikes(ctx context.Context) { + if d.logDB == nil { + return + } + + since := time.Now().UTC().Add(-d.config.ErrorSpikeWindow) + + // Query error stats grouped by master/key + type errorStat struct { + KeyID uint + Total int64 + Errors int64 + ErrRate float64 + } + + var stats []errorStat + err := d.logDB.Model(&model.LogRecord{}). + Select("key_id, COUNT(*) as total, SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) as errors"). + Where("created_at >= ?", since). + Where("key_id > 0"). + Group("key_id"). + Having("COUNT(*) >= 10"). // Minimum requests threshold + Scan(&stats).Error + + if err != nil { + d.logger.Warn("failed to query error stats", "err", err) + return + } + + for _, stat := range stats { + if stat.Total == 0 { + continue + } + errRate := float64(stat.Errors) / float64(stat.Total) + if errRate >= d.config.ErrorSpikeThreshold { + // Get key name + var key model.Key + if err := d.db.Select("id, master_id").First(&key, stat.KeyID).Error; err != nil { + continue + } + + d.createAlertIfNew( + model.AlertTypeErrorSpike, + d.errorRateSeverity(errRate), + fmt.Sprintf("High error rate detected (%.1f%%)", errRate*100), + fmt.Sprintf("Key ID %d has %.1f%% error rate (%d/%d requests) in the last %v", + stat.KeyID, errRate*100, stat.Errors, stat.Total, d.config.ErrorSpikeWindow), + stat.KeyID, + "key", + "", + ) + } + } +} + +// detectQuotaExceeded checks for keys approaching or exceeding quota +func (d *AlertDetector) detectQuotaExceeded(ctx context.Context) { + var keys []model.Key + // Find keys with quota enabled and usage >= threshold + err := d.db.Where("quota_limit > 0 AND quota_used >= quota_limit * ?", d.config.QuotaWarningThreshold). + Find(&keys).Error + + if err != nil { + d.logger.Warn("failed to query quota usage", "err", err) + return + } + + for _, key := range keys { + usagePercent := float64(key.QuotaUsed) / float64(key.QuotaLimit) * 100 + exceeded := key.QuotaUsed >= key.QuotaLimit + + severity := model.AlertSeverityWarning + title := fmt.Sprintf("Quota at %.0f%%", usagePercent) + if exceeded { + severity = model.AlertSeverityCritical + title = "Quota exceeded" + } + + d.createAlertIfNew( + model.AlertTypeQuotaExceeded, + severity, + title, + fmt.Sprintf("Key ID %d (Master %d) has used %d/%d tokens (%.1f%%)", + key.ID, key.MasterID, key.QuotaUsed, key.QuotaLimit, usagePercent), + key.ID, + "key", + "", + ) + } +} + +// detectProviderDown checks for API keys with consecutive failures +func (d *AlertDetector) detectProviderDown(ctx context.Context) { + // Find API keys with high failure rate + var apiKeys []model.APIKey + err := d.db.Where("status = ? AND total_requests > 0", "active"). + Where("(total_requests - success_requests) >= ?", d.config.ProviderFailThreshold). + Find(&apiKeys).Error + + if err != nil { + d.logger.Warn("failed to query api key failures", "err", err) + return + } + + for _, apiKey := range apiKeys { + failures := apiKey.TotalRequests - apiKey.SuccessRequests + if failures < int64(d.config.ProviderFailThreshold) { + continue + } + + // Check recent failure rate + failureRate := float64(failures) / float64(apiKey.TotalRequests) + if failureRate < 0.5 { // At least 50% failure rate + continue + } + + // Get provider group name + var group model.ProviderGroup + groupName := "" + if err := d.db.Select("name").First(&group, apiKey.GroupID).Error; err == nil { + groupName = group.Name + } + + d.createAlertIfNew( + model.AlertTypeProviderDown, + model.AlertSeverityCritical, + fmt.Sprintf("Provider API key failing (%d failures)", failures), + fmt.Sprintf("API Key ID %d in group '%s' has %d failures out of %d requests (%.1f%% failure rate)", + apiKey.ID, groupName, failures, apiKey.TotalRequests, failureRate*100), + apiKey.ID, + "apikey", + groupName, + ) + } +} + +// createAlertIfNew creates an alert if no duplicate exists within cooldown period +func (d *AlertDetector) createAlertIfNew( + alertType model.AlertType, + severity model.AlertSeverity, + title, message string, + relatedID uint, + relatedType, relatedName string, +) { + fingerprint := fmt.Sprintf("%s:%s:%d", alertType, relatedType, relatedID) + cooldownTime := time.Now().UTC().Add(-d.config.DeduplicationCooldown) + + // Check for existing active alert with same fingerprint + var count int64 + d.db.Model(&model.Alert{}). + Where("fingerprint = ? AND status = ? AND created_at >= ?", fingerprint, model.AlertStatusActive, cooldownTime). + Count(&count) + + if count > 0 { + return // Duplicate within cooldown + } + + alert := model.Alert{ + Type: alertType, + Severity: severity, + Status: model.AlertStatusActive, + Title: title, + Message: message, + RelatedID: relatedID, + RelatedType: relatedType, + RelatedName: relatedName, + Fingerprint: fingerprint, + } + + if err := d.db.Create(&alert).Error; err != nil { + d.logger.Warn("failed to create alert", "type", alertType, "err", err) + return + } + + d.logger.Info("alert created", "type", alertType, "severity", severity, "title", title) +} + +// errorRateSeverity determines severity based on error rate +func (d *AlertDetector) errorRateSeverity(rate float64) model.AlertSeverity { + if rate >= 0.5 { + return model.AlertSeverityCritical + } + if rate >= 0.25 { + return model.AlertSeverityWarning + } + return model.AlertSeverityInfo +}