mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
feat(api): add log webhook notification service
Implement webhook notifications for log error threshold alerts with configurable thresholds, time windows, and cooldown periods. - Add LogWebhookService with Redis-backed configuration storage - Add admin endpoints for webhook config management (GET/PUT) - Trigger webhook notifications when error count exceeds threshold - Support status code threshold and error message detection - Include sample log record data in webhook payload
This commit is contained in:
@@ -218,6 +218,8 @@ func main() {
|
||||
adminGroup.GET("/logs", handler.ListLogs)
|
||||
adminGroup.DELETE("/logs", handler.DeleteLogs)
|
||||
adminGroup.GET("/logs/stats", handler.LogStats)
|
||||
adminGroup.GET("/logs/webhook", handler.GetLogWebhookConfig)
|
||||
adminGroup.PUT("/logs/webhook", handler.UpdateLogWebhookConfig)
|
||||
adminGroup.GET("/stats", adminHandler.GetAdminStats)
|
||||
adminGroup.POST("/bindings", handler.CreateBinding)
|
||||
adminGroup.GET("/bindings", handler.ListBindings)
|
||||
|
||||
@@ -21,10 +21,17 @@ type Handler struct {
|
||||
sync *service.SyncService
|
||||
logger *service.LogWriter
|
||||
rdb *redis.Client
|
||||
logWebhook *service.LogWebhookService
|
||||
}
|
||||
|
||||
func NewHandler(db *gorm.DB, sync *service.SyncService, logger *service.LogWriter, rdb *redis.Client) *Handler {
|
||||
return &Handler{db: db, sync: sync, logger: logger, rdb: rdb}
|
||||
return &Handler{
|
||||
db: db,
|
||||
sync: sync,
|
||||
logger: logger,
|
||||
rdb: rdb,
|
||||
logWebhook: service.NewLogWebhookService(rdb),
|
||||
}
|
||||
}
|
||||
|
||||
// CreateKey is now handled by MasterHandler
|
||||
@@ -471,7 +478,13 @@ func (h *Handler) IngestLog(c *gin.Context) {
|
||||
}
|
||||
|
||||
// By default, only metadata is expected; payload fields may be empty.
|
||||
if h.logger != nil {
|
||||
h.logger.Write(rec)
|
||||
}
|
||||
if h.logWebhook != nil {
|
||||
recCopy := rec
|
||||
go h.logWebhook.NotifyIfNeeded(context.Background(), recCopy)
|
||||
}
|
||||
c.JSON(http.StatusAccepted, gin.H{"status": "queued"})
|
||||
}
|
||||
|
||||
|
||||
60
internal/api/log_webhook_handler.go
Normal file
60
internal/api/log_webhook_handler.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/ez-api/ez-api/internal/service"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// GetLogWebhookConfig godoc
|
||||
// @Summary Get log webhook config
|
||||
// @Description Returns current webhook notification config
|
||||
// @Tags admin
|
||||
// @Produce json
|
||||
// @Security AdminAuth
|
||||
// @Success 200 {object} service.LogWebhookConfig
|
||||
// @Failure 500 {object} gin.H
|
||||
// @Router /admin/logs/webhook [get]
|
||||
func (h *Handler) GetLogWebhookConfig(c *gin.Context) {
|
||||
if h == nil || h.logWebhook == nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "webhook service not configured"})
|
||||
return
|
||||
}
|
||||
cfg, err := h.logWebhook.GetConfig(c.Request.Context())
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read webhook config", "details": err.Error()})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, cfg)
|
||||
}
|
||||
|
||||
// UpdateLogWebhookConfig godoc
|
||||
// @Summary Update log webhook config
|
||||
// @Description Updates webhook notification config
|
||||
// @Tags admin
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Security AdminAuth
|
||||
// @Param request body service.LogWebhookConfig true "Webhook config"
|
||||
// @Success 200 {object} service.LogWebhookConfig
|
||||
// @Failure 400 {object} gin.H
|
||||
// @Failure 500 {object} gin.H
|
||||
// @Router /admin/logs/webhook [put]
|
||||
func (h *Handler) UpdateLogWebhookConfig(c *gin.Context) {
|
||||
if h == nil || h.logWebhook == nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "webhook service not configured"})
|
||||
return
|
||||
}
|
||||
var req service.LogWebhookConfig
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
cfg, err := h.logWebhook.SetConfig(c.Request.Context(), req)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, cfg)
|
||||
}
|
||||
112
internal/api/log_webhook_handler_test.go
Normal file
112
internal/api/log_webhook_handler_test.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/alicebob/miniredis/v2"
|
||||
"github.com/ez-api/ez-api/internal/model"
|
||||
"github.com/ez-api/ez-api/internal/service"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func newTestHandlerWithWebhook(t *testing.T) (*Handler, *miniredis.Miniredis) {
|
||||
t.Helper()
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
dsn := fmt.Sprintf("file:%s?mode=memory&cache=shared", t.Name())
|
||||
db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{})
|
||||
if err != nil {
|
||||
t.Fatalf("open sqlite: %v", err)
|
||||
}
|
||||
if err := db.AutoMigrate(&model.LogRecord{}); err != nil {
|
||||
t.Fatalf("migrate: %v", err)
|
||||
}
|
||||
|
||||
mr := miniredis.RunT(t)
|
||||
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
|
||||
sync := service.NewSyncService(rdb)
|
||||
return NewHandler(db, sync, nil, rdb), mr
|
||||
}
|
||||
|
||||
func TestLogWebhookConfigCRUD(t *testing.T) {
|
||||
h, _ := newTestHandlerWithWebhook(t)
|
||||
|
||||
r := gin.New()
|
||||
r.GET("/admin/logs/webhook", h.GetLogWebhookConfig)
|
||||
r.PUT("/admin/logs/webhook", h.UpdateLogWebhookConfig)
|
||||
|
||||
reqBody := service.LogWebhookConfig{
|
||||
Enabled: true,
|
||||
URL: "https://example.com/webhook",
|
||||
Secret: "s1",
|
||||
Threshold: 3,
|
||||
WindowSeconds: 60,
|
||||
CooldownSeconds: 120,
|
||||
StatusCodeThreshold: 500,
|
||||
}
|
||||
payload, _ := json.Marshal(reqBody)
|
||||
|
||||
req := httptest.NewRequest(http.MethodPut, "/admin/logs/webhook", bytes.NewReader(payload))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
rr := httptest.NewRecorder()
|
||||
r.ServeHTTP(rr, req)
|
||||
if rr.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d body=%s", rr.Code, rr.Body.String())
|
||||
}
|
||||
|
||||
getReq := httptest.NewRequest(http.MethodGet, "/admin/logs/webhook", nil)
|
||||
getRR := httptest.NewRecorder()
|
||||
r.ServeHTTP(getRR, getReq)
|
||||
if getRR.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d body=%s", getRR.Code, getRR.Body.String())
|
||||
}
|
||||
|
||||
var got service.LogWebhookConfig
|
||||
if err := json.Unmarshal(getRR.Body.Bytes(), &got); err != nil {
|
||||
t.Fatalf("decode response: %v", err)
|
||||
}
|
||||
if !got.Enabled || got.URL == "" || got.Threshold != 3 {
|
||||
t.Fatalf("unexpected webhook config: %+v", got)
|
||||
}
|
||||
if got.Secret != "s1" {
|
||||
t.Fatalf("expected secret s1, got %q", got.Secret)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogWebhookConfigDisableClears(t *testing.T) {
|
||||
h, mr := newTestHandlerWithWebhook(t)
|
||||
|
||||
r := gin.New()
|
||||
r.PUT("/admin/logs/webhook", h.UpdateLogWebhookConfig)
|
||||
|
||||
seed := service.LogWebhookConfig{Enabled: true, URL: "https://example.com"}
|
||||
payload, _ := json.Marshal(seed)
|
||||
req := httptest.NewRequest(http.MethodPut, "/admin/logs/webhook", bytes.NewReader(payload))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
rr := httptest.NewRecorder()
|
||||
r.ServeHTTP(rr, req)
|
||||
if rr.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d body=%s", rr.Code, rr.Body.String())
|
||||
}
|
||||
|
||||
clear := service.LogWebhookConfig{Enabled: false}
|
||||
clearPayload, _ := json.Marshal(clear)
|
||||
clearReq := httptest.NewRequest(http.MethodPut, "/admin/logs/webhook", bytes.NewReader(clearPayload))
|
||||
clearReq.Header.Set("Content-Type", "application/json")
|
||||
clearRR := httptest.NewRecorder()
|
||||
r.ServeHTTP(clearRR, clearReq)
|
||||
if clearRR.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d body=%s", clearRR.Code, clearRR.Body.String())
|
||||
}
|
||||
if mr.Exists("meta:log:webhook") {
|
||||
t.Fatalf("expected webhook config to be cleared")
|
||||
}
|
||||
}
|
||||
252
internal/service/log_webhook.go
Normal file
252
internal/service/log_webhook.go
Normal file
@@ -0,0 +1,252 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ez-api/ez-api/internal/model"
|
||||
"github.com/ez-api/foundation/jsoncodec"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
const (
|
||||
logWebhookConfigKey = "meta:log:webhook"
|
||||
logWebhookErrorsKey = "meta:log:webhook:errors"
|
||||
logWebhookCooldownKey = "meta:log:webhook:cooldown"
|
||||
defaultWebhookThreshold = 10
|
||||
defaultWebhookWindow = 60
|
||||
defaultWebhookCooldown = 300
|
||||
defaultWebhookStatusCode = 500
|
||||
)
|
||||
|
||||
type LogWebhookConfig struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
URL string `json:"url"`
|
||||
Secret string `json:"secret"`
|
||||
Threshold int `json:"threshold"`
|
||||
WindowSeconds int `json:"window_seconds"`
|
||||
CooldownSeconds int `json:"cooldown_seconds"`
|
||||
StatusCodeThreshold int `json:"status_code_threshold"`
|
||||
}
|
||||
|
||||
type LogWebhookService struct {
|
||||
rdb *redis.Client
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
func NewLogWebhookService(rdb *redis.Client) *LogWebhookService {
|
||||
if rdb == nil {
|
||||
return nil
|
||||
}
|
||||
return &LogWebhookService{
|
||||
rdb: rdb,
|
||||
client: &http.Client{Timeout: 3 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultLogWebhookConfig() LogWebhookConfig {
|
||||
return LogWebhookConfig{
|
||||
Enabled: false,
|
||||
Threshold: defaultWebhookThreshold,
|
||||
WindowSeconds: defaultWebhookWindow,
|
||||
CooldownSeconds: defaultWebhookCooldown,
|
||||
StatusCodeThreshold: defaultWebhookStatusCode,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LogWebhookService) GetConfig(ctx context.Context) (LogWebhookConfig, error) {
|
||||
cfg := DefaultLogWebhookConfig()
|
||||
if s == nil || s.rdb == nil {
|
||||
return cfg, nil
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
raw, err := s.rdb.Get(ctx, logWebhookConfigKey).Result()
|
||||
if err == redis.Nil {
|
||||
return cfg, nil
|
||||
}
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
if err := jsoncodec.Unmarshal([]byte(raw), &cfg); err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
return normalizeLogWebhookConfig(cfg), nil
|
||||
}
|
||||
|
||||
func (s *LogWebhookService) SetConfig(ctx context.Context, cfg LogWebhookConfig) (LogWebhookConfig, error) {
|
||||
if s == nil || s.rdb == nil {
|
||||
return LogWebhookConfig{}, errors.New("redis not configured")
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
cfg.URL = strings.TrimSpace(cfg.URL)
|
||||
cfg.Secret = strings.TrimSpace(cfg.Secret)
|
||||
if cfg.Enabled && cfg.URL == "" {
|
||||
return cfg, errors.New("url is required when enabled")
|
||||
}
|
||||
cfg = normalizeLogWebhookConfig(cfg)
|
||||
if !cfg.Enabled && cfg.URL == "" {
|
||||
if err := s.rdb.Del(ctx, logWebhookConfigKey).Err(); err != nil && err != redis.Nil {
|
||||
return cfg, err
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
payload, err := jsoncodec.Marshal(cfg)
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
if err := s.rdb.Set(ctx, logWebhookConfigKey, payload, 0).Err(); err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (s *LogWebhookService) NotifyIfNeeded(ctx context.Context, rec model.LogRecord) {
|
||||
if s == nil || s.rdb == nil || s.client == nil {
|
||||
return
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
cfg, err := s.GetConfig(ctx)
|
||||
if err != nil || !cfg.Enabled || strings.TrimSpace(cfg.URL) == "" {
|
||||
return
|
||||
}
|
||||
if !isLogWebhookError(rec, cfg) {
|
||||
return
|
||||
}
|
||||
|
||||
window := cfg.WindowSeconds
|
||||
if window <= 0 {
|
||||
window = defaultWebhookWindow
|
||||
}
|
||||
threshold := cfg.Threshold
|
||||
if threshold <= 0 {
|
||||
threshold = defaultWebhookThreshold
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
bucket := now.Unix() / int64(window)
|
||||
countKey := fmt.Sprintf("%s:%d", logWebhookErrorsKey, bucket)
|
||||
cnt, err := s.rdb.Incr(ctx, countKey).Result()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if cnt == 1 {
|
||||
_ = s.rdb.Expire(ctx, countKey, time.Duration(window+1)*time.Second).Err()
|
||||
}
|
||||
if cnt < int64(threshold) {
|
||||
return
|
||||
}
|
||||
|
||||
cooldown := cfg.CooldownSeconds
|
||||
if cooldown <= 0 {
|
||||
cooldown = defaultWebhookCooldown
|
||||
}
|
||||
ok, err := s.rdb.SetNX(ctx, logWebhookCooldownKey, now.Unix(), time.Duration(cooldown)*time.Second).Result()
|
||||
if err != nil || !ok {
|
||||
return
|
||||
}
|
||||
|
||||
_ = s.send(ctx, cfg, rec, cnt, now)
|
||||
}
|
||||
|
||||
type logWebhookPayload struct {
|
||||
Type string `json:"type"`
|
||||
SentAt string `json:"sent_at"`
|
||||
Threshold int `json:"threshold"`
|
||||
WindowSeconds int `json:"window_seconds"`
|
||||
Count int64 `json:"count"`
|
||||
Sample logWebhookSample `json:"sample"`
|
||||
}
|
||||
|
||||
type logWebhookSample struct {
|
||||
Group string `json:"group"`
|
||||
KeyID uint `json:"key_id"`
|
||||
ModelName string `json:"model"`
|
||||
ProviderName string `json:"provider_name"`
|
||||
ProviderType string `json:"provider_type"`
|
||||
StatusCode int `json:"status_code"`
|
||||
ErrorMessage string `json:"error_message"`
|
||||
ClientIP string `json:"client_ip"`
|
||||
}
|
||||
|
||||
func (s *LogWebhookService) send(ctx context.Context, cfg LogWebhookConfig, rec model.LogRecord, count int64, now time.Time) error {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
payload := logWebhookPayload{
|
||||
Type: "log_error_threshold",
|
||||
SentAt: now.UTC().Format(time.RFC3339),
|
||||
Threshold: cfg.Threshold,
|
||||
WindowSeconds: cfg.WindowSeconds,
|
||||
Count: count,
|
||||
Sample: logWebhookSample{
|
||||
Group: rec.Group,
|
||||
KeyID: rec.KeyID,
|
||||
ModelName: rec.ModelName,
|
||||
ProviderName: rec.ProviderName,
|
||||
ProviderType: rec.ProviderType,
|
||||
StatusCode: rec.StatusCode,
|
||||
ErrorMessage: rec.ErrorMessage,
|
||||
ClientIP: rec.ClientIP,
|
||||
},
|
||||
}
|
||||
body, err := jsoncodec.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.URL, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("User-Agent", "ez-api")
|
||||
if cfg.Secret != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+cfg.Secret)
|
||||
}
|
||||
resp, err := s.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
|
||||
return fmt.Errorf("webhook status %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func normalizeLogWebhookConfig(cfg LogWebhookConfig) LogWebhookConfig {
|
||||
if cfg.Threshold <= 0 {
|
||||
cfg.Threshold = defaultWebhookThreshold
|
||||
}
|
||||
if cfg.WindowSeconds <= 0 {
|
||||
cfg.WindowSeconds = defaultWebhookWindow
|
||||
}
|
||||
if cfg.CooldownSeconds <= 0 {
|
||||
cfg.CooldownSeconds = defaultWebhookCooldown
|
||||
}
|
||||
if cfg.StatusCodeThreshold <= 0 {
|
||||
cfg.StatusCodeThreshold = defaultWebhookStatusCode
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
func isLogWebhookError(rec model.LogRecord, cfg LogWebhookConfig) bool {
|
||||
threshold := cfg.StatusCodeThreshold
|
||||
if threshold <= 0 {
|
||||
threshold = defaultWebhookStatusCode
|
||||
}
|
||||
if rec.StatusCode >= threshold {
|
||||
return true
|
||||
}
|
||||
return strings.TrimSpace(rec.ErrorMessage) != ""
|
||||
}
|
||||
60
internal/service/log_webhook_test.go
Normal file
60
internal/service/log_webhook_test.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/alicebob/miniredis/v2"
|
||||
"github.com/ez-api/ez-api/internal/model"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func TestLogWebhookServiceNotifyThreshold(t *testing.T) {
|
||||
mr := miniredis.RunT(t)
|
||||
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
|
||||
svc := NewLogWebhookService(rdb)
|
||||
|
||||
var hits int64
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt64(&hits, 1)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
cfg := LogWebhookConfig{
|
||||
Enabled: true,
|
||||
URL: srv.URL,
|
||||
Threshold: 2,
|
||||
WindowSeconds: 60,
|
||||
CooldownSeconds: 300,
|
||||
StatusCodeThreshold: 500,
|
||||
}
|
||||
if _, err := svc.SetConfig(context.Background(), cfg); err != nil {
|
||||
t.Fatalf("set config: %v", err)
|
||||
}
|
||||
|
||||
rec := model.LogRecord{StatusCode: 500, ErrorMessage: "upstream error"}
|
||||
svc.NotifyIfNeeded(context.Background(), rec)
|
||||
svc.NotifyIfNeeded(context.Background(), rec)
|
||||
|
||||
if atomic.LoadInt64(&hits) != 1 {
|
||||
t.Fatalf("expected 1 webhook hit, got %d", hits)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogWebhookServiceDefaults(t *testing.T) {
|
||||
mr := miniredis.RunT(t)
|
||||
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
|
||||
svc := NewLogWebhookService(rdb)
|
||||
|
||||
cfg, err := svc.GetConfig(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("get config: %v", err)
|
||||
}
|
||||
if cfg.Threshold != defaultWebhookThreshold || cfg.WindowSeconds != defaultWebhookWindow {
|
||||
t.Fatalf("unexpected defaults: %+v", cfg)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user