mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
feat(alerts): add traffic spike detection with configurable thresholds
Introduce traffic_spike alert type for monitoring system and per-master traffic levels with configurable thresholds stored in database. - Add AlertThresholdConfig model for persistent threshold configuration - Implement GET/PUT /admin/alerts/thresholds endpoints for threshold management - Add traffic spike detection in alert detector cron job: - Global QPS monitoring across all masters - Per-master RPM/TPM checks with minimum sample thresholds - Per-master RPD/TPD checks for daily limits - Use warning severity at threshold, critical at 2x threshold - Include metric metadata (value, threshold, window) in alert details - Update API documentation with new endpoints and alert type
This commit is contained in:
@@ -2,6 +2,7 @@ package cron
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
@@ -100,6 +101,7 @@ func (d *AlertDetector) detectOnce(ctx context.Context) {
|
||||
d.detectErrorSpikes(ctx)
|
||||
d.detectQuotaExceeded(ctx)
|
||||
d.detectProviderDown(ctx)
|
||||
d.detectTrafficSpikes(ctx)
|
||||
}
|
||||
|
||||
// detectRateLimits checks for masters hitting rate limits
|
||||
@@ -323,3 +325,284 @@ func (d *AlertDetector) errorRateSeverity(rate float64) model.AlertSeverity {
|
||||
}
|
||||
return model.AlertSeverityInfo
|
||||
}
|
||||
|
||||
// loadThresholdConfig loads the threshold config from DB or returns defaults
|
||||
func (d *AlertDetector) loadThresholdConfig() model.AlertThresholdConfig {
|
||||
var cfg model.AlertThresholdConfig
|
||||
err := d.db.First(&cfg).Error
|
||||
if err != nil {
|
||||
return model.DefaultAlertThresholdConfig()
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
// trafficSpikeSeverity determines severity based on value vs threshold
|
||||
// warning when >= threshold, critical when >= 2x threshold
|
||||
func trafficSpikeSeverity(value, threshold int64) model.AlertSeverity {
|
||||
if value >= threshold*2 {
|
||||
return model.AlertSeverityCritical
|
||||
}
|
||||
return model.AlertSeverityWarning
|
||||
}
|
||||
|
||||
// trafficSpikeMetadata creates JSON metadata for traffic spike alerts
|
||||
type trafficSpikeMetadata struct {
|
||||
Metric string `json:"metric"`
|
||||
Value int64 `json:"value"`
|
||||
Threshold int64 `json:"threshold"`
|
||||
Window string `json:"window"`
|
||||
}
|
||||
|
||||
func (m trafficSpikeMetadata) JSON() string {
|
||||
b, _ := json.Marshal(m)
|
||||
return string(b)
|
||||
}
|
||||
|
||||
// detectTrafficSpikes checks for traffic threshold breaches
|
||||
func (d *AlertDetector) detectTrafficSpikes(ctx context.Context) {
|
||||
if d.rdb == nil || d.statsService == nil {
|
||||
return
|
||||
}
|
||||
|
||||
cfg := d.loadThresholdConfig()
|
||||
|
||||
// 1. Global QPS check
|
||||
d.detectGlobalQPSSpike(ctx, cfg)
|
||||
|
||||
// 2. Per-master RPM/TPM (1-minute window)
|
||||
d.detectMasterMinuteSpikes(ctx, cfg)
|
||||
|
||||
// 3. Per-master RPD/TPD (24-hour window)
|
||||
d.detectMasterDaySpikes(ctx, cfg)
|
||||
}
|
||||
|
||||
// detectGlobalQPSSpike checks global QPS against threshold
|
||||
func (d *AlertDetector) detectGlobalQPSSpike(ctx context.Context, cfg model.AlertThresholdConfig) {
|
||||
// Sum QPS from all active masters
|
||||
var masters []model.Master
|
||||
if err := d.db.Where("status = ?", "active").Find(&masters).Error; err != nil {
|
||||
d.logger.Warn("failed to load masters for global QPS check", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
var totalQPS int64
|
||||
for _, master := range masters {
|
||||
snapshot, err := d.statsService.GetMasterRealtimeSnapshot(ctx, master.ID)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
totalQPS += snapshot.QPS
|
||||
}
|
||||
|
||||
if totalQPS >= cfg.GlobalQPS {
|
||||
meta := trafficSpikeMetadata{
|
||||
Metric: "global_qps",
|
||||
Value: totalQPS,
|
||||
Threshold: cfg.GlobalQPS,
|
||||
Window: "realtime",
|
||||
}
|
||||
d.createTrafficSpikeAlert(
|
||||
trafficSpikeSeverity(totalQPS, cfg.GlobalQPS),
|
||||
fmt.Sprintf("Global QPS threshold exceeded (%d >= %d)", totalQPS, cfg.GlobalQPS),
|
||||
fmt.Sprintf("System-wide QPS is %d, threshold is %d", totalQPS, cfg.GlobalQPS),
|
||||
0, "system", "global",
|
||||
meta,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// detectMasterMinuteSpikes checks per-master RPM/TPM in 1-minute window
|
||||
func (d *AlertDetector) detectMasterMinuteSpikes(ctx context.Context, cfg model.AlertThresholdConfig) {
|
||||
since := time.Now().UTC().Add(-1 * time.Minute)
|
||||
|
||||
// Query aggregated stats per master for 1-minute window
|
||||
type masterStat struct {
|
||||
MasterID uint
|
||||
Requests int64
|
||||
TokensIn int64
|
||||
TokensOut int64
|
||||
}
|
||||
|
||||
var stats []masterStat
|
||||
err := d.logDB.Model(&model.LogRecord{}).
|
||||
Select("master_id, COUNT(*) as requests, COALESCE(SUM(tokens_in), 0) as tokens_in, COALESCE(SUM(tokens_out), 0) as tokens_out").
|
||||
Where("created_at >= ?", since).
|
||||
Where("master_id > 0").
|
||||
Group("master_id").
|
||||
Scan(&stats).Error
|
||||
|
||||
if err != nil {
|
||||
d.logger.Warn("failed to query master minute stats", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Load master names for alerts
|
||||
masterNames := make(map[uint]string)
|
||||
var masters []model.Master
|
||||
if err := d.db.Select("id, name").Find(&masters).Error; err == nil {
|
||||
for _, m := range masters {
|
||||
masterNames[m.ID] = m.Name
|
||||
}
|
||||
}
|
||||
|
||||
for _, stat := range stats {
|
||||
masterName := masterNames[stat.MasterID]
|
||||
if masterName == "" {
|
||||
masterName = fmt.Sprintf("Master#%d", stat.MasterID)
|
||||
}
|
||||
|
||||
// RPM check (with minimum sample threshold)
|
||||
if stat.Requests >= cfg.MinRPMRequests1m && stat.Requests >= cfg.MasterRPM {
|
||||
meta := trafficSpikeMetadata{
|
||||
Metric: "master_rpm",
|
||||
Value: stat.Requests,
|
||||
Threshold: cfg.MasterRPM,
|
||||
Window: "1m",
|
||||
}
|
||||
d.createTrafficSpikeAlert(
|
||||
trafficSpikeSeverity(stat.Requests, cfg.MasterRPM),
|
||||
fmt.Sprintf("Master '%s' RPM threshold exceeded (%d >= %d)", masterName, stat.Requests, cfg.MasterRPM),
|
||||
fmt.Sprintf("Master '%s' (ID: %d) has %d requests in the last minute, threshold is %d", masterName, stat.MasterID, stat.Requests, cfg.MasterRPM),
|
||||
stat.MasterID, "master", masterName,
|
||||
meta,
|
||||
)
|
||||
}
|
||||
|
||||
// TPM check (with minimum sample threshold)
|
||||
totalTokens := stat.TokensIn + stat.TokensOut
|
||||
if totalTokens >= cfg.MinTPMTokens1m && totalTokens >= cfg.MasterTPM {
|
||||
meta := trafficSpikeMetadata{
|
||||
Metric: "master_tpm",
|
||||
Value: totalTokens,
|
||||
Threshold: cfg.MasterTPM,
|
||||
Window: "1m",
|
||||
}
|
||||
d.createTrafficSpikeAlert(
|
||||
trafficSpikeSeverity(totalTokens, cfg.MasterTPM),
|
||||
fmt.Sprintf("Master '%s' TPM threshold exceeded (%d >= %d)", masterName, totalTokens, cfg.MasterTPM),
|
||||
fmt.Sprintf("Master '%s' (ID: %d) used %d tokens in the last minute, threshold is %d", masterName, stat.MasterID, totalTokens, cfg.MasterTPM),
|
||||
stat.MasterID, "master", masterName,
|
||||
meta,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// detectMasterDaySpikes checks per-master RPD/TPD in 24-hour window
|
||||
func (d *AlertDetector) detectMasterDaySpikes(ctx context.Context, cfg model.AlertThresholdConfig) {
|
||||
since := time.Now().UTC().Add(-24 * time.Hour)
|
||||
|
||||
// Query aggregated stats per master for 24-hour window
|
||||
type masterStat struct {
|
||||
MasterID uint
|
||||
Requests int64
|
||||
TokensIn int64
|
||||
TokensOut int64
|
||||
}
|
||||
|
||||
var stats []masterStat
|
||||
err := d.logDB.Model(&model.LogRecord{}).
|
||||
Select("master_id, COUNT(*) as requests, COALESCE(SUM(tokens_in), 0) as tokens_in, COALESCE(SUM(tokens_out), 0) as tokens_out").
|
||||
Where("created_at >= ?", since).
|
||||
Where("master_id > 0").
|
||||
Group("master_id").
|
||||
Scan(&stats).Error
|
||||
|
||||
if err != nil {
|
||||
d.logger.Warn("failed to query master day stats", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Load master names for alerts
|
||||
masterNames := make(map[uint]string)
|
||||
var masters []model.Master
|
||||
if err := d.db.Select("id, name").Find(&masters).Error; err == nil {
|
||||
for _, m := range masters {
|
||||
masterNames[m.ID] = m.Name
|
||||
}
|
||||
}
|
||||
|
||||
for _, stat := range stats {
|
||||
masterName := masterNames[stat.MasterID]
|
||||
if masterName == "" {
|
||||
masterName = fmt.Sprintf("Master#%d", stat.MasterID)
|
||||
}
|
||||
|
||||
// RPD check
|
||||
if stat.Requests >= cfg.MasterRPD {
|
||||
meta := trafficSpikeMetadata{
|
||||
Metric: "master_rpd",
|
||||
Value: stat.Requests,
|
||||
Threshold: cfg.MasterRPD,
|
||||
Window: "24h",
|
||||
}
|
||||
d.createTrafficSpikeAlert(
|
||||
trafficSpikeSeverity(stat.Requests, cfg.MasterRPD),
|
||||
fmt.Sprintf("Master '%s' RPD threshold exceeded (%d >= %d)", masterName, stat.Requests, cfg.MasterRPD),
|
||||
fmt.Sprintf("Master '%s' (ID: %d) has %d requests in the last 24 hours, threshold is %d", masterName, stat.MasterID, stat.Requests, cfg.MasterRPD),
|
||||
stat.MasterID, "master", masterName,
|
||||
meta,
|
||||
)
|
||||
}
|
||||
|
||||
// TPD check
|
||||
totalTokens := stat.TokensIn + stat.TokensOut
|
||||
if totalTokens >= cfg.MasterTPD {
|
||||
meta := trafficSpikeMetadata{
|
||||
Metric: "master_tpd",
|
||||
Value: totalTokens,
|
||||
Threshold: cfg.MasterTPD,
|
||||
Window: "24h",
|
||||
}
|
||||
d.createTrafficSpikeAlert(
|
||||
trafficSpikeSeverity(totalTokens, cfg.MasterTPD),
|
||||
fmt.Sprintf("Master '%s' TPD threshold exceeded (%d >= %d)", masterName, totalTokens, cfg.MasterTPD),
|
||||
fmt.Sprintf("Master '%s' (ID: %d) used %d tokens in the last 24 hours, threshold is %d", masterName, stat.MasterID, totalTokens, cfg.MasterTPD),
|
||||
stat.MasterID, "master", masterName,
|
||||
meta,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createTrafficSpikeAlert creates a traffic_spike alert with metadata
|
||||
func (d *AlertDetector) createTrafficSpikeAlert(
|
||||
severity model.AlertSeverity,
|
||||
title, message string,
|
||||
relatedID uint,
|
||||
relatedType, relatedName string,
|
||||
meta trafficSpikeMetadata,
|
||||
) {
|
||||
fingerprint := fmt.Sprintf("%s:%s:%s:%d", model.AlertTypeTrafficSpike, meta.Metric, relatedType, relatedID)
|
||||
cooldownTime := time.Now().UTC().Add(-d.config.DeduplicationCooldown)
|
||||
|
||||
// Check for existing active alert with same fingerprint
|
||||
var count int64
|
||||
d.db.Model(&model.Alert{}).
|
||||
Where("fingerprint = ? AND status = ? AND created_at >= ?", fingerprint, model.AlertStatusActive, cooldownTime).
|
||||
Count(&count)
|
||||
|
||||
if count > 0 {
|
||||
return // Duplicate within cooldown
|
||||
}
|
||||
|
||||
alert := model.Alert{
|
||||
Type: model.AlertTypeTrafficSpike,
|
||||
Severity: severity,
|
||||
Status: model.AlertStatusActive,
|
||||
Title: title,
|
||||
Message: message,
|
||||
RelatedID: relatedID,
|
||||
RelatedType: relatedType,
|
||||
RelatedName: relatedName,
|
||||
Fingerprint: fingerprint,
|
||||
Metadata: meta.JSON(),
|
||||
}
|
||||
|
||||
if err := d.db.Create(&alert).Error; err != nil {
|
||||
d.logger.Warn("failed to create traffic spike alert", "metric", meta.Metric, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
d.logger.Info("traffic spike alert created", "metric", meta.Metric, "severity", severity, "title", title)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user