mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
feat(arch): add log partitioning and provider delete sync
This commit is contained in:
272
internal/service/log_partition.go
Normal file
272
internal/service/log_partition.go
Normal file
@@ -0,0 +1,272 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type LogPartitioningMode string
|
||||
|
||||
const (
|
||||
LogPartitioningOff LogPartitioningMode = "off"
|
||||
LogPartitioningMonthly LogPartitioningMode = "monthly"
|
||||
LogPartitioningDaily LogPartitioningMode = "daily"
|
||||
)
|
||||
|
||||
type LogPartition struct {
|
||||
Table string
|
||||
Start time.Time
|
||||
End time.Time
|
||||
}
|
||||
|
||||
type LogPartitioner struct {
|
||||
db *gorm.DB
|
||||
mode LogPartitioningMode
|
||||
baseTable string
|
||||
viewTable string
|
||||
}
|
||||
|
||||
func NewLogPartitioner(db *gorm.DB, mode string) *LogPartitioner {
|
||||
return &LogPartitioner{
|
||||
db: db,
|
||||
mode: normalizePartitioningMode(mode),
|
||||
baseTable: "log_records",
|
||||
viewTable: "log_records_all",
|
||||
}
|
||||
}
|
||||
|
||||
func (p *LogPartitioner) Enabled() bool {
|
||||
if p == nil || p.db == nil {
|
||||
return false
|
||||
}
|
||||
if p.mode == LogPartitioningOff {
|
||||
return false
|
||||
}
|
||||
return p.db.Dialector.Name() == "postgres"
|
||||
}
|
||||
|
||||
func (p *LogPartitioner) ViewName() string {
|
||||
if p == nil {
|
||||
return "log_records"
|
||||
}
|
||||
if p.Enabled() {
|
||||
return p.viewTable
|
||||
}
|
||||
return p.baseTable
|
||||
}
|
||||
|
||||
func (p *LogPartitioner) TableForTime(t time.Time) string {
|
||||
if p == nil || !p.Enabled() {
|
||||
return "log_records"
|
||||
}
|
||||
t = t.UTC()
|
||||
switch p.mode {
|
||||
case LogPartitioningDaily:
|
||||
return fmt.Sprintf("%s_%04d%02d%02d", p.baseTable, t.Year(), int(t.Month()), t.Day())
|
||||
case LogPartitioningMonthly:
|
||||
fallthrough
|
||||
default:
|
||||
return fmt.Sprintf("%s_%04d%02d", p.baseTable, t.Year(), int(t.Month()))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *LogPartitioner) EnsurePartitionFor(t time.Time) (string, error) {
|
||||
if p == nil || !p.Enabled() {
|
||||
return "log_records", nil
|
||||
}
|
||||
table := p.TableForTime(t)
|
||||
if err := p.ensureTable(table); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if err := p.ensureView(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return table, nil
|
||||
}
|
||||
|
||||
func (p *LogPartitioner) ListPartitions() ([]LogPartition, error) {
|
||||
if p == nil || !p.Enabled() {
|
||||
return nil, nil
|
||||
}
|
||||
tables, err := p.listPartitionTables()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
partitions := make([]LogPartition, 0, len(tables))
|
||||
for _, table := range tables {
|
||||
start, end, ok := p.parsePartitionRange(table)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
partitions = append(partitions, LogPartition{Table: table, Start: start, End: end})
|
||||
}
|
||||
sort.Slice(partitions, func(i, j int) bool {
|
||||
return partitions[i].Start.Before(partitions[j].Start)
|
||||
})
|
||||
return partitions, nil
|
||||
}
|
||||
|
||||
func (p *LogPartitioner) DropPartitionsBefore(cutoff time.Time) (int, error) {
|
||||
if p == nil || !p.Enabled() {
|
||||
return 0, nil
|
||||
}
|
||||
partitions, err := p.ListPartitions()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
cutoff = cutoff.UTC()
|
||||
dropped := 0
|
||||
for _, part := range partitions {
|
||||
if part.End.After(cutoff) || part.End.Equal(cutoff) {
|
||||
continue
|
||||
}
|
||||
if err := p.dropTable(part.Table); err != nil {
|
||||
return dropped, err
|
||||
}
|
||||
dropped++
|
||||
}
|
||||
if dropped > 0 {
|
||||
if err := p.ensureView(); err != nil {
|
||||
return dropped, err
|
||||
}
|
||||
}
|
||||
return dropped, nil
|
||||
}
|
||||
|
||||
func (p *LogPartitioner) ensureTable(table string) error {
|
||||
if p == nil || !p.Enabled() {
|
||||
return nil
|
||||
}
|
||||
if table == "" || !p.validPartitionTable(table) {
|
||||
return fmt.Errorf("invalid partition table %q", table)
|
||||
}
|
||||
if p.db.Migrator().HasTable(table) {
|
||||
return nil
|
||||
}
|
||||
sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (LIKE %s INCLUDING ALL)", quoteIdent(table), quoteIdent(p.baseTable))
|
||||
return p.db.Exec(sql).Error
|
||||
}
|
||||
|
||||
func (p *LogPartitioner) ensureView() error {
|
||||
if p == nil || !p.Enabled() {
|
||||
return nil
|
||||
}
|
||||
tables, err := p.listPartitionTables()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
selects := make([]string, 0, len(tables)+1)
|
||||
selects = append(selects, fmt.Sprintf("SELECT * FROM %s", quoteIdent(p.baseTable)))
|
||||
for _, table := range tables {
|
||||
if table == p.baseTable {
|
||||
continue
|
||||
}
|
||||
selects = append(selects, fmt.Sprintf("SELECT * FROM %s", quoteIdent(table)))
|
||||
}
|
||||
viewSQL := fmt.Sprintf("CREATE OR REPLACE VIEW %s AS %s", quoteIdent(p.viewTable), strings.Join(selects, " UNION ALL "))
|
||||
return p.db.Exec(viewSQL).Error
|
||||
}
|
||||
|
||||
func (p *LogPartitioner) listPartitionTables() ([]string, error) {
|
||||
if p == nil || !p.Enabled() {
|
||||
return nil, nil
|
||||
}
|
||||
var tables []string
|
||||
err := p.db.Raw(
|
||||
`SELECT table_name FROM information_schema.tables WHERE table_schema = current_schema() AND table_type = 'BASE TABLE' AND table_name LIKE ?`,
|
||||
p.baseTable+"_%",
|
||||
).Scan(&tables).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out := make([]string, 0, len(tables))
|
||||
for _, table := range tables {
|
||||
if p.validPartitionTable(table) {
|
||||
out = append(out, table)
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (p *LogPartitioner) parsePartitionRange(table string) (time.Time, time.Time, bool) {
|
||||
if !p.validPartitionTable(table) {
|
||||
return time.Time{}, time.Time{}, false
|
||||
}
|
||||
raw := strings.TrimPrefix(table, p.baseTable+"_")
|
||||
if p.mode == LogPartitioningDaily {
|
||||
if len(raw) != 8 {
|
||||
return time.Time{}, time.Time{}, false
|
||||
}
|
||||
t, err := time.Parse("20060102", raw)
|
||||
if err != nil {
|
||||
return time.Time{}, time.Time{}, false
|
||||
}
|
||||
start := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
|
||||
end := start.AddDate(0, 0, 1)
|
||||
return start, end, true
|
||||
}
|
||||
if len(raw) != 6 {
|
||||
return time.Time{}, time.Time{}, false
|
||||
}
|
||||
t, err := time.Parse("200601", raw)
|
||||
if err != nil {
|
||||
return time.Time{}, time.Time{}, false
|
||||
}
|
||||
start := time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC)
|
||||
end := start.AddDate(0, 1, 0)
|
||||
return start, end, true
|
||||
}
|
||||
|
||||
func (p *LogPartitioner) validPartitionTable(table string) bool {
|
||||
if p == nil || table == "" {
|
||||
return false
|
||||
}
|
||||
if !strings.HasPrefix(table, p.baseTable+"_") {
|
||||
return false
|
||||
}
|
||||
raw := strings.TrimPrefix(table, p.baseTable+"_")
|
||||
if p.mode == LogPartitioningDaily {
|
||||
return len(raw) == 8 && isDigits(raw)
|
||||
}
|
||||
return len(raw) == 6 && isDigits(raw)
|
||||
}
|
||||
|
||||
func (p *LogPartitioner) dropTable(table string) error {
|
||||
if p == nil || !p.Enabled() {
|
||||
return nil
|
||||
}
|
||||
if !p.validPartitionTable(table) {
|
||||
return fmt.Errorf("invalid partition table %q", table)
|
||||
}
|
||||
sql := fmt.Sprintf("DROP TABLE IF EXISTS %s", quoteIdent(table))
|
||||
return p.db.Exec(sql).Error
|
||||
}
|
||||
|
||||
func normalizePartitioningMode(raw string) LogPartitioningMode {
|
||||
raw = strings.ToLower(strings.TrimSpace(raw))
|
||||
switch raw {
|
||||
case string(LogPartitioningDaily):
|
||||
return LogPartitioningDaily
|
||||
case string(LogPartitioningMonthly):
|
||||
return LogPartitioningMonthly
|
||||
default:
|
||||
return LogPartitioningOff
|
||||
}
|
||||
}
|
||||
|
||||
func quoteIdent(name string) string {
|
||||
return `"` + strings.ReplaceAll(name, `"`, `""`) + `"`
|
||||
}
|
||||
|
||||
func isDigits(raw string) bool {
|
||||
for _, r := range raw {
|
||||
if r < '0' || r > '9' {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return raw != ""
|
||||
}
|
||||
@@ -21,9 +21,10 @@ type LogWriter struct {
|
||||
batchSize int
|
||||
flushInterval time.Duration
|
||||
db *gorm.DB
|
||||
partitioner *LogPartitioner
|
||||
}
|
||||
|
||||
func NewLogWriter(db *gorm.DB, queueCapacity, batchSize int, flushInterval time.Duration) *LogWriter {
|
||||
func NewLogWriter(db *gorm.DB, queueCapacity, batchSize int, flushInterval time.Duration, partitioner *LogPartitioner) *LogWriter {
|
||||
if batchSize <= 0 {
|
||||
batchSize = 10
|
||||
}
|
||||
@@ -38,6 +39,7 @@ func NewLogWriter(db *gorm.DB, queueCapacity, batchSize int, flushInterval time.
|
||||
batchSize: batchSize,
|
||||
flushInterval: flushInterval,
|
||||
db: db,
|
||||
partitioner: partitioner,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,9 +54,33 @@ func (w *LogWriter) Start(ctx context.Context) {
|
||||
if len(buf) == 0 {
|
||||
return
|
||||
}
|
||||
if err := w.db.Create(&buf).Error; err != nil {
|
||||
slog.Default().Error("log batch insert failed", "err", err)
|
||||
} else {
|
||||
if w.partitioner == nil || !w.partitioner.Enabled() {
|
||||
if err := w.db.Create(&buf).Error; err != nil {
|
||||
slog.Default().Error("log batch insert failed", "err", err)
|
||||
} else {
|
||||
logBatchWriteTotal.Add(1)
|
||||
}
|
||||
buf = buf[:0]
|
||||
return
|
||||
}
|
||||
byTable := make(map[string][]model.LogRecord)
|
||||
for _, rec := range buf {
|
||||
t := rec.CreatedAt
|
||||
if t.IsZero() {
|
||||
t = time.Now().UTC()
|
||||
}
|
||||
table, err := w.partitioner.EnsurePartitionFor(t)
|
||||
if err != nil {
|
||||
slog.Default().Error("log partition ensure failed", "err", err)
|
||||
table = "log_records"
|
||||
}
|
||||
byTable[table] = append(byTable[table], rec)
|
||||
}
|
||||
for table, records := range byTable {
|
||||
if err := w.db.Table(table).Create(&records).Error; err != nil {
|
||||
slog.Default().Error("log batch insert failed", "table", table, "err", err)
|
||||
continue
|
||||
}
|
||||
logBatchWriteTotal.Add(1)
|
||||
}
|
||||
buf = buf[:0]
|
||||
|
||||
@@ -26,11 +26,11 @@ func TestLogWriterMetrics(t *testing.T) {
|
||||
startBatch := getExpvarInt(t, "log_write_batch_total")
|
||||
startDropped := getExpvarInt(t, "log_queue_dropped_total")
|
||||
|
||||
dropWriter := NewLogWriter(db, 1, 10, time.Second)
|
||||
dropWriter := NewLogWriter(db, 1, 10, time.Second, nil)
|
||||
dropWriter.Write(model.LogRecord{ModelName: "m1", StatusCode: 200})
|
||||
dropWriter.Write(model.LogRecord{ModelName: "m2", StatusCode: 200})
|
||||
|
||||
writer := NewLogWriter(db, 10, 1, 10*time.Millisecond)
|
||||
writer := NewLogWriter(db, 10, 1, 10*time.Millisecond, nil)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
writer.Start(ctx)
|
||||
|
||||
@@ -128,6 +128,31 @@ func (s *SyncService) SyncProvider(provider *model.Provider) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// SyncProviderDelete removes provider snapshot and routing entries from Redis.
|
||||
func (s *SyncService) SyncProviderDelete(provider *model.Provider) error {
|
||||
if provider == nil {
|
||||
return fmt.Errorf("provider required")
|
||||
}
|
||||
ctx := context.Background()
|
||||
group := groupx.Normalize(provider.Group)
|
||||
models := strings.Split(provider.Models, ",")
|
||||
|
||||
pipe := s.rdb.TxPipeline()
|
||||
pipe.HDel(ctx, "config:providers", fmt.Sprintf("%d", provider.ID))
|
||||
for _, m := range models {
|
||||
m = strings.TrimSpace(m)
|
||||
if m == "" {
|
||||
continue
|
||||
}
|
||||
routeKey := fmt.Sprintf("route:group:%s:%s", group, m)
|
||||
pipe.SRem(ctx, routeKey, provider.ID)
|
||||
}
|
||||
if _, err := pipe.Exec(ctx); err != nil {
|
||||
return fmt.Errorf("delete provider snapshot: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SyncModel writes a single model metadata record.
|
||||
func (s *SyncService) SyncModel(m *model.Model) error {
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -92,3 +92,35 @@ func TestSyncKey_WritesTokenID(t *testing.T) {
|
||||
t.Fatalf("expected auth:token:hash.id=123, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncProviderDelete_RemovesSnapshotAndRouting(t *testing.T) {
|
||||
mr := miniredis.RunT(t)
|
||||
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
|
||||
svc := NewSyncService(rdb)
|
||||
|
||||
p := &model.Provider{
|
||||
Name: "p1",
|
||||
Type: "openai",
|
||||
Group: "default",
|
||||
Models: "gpt-4o-mini,gpt-4o",
|
||||
Status: "active",
|
||||
}
|
||||
p.ID = 7
|
||||
|
||||
if err := svc.SyncProvider(p); err != nil {
|
||||
t.Fatalf("SyncProvider: %v", err)
|
||||
}
|
||||
if err := svc.SyncProviderDelete(p); err != nil {
|
||||
t.Fatalf("SyncProviderDelete: %v", err)
|
||||
}
|
||||
|
||||
if got := mr.HGet("config:providers", "7"); got != "" {
|
||||
t.Fatalf("expected provider snapshot removed, got %q", got)
|
||||
}
|
||||
if ok, _ := mr.SIsMember("route:group:default:gpt-4o-mini", "7"); ok {
|
||||
t.Fatalf("expected provider removed from route set")
|
||||
}
|
||||
if ok, _ := mr.SIsMember("route:group:default:gpt-4o", "7"); ok {
|
||||
t.Fatalf("expected provider removed from route set")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user