Files
ez-api/internal/api/internal_handler.go
zenfun bfba16bbd4 feat(api): add internal alerts reporting endpoint with deduplication
Add ReportAlerts endpoint for Data Plane to report alerts to Control Plane
with fingerprint-based deduplication using a 5-minute cooldown period.

Changes:
- Add POST /internal/alerts/report endpoint with validation
- Add Fingerprint field to Alert model for deduplication
- Extend GetAPIKeyStatsSummary with optional time range filtering
  using since/until query parameters to query from log records
2025-12-31 14:18:09 +08:00

383 lines
11 KiB
Go

package api
import (
"fmt"
"net/http"
"strings"
"time"
"github.com/ez-api/ez-api/internal/model"
"github.com/gin-gonic/gin"
"gorm.io/gorm"
)
type InternalHandler struct {
db *gorm.DB
}
func NewInternalHandler(db *gorm.DB) *InternalHandler {
return &InternalHandler{db: db}
}
type statsFlushRequest struct {
Keys []statsFlushEntry `json:"keys"`
}
type statsFlushEntry struct {
TokenHash string `json:"token_hash"`
Requests int64 `json:"requests"`
Tokens int64 `json:"tokens"`
LastAccessedAt int64 `json:"last_accessed_at"`
}
type apiKeyStatsFlushRequest struct {
Keys []apiKeyStatsFlushEntry `json:"keys"`
}
type apiKeyStatsFlushEntry struct {
APIKeyID uint `json:"api_key_id"`
Requests int64 `json:"requests"`
SuccessRequests int64 `json:"success_requests"`
}
// FlushStats godoc
// @Summary Flush key stats
// @Description Internal endpoint for flushing accumulated key usage stats from DP to CP database
// @Tags internal
// @Accept json
// @Produce json
// @Param request body statsFlushRequest true "Stats to flush"
// @Success 200 {object} gin.H
// @Failure 400 {object} gin.H
// @Failure 500 {object} gin.H
// @Router /internal/stats/flush [post]
func (h *InternalHandler) FlushStats(c *gin.Context) {
if h == nil || h.db == nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "database not configured"})
return
}
var req statsFlushRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
if len(req.Keys) == 0 {
c.JSON(http.StatusOK, gin.H{"updated": 0})
return
}
updated := 0
err := h.db.Transaction(func(tx *gorm.DB) error {
for _, entry := range req.Keys {
hash := strings.TrimSpace(entry.TokenHash)
if hash == "" {
continue
}
if entry.Requests < 0 || entry.Tokens < 0 || entry.LastAccessedAt < 0 {
return gorm.ErrInvalidData
}
updates := map[string]any{}
if entry.Requests > 0 {
updates["request_count"] = gorm.Expr("request_count + ?", entry.Requests)
}
if entry.Tokens > 0 {
updates["used_tokens"] = gorm.Expr("used_tokens + ?", entry.Tokens)
updates["quota_used"] = gorm.Expr("CASE WHEN quota_limit >= 0 THEN quota_used + ? ELSE quota_used END", entry.Tokens)
}
if entry.LastAccessedAt > 0 {
accessTime := time.Unix(entry.LastAccessedAt, 0).UTC()
updates["last_accessed_at"] = gorm.Expr("CASE WHEN last_accessed_at IS NULL OR last_accessed_at < ? THEN ? ELSE last_accessed_at END", accessTime, accessTime)
}
if len(updates) == 0 {
continue
}
res := tx.Model(&model.Key{}).Where("token_hash = ?", hash).Updates(updates)
if res.Error != nil {
return res.Error
}
if res.RowsAffected > 0 {
updated++
}
}
return nil
})
if err != nil {
if err == gorm.ErrInvalidData {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid stats payload"})
return
}
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to flush stats", "details": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"updated": updated})
}
// FlushAPIKeyStats godoc
// @Summary Flush API key stats
// @Description Internal endpoint for flushing accumulated APIKey stats from DP to CP database
// @Tags internal
// @Accept json
// @Produce json
// @Param request body apiKeyStatsFlushRequest true "Stats to flush"
// @Success 200 {object} gin.H
// @Failure 400 {object} gin.H
// @Failure 500 {object} gin.H
// @Router /internal/apikey-stats/flush [post]
func (h *InternalHandler) FlushAPIKeyStats(c *gin.Context) {
if h == nil || h.db == nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "database not configured"})
return
}
var req apiKeyStatsFlushRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
if len(req.Keys) == 0 {
c.JSON(http.StatusOK, gin.H{"updated": 0, "groups_updated": 0})
return
}
type apiKeyDelta struct {
requests int64
success int64
}
deltas := make(map[uint]apiKeyDelta, len(req.Keys))
for _, entry := range req.Keys {
if entry.APIKeyID == 0 {
continue
}
if entry.Requests < 0 || entry.SuccessRequests < 0 || entry.SuccessRequests > entry.Requests {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid stats payload"})
return
}
if entry.Requests == 0 && entry.SuccessRequests == 0 {
continue
}
delta := deltas[entry.APIKeyID]
delta.requests += entry.Requests
delta.success += entry.SuccessRequests
deltas[entry.APIKeyID] = delta
}
if len(deltas) == 0 {
c.JSON(http.StatusOK, gin.H{"updated": 0, "groups_updated": 0})
return
}
ids := make([]uint, 0, len(deltas))
for id := range deltas {
ids = append(ids, id)
}
var apiKeys []model.APIKey
if err := h.db.Model(&model.APIKey{}).Select("id, group_id").Where("id IN ?", ids).Find(&apiKeys).Error; err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load api keys", "details": err.Error()})
return
}
groupByKey := make(map[uint]uint, len(apiKeys))
for _, key := range apiKeys {
groupByKey[key.ID] = key.GroupID
}
statsUpdates := func(requests, success int64) map[string]any {
return map[string]any{
"total_requests": gorm.Expr("total_requests + ?", requests),
"success_requests": gorm.Expr("success_requests + ?", success),
"failure_requests": gorm.Expr("(total_requests + ?) - (success_requests + ?)", requests, success),
"success_rate": gorm.Expr(
"CASE WHEN (total_requests + ?) > 0 THEN (success_requests + ?) * 1.0 / (total_requests + ?) ELSE 0 END",
requests, success, requests,
),
"failure_rate": gorm.Expr(
"CASE WHEN (total_requests + ?) > 0 THEN ((total_requests + ?) - (success_requests + ?)) * 1.0 / (total_requests + ?) ELSE 0 END",
requests, requests, success, requests,
),
}
}
updated := 0
groupsUpdated := 0
groupDeltas := make(map[uint]apiKeyDelta)
err := h.db.Transaction(func(tx *gorm.DB) error {
for id, delta := range deltas {
groupID, ok := groupByKey[id]
if !ok {
continue
}
if delta.requests == 0 && delta.success == 0 {
continue
}
res := tx.Model(&model.APIKey{}).Where("id = ?", id).Updates(statsUpdates(delta.requests, delta.success))
if res.Error != nil {
return res.Error
}
if res.RowsAffected > 0 {
updated++
}
if groupID > 0 {
groupDelta := groupDeltas[groupID]
groupDelta.requests += delta.requests
groupDelta.success += delta.success
groupDeltas[groupID] = groupDelta
}
}
for groupID, delta := range groupDeltas {
if delta.requests == 0 && delta.success == 0 {
continue
}
res := tx.Model(&model.ProviderGroup{}).Where("id = ?", groupID).Updates(statsUpdates(delta.requests, delta.success))
if res.Error != nil {
return res.Error
}
if res.RowsAffected > 0 {
groupsUpdated++
}
}
return nil
})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to flush api key stats", "details": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"updated": updated, "groups_updated": groupsUpdated})
}
// Alert reporting types
type reportAlertEntry struct {
Type string `json:"type" binding:"required"`
Severity string `json:"severity" binding:"required"`
Title string `json:"title" binding:"required"`
Message string `json:"message"`
RelatedID uint `json:"related_id"`
RelatedType string `json:"related_type"`
RelatedName string `json:"related_name"`
Fingerprint string `json:"fingerprint"`
Metadata string `json:"metadata"`
}
type reportAlertsRequest struct {
Alerts []reportAlertEntry `json:"alerts" binding:"required"`
}
type reportAlertsResponse struct {
Accepted int `json:"accepted"`
Deduplicated int `json:"deduplicated"`
Errors []string `json:"errors"`
}
// Default cooldown period for alert deduplication
const alertDeduplicationCooldown = 5 * time.Minute
// ReportAlerts godoc
// @Summary Report alerts from DP
// @Description Internal endpoint for Data Plane to report alerts to Control Plane
// @Tags internal
// @Accept json
// @Produce json
// @Param request body reportAlertsRequest true "Alerts to report"
// @Success 200 {object} reportAlertsResponse
// @Failure 400 {object} gin.H
// @Failure 500 {object} gin.H
// @Router /internal/alerts/report [post]
func (h *InternalHandler) ReportAlerts(c *gin.Context) {
if h == nil || h.db == nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "database not configured"})
return
}
var req reportAlertsRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body", "details": err.Error()})
return
}
if len(req.Alerts) == 0 {
c.JSON(http.StatusOK, reportAlertsResponse{Accepted: 0, Deduplicated: 0, Errors: []string{}})
return
}
// Valid alert types and severities
validTypes := map[string]bool{
"rate_limit": true, "error_spike": true, "quota_exceeded": true,
"key_disabled": true, "key_expired": true, "provider_down": true,
}
validSeverities := map[string]bool{"info": true, "warning": true, "critical": true}
accepted := 0
deduplicated := 0
var errors []string
cooldownTime := time.Now().UTC().Add(-alertDeduplicationCooldown)
for i, entry := range req.Alerts {
// Validate type
if !validTypes[entry.Type] {
errors = append(errors, fmt.Sprintf("alert %d: invalid type '%s'", i, entry.Type))
continue
}
// Validate severity
if !validSeverities[entry.Severity] {
errors = append(errors, fmt.Sprintf("alert %d: invalid severity '%s'", i, entry.Severity))
continue
}
// Generate fingerprint if not provided
fingerprint := strings.TrimSpace(entry.Fingerprint)
if fingerprint == "" && entry.RelatedType != "" {
fingerprint = fmt.Sprintf("%s:%s:%d", entry.Type, entry.RelatedType, entry.RelatedID)
}
// Check for deduplication if fingerprint is present
if fingerprint != "" {
var existingCount int64
h.db.Model(&model.Alert{}).
Where("fingerprint = ? AND status = ? AND created_at >= ?", fingerprint, model.AlertStatusActive, cooldownTime).
Count(&existingCount)
if existingCount > 0 {
deduplicated++
continue
}
}
// Create the alert
alert := model.Alert{
Type: model.AlertType(entry.Type),
Severity: model.AlertSeverity(entry.Severity),
Status: model.AlertStatusActive,
Title: strings.TrimSpace(entry.Title),
Message: strings.TrimSpace(entry.Message),
RelatedID: entry.RelatedID,
RelatedType: strings.TrimSpace(entry.RelatedType),
RelatedName: strings.TrimSpace(entry.RelatedName),
Fingerprint: fingerprint,
Metadata: entry.Metadata,
}
if err := h.db.Create(&alert).Error; err != nil {
errors = append(errors, fmt.Sprintf("alert %d: failed to create - %s", i, err.Error()))
continue
}
accepted++
}
c.JSON(http.StatusOK, reportAlertsResponse{
Accepted: accepted,
Deduplicated: deduplicated,
Errors: errors,
})
}