Files
ez-api/internal/cron/log_cleaner.go
zenfun 05caed37c2 refactor(cron): migrate cron jobs to foundation scheduler
Replace custom goroutine-based scheduling in cron jobs with centralized
foundation scheduler. Each cron job now exposes a RunOnce method called
by the scheduler instead of managing its own ticker loop.

Changes:
- Remove interval/enabled config from cron job structs
- Convert Start() methods to RunOnce() for all cron jobs
- Add scheduler setup in main.go with configurable intervals
- Update foundation dependency to v0.6.0 for scheduler support
- Update tests to validate RunOnce nil-safety
2025-12-31 20:42:25 +08:00

232 lines
5.2 KiB
Go

package cron
import (
"context"
"log/slog"
"math"
"strconv"
"strings"
"time"
"github.com/ez-api/ez-api/internal/model"
"github.com/ez-api/ez-api/internal/service"
"github.com/redis/go-redis/v9"
"gorm.io/gorm"
)
const (
logRetentionDaysKey = "meta:log:retention_days"
logMaxRecordsKey = "meta:log:max_records"
)
// LogCleaner deletes old log records using retention days and max records limits.
type LogCleaner struct {
db *gorm.DB
rdb *redis.Client
retentionDays int
maxRecords int64
partitioner *service.LogPartitioner
}
func NewLogCleaner(db *gorm.DB, rdb *redis.Client, retentionDays int, maxRecords int64, partitioner *service.LogPartitioner) *LogCleaner {
return &LogCleaner{
db: db,
rdb: rdb,
retentionDays: retentionDays,
maxRecords: maxRecords,
partitioner: partitioner,
}
}
// RunOnce executes a single log cleanup. Called by scheduler.
func (c *LogCleaner) RunOnce(ctx context.Context) {
if c == nil || c.db == nil {
return
}
if err := c.cleanOnce(ctx); err != nil {
slog.Default().Warn("log cleaner run failed", "err", err)
}
}
func (c *LogCleaner) cleanOnce(ctx context.Context) error {
retentionDays := c.resolveRetentionDays(ctx)
maxRecords := c.resolveMaxRecords(ctx)
if retentionDays <= 0 && maxRecords <= 0 {
return nil
}
if c.partitioner != nil && c.partitioner.Enabled() {
return c.cleanPartitioned(retentionDays, maxRecords)
}
var deleted int64
if retentionDays > 0 {
cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays)
res := c.db.Unscoped().Where("created_at < ?", cutoff).Delete(&model.LogRecord{})
if res.Error != nil {
return res.Error
}
deleted += res.RowsAffected
}
if maxRecords > 0 {
if maxRecords > int64(math.MaxInt) {
maxRecords = int64(math.MaxInt)
}
var cutoff struct {
ID uint
}
if err := c.db.Unscoped().
Model(&model.LogRecord{}).
Select("id").
Order("id desc").
Offset(int(maxRecords - 1)).
Limit(1).
Scan(&cutoff).Error; err != nil {
return err
}
if cutoff.ID > 0 {
res := c.db.Unscoped().Where("id < ?", cutoff.ID).Delete(&model.LogRecord{})
if res.Error != nil {
return res.Error
}
deleted += res.RowsAffected
}
}
if deleted > 0 {
slog.Default().Info("log cleanup completed", "deleted", deleted, "retention_days", retentionDays, "max_records", maxRecords)
}
return nil
}
func (c *LogCleaner) cleanPartitioned(retentionDays int, maxRecords int64) error {
var deleted int64
if retentionDays > 0 {
cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays)
dropped, err := c.partitioner.DropPartitionsBefore(cutoff)
if err != nil {
return err
}
if dropped > 0 {
slog.Default().Info("log partition cleanup completed", "dropped_tables", dropped, "retention_days", retentionDays)
}
table, err := c.partitioner.EnsurePartitionFor(time.Now().UTC())
if err != nil {
return err
}
res := c.db.Table(table).Unscoped().Where("created_at < ?", cutoff).Delete(&model.LogRecord{})
if res.Error != nil {
return res.Error
}
deleted += res.RowsAffected
}
if maxRecords > 0 {
if maxRecords > int64(math.MaxInt) {
maxRecords = int64(math.MaxInt)
}
table, err := c.partitioner.EnsurePartitionFor(time.Now().UTC())
if err != nil {
return err
}
var cutoff struct {
ID uint
}
if err := c.db.Unscoped().
Table(table).
Select("id").
Order("id desc").
Offset(int(maxRecords - 1)).
Limit(1).
Scan(&cutoff).Error; err != nil {
return err
}
if cutoff.ID > 0 {
res := c.db.Table(table).Unscoped().Where("id < ?", cutoff.ID).Delete(&model.LogRecord{})
if res.Error != nil {
return res.Error
}
deleted += res.RowsAffected
}
}
if deleted > 0 {
slog.Default().Info("log cleanup completed", "deleted", deleted, "retention_days", retentionDays, "max_records", maxRecords)
}
return nil
}
func (c *LogCleaner) resolveRetentionDays(ctx context.Context) int {
days := c.retentionDays
if days < 0 {
days = 0
}
if c.rdb == nil {
return days
}
raw, err := c.rdb.Get(ctx, logRetentionDaysKey).Result()
if err == redis.Nil {
return days
}
if err != nil {
slog.Default().Warn("log cleaner failed to read retention_days", "err", err)
return days
}
if v, ok := parsePositiveInt(raw); ok {
return v
}
slog.Default().Warn("log cleaner invalid retention_days", "value", raw)
return days
}
func (c *LogCleaner) resolveMaxRecords(ctx context.Context) int64 {
max := c.maxRecords
if max < 0 {
max = 0
}
if c.rdb == nil {
return max
}
raw, err := c.rdb.Get(ctx, logMaxRecordsKey).Result()
if err == redis.Nil {
return max
}
if err != nil {
slog.Default().Warn("log cleaner failed to read max_records", "err", err)
return max
}
if v, ok := parsePositiveInt64(raw); ok {
return v
}
slog.Default().Warn("log cleaner invalid max_records", "value", raw)
return max
}
func parsePositiveInt(raw string) (int, bool) {
raw = strings.TrimSpace(raw)
if raw == "" {
return 0, false
}
v, err := strconv.Atoi(raw)
if err != nil || v <= 0 {
return 0, false
}
return v, true
}
func parsePositiveInt64(raw string) (int64, bool) {
raw = strings.TrimSpace(raw)
if raw == "" {
return 0, false
}
v, err := strconv.ParseInt(raw, 10, 64)
if err != nil || v <= 0 {
return 0, false
}
return v, true
}