feat(cron): add automatic log cleanup with retention policy

Implement LogCleaner cron job to automatically clean up old log records
based on configurable retention period and maximum record count.

- Add LogCleaner with retention_days and max_records configuration
- Add EZ_LOG_RETENTION_DAYS and EZ_LOG_MAX_RECORDS environment variables
- Default to 30 days retention and 1,000,000 max records
- Include unit tests for log cleaner functionality
This commit is contained in:
zenfun
2025-12-21 12:01:52 +08:00
parent 369c204c16
commit 25795a79d6
4 changed files with 286 additions and 0 deletions

View File

@@ -107,6 +107,10 @@ func main() {
quotaCtx, cancelQuota := context.WithCancel(context.Background()) quotaCtx, cancelQuota := context.WithCancel(context.Background())
defer cancelQuota() defer cancelQuota()
go quotaResetter.Start(quotaCtx) go quotaResetter.Start(quotaCtx)
logCleaner := cron.NewLogCleaner(db, rdb, cfg.Log.RetentionDays, int64(cfg.Log.MaxRecords), time.Hour)
cleanerCtx, cancelCleaner := context.WithCancel(context.Background())
defer cancelCleaner()
go logCleaner.Start(cleanerCtx)
adminService, err := service.NewAdminService() adminService, err := service.NewAdminService()
if err != nil { if err != nil {

View File

@@ -42,6 +42,8 @@ type LogConfig struct {
BatchSize int BatchSize int
FlushInterval time.Duration FlushInterval time.Duration
QueueCapacity int QueueCapacity int
RetentionDays int
MaxRecords int
} }
type ModelRegistryConfig struct { type ModelRegistryConfig struct {
@@ -73,6 +75,8 @@ func Load() (*Config, error) {
v.SetDefault("log.batch_size", 10) v.SetDefault("log.batch_size", 10)
v.SetDefault("log.flush_ms", 1000) v.SetDefault("log.flush_ms", 1000)
v.SetDefault("log.queue_capacity", 10000) v.SetDefault("log.queue_capacity", 10000)
v.SetDefault("log.retention_days", 30)
v.SetDefault("log.max_records", 1000000)
v.SetDefault("auth.jwt_secret", "change_me_in_production") v.SetDefault("auth.jwt_secret", "change_me_in_production")
v.SetDefault("model_registry.enabled", false) v.SetDefault("model_registry.enabled", false)
v.SetDefault("model_registry.refresh_seconds", 1800) v.SetDefault("model_registry.refresh_seconds", 1800)
@@ -95,6 +99,8 @@ func Load() (*Config, error) {
_ = v.BindEnv("log.batch_size", "EZ_LOG_BATCH_SIZE") _ = v.BindEnv("log.batch_size", "EZ_LOG_BATCH_SIZE")
_ = v.BindEnv("log.flush_ms", "EZ_LOG_FLUSH_MS") _ = v.BindEnv("log.flush_ms", "EZ_LOG_FLUSH_MS")
_ = v.BindEnv("log.queue_capacity", "EZ_LOG_QUEUE") _ = v.BindEnv("log.queue_capacity", "EZ_LOG_QUEUE")
_ = v.BindEnv("log.retention_days", "EZ_LOG_RETENTION_DAYS")
_ = v.BindEnv("log.max_records", "EZ_LOG_MAX_RECORDS")
_ = v.BindEnv("auth.jwt_secret", "EZ_JWT_SECRET") _ = v.BindEnv("auth.jwt_secret", "EZ_JWT_SECRET")
_ = v.BindEnv("model_registry.enabled", "EZ_MODEL_REGISTRY_ENABLED") _ = v.BindEnv("model_registry.enabled", "EZ_MODEL_REGISTRY_ENABLED")
_ = v.BindEnv("model_registry.refresh_seconds", "EZ_MODEL_REGISTRY_REFRESH_SECONDS") _ = v.BindEnv("model_registry.refresh_seconds", "EZ_MODEL_REGISTRY_REFRESH_SECONDS")
@@ -137,6 +143,8 @@ func Load() (*Config, error) {
BatchSize: v.GetInt("log.batch_size"), BatchSize: v.GetInt("log.batch_size"),
FlushInterval: time.Duration(v.GetInt("log.flush_ms")) * time.Millisecond, FlushInterval: time.Duration(v.GetInt("log.flush_ms")) * time.Millisecond,
QueueCapacity: v.GetInt("log.queue_capacity"), QueueCapacity: v.GetInt("log.queue_capacity"),
RetentionDays: v.GetInt("log.retention_days"),
MaxRecords: v.GetInt("log.max_records"),
}, },
Auth: AuthConfig{ Auth: AuthConfig{
JWTSecret: v.GetString("auth.jwt_secret"), JWTSecret: v.GetString("auth.jwt_secret"),

View File

@@ -0,0 +1,187 @@
package cron
import (
"context"
"log/slog"
"math"
"strconv"
"strings"
"time"
"github.com/ez-api/ez-api/internal/model"
"github.com/redis/go-redis/v9"
"gorm.io/gorm"
)
const (
logRetentionDaysKey = "meta:log:retention_days"
logMaxRecordsKey = "meta:log:max_records"
)
// LogCleaner deletes old log records using retention days and max records limits.
type LogCleaner struct {
db *gorm.DB
rdb *redis.Client
retentionDays int
maxRecords int64
interval time.Duration
}
func NewLogCleaner(db *gorm.DB, rdb *redis.Client, retentionDays int, maxRecords int64, interval time.Duration) *LogCleaner {
if interval <= 0 {
interval = time.Hour
}
return &LogCleaner{
db: db,
rdb: rdb,
retentionDays: retentionDays,
maxRecords: maxRecords,
interval: interval,
}
}
func (c *LogCleaner) Start(ctx context.Context) {
if c == nil || c.db == nil {
return
}
if ctx == nil {
ctx = context.Background()
}
if err := c.cleanOnce(ctx); err != nil {
slog.Default().Warn("log cleaner run failed", "err", err)
}
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := c.cleanOnce(ctx); err != nil {
slog.Default().Warn("log cleaner run failed", "err", err)
}
}
}
}
func (c *LogCleaner) cleanOnce(ctx context.Context) error {
retentionDays := c.resolveRetentionDays(ctx)
maxRecords := c.resolveMaxRecords(ctx)
if retentionDays <= 0 && maxRecords <= 0 {
return nil
}
var deleted int64
if retentionDays > 0 {
cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays)
res := c.db.Unscoped().Where("created_at < ?", cutoff).Delete(&model.LogRecord{})
if res.Error != nil {
return res.Error
}
deleted += res.RowsAffected
}
if maxRecords > 0 {
if maxRecords > int64(math.MaxInt) {
maxRecords = int64(math.MaxInt)
}
var cutoff struct {
ID uint
}
if err := c.db.Unscoped().
Model(&model.LogRecord{}).
Select("id").
Order("id desc").
Offset(int(maxRecords - 1)).
Limit(1).
Scan(&cutoff).Error; err != nil {
return err
}
if cutoff.ID > 0 {
res := c.db.Unscoped().Where("id < ?", cutoff.ID).Delete(&model.LogRecord{})
if res.Error != nil {
return res.Error
}
deleted += res.RowsAffected
}
}
if deleted > 0 {
slog.Default().Info("log cleanup completed", "deleted", deleted, "retention_days", retentionDays, "max_records", maxRecords)
}
return nil
}
func (c *LogCleaner) resolveRetentionDays(ctx context.Context) int {
days := c.retentionDays
if days < 0 {
days = 0
}
if c.rdb == nil {
return days
}
raw, err := c.rdb.Get(ctx, logRetentionDaysKey).Result()
if err == redis.Nil {
return days
}
if err != nil {
slog.Default().Warn("log cleaner failed to read retention_days", "err", err)
return days
}
if v, ok := parsePositiveInt(raw); ok {
return v
}
slog.Default().Warn("log cleaner invalid retention_days", "value", raw)
return days
}
func (c *LogCleaner) resolveMaxRecords(ctx context.Context) int64 {
max := c.maxRecords
if max < 0 {
max = 0
}
if c.rdb == nil {
return max
}
raw, err := c.rdb.Get(ctx, logMaxRecordsKey).Result()
if err == redis.Nil {
return max
}
if err != nil {
slog.Default().Warn("log cleaner failed to read max_records", "err", err)
return max
}
if v, ok := parsePositiveInt64(raw); ok {
return v
}
slog.Default().Warn("log cleaner invalid max_records", "value", raw)
return max
}
func parsePositiveInt(raw string) (int, bool) {
raw = strings.TrimSpace(raw)
if raw == "" {
return 0, false
}
v, err := strconv.Atoi(raw)
if err != nil || v <= 0 {
return 0, false
}
return v, true
}
func parsePositiveInt64(raw string) (int64, bool) {
raw = strings.TrimSpace(raw)
if raw == "" {
return 0, false
}
v, err := strconv.ParseInt(raw, 10, 64)
if err != nil || v <= 0 {
return 0, false
}
return v, true
}

View File

@@ -0,0 +1,87 @@
package cron
import (
"context"
"fmt"
"testing"
"time"
"github.com/ez-api/ez-api/internal/model"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func TestLogCleanerRetentionDeletesOld(t *testing.T) {
dsn := fmt.Sprintf("file:%s?mode=memory&cache=shared", t.Name())
db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{})
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
if err := db.AutoMigrate(&model.LogRecord{}); err != nil {
t.Fatalf("migrate: %v", err)
}
now := time.Now().UTC()
old := model.LogRecord{ModelName: "m1", StatusCode: 200}
old.CreatedAt = now.Add(-48 * time.Hour)
fresh := model.LogRecord{ModelName: "m1", StatusCode: 200}
fresh.CreatedAt = now.Add(-2 * time.Hour)
if err := db.Create(&old).Error; err != nil {
t.Fatalf("create old: %v", err)
}
if err := db.Create(&fresh).Error; err != nil {
t.Fatalf("create fresh: %v", err)
}
cleaner := NewLogCleaner(db, nil, 1, 0, time.Minute)
if err := cleaner.cleanOnce(context.Background()); err != nil {
t.Fatalf("clean once: %v", err)
}
var remaining int64
if err := db.Model(&model.LogRecord{}).Count(&remaining).Error; err != nil {
t.Fatalf("count logs: %v", err)
}
if remaining != 1 {
t.Fatalf("expected 1 log remaining, got %d", remaining)
}
}
func TestLogCleanerMaxRecordsKeepsLatest(t *testing.T) {
dsn := fmt.Sprintf("file:%s?mode=memory&cache=shared", t.Name())
db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{})
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
if err := db.AutoMigrate(&model.LogRecord{}); err != nil {
t.Fatalf("migrate: %v", err)
}
for i := 0; i < 5; i++ {
if err := db.Create(&model.LogRecord{ModelName: "m1", StatusCode: 200}).Error; err != nil {
t.Fatalf("create log %d: %v", i, err)
}
}
cleaner := NewLogCleaner(db, nil, 0, 3, time.Minute)
if err := cleaner.cleanOnce(context.Background()); err != nil {
t.Fatalf("clean once: %v", err)
}
var remaining int64
if err := db.Model(&model.LogRecord{}).Count(&remaining).Error; err != nil {
t.Fatalf("count logs: %v", err)
}
if remaining != 3 {
t.Fatalf("expected 3 logs remaining, got %d", remaining)
}
var min model.LogRecord
if err := db.Order("id asc").First(&min).Error; err != nil {
t.Fatalf("fetch min log: %v", err)
}
if min.ID < 3 {
t.Fatalf("expected min id >= 3, got %d", min.ID)
}
}