package service import ( "bytes" "context" "errors" "fmt" "net/http" "strings" "time" "github.com/ez-api/ez-api/internal/model" "github.com/ez-api/foundation/jsoncodec" "github.com/redis/go-redis/v9" ) const ( logWebhookConfigKey = "meta:log:webhook" logWebhookErrorsKey = "meta:log:webhook:errors" logWebhookCooldownKey = "meta:log:webhook:cooldown" defaultWebhookThreshold = 10 defaultWebhookWindow = 60 defaultWebhookCooldown = 300 defaultWebhookStatusCode = 500 ) type LogWebhookConfig struct { Enabled bool `json:"enabled"` URL string `json:"url"` Secret string `json:"secret"` Threshold int `json:"threshold"` WindowSeconds int `json:"window_seconds"` CooldownSeconds int `json:"cooldown_seconds"` StatusCodeThreshold int `json:"status_code_threshold"` } type LogWebhookService struct { rdb *redis.Client client *http.Client } func NewLogWebhookService(rdb *redis.Client) *LogWebhookService { if rdb == nil { return nil } return &LogWebhookService{ rdb: rdb, client: &http.Client{Timeout: 3 * time.Second}, } } func DefaultLogWebhookConfig() LogWebhookConfig { return LogWebhookConfig{ Enabled: false, Threshold: defaultWebhookThreshold, WindowSeconds: defaultWebhookWindow, CooldownSeconds: defaultWebhookCooldown, StatusCodeThreshold: defaultWebhookStatusCode, } } func (s *LogWebhookService) GetConfig(ctx context.Context) (LogWebhookConfig, error) { cfg := DefaultLogWebhookConfig() if s == nil || s.rdb == nil { return cfg, nil } if ctx == nil { ctx = context.Background() } raw, err := s.rdb.Get(ctx, logWebhookConfigKey).Result() if err == redis.Nil { return cfg, nil } if err != nil { return cfg, err } if err := jsoncodec.Unmarshal([]byte(raw), &cfg); err != nil { return cfg, err } return normalizeLogWebhookConfig(cfg), nil } func (s *LogWebhookService) SetConfig(ctx context.Context, cfg LogWebhookConfig) (LogWebhookConfig, error) { if s == nil || s.rdb == nil { return LogWebhookConfig{}, errors.New("redis not configured") } if ctx == nil { ctx = context.Background() } cfg.URL = strings.TrimSpace(cfg.URL) cfg.Secret = strings.TrimSpace(cfg.Secret) if cfg.Enabled && cfg.URL == "" { return cfg, errors.New("url is required when enabled") } cfg = normalizeLogWebhookConfig(cfg) if !cfg.Enabled && cfg.URL == "" { if err := s.rdb.Del(ctx, logWebhookConfigKey).Err(); err != nil && err != redis.Nil { return cfg, err } return cfg, nil } payload, err := jsoncodec.Marshal(cfg) if err != nil { return cfg, err } if err := s.rdb.Set(ctx, logWebhookConfigKey, payload, 0).Err(); err != nil { return cfg, err } return cfg, nil } func (s *LogWebhookService) NotifyIfNeeded(ctx context.Context, rec model.LogRecord) { if s == nil || s.rdb == nil || s.client == nil { return } if ctx == nil { ctx = context.Background() } cfg, err := s.GetConfig(ctx) if err != nil || !cfg.Enabled || strings.TrimSpace(cfg.URL) == "" { return } if !isLogWebhookError(rec, cfg) { return } window := cfg.WindowSeconds if window <= 0 { window = defaultWebhookWindow } threshold := cfg.Threshold if threshold <= 0 { threshold = defaultWebhookThreshold } now := time.Now().UTC() bucket := now.Unix() / int64(window) countKey := fmt.Sprintf("%s:%d", logWebhookErrorsKey, bucket) cnt, err := s.rdb.Incr(ctx, countKey).Result() if err != nil { return } if cnt == 1 { _ = s.rdb.Expire(ctx, countKey, time.Duration(window+1)*time.Second).Err() } if cnt < int64(threshold) { return } cooldown := cfg.CooldownSeconds if cooldown <= 0 { cooldown = defaultWebhookCooldown } ok, err := s.rdb.SetNX(ctx, logWebhookCooldownKey, now.Unix(), time.Duration(cooldown)*time.Second).Result() if err != nil || !ok { return } _ = s.send(ctx, cfg, rec, cnt, now) } type logWebhookPayload struct { Type string `json:"type"` SentAt string `json:"sent_at"` Threshold int `json:"threshold"` WindowSeconds int `json:"window_seconds"` Count int64 `json:"count"` Sample logWebhookSample `json:"sample"` } type logWebhookSample struct { Group string `json:"group"` KeyID uint `json:"key_id"` ModelName string `json:"model"` ProviderName string `json:"provider_name"` ProviderType string `json:"provider_type"` StatusCode int `json:"status_code"` ErrorMessage string `json:"error_message"` ClientIP string `json:"client_ip"` } func (s *LogWebhookService) send(ctx context.Context, cfg LogWebhookConfig, rec model.LogRecord, count int64, now time.Time) error { if ctx == nil { ctx = context.Background() } payload := logWebhookPayload{ Type: "log_error_threshold", SentAt: now.UTC().Format(time.RFC3339), Threshold: cfg.Threshold, WindowSeconds: cfg.WindowSeconds, Count: count, Sample: logWebhookSample{ Group: rec.Group, KeyID: rec.KeyID, ModelName: rec.ModelName, ProviderName: rec.ProviderName, ProviderType: rec.ProviderType, StatusCode: rec.StatusCode, ErrorMessage: rec.ErrorMessage, ClientIP: rec.ClientIP, }, } body, err := jsoncodec.Marshal(payload) if err != nil { return err } req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.URL, bytes.NewReader(body)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.Header.Set("User-Agent", "ez-api") if cfg.Secret != "" { req.Header.Set("Authorization", "Bearer "+cfg.Secret) } resp, err := s.client.Do(req) if err != nil { return err } _ = resp.Body.Close() if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { return fmt.Errorf("webhook status %d", resp.StatusCode) } return nil } func normalizeLogWebhookConfig(cfg LogWebhookConfig) LogWebhookConfig { if cfg.Threshold <= 0 { cfg.Threshold = defaultWebhookThreshold } if cfg.WindowSeconds <= 0 { cfg.WindowSeconds = defaultWebhookWindow } if cfg.CooldownSeconds <= 0 { cfg.CooldownSeconds = defaultWebhookCooldown } if cfg.StatusCodeThreshold <= 0 { cfg.StatusCodeThreshold = defaultWebhookStatusCode } return cfg } func isLogWebhookError(rec model.LogRecord, cfg LogWebhookConfig) bool { threshold := cfg.StatusCodeThreshold if threshold <= 0 { threshold = defaultWebhookStatusCode } if rec.StatusCode >= threshold { return true } return strings.TrimSpace(rec.ErrorMessage) != "" }