package api import ( "fmt" "net/http" "strings" "time" "github.com/ez-api/ez-api/internal/model" "github.com/gin-gonic/gin" "gorm.io/gorm" ) type InternalHandler struct { db *gorm.DB } func NewInternalHandler(db *gorm.DB) *InternalHandler { return &InternalHandler{db: db} } type statsFlushRequest struct { Keys []statsFlushEntry `json:"keys"` } type statsFlushEntry struct { TokenHash string `json:"token_hash"` Requests int64 `json:"requests"` Tokens int64 `json:"tokens"` LastAccessedAt int64 `json:"last_accessed_at"` } type apiKeyStatsFlushRequest struct { Keys []apiKeyStatsFlushEntry `json:"keys"` } type apiKeyStatsFlushEntry struct { APIKeyID uint `json:"api_key_id"` Requests int64 `json:"requests"` SuccessRequests int64 `json:"success_requests"` } // FlushStats godoc // @Summary Flush key stats // @Description Internal endpoint for flushing accumulated key usage stats from DP to CP database // @Tags internal // @Accept json // @Produce json // @Param request body statsFlushRequest true "Stats to flush" // @Success 200 {object} ResponseEnvelope{data=MapData} // @Failure 400 {object} ResponseEnvelope{data=MapData} // @Failure 500 {object} ResponseEnvelope{data=MapData} // @Router /internal/stats/flush [post] func (h *InternalHandler) FlushStats(c *gin.Context) { if h == nil || h.db == nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "database not configured"}) return } var req statsFlushRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) return } if len(req.Keys) == 0 { c.JSON(http.StatusOK, gin.H{"updated": 0}) return } updated := 0 err := h.db.Transaction(func(tx *gorm.DB) error { for _, entry := range req.Keys { hash := strings.TrimSpace(entry.TokenHash) if hash == "" { continue } if entry.Requests < 0 || entry.Tokens < 0 || entry.LastAccessedAt < 0 { return gorm.ErrInvalidData } updates := map[string]any{} if entry.Requests > 0 { updates["request_count"] = gorm.Expr("request_count + ?", entry.Requests) } if entry.Tokens > 0 { updates["used_tokens"] = gorm.Expr("used_tokens + ?", entry.Tokens) updates["quota_used"] = gorm.Expr("CASE WHEN quota_limit >= 0 THEN quota_used + ? ELSE quota_used END", entry.Tokens) } if entry.LastAccessedAt > 0 { accessTime := time.Unix(entry.LastAccessedAt, 0).UTC() updates["last_accessed_at"] = gorm.Expr("CASE WHEN last_accessed_at IS NULL OR last_accessed_at < ? THEN ? ELSE last_accessed_at END", accessTime, accessTime) } if len(updates) == 0 { continue } res := tx.Model(&model.Key{}).Where("token_hash = ?", hash).Updates(updates) if res.Error != nil { return res.Error } if res.RowsAffected > 0 { updated++ } } return nil }) if err != nil { if err == gorm.ErrInvalidData { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid stats payload"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to flush stats", "details": err.Error()}) return } c.JSON(http.StatusOK, gin.H{"updated": updated}) } // FlushAPIKeyStats godoc // @Summary Flush API key stats // @Description Internal endpoint for flushing accumulated APIKey stats from DP to CP database // @Tags internal // @Accept json // @Produce json // @Param request body apiKeyStatsFlushRequest true "Stats to flush" // @Success 200 {object} ResponseEnvelope{data=MapData} // @Failure 400 {object} ResponseEnvelope{data=MapData} // @Failure 500 {object} ResponseEnvelope{data=MapData} // @Router /internal/apikey-stats/flush [post] func (h *InternalHandler) FlushAPIKeyStats(c *gin.Context) { if h == nil || h.db == nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "database not configured"}) return } var req apiKeyStatsFlushRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) return } if len(req.Keys) == 0 { c.JSON(http.StatusOK, gin.H{"updated": 0, "groups_updated": 0}) return } type apiKeyDelta struct { requests int64 success int64 } deltas := make(map[uint]apiKeyDelta, len(req.Keys)) for _, entry := range req.Keys { if entry.APIKeyID == 0 { continue } if entry.Requests < 0 || entry.SuccessRequests < 0 || entry.SuccessRequests > entry.Requests { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid stats payload"}) return } if entry.Requests == 0 && entry.SuccessRequests == 0 { continue } delta := deltas[entry.APIKeyID] delta.requests += entry.Requests delta.success += entry.SuccessRequests deltas[entry.APIKeyID] = delta } if len(deltas) == 0 { c.JSON(http.StatusOK, gin.H{"updated": 0, "groups_updated": 0}) return } ids := make([]uint, 0, len(deltas)) for id := range deltas { ids = append(ids, id) } var apiKeys []model.APIKey if err := h.db.Model(&model.APIKey{}).Select("id, group_id").Where("id IN ?", ids).Find(&apiKeys).Error; err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load api keys", "details": err.Error()}) return } groupByKey := make(map[uint]uint, len(apiKeys)) for _, key := range apiKeys { groupByKey[key.ID] = key.GroupID } statsUpdates := func(requests, success int64) map[string]any { return map[string]any{ "total_requests": gorm.Expr("total_requests + ?", requests), "success_requests": gorm.Expr("success_requests + ?", success), "failure_requests": gorm.Expr("(total_requests + ?) - (success_requests + ?)", requests, success), "success_rate": gorm.Expr( "CASE WHEN (total_requests + ?) > 0 THEN (success_requests + ?) * 1.0 / (total_requests + ?) ELSE 0 END", requests, success, requests, ), "failure_rate": gorm.Expr( "CASE WHEN (total_requests + ?) > 0 THEN ((total_requests + ?) - (success_requests + ?)) * 1.0 / (total_requests + ?) ELSE 0 END", requests, requests, success, requests, ), } } updated := 0 groupsUpdated := 0 groupDeltas := make(map[uint]apiKeyDelta) err := h.db.Transaction(func(tx *gorm.DB) error { for id, delta := range deltas { groupID, ok := groupByKey[id] if !ok { continue } if delta.requests == 0 && delta.success == 0 { continue } res := tx.Model(&model.APIKey{}).Where("id = ?", id).Updates(statsUpdates(delta.requests, delta.success)) if res.Error != nil { return res.Error } if res.RowsAffected > 0 { updated++ } if groupID > 0 { groupDelta := groupDeltas[groupID] groupDelta.requests += delta.requests groupDelta.success += delta.success groupDeltas[groupID] = groupDelta } } for groupID, delta := range groupDeltas { if delta.requests == 0 && delta.success == 0 { continue } res := tx.Model(&model.ProviderGroup{}).Where("id = ?", groupID).Updates(statsUpdates(delta.requests, delta.success)) if res.Error != nil { return res.Error } if res.RowsAffected > 0 { groupsUpdated++ } } return nil }) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to flush api key stats", "details": err.Error()}) return } c.JSON(http.StatusOK, gin.H{"updated": updated, "groups_updated": groupsUpdated}) } // Alert reporting types type reportAlertEntry struct { Type string `json:"type" binding:"required"` Severity string `json:"severity" binding:"required"` Title string `json:"title" binding:"required"` Message string `json:"message"` RelatedID uint `json:"related_id"` RelatedType string `json:"related_type"` RelatedName string `json:"related_name"` Fingerprint string `json:"fingerprint"` Metadata string `json:"metadata"` } type reportAlertsRequest struct { Alerts []reportAlertEntry `json:"alerts" binding:"required"` } type reportAlertsResponse struct { Accepted int `json:"accepted"` Deduplicated int `json:"deduplicated"` Errors []string `json:"errors"` } // Default cooldown period for alert deduplication const alertDeduplicationCooldown = 5 * time.Minute // ReportAlerts godoc // @Summary Report alerts from DP // @Description Internal endpoint for Data Plane to report alerts to Control Plane // @Tags internal // @Accept json // @Produce json // @Param request body reportAlertsRequest true "Alerts to report" // @Success 200 {object} ResponseEnvelope{data=reportAlertsResponse} // @Failure 400 {object} ResponseEnvelope{data=MapData} // @Failure 500 {object} ResponseEnvelope{data=MapData} // @Router /internal/alerts/report [post] func (h *InternalHandler) ReportAlerts(c *gin.Context) { if h == nil || h.db == nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "database not configured"}) return } var req reportAlertsRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body", "details": err.Error()}) return } if len(req.Alerts) == 0 { c.JSON(http.StatusOK, reportAlertsResponse{Accepted: 0, Deduplicated: 0, Errors: []string{}}) return } // Valid alert types and severities validTypes := map[string]bool{ "rate_limit": true, "error_spike": true, "quota_exceeded": true, "key_disabled": true, "key_expired": true, "provider_down": true, } validSeverities := map[string]bool{"info": true, "warning": true, "critical": true} accepted := 0 deduplicated := 0 var errors []string cooldownTime := time.Now().UTC().Add(-alertDeduplicationCooldown) for i, entry := range req.Alerts { // Validate type if !validTypes[entry.Type] { errors = append(errors, fmt.Sprintf("alert %d: invalid type '%s'", i, entry.Type)) continue } // Validate severity if !validSeverities[entry.Severity] { errors = append(errors, fmt.Sprintf("alert %d: invalid severity '%s'", i, entry.Severity)) continue } // Generate fingerprint if not provided fingerprint := strings.TrimSpace(entry.Fingerprint) if fingerprint == "" && entry.RelatedType != "" { fingerprint = fmt.Sprintf("%s:%s:%d", entry.Type, entry.RelatedType, entry.RelatedID) } // Check for deduplication if fingerprint is present if fingerprint != "" { var existingCount int64 h.db.Model(&model.Alert{}). Where("fingerprint = ? AND status = ? AND created_at >= ?", fingerprint, model.AlertStatusActive, cooldownTime). Count(&existingCount) if existingCount > 0 { deduplicated++ continue } } // Create the alert alert := model.Alert{ Type: model.AlertType(entry.Type), Severity: model.AlertSeverity(entry.Severity), Status: model.AlertStatusActive, Title: strings.TrimSpace(entry.Title), Message: strings.TrimSpace(entry.Message), RelatedID: entry.RelatedID, RelatedType: strings.TrimSpace(entry.RelatedType), RelatedName: strings.TrimSpace(entry.RelatedName), Fingerprint: fingerprint, Metadata: entry.Metadata, } if err := h.db.Create(&alert).Error; err != nil { errors = append(errors, fmt.Sprintf("alert %d: failed to create - %s", i, err.Error())) continue } accepted++ } c.JSON(http.StatusOK, reportAlertsResponse{ Accepted: accepted, Deduplicated: deduplicated, Errors: errors, }) }