mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
feat(api): add internal alerts reporting endpoint with deduplication
Add ReportAlerts endpoint for Data Plane to report alerts to Control Plane with fingerprint-based deduplication using a 5-minute cooldown period. Changes: - Add POST /internal/alerts/report endpoint with validation - Add Fingerprint field to Alert model for deduplication - Extend GetAPIKeyStatsSummary with optional time range filtering using since/until query parameters to query from log records
This commit is contained in:
@@ -300,6 +300,7 @@ func main() {
|
|||||||
{
|
{
|
||||||
internalGroup.POST("/stats/flush", internalHandler.FlushStats)
|
internalGroup.POST("/stats/flush", internalHandler.FlushStats)
|
||||||
internalGroup.POST("/apikey-stats/flush", internalHandler.FlushAPIKeyStats)
|
internalGroup.POST("/apikey-stats/flush", internalHandler.FlushAPIKeyStats)
|
||||||
|
internalGroup.POST("/alerts/report", internalHandler.ReportAlerts)
|
||||||
internalGroup.GET("/metrics", gin.WrapH(expvar.Handler()))
|
internalGroup.GET("/metrics", gin.WrapH(expvar.Handler()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ package api
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ez-api/ez-api/internal/model"
|
"github.com/ez-api/ez-api/internal/model"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
@@ -21,6 +23,8 @@ type APIKeyStatsSummaryResponse struct {
|
|||||||
// @Tags admin
|
// @Tags admin
|
||||||
// @Produce json
|
// @Produce json
|
||||||
// @Security AdminAuth
|
// @Security AdminAuth
|
||||||
|
// @Param since query int false "Start time (unix seconds)"
|
||||||
|
// @Param until query int false "End time (unix seconds)"
|
||||||
// @Success 200 {object} APIKeyStatsSummaryResponse
|
// @Success 200 {object} APIKeyStatsSummaryResponse
|
||||||
// @Failure 500 {object} gin.H
|
// @Failure 500 {object} gin.H
|
||||||
// @Router /admin/apikey-stats/summary [get]
|
// @Router /admin/apikey-stats/summary [get]
|
||||||
@@ -30,6 +34,28 @@ func (h *AdminHandler) GetAPIKeyStatsSummary(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Parse optional time range parameters
|
||||||
|
var sinceTime, untilTime *time.Time
|
||||||
|
if sinceStr := c.Query("since"); sinceStr != "" {
|
||||||
|
if ts, err := strconv.ParseInt(sinceStr, 10, 64); err == nil {
|
||||||
|
t := time.Unix(ts, 0).UTC()
|
||||||
|
sinceTime = &t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if untilStr := c.Query("until"); untilStr != "" {
|
||||||
|
if ts, err := strconv.ParseInt(untilStr, 10, 64); err == nil {
|
||||||
|
t := time.Unix(ts, 0).UTC()
|
||||||
|
untilTime = &t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If time range specified, query from LogRecord table
|
||||||
|
if sinceTime != nil || untilTime != nil {
|
||||||
|
h.getAPIKeyStatsFromLogs(c, sinceTime, untilTime)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default: use aggregated stats from APIKey table (all-time)
|
||||||
var totals struct {
|
var totals struct {
|
||||||
TotalRequests int64 `json:"total_requests"`
|
TotalRequests int64 `json:"total_requests"`
|
||||||
SuccessRequests int64 `json:"success_requests"`
|
SuccessRequests int64 `json:"success_requests"`
|
||||||
@@ -65,3 +91,53 @@ func (h *AdminHandler) GetAPIKeyStatsSummary(c *gin.Context) {
|
|||||||
FailureRate: failureRate,
|
FailureRate: failureRate,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getAPIKeyStatsFromLogs calculates stats from log records for a specific time range
|
||||||
|
func (h *AdminHandler) getAPIKeyStatsFromLogs(c *gin.Context, sinceTime, untilTime *time.Time) {
|
||||||
|
q := h.logBaseQuery()
|
||||||
|
|
||||||
|
if sinceTime != nil {
|
||||||
|
q = q.Where("created_at >= ?", *sinceTime)
|
||||||
|
}
|
||||||
|
if untilTime != nil {
|
||||||
|
q = q.Where("created_at <= ?", *untilTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only count requests that went through a provider (provider_id > 0)
|
||||||
|
q = q.Where("provider_id > 0")
|
||||||
|
|
||||||
|
var result struct {
|
||||||
|
TotalRequests int64
|
||||||
|
SuccessRequests int64
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := q.Select(`
|
||||||
|
COUNT(*) as total_requests,
|
||||||
|
SUM(CASE WHEN status_code >= 200 AND status_code < 400 THEN 1 ELSE 0 END) as success_requests
|
||||||
|
`).Scan(&result).Error; err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to aggregate api key stats from logs", "details": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
total := result.TotalRequests
|
||||||
|
success := result.SuccessRequests
|
||||||
|
failure := total - success
|
||||||
|
if failure < 0 {
|
||||||
|
failure = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
var successRate float64
|
||||||
|
var failureRate float64
|
||||||
|
if total > 0 {
|
||||||
|
successRate = float64(success) / float64(total)
|
||||||
|
failureRate = float64(failure) / float64(total)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, APIKeyStatsSummaryResponse{
|
||||||
|
TotalRequests: total,
|
||||||
|
SuccessRequests: success,
|
||||||
|
FailureRequests: failure,
|
||||||
|
SuccessRate: successRate,
|
||||||
|
FailureRate: failureRate,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -251,3 +252,131 @@ func (h *InternalHandler) FlushAPIKeyStats(c *gin.Context) {
|
|||||||
|
|
||||||
c.JSON(http.StatusOK, gin.H{"updated": updated, "groups_updated": groupsUpdated})
|
c.JSON(http.StatusOK, gin.H{"updated": updated, "groups_updated": groupsUpdated})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Alert reporting types
|
||||||
|
|
||||||
|
type reportAlertEntry struct {
|
||||||
|
Type string `json:"type" binding:"required"`
|
||||||
|
Severity string `json:"severity" binding:"required"`
|
||||||
|
Title string `json:"title" binding:"required"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
RelatedID uint `json:"related_id"`
|
||||||
|
RelatedType string `json:"related_type"`
|
||||||
|
RelatedName string `json:"related_name"`
|
||||||
|
Fingerprint string `json:"fingerprint"`
|
||||||
|
Metadata string `json:"metadata"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type reportAlertsRequest struct {
|
||||||
|
Alerts []reportAlertEntry `json:"alerts" binding:"required"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type reportAlertsResponse struct {
|
||||||
|
Accepted int `json:"accepted"`
|
||||||
|
Deduplicated int `json:"deduplicated"`
|
||||||
|
Errors []string `json:"errors"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default cooldown period for alert deduplication
|
||||||
|
const alertDeduplicationCooldown = 5 * time.Minute
|
||||||
|
|
||||||
|
// ReportAlerts godoc
|
||||||
|
// @Summary Report alerts from DP
|
||||||
|
// @Description Internal endpoint for Data Plane to report alerts to Control Plane
|
||||||
|
// @Tags internal
|
||||||
|
// @Accept json
|
||||||
|
// @Produce json
|
||||||
|
// @Param request body reportAlertsRequest true "Alerts to report"
|
||||||
|
// @Success 200 {object} reportAlertsResponse
|
||||||
|
// @Failure 400 {object} gin.H
|
||||||
|
// @Failure 500 {object} gin.H
|
||||||
|
// @Router /internal/alerts/report [post]
|
||||||
|
func (h *InternalHandler) ReportAlerts(c *gin.Context) {
|
||||||
|
if h == nil || h.db == nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "database not configured"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var req reportAlertsRequest
|
||||||
|
if err := c.ShouldBindJSON(&req); err != nil {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body", "details": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(req.Alerts) == 0 {
|
||||||
|
c.JSON(http.StatusOK, reportAlertsResponse{Accepted: 0, Deduplicated: 0, Errors: []string{}})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Valid alert types and severities
|
||||||
|
validTypes := map[string]bool{
|
||||||
|
"rate_limit": true, "error_spike": true, "quota_exceeded": true,
|
||||||
|
"key_disabled": true, "key_expired": true, "provider_down": true,
|
||||||
|
}
|
||||||
|
validSeverities := map[string]bool{"info": true, "warning": true, "critical": true}
|
||||||
|
|
||||||
|
accepted := 0
|
||||||
|
deduplicated := 0
|
||||||
|
var errors []string
|
||||||
|
|
||||||
|
cooldownTime := time.Now().UTC().Add(-alertDeduplicationCooldown)
|
||||||
|
|
||||||
|
for i, entry := range req.Alerts {
|
||||||
|
// Validate type
|
||||||
|
if !validTypes[entry.Type] {
|
||||||
|
errors = append(errors, fmt.Sprintf("alert %d: invalid type '%s'", i, entry.Type))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate severity
|
||||||
|
if !validSeverities[entry.Severity] {
|
||||||
|
errors = append(errors, fmt.Sprintf("alert %d: invalid severity '%s'", i, entry.Severity))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate fingerprint if not provided
|
||||||
|
fingerprint := strings.TrimSpace(entry.Fingerprint)
|
||||||
|
if fingerprint == "" && entry.RelatedType != "" {
|
||||||
|
fingerprint = fmt.Sprintf("%s:%s:%d", entry.Type, entry.RelatedType, entry.RelatedID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for deduplication if fingerprint is present
|
||||||
|
if fingerprint != "" {
|
||||||
|
var existingCount int64
|
||||||
|
h.db.Model(&model.Alert{}).
|
||||||
|
Where("fingerprint = ? AND status = ? AND created_at >= ?", fingerprint, model.AlertStatusActive, cooldownTime).
|
||||||
|
Count(&existingCount)
|
||||||
|
if existingCount > 0 {
|
||||||
|
deduplicated++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the alert
|
||||||
|
alert := model.Alert{
|
||||||
|
Type: model.AlertType(entry.Type),
|
||||||
|
Severity: model.AlertSeverity(entry.Severity),
|
||||||
|
Status: model.AlertStatusActive,
|
||||||
|
Title: strings.TrimSpace(entry.Title),
|
||||||
|
Message: strings.TrimSpace(entry.Message),
|
||||||
|
RelatedID: entry.RelatedID,
|
||||||
|
RelatedType: strings.TrimSpace(entry.RelatedType),
|
||||||
|
RelatedName: strings.TrimSpace(entry.RelatedName),
|
||||||
|
Fingerprint: fingerprint,
|
||||||
|
Metadata: entry.Metadata,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.db.Create(&alert).Error; err != nil {
|
||||||
|
errors = append(errors, fmt.Sprintf("alert %d: failed to create - %s", i, err.Error()))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
accepted++
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, reportAlertsResponse{
|
||||||
|
Accepted: accepted,
|
||||||
|
Deduplicated: deduplicated,
|
||||||
|
Errors: errors,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@@ -48,7 +48,8 @@ type Alert struct {
|
|||||||
RelatedID uint `gorm:"index" json:"related_id,omitempty"`
|
RelatedID uint `gorm:"index" json:"related_id,omitempty"`
|
||||||
RelatedType string `gorm:"size:50" json:"related_type,omitempty"` // master, key, apikey, provider_group
|
RelatedType string `gorm:"size:50" json:"related_type,omitempty"` // master, key, apikey, provider_group
|
||||||
RelatedName string `gorm:"size:255" json:"related_name,omitempty"`
|
RelatedName string `gorm:"size:255" json:"related_name,omitempty"`
|
||||||
Metadata string `gorm:"type:text" json:"metadata,omitempty"` // JSON encoded additional data
|
Fingerprint string `gorm:"size:255;index" json:"fingerprint,omitempty"` // For deduplication: type:related_type:related_id
|
||||||
|
Metadata string `gorm:"type:text" json:"metadata,omitempty"` // JSON encoded additional data
|
||||||
AckedAt *time.Time `json:"acked_at,omitempty"`
|
AckedAt *time.Time `json:"acked_at,omitempty"`
|
||||||
AckedBy string `gorm:"size:100" json:"acked_by,omitempty"`
|
AckedBy string `gorm:"size:100" json:"acked_by,omitempty"`
|
||||||
ResolvedAt *time.Time `json:"resolved_at,omitempty"`
|
ResolvedAt *time.Time `json:"resolved_at,omitempty"`
|
||||||
|
|||||||
Reference in New Issue
Block a user