From 4c1e03f83dc5d7374665458fd5c3d45c131d18dd Mon Sep 17 00:00:00 2001 From: zenfun Date: Sun, 21 Dec 2025 14:13:35 +0800 Subject: [PATCH] feat(api): add log webhook notification service Implement webhook notifications for log error threshold alerts with configurable thresholds, time windows, and cooldown periods. - Add LogWebhookService with Redis-backed configuration storage - Add admin endpoints for webhook config management (GET/PUT) - Trigger webhook notifications when error count exceeds threshold - Support status code threshold and error message detection - Include sample log record data in webhook payload --- cmd/server/main.go | 2 + internal/api/handler.go | 25 ++- internal/api/log_webhook_handler.go | 60 ++++++ internal/api/log_webhook_handler_test.go | 112 ++++++++++ internal/service/log_webhook.go | 252 +++++++++++++++++++++++ internal/service/log_webhook_test.go | 60 ++++++ 6 files changed, 505 insertions(+), 6 deletions(-) create mode 100644 internal/api/log_webhook_handler.go create mode 100644 internal/api/log_webhook_handler_test.go create mode 100644 internal/service/log_webhook.go create mode 100644 internal/service/log_webhook_test.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 9be89bd..357f6de 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -218,6 +218,8 @@ func main() { adminGroup.GET("/logs", handler.ListLogs) adminGroup.DELETE("/logs", handler.DeleteLogs) adminGroup.GET("/logs/stats", handler.LogStats) + adminGroup.GET("/logs/webhook", handler.GetLogWebhookConfig) + adminGroup.PUT("/logs/webhook", handler.UpdateLogWebhookConfig) adminGroup.GET("/stats", adminHandler.GetAdminStats) adminGroup.POST("/bindings", handler.CreateBinding) adminGroup.GET("/bindings", handler.ListBindings) diff --git a/internal/api/handler.go b/internal/api/handler.go index 98f94f7..0ea80d7 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -17,14 +17,21 @@ import ( ) type Handler struct { - db *gorm.DB - sync *service.SyncService - logger *service.LogWriter - rdb *redis.Client + db *gorm.DB + sync *service.SyncService + logger *service.LogWriter + rdb *redis.Client + logWebhook *service.LogWebhookService } func NewHandler(db *gorm.DB, sync *service.SyncService, logger *service.LogWriter, rdb *redis.Client) *Handler { - return &Handler{db: db, sync: sync, logger: logger, rdb: rdb} + return &Handler{ + db: db, + sync: sync, + logger: logger, + rdb: rdb, + logWebhook: service.NewLogWebhookService(rdb), + } } // CreateKey is now handled by MasterHandler @@ -471,7 +478,13 @@ func (h *Handler) IngestLog(c *gin.Context) { } // By default, only metadata is expected; payload fields may be empty. - h.logger.Write(rec) + if h.logger != nil { + h.logger.Write(rec) + } + if h.logWebhook != nil { + recCopy := rec + go h.logWebhook.NotifyIfNeeded(context.Background(), recCopy) + } c.JSON(http.StatusAccepted, gin.H{"status": "queued"}) } diff --git a/internal/api/log_webhook_handler.go b/internal/api/log_webhook_handler.go new file mode 100644 index 0000000..91449a6 --- /dev/null +++ b/internal/api/log_webhook_handler.go @@ -0,0 +1,60 @@ +package api + +import ( + "net/http" + + "github.com/ez-api/ez-api/internal/service" + "github.com/gin-gonic/gin" +) + +// GetLogWebhookConfig godoc +// @Summary Get log webhook config +// @Description Returns current webhook notification config +// @Tags admin +// @Produce json +// @Security AdminAuth +// @Success 200 {object} service.LogWebhookConfig +// @Failure 500 {object} gin.H +// @Router /admin/logs/webhook [get] +func (h *Handler) GetLogWebhookConfig(c *gin.Context) { + if h == nil || h.logWebhook == nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "webhook service not configured"}) + return + } + cfg, err := h.logWebhook.GetConfig(c.Request.Context()) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read webhook config", "details": err.Error()}) + return + } + c.JSON(http.StatusOK, cfg) +} + +// UpdateLogWebhookConfig godoc +// @Summary Update log webhook config +// @Description Updates webhook notification config +// @Tags admin +// @Accept json +// @Produce json +// @Security AdminAuth +// @Param request body service.LogWebhookConfig true "Webhook config" +// @Success 200 {object} service.LogWebhookConfig +// @Failure 400 {object} gin.H +// @Failure 500 {object} gin.H +// @Router /admin/logs/webhook [put] +func (h *Handler) UpdateLogWebhookConfig(c *gin.Context) { + if h == nil || h.logWebhook == nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "webhook service not configured"}) + return + } + var req service.LogWebhookConfig + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + cfg, err := h.logWebhook.SetConfig(c.Request.Context(), req) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, cfg) +} diff --git a/internal/api/log_webhook_handler_test.go b/internal/api/log_webhook_handler_test.go new file mode 100644 index 0000000..8702b18 --- /dev/null +++ b/internal/api/log_webhook_handler_test.go @@ -0,0 +1,112 @@ +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/alicebob/miniredis/v2" + "github.com/ez-api/ez-api/internal/model" + "github.com/ez-api/ez-api/internal/service" + "github.com/gin-gonic/gin" + "github.com/redis/go-redis/v9" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func newTestHandlerWithWebhook(t *testing.T) (*Handler, *miniredis.Miniredis) { + t.Helper() + gin.SetMode(gin.TestMode) + + dsn := fmt.Sprintf("file:%s?mode=memory&cache=shared", t.Name()) + db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{}) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + if err := db.AutoMigrate(&model.LogRecord{}); err != nil { + t.Fatalf("migrate: %v", err) + } + + mr := miniredis.RunT(t) + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + sync := service.NewSyncService(rdb) + return NewHandler(db, sync, nil, rdb), mr +} + +func TestLogWebhookConfigCRUD(t *testing.T) { + h, _ := newTestHandlerWithWebhook(t) + + r := gin.New() + r.GET("/admin/logs/webhook", h.GetLogWebhookConfig) + r.PUT("/admin/logs/webhook", h.UpdateLogWebhookConfig) + + reqBody := service.LogWebhookConfig{ + Enabled: true, + URL: "https://example.com/webhook", + Secret: "s1", + Threshold: 3, + WindowSeconds: 60, + CooldownSeconds: 120, + StatusCodeThreshold: 500, + } + payload, _ := json.Marshal(reqBody) + + req := httptest.NewRequest(http.MethodPut, "/admin/logs/webhook", bytes.NewReader(payload)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + if rr.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rr.Code, rr.Body.String()) + } + + getReq := httptest.NewRequest(http.MethodGet, "/admin/logs/webhook", nil) + getRR := httptest.NewRecorder() + r.ServeHTTP(getRR, getReq) + if getRR.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", getRR.Code, getRR.Body.String()) + } + + var got service.LogWebhookConfig + if err := json.Unmarshal(getRR.Body.Bytes(), &got); err != nil { + t.Fatalf("decode response: %v", err) + } + if !got.Enabled || got.URL == "" || got.Threshold != 3 { + t.Fatalf("unexpected webhook config: %+v", got) + } + if got.Secret != "s1" { + t.Fatalf("expected secret s1, got %q", got.Secret) + } +} + +func TestLogWebhookConfigDisableClears(t *testing.T) { + h, mr := newTestHandlerWithWebhook(t) + + r := gin.New() + r.PUT("/admin/logs/webhook", h.UpdateLogWebhookConfig) + + seed := service.LogWebhookConfig{Enabled: true, URL: "https://example.com"} + payload, _ := json.Marshal(seed) + req := httptest.NewRequest(http.MethodPut, "/admin/logs/webhook", bytes.NewReader(payload)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + if rr.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rr.Code, rr.Body.String()) + } + + clear := service.LogWebhookConfig{Enabled: false} + clearPayload, _ := json.Marshal(clear) + clearReq := httptest.NewRequest(http.MethodPut, "/admin/logs/webhook", bytes.NewReader(clearPayload)) + clearReq.Header.Set("Content-Type", "application/json") + clearRR := httptest.NewRecorder() + r.ServeHTTP(clearRR, clearReq) + if clearRR.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", clearRR.Code, clearRR.Body.String()) + } + if mr.Exists("meta:log:webhook") { + t.Fatalf("expected webhook config to be cleared") + } +} diff --git a/internal/service/log_webhook.go b/internal/service/log_webhook.go new file mode 100644 index 0000000..9224f17 --- /dev/null +++ b/internal/service/log_webhook.go @@ -0,0 +1,252 @@ +package service + +import ( + "bytes" + "context" + "errors" + "fmt" + "net/http" + "strings" + "time" + + "github.com/ez-api/ez-api/internal/model" + "github.com/ez-api/foundation/jsoncodec" + "github.com/redis/go-redis/v9" +) + +const ( + logWebhookConfigKey = "meta:log:webhook" + logWebhookErrorsKey = "meta:log:webhook:errors" + logWebhookCooldownKey = "meta:log:webhook:cooldown" + defaultWebhookThreshold = 10 + defaultWebhookWindow = 60 + defaultWebhookCooldown = 300 + defaultWebhookStatusCode = 500 +) + +type LogWebhookConfig struct { + Enabled bool `json:"enabled"` + URL string `json:"url"` + Secret string `json:"secret"` + Threshold int `json:"threshold"` + WindowSeconds int `json:"window_seconds"` + CooldownSeconds int `json:"cooldown_seconds"` + StatusCodeThreshold int `json:"status_code_threshold"` +} + +type LogWebhookService struct { + rdb *redis.Client + client *http.Client +} + +func NewLogWebhookService(rdb *redis.Client) *LogWebhookService { + if rdb == nil { + return nil + } + return &LogWebhookService{ + rdb: rdb, + client: &http.Client{Timeout: 3 * time.Second}, + } +} + +func DefaultLogWebhookConfig() LogWebhookConfig { + return LogWebhookConfig{ + Enabled: false, + Threshold: defaultWebhookThreshold, + WindowSeconds: defaultWebhookWindow, + CooldownSeconds: defaultWebhookCooldown, + StatusCodeThreshold: defaultWebhookStatusCode, + } +} + +func (s *LogWebhookService) GetConfig(ctx context.Context) (LogWebhookConfig, error) { + cfg := DefaultLogWebhookConfig() + if s == nil || s.rdb == nil { + return cfg, nil + } + if ctx == nil { + ctx = context.Background() + } + raw, err := s.rdb.Get(ctx, logWebhookConfigKey).Result() + if err == redis.Nil { + return cfg, nil + } + if err != nil { + return cfg, err + } + if err := jsoncodec.Unmarshal([]byte(raw), &cfg); err != nil { + return cfg, err + } + return normalizeLogWebhookConfig(cfg), nil +} + +func (s *LogWebhookService) SetConfig(ctx context.Context, cfg LogWebhookConfig) (LogWebhookConfig, error) { + if s == nil || s.rdb == nil { + return LogWebhookConfig{}, errors.New("redis not configured") + } + if ctx == nil { + ctx = context.Background() + } + cfg.URL = strings.TrimSpace(cfg.URL) + cfg.Secret = strings.TrimSpace(cfg.Secret) + if cfg.Enabled && cfg.URL == "" { + return cfg, errors.New("url is required when enabled") + } + cfg = normalizeLogWebhookConfig(cfg) + if !cfg.Enabled && cfg.URL == "" { + if err := s.rdb.Del(ctx, logWebhookConfigKey).Err(); err != nil && err != redis.Nil { + return cfg, err + } + return cfg, nil + } + payload, err := jsoncodec.Marshal(cfg) + if err != nil { + return cfg, err + } + if err := s.rdb.Set(ctx, logWebhookConfigKey, payload, 0).Err(); err != nil { + return cfg, err + } + return cfg, nil +} + +func (s *LogWebhookService) NotifyIfNeeded(ctx context.Context, rec model.LogRecord) { + if s == nil || s.rdb == nil || s.client == nil { + return + } + if ctx == nil { + ctx = context.Background() + } + cfg, err := s.GetConfig(ctx) + if err != nil || !cfg.Enabled || strings.TrimSpace(cfg.URL) == "" { + return + } + if !isLogWebhookError(rec, cfg) { + return + } + + window := cfg.WindowSeconds + if window <= 0 { + window = defaultWebhookWindow + } + threshold := cfg.Threshold + if threshold <= 0 { + threshold = defaultWebhookThreshold + } + + now := time.Now().UTC() + bucket := now.Unix() / int64(window) + countKey := fmt.Sprintf("%s:%d", logWebhookErrorsKey, bucket) + cnt, err := s.rdb.Incr(ctx, countKey).Result() + if err != nil { + return + } + if cnt == 1 { + _ = s.rdb.Expire(ctx, countKey, time.Duration(window+1)*time.Second).Err() + } + if cnt < int64(threshold) { + return + } + + cooldown := cfg.CooldownSeconds + if cooldown <= 0 { + cooldown = defaultWebhookCooldown + } + ok, err := s.rdb.SetNX(ctx, logWebhookCooldownKey, now.Unix(), time.Duration(cooldown)*time.Second).Result() + if err != nil || !ok { + return + } + + _ = s.send(ctx, cfg, rec, cnt, now) +} + +type logWebhookPayload struct { + Type string `json:"type"` + SentAt string `json:"sent_at"` + Threshold int `json:"threshold"` + WindowSeconds int `json:"window_seconds"` + Count int64 `json:"count"` + Sample logWebhookSample `json:"sample"` +} + +type logWebhookSample struct { + Group string `json:"group"` + KeyID uint `json:"key_id"` + ModelName string `json:"model"` + ProviderName string `json:"provider_name"` + ProviderType string `json:"provider_type"` + StatusCode int `json:"status_code"` + ErrorMessage string `json:"error_message"` + ClientIP string `json:"client_ip"` +} + +func (s *LogWebhookService) send(ctx context.Context, cfg LogWebhookConfig, rec model.LogRecord, count int64, now time.Time) error { + if ctx == nil { + ctx = context.Background() + } + payload := logWebhookPayload{ + Type: "log_error_threshold", + SentAt: now.UTC().Format(time.RFC3339), + Threshold: cfg.Threshold, + WindowSeconds: cfg.WindowSeconds, + Count: count, + Sample: logWebhookSample{ + Group: rec.Group, + KeyID: rec.KeyID, + ModelName: rec.ModelName, + ProviderName: rec.ProviderName, + ProviderType: rec.ProviderType, + StatusCode: rec.StatusCode, + ErrorMessage: rec.ErrorMessage, + ClientIP: rec.ClientIP, + }, + } + body, err := jsoncodec.Marshal(payload) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.URL, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "ez-api") + if cfg.Secret != "" { + req.Header.Set("Authorization", "Bearer "+cfg.Secret) + } + resp, err := s.client.Do(req) + if err != nil { + return err + } + _ = resp.Body.Close() + if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { + return fmt.Errorf("webhook status %d", resp.StatusCode) + } + return nil +} + +func normalizeLogWebhookConfig(cfg LogWebhookConfig) LogWebhookConfig { + if cfg.Threshold <= 0 { + cfg.Threshold = defaultWebhookThreshold + } + if cfg.WindowSeconds <= 0 { + cfg.WindowSeconds = defaultWebhookWindow + } + if cfg.CooldownSeconds <= 0 { + cfg.CooldownSeconds = defaultWebhookCooldown + } + if cfg.StatusCodeThreshold <= 0 { + cfg.StatusCodeThreshold = defaultWebhookStatusCode + } + return cfg +} + +func isLogWebhookError(rec model.LogRecord, cfg LogWebhookConfig) bool { + threshold := cfg.StatusCodeThreshold + if threshold <= 0 { + threshold = defaultWebhookStatusCode + } + if rec.StatusCode >= threshold { + return true + } + return strings.TrimSpace(rec.ErrorMessage) != "" +} diff --git a/internal/service/log_webhook_test.go b/internal/service/log_webhook_test.go new file mode 100644 index 0000000..4acb26b --- /dev/null +++ b/internal/service/log_webhook_test.go @@ -0,0 +1,60 @@ +package service + +import ( + "context" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + + "github.com/alicebob/miniredis/v2" + "github.com/ez-api/ez-api/internal/model" + "github.com/redis/go-redis/v9" +) + +func TestLogWebhookServiceNotifyThreshold(t *testing.T) { + mr := miniredis.RunT(t) + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + svc := NewLogWebhookService(rdb) + + var hits int64 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt64(&hits, 1) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + cfg := LogWebhookConfig{ + Enabled: true, + URL: srv.URL, + Threshold: 2, + WindowSeconds: 60, + CooldownSeconds: 300, + StatusCodeThreshold: 500, + } + if _, err := svc.SetConfig(context.Background(), cfg); err != nil { + t.Fatalf("set config: %v", err) + } + + rec := model.LogRecord{StatusCode: 500, ErrorMessage: "upstream error"} + svc.NotifyIfNeeded(context.Background(), rec) + svc.NotifyIfNeeded(context.Background(), rec) + + if atomic.LoadInt64(&hits) != 1 { + t.Fatalf("expected 1 webhook hit, got %d", hits) + } +} + +func TestLogWebhookServiceDefaults(t *testing.T) { + mr := miniredis.RunT(t) + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + svc := NewLogWebhookService(rdb) + + cfg, err := svc.GetConfig(context.Background()) + if err != nil { + t.Fatalf("get config: %v", err) + } + if cfg.Threshold != defaultWebhookThreshold || cfg.WindowSeconds != defaultWebhookWindow { + t.Fatalf("unexpected defaults: %+v", cfg) + } +}