diff --git a/cmd/server/main.go b/cmd/server/main.go index e735583..86e43cf 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -131,7 +131,13 @@ func main() { // 4. Setup Services and Handlers syncService := service.NewSyncService(rdb) - logWriter := service.NewLogWriter(logDB, cfg.Log.QueueCapacity, cfg.Log.BatchSize, cfg.Log.FlushInterval) + logPartitioner := service.NewLogPartitioner(logDB, cfg.Log.Partitioning) + if logPartitioner.Enabled() { + if _, err := logPartitioner.EnsurePartitionFor(time.Now().UTC()); err != nil { + fatal(logger, "failed to ensure log partition", "err", err) + } + } + logWriter := service.NewLogWriter(logDB, cfg.Log.QueueCapacity, cfg.Log.BatchSize, cfg.Log.FlushInterval, logPartitioner) logCtx, cancelLogs := context.WithCancel(context.Background()) defer cancelLogs() logWriter.Start(logCtx) @@ -139,7 +145,7 @@ func main() { quotaCtx, cancelQuota := context.WithCancel(context.Background()) defer cancelQuota() go quotaResetter.Start(quotaCtx) - logCleaner := cron.NewLogCleaner(logDB, rdb, cfg.Log.RetentionDays, int64(cfg.Log.MaxRecords), time.Hour) + logCleaner := cron.NewLogCleaner(logDB, rdb, cfg.Log.RetentionDays, int64(cfg.Log.MaxRecords), time.Hour, logPartitioner) cleanerCtx, cancelCleaner := context.WithCancel(context.Background()) defer cancelCleaner() go logCleaner.Start(cleanerCtx) @@ -152,9 +158,9 @@ func main() { healthService := service.NewHealthCheckService(db, rdb) healthHandler := api.NewHealthHandler(healthService) - handler := api.NewHandler(db, logDB, syncService, logWriter, rdb) - adminHandler := api.NewAdminHandler(db, logDB, masterService, syncService) - masterHandler := api.NewMasterHandler(db, logDB, masterService, syncService) + handler := api.NewHandler(db, logDB, syncService, logWriter, rdb, logPartitioner) + adminHandler := api.NewAdminHandler(db, logDB, masterService, syncService, logPartitioner) + masterHandler := api.NewMasterHandler(db, logDB, masterService, syncService, logPartitioner) internalHandler := api.NewInternalHandler(db) featureHandler := api.NewFeatureHandler(rdb) modelRegistryService := service.NewModelRegistryService(db, rdb, service.ModelRegistryConfig{ diff --git a/internal/api/admin_handler.go b/internal/api/admin_handler.go index 9eeb516..cc30a4b 100644 --- a/internal/api/admin_handler.go +++ b/internal/api/admin_handler.go @@ -13,17 +13,18 @@ import ( ) type AdminHandler struct { - db *gorm.DB - logDB *gorm.DB - masterService *service.MasterService - syncService *service.SyncService + db *gorm.DB + logDB *gorm.DB + masterService *service.MasterService + syncService *service.SyncService + logPartitioner *service.LogPartitioner } -func NewAdminHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService) *AdminHandler { +func NewAdminHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService, partitioner *service.LogPartitioner) *AdminHandler { if logDB == nil { logDB = db } - return &AdminHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService} + return &AdminHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService, logPartitioner: partitioner} } func (h *AdminHandler) logDBConn() *gorm.DB { @@ -33,6 +34,10 @@ func (h *AdminHandler) logDBConn() *gorm.DB { return h.logDB } +func (h *AdminHandler) logBaseQuery() *gorm.DB { + return logBaseQuery(h.logDBConn(), h.logPartitioner) +} + type CreateMasterRequest struct { Name string `json:"name" binding:"required"` Group string `json:"group" binding:"required"` diff --git a/internal/api/admin_issue_key_test.go b/internal/api/admin_issue_key_test.go index 92d4700..e4dd98b 100644 --- a/internal/api/admin_issue_key_test.go +++ b/internal/api/admin_issue_key_test.go @@ -34,7 +34,7 @@ func TestAdmin_IssueChildKeyForMaster_IssuedByAdminAndSynced(t *testing.T) { syncService := service.NewSyncService(rdb) masterService := service.NewMasterService(db) - adminHandler := NewAdminHandler(db, db, masterService, syncService) + adminHandler := NewAdminHandler(db, db, masterService, syncService, nil) m, _, err := masterService.CreateMaster("m1", "default", 5, 10) if err != nil { diff --git a/internal/api/handler.go b/internal/api/handler.go index 0bc0a5d..b5609aa 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -17,25 +17,27 @@ import ( ) type Handler struct { - db *gorm.DB - logDB *gorm.DB - sync *service.SyncService - logger *service.LogWriter - rdb *redis.Client - logWebhook *service.LogWebhookService + db *gorm.DB + logDB *gorm.DB + sync *service.SyncService + logger *service.LogWriter + rdb *redis.Client + logWebhook *service.LogWebhookService + logPartitioner *service.LogPartitioner } -func NewHandler(db *gorm.DB, logDB *gorm.DB, sync *service.SyncService, logger *service.LogWriter, rdb *redis.Client) *Handler { +func NewHandler(db *gorm.DB, logDB *gorm.DB, sync *service.SyncService, logger *service.LogWriter, rdb *redis.Client, partitioner *service.LogPartitioner) *Handler { if logDB == nil { logDB = db } return &Handler{ - db: db, - logDB: logDB, - sync: sync, - logger: logger, - rdb: rdb, - logWebhook: service.NewLogWebhookService(rdb), + db: db, + logDB: logDB, + sync: sync, + logger: logger, + rdb: rdb, + logWebhook: service.NewLogWebhookService(rdb), + logPartitioner: partitioner, } } @@ -46,6 +48,10 @@ func (h *Handler) logDBConn() *gorm.DB { return h.logDB } +func (h *Handler) logBaseQuery() *gorm.DB { + return logBaseQuery(h.logDBConn(), h.logPartitioner) +} + // CreateKey is now handled by MasterHandler // CreateProvider godoc diff --git a/internal/api/log_handler.go b/internal/api/log_handler.go index c7e89c2..133314e 100644 --- a/internal/api/log_handler.go +++ b/internal/api/log_handler.go @@ -132,8 +132,9 @@ func parseUnixSeconds(raw string) (time.Time, bool) { func (h *MasterHandler) masterLogBase(masterID uint) (*gorm.DB, error) { logDB := h.logDBConn() + base := h.logBaseQuery() if logDB == h.db { - return logDB.Model(&model.LogRecord{}). + return base. Joins("JOIN keys ON keys.id = log_records.key_id"). Where("keys.master_id = ?", masterID), nil } @@ -144,9 +145,9 @@ func (h *MasterHandler) masterLogBase(masterID uint) (*gorm.DB, error) { return nil, err } if len(keyIDs) == 0 { - return logDB.Model(&model.LogRecord{}).Where("1 = 0"), nil + return base.Where("1 = 0"), nil } - return logDB.Model(&model.LogRecord{}). + return base. Where("log_records.key_id IN ?", keyIDs), nil } @@ -170,7 +171,7 @@ func (h *MasterHandler) masterLogBase(masterID uint) (*gorm.DB, error) { func (h *Handler) ListLogs(c *gin.Context) { limit, offset := parseLimitOffset(c) - q := h.logDBConn().Model(&model.LogRecord{}) + q := h.logBaseQuery() if t, ok := parseUnixSeconds(c.Query("since")); ok { q = q.Where("created_at >= ?", t) @@ -261,20 +262,63 @@ func (h *Handler) DeleteLogs(c *gin.Context) { return } - q := h.logDBConn().Unscoped().Where("created_at < ?", ts.UTC()) - if req.KeyID > 0 { - q = q.Where("key_id = ?", req.KeyID) - } - if model := strings.TrimSpace(req.Model); model != "" { - q = q.Where("model_name = ?", model) - } - - res := q.Delete(&model.LogRecord{}) - if res.Error != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete logs", "details": res.Error.Error()}) + deleted, err := h.deleteLogsBefore(ts.UTC(), req.KeyID, req.Model) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete logs", "details": err.Error()}) return } - c.JSON(http.StatusOK, DeleteLogsResponse{DeletedCount: res.RowsAffected}) + c.JSON(http.StatusOK, DeleteLogsResponse{DeletedCount: deleted}) +} + +func (h *Handler) deleteLogsBefore(cutoff time.Time, keyID uint, modelName string) (int64, error) { + modelName = strings.TrimSpace(modelName) + if h == nil || h.logPartitioner == nil || !h.logPartitioner.Enabled() { + q := h.logBaseQuery().Unscoped().Where("created_at < ?", cutoff) + if keyID > 0 { + q = q.Where("key_id = ?", keyID) + } + if modelName != "" { + q = q.Where("model_name = ?", modelName) + } + res := q.Delete(&model.LogRecord{}) + return res.RowsAffected, res.Error + } + + partitions, err := h.logPartitioner.ListPartitions() + if err != nil { + return 0, err + } + if len(partitions) == 0 { + q := h.logDBConn().Table("log_records").Unscoped().Where("created_at < ?", cutoff) + if keyID > 0 { + q = q.Where("key_id = ?", keyID) + } + if modelName != "" { + q = q.Where("model_name = ?", modelName) + } + res := q.Delete(&model.LogRecord{}) + return res.RowsAffected, res.Error + } + + var deleted int64 + for _, part := range partitions { + if !part.Start.Before(cutoff) { + continue + } + q := h.logDBConn().Table(part.Table).Unscoped().Where("created_at < ?", cutoff) + if keyID > 0 { + q = q.Where("key_id = ?", keyID) + } + if modelName != "" { + q = q.Where("model_name = ?", modelName) + } + res := q.Delete(&model.LogRecord{}) + if res.Error != nil { + return deleted, res.Error + } + deleted += res.RowsAffected + } + return deleted, nil } // LogStats godoc @@ -289,7 +333,7 @@ func (h *Handler) DeleteLogs(c *gin.Context) { // @Failure 500 {object} gin.H // @Router /admin/logs/stats [get] func (h *Handler) LogStats(c *gin.Context) { - q := h.logDBConn().Model(&model.LogRecord{}) + q := h.logBaseQuery() if t, ok := parseUnixSeconds(c.Query("since")); ok { q = q.Where("created_at >= ?", t) } diff --git a/internal/api/log_query.go b/internal/api/log_query.go new file mode 100644 index 0000000..e39d913 --- /dev/null +++ b/internal/api/log_query.go @@ -0,0 +1,17 @@ +package api + +import ( + "github.com/ez-api/ez-api/internal/model" + "github.com/ez-api/ez-api/internal/service" + "gorm.io/gorm" +) + +func logBaseQuery(db *gorm.DB, partitioner *service.LogPartitioner) *gorm.DB { + if db == nil { + return db + } + if partitioner != nil && partitioner.Enabled() { + return db.Table(partitioner.ViewName() + " as log_records") + } + return db.Model(&model.LogRecord{}) +} diff --git a/internal/api/log_webhook_handler_test.go b/internal/api/log_webhook_handler_test.go index fec97d4..bdc3a8c 100644 --- a/internal/api/log_webhook_handler_test.go +++ b/internal/api/log_webhook_handler_test.go @@ -33,7 +33,7 @@ func newTestHandlerWithWebhook(t *testing.T) (*Handler, *miniredis.Miniredis) { mr := miniredis.RunT(t) rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) sync := service.NewSyncService(rdb) - return NewHandler(db, db, sync, nil, rdb), mr + return NewHandler(db, db, sync, nil, rdb, nil), mr } func TestLogWebhookConfigCRUD(t *testing.T) { diff --git a/internal/api/master_handler.go b/internal/api/master_handler.go index 5f6e3b3..287c75f 100644 --- a/internal/api/master_handler.go +++ b/internal/api/master_handler.go @@ -14,17 +14,18 @@ import ( ) type MasterHandler struct { - db *gorm.DB - logDB *gorm.DB - masterService *service.MasterService - syncService *service.SyncService + db *gorm.DB + logDB *gorm.DB + masterService *service.MasterService + syncService *service.SyncService + logPartitioner *service.LogPartitioner } -func NewMasterHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService) *MasterHandler { +func NewMasterHandler(db *gorm.DB, logDB *gorm.DB, masterService *service.MasterService, syncService *service.SyncService, partitioner *service.LogPartitioner) *MasterHandler { if logDB == nil { logDB = db } - return &MasterHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService} + return &MasterHandler{db: db, logDB: logDB, masterService: masterService, syncService: syncService, logPartitioner: partitioner} } func (h *MasterHandler) logDBConn() *gorm.DB { @@ -34,6 +35,10 @@ func (h *MasterHandler) logDBConn() *gorm.DB { return h.logDB } +func (h *MasterHandler) logBaseQuery() *gorm.DB { + return logBaseQuery(h.logDBConn(), h.logPartitioner) +} + type IssueChildKeyRequest struct { Group string `json:"group"` Scopes string `json:"scopes"` diff --git a/internal/api/master_tokens_handler_test.go b/internal/api/master_tokens_handler_test.go index 624763b..8980617 100644 --- a/internal/api/master_tokens_handler_test.go +++ b/internal/api/master_tokens_handler_test.go @@ -42,7 +42,7 @@ func TestMaster_ListTokens_AndUpdateToken(t *testing.T) { rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) syncSvc := service.NewSyncService(rdb) masterSvc := service.NewMasterService(db) - h := NewMasterHandler(db, db, masterSvc, syncSvc) + h := NewMasterHandler(db, db, masterSvc, syncSvc, nil) withMaster := func(next gin.HandlerFunc) gin.HandlerFunc { return func(c *gin.Context) { diff --git a/internal/api/model_handler_test.go b/internal/api/model_handler_test.go index aab5bcb..9fa4003 100644 --- a/internal/api/model_handler_test.go +++ b/internal/api/model_handler_test.go @@ -33,7 +33,7 @@ func newTestHandlerWithRedis(t *testing.T) (*Handler, *gorm.DB, *miniredis.Minir mr := miniredis.RunT(t) rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) sync := service.NewSyncService(rdb) - return NewHandler(db, db, sync, nil, rdb), db, mr + return NewHandler(db, db, sync, nil, rdb, nil), db, mr } func TestCreateModel_DefaultsKindChat_AndWritesModelsMeta(t *testing.T) { diff --git a/internal/api/provider_admin_handler.go b/internal/api/provider_admin_handler.go index b68d4ad..408aa99 100644 --- a/internal/api/provider_admin_handler.go +++ b/internal/api/provider_admin_handler.go @@ -86,10 +86,12 @@ func (h *Handler) DeleteProvider(c *gin.Context) { return } - // Full sync is the simplest safe option because provider deletion needs to remove - // stale entries in Redis snapshots (config:providers) and refresh bindings. - if err := h.sync.SyncAll(h.db); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync snapshots", "details": err.Error()}) + if err := h.sync.SyncProviderDelete(&p); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync provider delete", "details": err.Error()}) + return + } + if err := h.sync.SyncBindings(h.db); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to sync bindings", "details": err.Error()}) return } diff --git a/internal/api/provider_handler_test.go b/internal/api/provider_handler_test.go index 08a7da2..b0508e5 100644 --- a/internal/api/provider_handler_test.go +++ b/internal/api/provider_handler_test.go @@ -36,7 +36,7 @@ func newTestHandler(t *testing.T) (*Handler, *gorm.DB) { rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) sync := service.NewSyncService(rdb) - return NewHandler(db, db, sync, nil, rdb), db + return NewHandler(db, db, sync, nil, rdb, nil), db } func TestCreateProvider_DefaultsVertexLocationGlobal(t *testing.T) { diff --git a/internal/api/stats_handler.go b/internal/api/stats_handler.go index 4bafab9..ca9ba84 100644 --- a/internal/api/stats_handler.go +++ b/internal/api/stats_handler.go @@ -61,7 +61,7 @@ func (h *MasterHandler) GetSelfStats(c *gin.Context) { } logDB := h.logDBConn() - base := logDB.Model(&model.LogRecord{}) + base := h.logBaseQuery() if logDB == h.db { base = base.Joins("JOIN keys ON keys.id = log_records.key_id"). Where("keys.master_id = ?", m.ID) @@ -170,7 +170,7 @@ func (h *AdminHandler) GetAdminStats(c *gin.Context) { } logDB := h.logDBConn() - base := logDB.Model(&model.LogRecord{}) + base := h.logBaseQuery() base = applyStatsRange(base, rng) totalRequests, totalTokens, err := aggregateTotals(base) diff --git a/internal/api/stats_handler_test.go b/internal/api/stats_handler_test.go index 1c7d78f..dff1313 100644 --- a/internal/api/stats_handler_test.go +++ b/internal/api/stats_handler_test.go @@ -72,7 +72,7 @@ func TestMasterStats_AggregatesByKeyAndModel(t *testing.T) { rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) masterSvc := service.NewMasterService(db) syncSvc := service.NewSyncService(rdb) - h := NewMasterHandler(db, db, masterSvc, syncSvc) + h := NewMasterHandler(db, db, masterSvc, syncSvc, nil) withMaster := func(next gin.HandlerFunc) gin.HandlerFunc { return func(c *gin.Context) { @@ -163,7 +163,7 @@ func TestAdminStats_AggregatesByProvider(t *testing.T) { rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) masterSvc := service.NewMasterService(db) syncSvc := service.NewSyncService(rdb) - adminHandler := NewAdminHandler(db, db, masterSvc, syncSvc) + adminHandler := NewAdminHandler(db, db, masterSvc, syncSvc, nil) r := gin.New() r.GET("/admin/stats", adminHandler.GetAdminStats) diff --git a/internal/config/config.go b/internal/config/config.go index f2c8e69..7782401 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -45,6 +45,7 @@ type LogConfig struct { RetentionDays int MaxRecords int DSN string + Partitioning string } type ModelRegistryConfig struct { @@ -79,6 +80,7 @@ func Load() (*Config, error) { v.SetDefault("log.retention_days", 30) v.SetDefault("log.max_records", 1000000) v.SetDefault("log.dsn", "") + v.SetDefault("log.partitioning", "off") v.SetDefault("auth.jwt_secret", "change_me_in_production") v.SetDefault("model_registry.enabled", false) v.SetDefault("model_registry.refresh_seconds", 1800) @@ -104,6 +106,7 @@ func Load() (*Config, error) { _ = v.BindEnv("log.retention_days", "EZ_LOG_RETENTION_DAYS") _ = v.BindEnv("log.max_records", "EZ_LOG_MAX_RECORDS") _ = v.BindEnv("log.dsn", "EZ_LOG_PG_DSN") + _ = v.BindEnv("log.partitioning", "EZ_LOG_PARTITIONING") _ = v.BindEnv("auth.jwt_secret", "EZ_JWT_SECRET") _ = v.BindEnv("model_registry.enabled", "EZ_MODEL_REGISTRY_ENABLED") _ = v.BindEnv("model_registry.refresh_seconds", "EZ_MODEL_REGISTRY_REFRESH_SECONDS") @@ -149,6 +152,7 @@ func Load() (*Config, error) { RetentionDays: v.GetInt("log.retention_days"), MaxRecords: v.GetInt("log.max_records"), DSN: strings.TrimSpace(v.GetString("log.dsn")), + Partitioning: strings.TrimSpace(v.GetString("log.partitioning")), }, Auth: AuthConfig{ JWTSecret: v.GetString("auth.jwt_secret"), diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 5b4386f..be1c2b8 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -4,6 +4,7 @@ import "testing" func TestLoad_LogDSNOverride(t *testing.T) { t.Setenv("EZ_LOG_PG_DSN", "host=log-db user=postgres dbname=logs") + t.Setenv("EZ_LOG_PARTITIONING", "monthly") cfg, err := Load() if err != nil { t.Fatalf("load config: %v", err) @@ -11,4 +12,7 @@ func TestLoad_LogDSNOverride(t *testing.T) { if cfg.Log.DSN != "host=log-db user=postgres dbname=logs" { t.Fatalf("expected log dsn to be set, got %q", cfg.Log.DSN) } + if cfg.Log.Partitioning != "monthly" { + t.Fatalf("expected log partitioning to be set, got %q", cfg.Log.Partitioning) + } } diff --git a/internal/cron/log_cleaner.go b/internal/cron/log_cleaner.go index d6b20c8..2286d10 100644 --- a/internal/cron/log_cleaner.go +++ b/internal/cron/log_cleaner.go @@ -9,6 +9,7 @@ import ( "time" "github.com/ez-api/ez-api/internal/model" + "github.com/ez-api/ez-api/internal/service" "github.com/redis/go-redis/v9" "gorm.io/gorm" ) @@ -25,9 +26,10 @@ type LogCleaner struct { retentionDays int maxRecords int64 interval time.Duration + partitioner *service.LogPartitioner } -func NewLogCleaner(db *gorm.DB, rdb *redis.Client, retentionDays int, maxRecords int64, interval time.Duration) *LogCleaner { +func NewLogCleaner(db *gorm.DB, rdb *redis.Client, retentionDays int, maxRecords int64, interval time.Duration, partitioner *service.LogPartitioner) *LogCleaner { if interval <= 0 { interval = time.Hour } @@ -37,6 +39,7 @@ func NewLogCleaner(db *gorm.DB, rdb *redis.Client, retentionDays int, maxRecords retentionDays: retentionDays, maxRecords: maxRecords, interval: interval, + partitioner: partitioner, } } @@ -74,6 +77,10 @@ func (c *LogCleaner) cleanOnce(ctx context.Context) error { return nil } + if c.partitioner != nil && c.partitioner.Enabled() { + return c.cleanPartitioned(retentionDays, maxRecords) + } + var deleted int64 if retentionDays > 0 { cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays) @@ -116,6 +123,64 @@ func (c *LogCleaner) cleanOnce(ctx context.Context) error { return nil } +func (c *LogCleaner) cleanPartitioned(retentionDays int, maxRecords int64) error { + var deleted int64 + + if retentionDays > 0 { + cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays) + dropped, err := c.partitioner.DropPartitionsBefore(cutoff) + if err != nil { + return err + } + if dropped > 0 { + slog.Default().Info("log partition cleanup completed", "dropped_tables", dropped, "retention_days", retentionDays) + } + table, err := c.partitioner.EnsurePartitionFor(time.Now().UTC()) + if err != nil { + return err + } + res := c.db.Table(table).Unscoped().Where("created_at < ?", cutoff).Delete(&model.LogRecord{}) + if res.Error != nil { + return res.Error + } + deleted += res.RowsAffected + } + + if maxRecords > 0 { + if maxRecords > int64(math.MaxInt) { + maxRecords = int64(math.MaxInt) + } + table, err := c.partitioner.EnsurePartitionFor(time.Now().UTC()) + if err != nil { + return err + } + var cutoff struct { + ID uint + } + if err := c.db.Unscoped(). + Table(table). + Select("id"). + Order("id desc"). + Offset(int(maxRecords - 1)). + Limit(1). + Scan(&cutoff).Error; err != nil { + return err + } + if cutoff.ID > 0 { + res := c.db.Table(table).Unscoped().Where("id < ?", cutoff.ID).Delete(&model.LogRecord{}) + if res.Error != nil { + return res.Error + } + deleted += res.RowsAffected + } + } + + if deleted > 0 { + slog.Default().Info("log cleanup completed", "deleted", deleted, "retention_days", retentionDays, "max_records", maxRecords) + } + return nil +} + func (c *LogCleaner) resolveRetentionDays(ctx context.Context) int { days := c.retentionDays if days < 0 { diff --git a/internal/cron/log_cleaner_test.go b/internal/cron/log_cleaner_test.go index ff37868..ac66dc0 100644 --- a/internal/cron/log_cleaner_test.go +++ b/internal/cron/log_cleaner_test.go @@ -34,7 +34,7 @@ func TestLogCleanerRetentionDeletesOld(t *testing.T) { t.Fatalf("create fresh: %v", err) } - cleaner := NewLogCleaner(db, nil, 1, 0, time.Minute) + cleaner := NewLogCleaner(db, nil, 1, 0, time.Minute, nil) if err := cleaner.cleanOnce(context.Background()); err != nil { t.Fatalf("clean once: %v", err) } @@ -64,7 +64,7 @@ func TestLogCleanerMaxRecordsKeepsLatest(t *testing.T) { } } - cleaner := NewLogCleaner(db, nil, 0, 3, time.Minute) + cleaner := NewLogCleaner(db, nil, 0, 3, time.Minute, nil) if err := cleaner.cleanOnce(context.Background()); err != nil { t.Fatalf("clean once: %v", err) } diff --git a/internal/service/log_partition.go b/internal/service/log_partition.go new file mode 100644 index 0000000..aa2807c --- /dev/null +++ b/internal/service/log_partition.go @@ -0,0 +1,272 @@ +package service + +import ( + "fmt" + "sort" + "strings" + "time" + + "gorm.io/gorm" +) + +type LogPartitioningMode string + +const ( + LogPartitioningOff LogPartitioningMode = "off" + LogPartitioningMonthly LogPartitioningMode = "monthly" + LogPartitioningDaily LogPartitioningMode = "daily" +) + +type LogPartition struct { + Table string + Start time.Time + End time.Time +} + +type LogPartitioner struct { + db *gorm.DB + mode LogPartitioningMode + baseTable string + viewTable string +} + +func NewLogPartitioner(db *gorm.DB, mode string) *LogPartitioner { + return &LogPartitioner{ + db: db, + mode: normalizePartitioningMode(mode), + baseTable: "log_records", + viewTable: "log_records_all", + } +} + +func (p *LogPartitioner) Enabled() bool { + if p == nil || p.db == nil { + return false + } + if p.mode == LogPartitioningOff { + return false + } + return p.db.Dialector.Name() == "postgres" +} + +func (p *LogPartitioner) ViewName() string { + if p == nil { + return "log_records" + } + if p.Enabled() { + return p.viewTable + } + return p.baseTable +} + +func (p *LogPartitioner) TableForTime(t time.Time) string { + if p == nil || !p.Enabled() { + return "log_records" + } + t = t.UTC() + switch p.mode { + case LogPartitioningDaily: + return fmt.Sprintf("%s_%04d%02d%02d", p.baseTable, t.Year(), int(t.Month()), t.Day()) + case LogPartitioningMonthly: + fallthrough + default: + return fmt.Sprintf("%s_%04d%02d", p.baseTable, t.Year(), int(t.Month())) + } +} + +func (p *LogPartitioner) EnsurePartitionFor(t time.Time) (string, error) { + if p == nil || !p.Enabled() { + return "log_records", nil + } + table := p.TableForTime(t) + if err := p.ensureTable(table); err != nil { + return "", err + } + if err := p.ensureView(); err != nil { + return "", err + } + return table, nil +} + +func (p *LogPartitioner) ListPartitions() ([]LogPartition, error) { + if p == nil || !p.Enabled() { + return nil, nil + } + tables, err := p.listPartitionTables() + if err != nil { + return nil, err + } + partitions := make([]LogPartition, 0, len(tables)) + for _, table := range tables { + start, end, ok := p.parsePartitionRange(table) + if !ok { + continue + } + partitions = append(partitions, LogPartition{Table: table, Start: start, End: end}) + } + sort.Slice(partitions, func(i, j int) bool { + return partitions[i].Start.Before(partitions[j].Start) + }) + return partitions, nil +} + +func (p *LogPartitioner) DropPartitionsBefore(cutoff time.Time) (int, error) { + if p == nil || !p.Enabled() { + return 0, nil + } + partitions, err := p.ListPartitions() + if err != nil { + return 0, err + } + cutoff = cutoff.UTC() + dropped := 0 + for _, part := range partitions { + if part.End.After(cutoff) || part.End.Equal(cutoff) { + continue + } + if err := p.dropTable(part.Table); err != nil { + return dropped, err + } + dropped++ + } + if dropped > 0 { + if err := p.ensureView(); err != nil { + return dropped, err + } + } + return dropped, nil +} + +func (p *LogPartitioner) ensureTable(table string) error { + if p == nil || !p.Enabled() { + return nil + } + if table == "" || !p.validPartitionTable(table) { + return fmt.Errorf("invalid partition table %q", table) + } + if p.db.Migrator().HasTable(table) { + return nil + } + sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (LIKE %s INCLUDING ALL)", quoteIdent(table), quoteIdent(p.baseTable)) + return p.db.Exec(sql).Error +} + +func (p *LogPartitioner) ensureView() error { + if p == nil || !p.Enabled() { + return nil + } + tables, err := p.listPartitionTables() + if err != nil { + return err + } + selects := make([]string, 0, len(tables)+1) + selects = append(selects, fmt.Sprintf("SELECT * FROM %s", quoteIdent(p.baseTable))) + for _, table := range tables { + if table == p.baseTable { + continue + } + selects = append(selects, fmt.Sprintf("SELECT * FROM %s", quoteIdent(table))) + } + viewSQL := fmt.Sprintf("CREATE OR REPLACE VIEW %s AS %s", quoteIdent(p.viewTable), strings.Join(selects, " UNION ALL ")) + return p.db.Exec(viewSQL).Error +} + +func (p *LogPartitioner) listPartitionTables() ([]string, error) { + if p == nil || !p.Enabled() { + return nil, nil + } + var tables []string + err := p.db.Raw( + `SELECT table_name FROM information_schema.tables WHERE table_schema = current_schema() AND table_type = 'BASE TABLE' AND table_name LIKE ?`, + p.baseTable+"_%", + ).Scan(&tables).Error + if err != nil { + return nil, err + } + out := make([]string, 0, len(tables)) + for _, table := range tables { + if p.validPartitionTable(table) { + out = append(out, table) + } + } + return out, nil +} + +func (p *LogPartitioner) parsePartitionRange(table string) (time.Time, time.Time, bool) { + if !p.validPartitionTable(table) { + return time.Time{}, time.Time{}, false + } + raw := strings.TrimPrefix(table, p.baseTable+"_") + if p.mode == LogPartitioningDaily { + if len(raw) != 8 { + return time.Time{}, time.Time{}, false + } + t, err := time.Parse("20060102", raw) + if err != nil { + return time.Time{}, time.Time{}, false + } + start := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC) + end := start.AddDate(0, 0, 1) + return start, end, true + } + if len(raw) != 6 { + return time.Time{}, time.Time{}, false + } + t, err := time.Parse("200601", raw) + if err != nil { + return time.Time{}, time.Time{}, false + } + start := time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC) + end := start.AddDate(0, 1, 0) + return start, end, true +} + +func (p *LogPartitioner) validPartitionTable(table string) bool { + if p == nil || table == "" { + return false + } + if !strings.HasPrefix(table, p.baseTable+"_") { + return false + } + raw := strings.TrimPrefix(table, p.baseTable+"_") + if p.mode == LogPartitioningDaily { + return len(raw) == 8 && isDigits(raw) + } + return len(raw) == 6 && isDigits(raw) +} + +func (p *LogPartitioner) dropTable(table string) error { + if p == nil || !p.Enabled() { + return nil + } + if !p.validPartitionTable(table) { + return fmt.Errorf("invalid partition table %q", table) + } + sql := fmt.Sprintf("DROP TABLE IF EXISTS %s", quoteIdent(table)) + return p.db.Exec(sql).Error +} + +func normalizePartitioningMode(raw string) LogPartitioningMode { + raw = strings.ToLower(strings.TrimSpace(raw)) + switch raw { + case string(LogPartitioningDaily): + return LogPartitioningDaily + case string(LogPartitioningMonthly): + return LogPartitioningMonthly + default: + return LogPartitioningOff + } +} + +func quoteIdent(name string) string { + return `"` + strings.ReplaceAll(name, `"`, `""`) + `"` +} + +func isDigits(raw string) bool { + for _, r := range raw { + if r < '0' || r > '9' { + return false + } + } + return raw != "" +} diff --git a/internal/service/logger.go b/internal/service/logger.go index 987f4d5..5840d96 100644 --- a/internal/service/logger.go +++ b/internal/service/logger.go @@ -21,9 +21,10 @@ type LogWriter struct { batchSize int flushInterval time.Duration db *gorm.DB + partitioner *LogPartitioner } -func NewLogWriter(db *gorm.DB, queueCapacity, batchSize int, flushInterval time.Duration) *LogWriter { +func NewLogWriter(db *gorm.DB, queueCapacity, batchSize int, flushInterval time.Duration, partitioner *LogPartitioner) *LogWriter { if batchSize <= 0 { batchSize = 10 } @@ -38,6 +39,7 @@ func NewLogWriter(db *gorm.DB, queueCapacity, batchSize int, flushInterval time. batchSize: batchSize, flushInterval: flushInterval, db: db, + partitioner: partitioner, } } @@ -52,9 +54,33 @@ func (w *LogWriter) Start(ctx context.Context) { if len(buf) == 0 { return } - if err := w.db.Create(&buf).Error; err != nil { - slog.Default().Error("log batch insert failed", "err", err) - } else { + if w.partitioner == nil || !w.partitioner.Enabled() { + if err := w.db.Create(&buf).Error; err != nil { + slog.Default().Error("log batch insert failed", "err", err) + } else { + logBatchWriteTotal.Add(1) + } + buf = buf[:0] + return + } + byTable := make(map[string][]model.LogRecord) + for _, rec := range buf { + t := rec.CreatedAt + if t.IsZero() { + t = time.Now().UTC() + } + table, err := w.partitioner.EnsurePartitionFor(t) + if err != nil { + slog.Default().Error("log partition ensure failed", "err", err) + table = "log_records" + } + byTable[table] = append(byTable[table], rec) + } + for table, records := range byTable { + if err := w.db.Table(table).Create(&records).Error; err != nil { + slog.Default().Error("log batch insert failed", "table", table, "err", err) + continue + } logBatchWriteTotal.Add(1) } buf = buf[:0] diff --git a/internal/service/logger_test.go b/internal/service/logger_test.go index bd42ce5..ced7cc1 100644 --- a/internal/service/logger_test.go +++ b/internal/service/logger_test.go @@ -26,11 +26,11 @@ func TestLogWriterMetrics(t *testing.T) { startBatch := getExpvarInt(t, "log_write_batch_total") startDropped := getExpvarInt(t, "log_queue_dropped_total") - dropWriter := NewLogWriter(db, 1, 10, time.Second) + dropWriter := NewLogWriter(db, 1, 10, time.Second, nil) dropWriter.Write(model.LogRecord{ModelName: "m1", StatusCode: 200}) dropWriter.Write(model.LogRecord{ModelName: "m2", StatusCode: 200}) - writer := NewLogWriter(db, 10, 1, 10*time.Millisecond) + writer := NewLogWriter(db, 10, 1, 10*time.Millisecond, nil) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) writer.Start(ctx) diff --git a/internal/service/sync.go b/internal/service/sync.go index 41b43c1..7b4fefe 100644 --- a/internal/service/sync.go +++ b/internal/service/sync.go @@ -128,6 +128,31 @@ func (s *SyncService) SyncProvider(provider *model.Provider) error { return err } +// SyncProviderDelete removes provider snapshot and routing entries from Redis. +func (s *SyncService) SyncProviderDelete(provider *model.Provider) error { + if provider == nil { + return fmt.Errorf("provider required") + } + ctx := context.Background() + group := groupx.Normalize(provider.Group) + models := strings.Split(provider.Models, ",") + + pipe := s.rdb.TxPipeline() + pipe.HDel(ctx, "config:providers", fmt.Sprintf("%d", provider.ID)) + for _, m := range models { + m = strings.TrimSpace(m) + if m == "" { + continue + } + routeKey := fmt.Sprintf("route:group:%s:%s", group, m) + pipe.SRem(ctx, routeKey, provider.ID) + } + if _, err := pipe.Exec(ctx); err != nil { + return fmt.Errorf("delete provider snapshot: %w", err) + } + return nil +} + // SyncModel writes a single model metadata record. func (s *SyncService) SyncModel(m *model.Model) error { ctx := context.Background() diff --git a/internal/service/sync_test.go b/internal/service/sync_test.go index b4f29c1..3b1460e 100644 --- a/internal/service/sync_test.go +++ b/internal/service/sync_test.go @@ -92,3 +92,35 @@ func TestSyncKey_WritesTokenID(t *testing.T) { t.Fatalf("expected auth:token:hash.id=123, got %q", got) } } + +func TestSyncProviderDelete_RemovesSnapshotAndRouting(t *testing.T) { + mr := miniredis.RunT(t) + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + svc := NewSyncService(rdb) + + p := &model.Provider{ + Name: "p1", + Type: "openai", + Group: "default", + Models: "gpt-4o-mini,gpt-4o", + Status: "active", + } + p.ID = 7 + + if err := svc.SyncProvider(p); err != nil { + t.Fatalf("SyncProvider: %v", err) + } + if err := svc.SyncProviderDelete(p); err != nil { + t.Fatalf("SyncProviderDelete: %v", err) + } + + if got := mr.HGet("config:providers", "7"); got != "" { + t.Fatalf("expected provider snapshot removed, got %q", got) + } + if ok, _ := mr.SIsMember("route:group:default:gpt-4o-mini", "7"); ok { + t.Fatalf("expected provider removed from route set") + } + if ok, _ := mr.SIsMember("route:group:default:gpt-4o", "7"); ok { + t.Fatalf("expected provider removed from route set") + } +}