From 25795a79d6880189370886bcf1b2661b59389520 Mon Sep 17 00:00:00 2001 From: zenfun Date: Sun, 21 Dec 2025 12:01:52 +0800 Subject: [PATCH] feat(cron): add automatic log cleanup with retention policy Implement LogCleaner cron job to automatically clean up old log records based on configurable retention period and maximum record count. - Add LogCleaner with retention_days and max_records configuration - Add EZ_LOG_RETENTION_DAYS and EZ_LOG_MAX_RECORDS environment variables - Default to 30 days retention and 1,000,000 max records - Include unit tests for log cleaner functionality --- cmd/server/main.go | 4 + internal/config/config.go | 8 ++ internal/cron/log_cleaner.go | 187 ++++++++++++++++++++++++++++++ internal/cron/log_cleaner_test.go | 87 ++++++++++++++ 4 files changed, 286 insertions(+) create mode 100644 internal/cron/log_cleaner.go create mode 100644 internal/cron/log_cleaner_test.go diff --git a/cmd/server/main.go b/cmd/server/main.go index f456bc7..0c3282b 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -107,6 +107,10 @@ func main() { quotaCtx, cancelQuota := context.WithCancel(context.Background()) defer cancelQuota() go quotaResetter.Start(quotaCtx) + logCleaner := cron.NewLogCleaner(db, rdb, cfg.Log.RetentionDays, int64(cfg.Log.MaxRecords), time.Hour) + cleanerCtx, cancelCleaner := context.WithCancel(context.Background()) + defer cancelCleaner() + go logCleaner.Start(cleanerCtx) adminService, err := service.NewAdminService() if err != nil { diff --git a/internal/config/config.go b/internal/config/config.go index 8bdb1af..59d333a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -42,6 +42,8 @@ type LogConfig struct { BatchSize int FlushInterval time.Duration QueueCapacity int + RetentionDays int + MaxRecords int } type ModelRegistryConfig struct { @@ -73,6 +75,8 @@ func Load() (*Config, error) { v.SetDefault("log.batch_size", 10) v.SetDefault("log.flush_ms", 1000) v.SetDefault("log.queue_capacity", 10000) + v.SetDefault("log.retention_days", 30) + v.SetDefault("log.max_records", 1000000) v.SetDefault("auth.jwt_secret", "change_me_in_production") v.SetDefault("model_registry.enabled", false) v.SetDefault("model_registry.refresh_seconds", 1800) @@ -95,6 +99,8 @@ func Load() (*Config, error) { _ = v.BindEnv("log.batch_size", "EZ_LOG_BATCH_SIZE") _ = v.BindEnv("log.flush_ms", "EZ_LOG_FLUSH_MS") _ = v.BindEnv("log.queue_capacity", "EZ_LOG_QUEUE") + _ = v.BindEnv("log.retention_days", "EZ_LOG_RETENTION_DAYS") + _ = v.BindEnv("log.max_records", "EZ_LOG_MAX_RECORDS") _ = v.BindEnv("auth.jwt_secret", "EZ_JWT_SECRET") _ = v.BindEnv("model_registry.enabled", "EZ_MODEL_REGISTRY_ENABLED") _ = v.BindEnv("model_registry.refresh_seconds", "EZ_MODEL_REGISTRY_REFRESH_SECONDS") @@ -137,6 +143,8 @@ func Load() (*Config, error) { BatchSize: v.GetInt("log.batch_size"), FlushInterval: time.Duration(v.GetInt("log.flush_ms")) * time.Millisecond, QueueCapacity: v.GetInt("log.queue_capacity"), + RetentionDays: v.GetInt("log.retention_days"), + MaxRecords: v.GetInt("log.max_records"), }, Auth: AuthConfig{ JWTSecret: v.GetString("auth.jwt_secret"), diff --git a/internal/cron/log_cleaner.go b/internal/cron/log_cleaner.go new file mode 100644 index 0000000..d6b20c8 --- /dev/null +++ b/internal/cron/log_cleaner.go @@ -0,0 +1,187 @@ +package cron + +import ( + "context" + "log/slog" + "math" + "strconv" + "strings" + "time" + + "github.com/ez-api/ez-api/internal/model" + "github.com/redis/go-redis/v9" + "gorm.io/gorm" +) + +const ( + logRetentionDaysKey = "meta:log:retention_days" + logMaxRecordsKey = "meta:log:max_records" +) + +// LogCleaner deletes old log records using retention days and max records limits. +type LogCleaner struct { + db *gorm.DB + rdb *redis.Client + retentionDays int + maxRecords int64 + interval time.Duration +} + +func NewLogCleaner(db *gorm.DB, rdb *redis.Client, retentionDays int, maxRecords int64, interval time.Duration) *LogCleaner { + if interval <= 0 { + interval = time.Hour + } + return &LogCleaner{ + db: db, + rdb: rdb, + retentionDays: retentionDays, + maxRecords: maxRecords, + interval: interval, + } +} + +func (c *LogCleaner) Start(ctx context.Context) { + if c == nil || c.db == nil { + return + } + if ctx == nil { + ctx = context.Background() + } + + if err := c.cleanOnce(ctx); err != nil { + slog.Default().Warn("log cleaner run failed", "err", err) + } + + ticker := time.NewTicker(c.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := c.cleanOnce(ctx); err != nil { + slog.Default().Warn("log cleaner run failed", "err", err) + } + } + } +} + +func (c *LogCleaner) cleanOnce(ctx context.Context) error { + retentionDays := c.resolveRetentionDays(ctx) + maxRecords := c.resolveMaxRecords(ctx) + + if retentionDays <= 0 && maxRecords <= 0 { + return nil + } + + var deleted int64 + if retentionDays > 0 { + cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays) + res := c.db.Unscoped().Where("created_at < ?", cutoff).Delete(&model.LogRecord{}) + if res.Error != nil { + return res.Error + } + deleted += res.RowsAffected + } + + if maxRecords > 0 { + if maxRecords > int64(math.MaxInt) { + maxRecords = int64(math.MaxInt) + } + var cutoff struct { + ID uint + } + if err := c.db.Unscoped(). + Model(&model.LogRecord{}). + Select("id"). + Order("id desc"). + Offset(int(maxRecords - 1)). + Limit(1). + Scan(&cutoff).Error; err != nil { + return err + } + if cutoff.ID > 0 { + res := c.db.Unscoped().Where("id < ?", cutoff.ID).Delete(&model.LogRecord{}) + if res.Error != nil { + return res.Error + } + deleted += res.RowsAffected + } + } + + if deleted > 0 { + slog.Default().Info("log cleanup completed", "deleted", deleted, "retention_days", retentionDays, "max_records", maxRecords) + } + + return nil +} + +func (c *LogCleaner) resolveRetentionDays(ctx context.Context) int { + days := c.retentionDays + if days < 0 { + days = 0 + } + if c.rdb == nil { + return days + } + raw, err := c.rdb.Get(ctx, logRetentionDaysKey).Result() + if err == redis.Nil { + return days + } + if err != nil { + slog.Default().Warn("log cleaner failed to read retention_days", "err", err) + return days + } + if v, ok := parsePositiveInt(raw); ok { + return v + } + slog.Default().Warn("log cleaner invalid retention_days", "value", raw) + return days +} + +func (c *LogCleaner) resolveMaxRecords(ctx context.Context) int64 { + max := c.maxRecords + if max < 0 { + max = 0 + } + if c.rdb == nil { + return max + } + raw, err := c.rdb.Get(ctx, logMaxRecordsKey).Result() + if err == redis.Nil { + return max + } + if err != nil { + slog.Default().Warn("log cleaner failed to read max_records", "err", err) + return max + } + if v, ok := parsePositiveInt64(raw); ok { + return v + } + slog.Default().Warn("log cleaner invalid max_records", "value", raw) + return max +} + +func parsePositiveInt(raw string) (int, bool) { + raw = strings.TrimSpace(raw) + if raw == "" { + return 0, false + } + v, err := strconv.Atoi(raw) + if err != nil || v <= 0 { + return 0, false + } + return v, true +} + +func parsePositiveInt64(raw string) (int64, bool) { + raw = strings.TrimSpace(raw) + if raw == "" { + return 0, false + } + v, err := strconv.ParseInt(raw, 10, 64) + if err != nil || v <= 0 { + return 0, false + } + return v, true +} diff --git a/internal/cron/log_cleaner_test.go b/internal/cron/log_cleaner_test.go new file mode 100644 index 0000000..ff37868 --- /dev/null +++ b/internal/cron/log_cleaner_test.go @@ -0,0 +1,87 @@ +package cron + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/ez-api/ez-api/internal/model" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func TestLogCleanerRetentionDeletesOld(t *testing.T) { + dsn := fmt.Sprintf("file:%s?mode=memory&cache=shared", t.Name()) + db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{}) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + if err := db.AutoMigrate(&model.LogRecord{}); err != nil { + t.Fatalf("migrate: %v", err) + } + + now := time.Now().UTC() + old := model.LogRecord{ModelName: "m1", StatusCode: 200} + old.CreatedAt = now.Add(-48 * time.Hour) + fresh := model.LogRecord{ModelName: "m1", StatusCode: 200} + fresh.CreatedAt = now.Add(-2 * time.Hour) + + if err := db.Create(&old).Error; err != nil { + t.Fatalf("create old: %v", err) + } + if err := db.Create(&fresh).Error; err != nil { + t.Fatalf("create fresh: %v", err) + } + + cleaner := NewLogCleaner(db, nil, 1, 0, time.Minute) + if err := cleaner.cleanOnce(context.Background()); err != nil { + t.Fatalf("clean once: %v", err) + } + + var remaining int64 + if err := db.Model(&model.LogRecord{}).Count(&remaining).Error; err != nil { + t.Fatalf("count logs: %v", err) + } + if remaining != 1 { + t.Fatalf("expected 1 log remaining, got %d", remaining) + } +} + +func TestLogCleanerMaxRecordsKeepsLatest(t *testing.T) { + dsn := fmt.Sprintf("file:%s?mode=memory&cache=shared", t.Name()) + db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{}) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + if err := db.AutoMigrate(&model.LogRecord{}); err != nil { + t.Fatalf("migrate: %v", err) + } + + for i := 0; i < 5; i++ { + if err := db.Create(&model.LogRecord{ModelName: "m1", StatusCode: 200}).Error; err != nil { + t.Fatalf("create log %d: %v", i, err) + } + } + + cleaner := NewLogCleaner(db, nil, 0, 3, time.Minute) + if err := cleaner.cleanOnce(context.Background()); err != nil { + t.Fatalf("clean once: %v", err) + } + + var remaining int64 + if err := db.Model(&model.LogRecord{}).Count(&remaining).Error; err != nil { + t.Fatalf("count logs: %v", err) + } + if remaining != 3 { + t.Fatalf("expected 3 logs remaining, got %d", remaining) + } + + var min model.LogRecord + if err := db.Order("id asc").First(&min).Error; err != nil { + t.Fatalf("fetch min log: %v", err) + } + if min.ID < 3 { + t.Fatalf("expected min id >= 3, got %d", min.ID) + } +}