mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
feat(cron): add automatic alert detector for anomaly monitoring
Implement AlertDetector background task that runs every minute to detect and create alerts for various anomalies: - Rate limit detection: monitors masters hitting rate limits - Error spike detection: flags keys with >= 10% error rate - Quota exceeded: warns when key quota usage >= 90% - Provider down: alerts when API keys have >= 50% failure rate Includes fingerprint-based deduplication with 5-minute cooldown to prevent duplicate alerts for the same issue.
This commit is contained in:
@@ -207,6 +207,13 @@ func main() {
|
||||
defer cancelToken()
|
||||
go tokenRefresher.Start(tokenCtx)
|
||||
|
||||
// Alert Detector
|
||||
alertDetectorConfig := cron.DefaultAlertDetectorConfig()
|
||||
alertDetector := cron.NewAlertDetector(db, logDB, rdb, service.NewStatsService(rdb), alertDetectorConfig, logger)
|
||||
alertDetectorCtx, cancelAlertDetector := context.WithCancel(context.Background())
|
||||
defer cancelAlertDetector()
|
||||
go alertDetector.Start(alertDetectorCtx)
|
||||
|
||||
adminService, err := service.NewAdminService()
|
||||
if err != nil {
|
||||
fatal(logger, "failed to create admin service", "err", err)
|
||||
|
||||
27
docs/api.md
27
docs/api.md
@@ -524,6 +524,33 @@ curl -X POST http://localhost:8080/internal/alerts/report \
|
||||
}
|
||||
```
|
||||
|
||||
### 6.6 自动告警检测 (AlertDetector)
|
||||
|
||||
CP 侧运行后台任务,每分钟自动检测异常并生成告警。
|
||||
|
||||
**检测规则**:
|
||||
|
||||
| 规则 | 类型 | 严重性 | 说明 |
|
||||
| :--- | :--- | :--- | :--- |
|
||||
| 速率限制 | `rate_limit` | warning | 检测 Redis 中被限流的 Master |
|
||||
| 错误飙升 | `error_spike` | info/warning/critical | 近 5 分钟错误率 >= 10%(>=50% 为 critical) |
|
||||
| 配额超限 | `quota_exceeded` | warning/critical | Key 配额使用 >= 90%(达到 100% 为 critical) |
|
||||
| 上游故障 | `provider_down` | critical | API Key 失败率 >= 50% 且失败次数 >= 10 |
|
||||
|
||||
**去重机制**:
|
||||
- 基于 `fingerprint`(`type:related_type:related_id`)去重
|
||||
- 5 分钟内同一 fingerprint 的活跃告警不重复创建
|
||||
|
||||
**配置默认值**:
|
||||
```go
|
||||
Interval: 1 * time.Minute // 检测间隔
|
||||
ErrorSpikeThreshold: 0.1 // 错误率阈值 (10%)
|
||||
ErrorSpikeWindow: 5 * time.Minute // 错误统计窗口
|
||||
QuotaWarningThreshold: 0.9 // 配额告警阈值 (90%)
|
||||
ProviderFailThreshold: 10 // 上游失败次数阈值
|
||||
DeduplicationCooldown: 5 * time.Minute // 去重冷却期
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 7. 备注
|
||||
|
||||
325
internal/cron/alert_detector.go
Normal file
325
internal/cron/alert_detector.go
Normal file
@@ -0,0 +1,325 @@
|
||||
package cron
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/ez-api/ez-api/internal/model"
|
||||
"github.com/ez-api/ez-api/internal/service"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// AlertDetectorConfig holds configuration for alert detection
|
||||
type AlertDetectorConfig struct {
|
||||
Enabled bool
|
||||
Interval time.Duration
|
||||
ErrorSpikeThreshold float64 // Error rate threshold (0.1 = 10%)
|
||||
ErrorSpikeWindow time.Duration
|
||||
QuotaWarningThreshold float64 // Quota usage threshold (0.9 = 90%)
|
||||
ProviderFailThreshold int // Consecutive failures before alert
|
||||
DeduplicationCooldown time.Duration
|
||||
}
|
||||
|
||||
// DefaultAlertDetectorConfig returns default configuration
|
||||
func DefaultAlertDetectorConfig() AlertDetectorConfig {
|
||||
return AlertDetectorConfig{
|
||||
Enabled: true,
|
||||
Interval: 1 * time.Minute,
|
||||
ErrorSpikeThreshold: 0.1, // 10% error rate
|
||||
ErrorSpikeWindow: 5 * time.Minute,
|
||||
QuotaWarningThreshold: 0.9, // 90% quota used
|
||||
ProviderFailThreshold: 10,
|
||||
DeduplicationCooldown: 5 * time.Minute,
|
||||
}
|
||||
}
|
||||
|
||||
// AlertDetector detects anomalies and creates alerts
|
||||
type AlertDetector struct {
|
||||
db *gorm.DB
|
||||
logDB *gorm.DB
|
||||
rdb *redis.Client
|
||||
statsService *service.StatsService
|
||||
config AlertDetectorConfig
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewAlertDetector creates a new AlertDetector
|
||||
func NewAlertDetector(db, logDB *gorm.DB, rdb *redis.Client, statsService *service.StatsService, config AlertDetectorConfig, logger *slog.Logger) *AlertDetector {
|
||||
if logDB == nil {
|
||||
logDB = db
|
||||
}
|
||||
if logger == nil {
|
||||
logger = slog.Default()
|
||||
}
|
||||
return &AlertDetector{
|
||||
db: db,
|
||||
logDB: logDB,
|
||||
rdb: rdb,
|
||||
statsService: statsService,
|
||||
config: config,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the alert detection loop
|
||||
func (d *AlertDetector) Start(ctx context.Context) {
|
||||
if d == nil || !d.config.Enabled {
|
||||
return
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(d.config.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
d.logger.Info("alert detector started", "interval", d.config.Interval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
d.logger.Info("alert detector stopped")
|
||||
return
|
||||
case <-ticker.C:
|
||||
d.detectOnce(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// detectOnce runs all detection rules once
|
||||
func (d *AlertDetector) detectOnce(ctx context.Context) {
|
||||
if d == nil || d.db == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Run each detection rule
|
||||
d.detectRateLimits(ctx)
|
||||
d.detectErrorSpikes(ctx)
|
||||
d.detectQuotaExceeded(ctx)
|
||||
d.detectProviderDown(ctx)
|
||||
}
|
||||
|
||||
// detectRateLimits checks for masters hitting rate limits
|
||||
func (d *AlertDetector) detectRateLimits(ctx context.Context) {
|
||||
if d.rdb == nil || d.statsService == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Get all active masters
|
||||
var masters []model.Master
|
||||
if err := d.db.Where("status = ?", "active").Find(&masters).Error; err != nil {
|
||||
d.logger.Warn("failed to load masters for rate limit detection", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, master := range masters {
|
||||
snapshot, err := d.statsService.GetMasterRealtimeSnapshot(ctx, master.ID)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if snapshot.RateLimited {
|
||||
d.createAlertIfNew(
|
||||
model.AlertTypeRateLimit,
|
||||
model.AlertSeverityWarning,
|
||||
fmt.Sprintf("Master '%s' is rate limited", master.Name),
|
||||
fmt.Sprintf("Master '%s' (ID: %d) is currently being rate limited. Current QPS: %.2f", master.Name, master.ID, snapshot.QPS),
|
||||
master.ID,
|
||||
"master",
|
||||
master.Name,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// detectErrorSpikes checks for high error rates in recent logs
|
||||
func (d *AlertDetector) detectErrorSpikes(ctx context.Context) {
|
||||
if d.logDB == nil {
|
||||
return
|
||||
}
|
||||
|
||||
since := time.Now().UTC().Add(-d.config.ErrorSpikeWindow)
|
||||
|
||||
// Query error stats grouped by master/key
|
||||
type errorStat struct {
|
||||
KeyID uint
|
||||
Total int64
|
||||
Errors int64
|
||||
ErrRate float64
|
||||
}
|
||||
|
||||
var stats []errorStat
|
||||
err := d.logDB.Model(&model.LogRecord{}).
|
||||
Select("key_id, COUNT(*) as total, SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) as errors").
|
||||
Where("created_at >= ?", since).
|
||||
Where("key_id > 0").
|
||||
Group("key_id").
|
||||
Having("COUNT(*) >= 10"). // Minimum requests threshold
|
||||
Scan(&stats).Error
|
||||
|
||||
if err != nil {
|
||||
d.logger.Warn("failed to query error stats", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, stat := range stats {
|
||||
if stat.Total == 0 {
|
||||
continue
|
||||
}
|
||||
errRate := float64(stat.Errors) / float64(stat.Total)
|
||||
if errRate >= d.config.ErrorSpikeThreshold {
|
||||
// Get key name
|
||||
var key model.Key
|
||||
if err := d.db.Select("id, master_id").First(&key, stat.KeyID).Error; err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
d.createAlertIfNew(
|
||||
model.AlertTypeErrorSpike,
|
||||
d.errorRateSeverity(errRate),
|
||||
fmt.Sprintf("High error rate detected (%.1f%%)", errRate*100),
|
||||
fmt.Sprintf("Key ID %d has %.1f%% error rate (%d/%d requests) in the last %v",
|
||||
stat.KeyID, errRate*100, stat.Errors, stat.Total, d.config.ErrorSpikeWindow),
|
||||
stat.KeyID,
|
||||
"key",
|
||||
"",
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// detectQuotaExceeded checks for keys approaching or exceeding quota
|
||||
func (d *AlertDetector) detectQuotaExceeded(ctx context.Context) {
|
||||
var keys []model.Key
|
||||
// Find keys with quota enabled and usage >= threshold
|
||||
err := d.db.Where("quota_limit > 0 AND quota_used >= quota_limit * ?", d.config.QuotaWarningThreshold).
|
||||
Find(&keys).Error
|
||||
|
||||
if err != nil {
|
||||
d.logger.Warn("failed to query quota usage", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
usagePercent := float64(key.QuotaUsed) / float64(key.QuotaLimit) * 100
|
||||
exceeded := key.QuotaUsed >= key.QuotaLimit
|
||||
|
||||
severity := model.AlertSeverityWarning
|
||||
title := fmt.Sprintf("Quota at %.0f%%", usagePercent)
|
||||
if exceeded {
|
||||
severity = model.AlertSeverityCritical
|
||||
title = "Quota exceeded"
|
||||
}
|
||||
|
||||
d.createAlertIfNew(
|
||||
model.AlertTypeQuotaExceeded,
|
||||
severity,
|
||||
title,
|
||||
fmt.Sprintf("Key ID %d (Master %d) has used %d/%d tokens (%.1f%%)",
|
||||
key.ID, key.MasterID, key.QuotaUsed, key.QuotaLimit, usagePercent),
|
||||
key.ID,
|
||||
"key",
|
||||
"",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// detectProviderDown checks for API keys with consecutive failures
|
||||
func (d *AlertDetector) detectProviderDown(ctx context.Context) {
|
||||
// Find API keys with high failure rate
|
||||
var apiKeys []model.APIKey
|
||||
err := d.db.Where("status = ? AND total_requests > 0", "active").
|
||||
Where("(total_requests - success_requests) >= ?", d.config.ProviderFailThreshold).
|
||||
Find(&apiKeys).Error
|
||||
|
||||
if err != nil {
|
||||
d.logger.Warn("failed to query api key failures", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, apiKey := range apiKeys {
|
||||
failures := apiKey.TotalRequests - apiKey.SuccessRequests
|
||||
if failures < int64(d.config.ProviderFailThreshold) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check recent failure rate
|
||||
failureRate := float64(failures) / float64(apiKey.TotalRequests)
|
||||
if failureRate < 0.5 { // At least 50% failure rate
|
||||
continue
|
||||
}
|
||||
|
||||
// Get provider group name
|
||||
var group model.ProviderGroup
|
||||
groupName := ""
|
||||
if err := d.db.Select("name").First(&group, apiKey.GroupID).Error; err == nil {
|
||||
groupName = group.Name
|
||||
}
|
||||
|
||||
d.createAlertIfNew(
|
||||
model.AlertTypeProviderDown,
|
||||
model.AlertSeverityCritical,
|
||||
fmt.Sprintf("Provider API key failing (%d failures)", failures),
|
||||
fmt.Sprintf("API Key ID %d in group '%s' has %d failures out of %d requests (%.1f%% failure rate)",
|
||||
apiKey.ID, groupName, failures, apiKey.TotalRequests, failureRate*100),
|
||||
apiKey.ID,
|
||||
"apikey",
|
||||
groupName,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// createAlertIfNew creates an alert if no duplicate exists within cooldown period
|
||||
func (d *AlertDetector) createAlertIfNew(
|
||||
alertType model.AlertType,
|
||||
severity model.AlertSeverity,
|
||||
title, message string,
|
||||
relatedID uint,
|
||||
relatedType, relatedName string,
|
||||
) {
|
||||
fingerprint := fmt.Sprintf("%s:%s:%d", alertType, relatedType, relatedID)
|
||||
cooldownTime := time.Now().UTC().Add(-d.config.DeduplicationCooldown)
|
||||
|
||||
// Check for existing active alert with same fingerprint
|
||||
var count int64
|
||||
d.db.Model(&model.Alert{}).
|
||||
Where("fingerprint = ? AND status = ? AND created_at >= ?", fingerprint, model.AlertStatusActive, cooldownTime).
|
||||
Count(&count)
|
||||
|
||||
if count > 0 {
|
||||
return // Duplicate within cooldown
|
||||
}
|
||||
|
||||
alert := model.Alert{
|
||||
Type: alertType,
|
||||
Severity: severity,
|
||||
Status: model.AlertStatusActive,
|
||||
Title: title,
|
||||
Message: message,
|
||||
RelatedID: relatedID,
|
||||
RelatedType: relatedType,
|
||||
RelatedName: relatedName,
|
||||
Fingerprint: fingerprint,
|
||||
}
|
||||
|
||||
if err := d.db.Create(&alert).Error; err != nil {
|
||||
d.logger.Warn("failed to create alert", "type", alertType, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
d.logger.Info("alert created", "type", alertType, "severity", severity, "title", title)
|
||||
}
|
||||
|
||||
// errorRateSeverity determines severity based on error rate
|
||||
func (d *AlertDetector) errorRateSeverity(rate float64) model.AlertSeverity {
|
||||
if rate >= 0.5 {
|
||||
return model.AlertSeverityCritical
|
||||
}
|
||||
if rate >= 0.25 {
|
||||
return model.AlertSeverityWarning
|
||||
}
|
||||
return model.AlertSeverityInfo
|
||||
}
|
||||
Reference in New Issue
Block a user