mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
Implement webhook notifications for log error threshold alerts with configurable thresholds, time windows, and cooldown periods. - Add LogWebhookService with Redis-backed configuration storage - Add admin endpoints for webhook config management (GET/PUT) - Trigger webhook notifications when error count exceeds threshold - Support status code threshold and error message detection - Include sample log record data in webhook payload
253 lines
6.5 KiB
Go
253 lines
6.5 KiB
Go
package service
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/ez-api/ez-api/internal/model"
|
|
"github.com/ez-api/foundation/jsoncodec"
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
const (
|
|
logWebhookConfigKey = "meta:log:webhook"
|
|
logWebhookErrorsKey = "meta:log:webhook:errors"
|
|
logWebhookCooldownKey = "meta:log:webhook:cooldown"
|
|
defaultWebhookThreshold = 10
|
|
defaultWebhookWindow = 60
|
|
defaultWebhookCooldown = 300
|
|
defaultWebhookStatusCode = 500
|
|
)
|
|
|
|
type LogWebhookConfig struct {
|
|
Enabled bool `json:"enabled"`
|
|
URL string `json:"url"`
|
|
Secret string `json:"secret"`
|
|
Threshold int `json:"threshold"`
|
|
WindowSeconds int `json:"window_seconds"`
|
|
CooldownSeconds int `json:"cooldown_seconds"`
|
|
StatusCodeThreshold int `json:"status_code_threshold"`
|
|
}
|
|
|
|
type LogWebhookService struct {
|
|
rdb *redis.Client
|
|
client *http.Client
|
|
}
|
|
|
|
func NewLogWebhookService(rdb *redis.Client) *LogWebhookService {
|
|
if rdb == nil {
|
|
return nil
|
|
}
|
|
return &LogWebhookService{
|
|
rdb: rdb,
|
|
client: &http.Client{Timeout: 3 * time.Second},
|
|
}
|
|
}
|
|
|
|
func DefaultLogWebhookConfig() LogWebhookConfig {
|
|
return LogWebhookConfig{
|
|
Enabled: false,
|
|
Threshold: defaultWebhookThreshold,
|
|
WindowSeconds: defaultWebhookWindow,
|
|
CooldownSeconds: defaultWebhookCooldown,
|
|
StatusCodeThreshold: defaultWebhookStatusCode,
|
|
}
|
|
}
|
|
|
|
func (s *LogWebhookService) GetConfig(ctx context.Context) (LogWebhookConfig, error) {
|
|
cfg := DefaultLogWebhookConfig()
|
|
if s == nil || s.rdb == nil {
|
|
return cfg, nil
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
raw, err := s.rdb.Get(ctx, logWebhookConfigKey).Result()
|
|
if err == redis.Nil {
|
|
return cfg, nil
|
|
}
|
|
if err != nil {
|
|
return cfg, err
|
|
}
|
|
if err := jsoncodec.Unmarshal([]byte(raw), &cfg); err != nil {
|
|
return cfg, err
|
|
}
|
|
return normalizeLogWebhookConfig(cfg), nil
|
|
}
|
|
|
|
func (s *LogWebhookService) SetConfig(ctx context.Context, cfg LogWebhookConfig) (LogWebhookConfig, error) {
|
|
if s == nil || s.rdb == nil {
|
|
return LogWebhookConfig{}, errors.New("redis not configured")
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
cfg.URL = strings.TrimSpace(cfg.URL)
|
|
cfg.Secret = strings.TrimSpace(cfg.Secret)
|
|
if cfg.Enabled && cfg.URL == "" {
|
|
return cfg, errors.New("url is required when enabled")
|
|
}
|
|
cfg = normalizeLogWebhookConfig(cfg)
|
|
if !cfg.Enabled && cfg.URL == "" {
|
|
if err := s.rdb.Del(ctx, logWebhookConfigKey).Err(); err != nil && err != redis.Nil {
|
|
return cfg, err
|
|
}
|
|
return cfg, nil
|
|
}
|
|
payload, err := jsoncodec.Marshal(cfg)
|
|
if err != nil {
|
|
return cfg, err
|
|
}
|
|
if err := s.rdb.Set(ctx, logWebhookConfigKey, payload, 0).Err(); err != nil {
|
|
return cfg, err
|
|
}
|
|
return cfg, nil
|
|
}
|
|
|
|
func (s *LogWebhookService) NotifyIfNeeded(ctx context.Context, rec model.LogRecord) {
|
|
if s == nil || s.rdb == nil || s.client == nil {
|
|
return
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
cfg, err := s.GetConfig(ctx)
|
|
if err != nil || !cfg.Enabled || strings.TrimSpace(cfg.URL) == "" {
|
|
return
|
|
}
|
|
if !isLogWebhookError(rec, cfg) {
|
|
return
|
|
}
|
|
|
|
window := cfg.WindowSeconds
|
|
if window <= 0 {
|
|
window = defaultWebhookWindow
|
|
}
|
|
threshold := cfg.Threshold
|
|
if threshold <= 0 {
|
|
threshold = defaultWebhookThreshold
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
bucket := now.Unix() / int64(window)
|
|
countKey := fmt.Sprintf("%s:%d", logWebhookErrorsKey, bucket)
|
|
cnt, err := s.rdb.Incr(ctx, countKey).Result()
|
|
if err != nil {
|
|
return
|
|
}
|
|
if cnt == 1 {
|
|
_ = s.rdb.Expire(ctx, countKey, time.Duration(window+1)*time.Second).Err()
|
|
}
|
|
if cnt < int64(threshold) {
|
|
return
|
|
}
|
|
|
|
cooldown := cfg.CooldownSeconds
|
|
if cooldown <= 0 {
|
|
cooldown = defaultWebhookCooldown
|
|
}
|
|
ok, err := s.rdb.SetNX(ctx, logWebhookCooldownKey, now.Unix(), time.Duration(cooldown)*time.Second).Result()
|
|
if err != nil || !ok {
|
|
return
|
|
}
|
|
|
|
_ = s.send(ctx, cfg, rec, cnt, now)
|
|
}
|
|
|
|
type logWebhookPayload struct {
|
|
Type string `json:"type"`
|
|
SentAt string `json:"sent_at"`
|
|
Threshold int `json:"threshold"`
|
|
WindowSeconds int `json:"window_seconds"`
|
|
Count int64 `json:"count"`
|
|
Sample logWebhookSample `json:"sample"`
|
|
}
|
|
|
|
type logWebhookSample struct {
|
|
Group string `json:"group"`
|
|
KeyID uint `json:"key_id"`
|
|
ModelName string `json:"model"`
|
|
ProviderName string `json:"provider_name"`
|
|
ProviderType string `json:"provider_type"`
|
|
StatusCode int `json:"status_code"`
|
|
ErrorMessage string `json:"error_message"`
|
|
ClientIP string `json:"client_ip"`
|
|
}
|
|
|
|
func (s *LogWebhookService) send(ctx context.Context, cfg LogWebhookConfig, rec model.LogRecord, count int64, now time.Time) error {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
payload := logWebhookPayload{
|
|
Type: "log_error_threshold",
|
|
SentAt: now.UTC().Format(time.RFC3339),
|
|
Threshold: cfg.Threshold,
|
|
WindowSeconds: cfg.WindowSeconds,
|
|
Count: count,
|
|
Sample: logWebhookSample{
|
|
Group: rec.Group,
|
|
KeyID: rec.KeyID,
|
|
ModelName: rec.ModelName,
|
|
ProviderName: rec.ProviderName,
|
|
ProviderType: rec.ProviderType,
|
|
StatusCode: rec.StatusCode,
|
|
ErrorMessage: rec.ErrorMessage,
|
|
ClientIP: rec.ClientIP,
|
|
},
|
|
}
|
|
body, err := jsoncodec.Marshal(payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.URL, bytes.NewReader(body))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("User-Agent", "ez-api")
|
|
if cfg.Secret != "" {
|
|
req.Header.Set("Authorization", "Bearer "+cfg.Secret)
|
|
}
|
|
resp, err := s.client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_ = resp.Body.Close()
|
|
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
|
|
return fmt.Errorf("webhook status %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func normalizeLogWebhookConfig(cfg LogWebhookConfig) LogWebhookConfig {
|
|
if cfg.Threshold <= 0 {
|
|
cfg.Threshold = defaultWebhookThreshold
|
|
}
|
|
if cfg.WindowSeconds <= 0 {
|
|
cfg.WindowSeconds = defaultWebhookWindow
|
|
}
|
|
if cfg.CooldownSeconds <= 0 {
|
|
cfg.CooldownSeconds = defaultWebhookCooldown
|
|
}
|
|
if cfg.StatusCodeThreshold <= 0 {
|
|
cfg.StatusCodeThreshold = defaultWebhookStatusCode
|
|
}
|
|
return cfg
|
|
}
|
|
|
|
func isLogWebhookError(rec model.LogRecord, cfg LogWebhookConfig) bool {
|
|
threshold := cfg.StatusCodeThreshold
|
|
if threshold <= 0 {
|
|
threshold = defaultWebhookStatusCode
|
|
}
|
|
if rec.StatusCode >= threshold {
|
|
return true
|
|
}
|
|
return strings.TrimSpace(rec.ErrorMessage) != ""
|
|
}
|