package service import ( "context" "expvar" "log/slog" "time" "github.com/ez-api/ez-api/internal/model" "gorm.io/gorm" ) var ( logBatchWriteTotal = expvar.NewInt("log_write_batch_total") logQueueDropped = expvar.NewInt("log_queue_dropped_total") ) // LogWriter batches log records to reduce IO overhead. type LogWriter struct { ch chan model.LogRecord batchSize int flushInterval time.Duration db *gorm.DB partitioner *LogPartitioner } func NewLogWriter(db *gorm.DB, queueCapacity, batchSize int, flushInterval time.Duration, partitioner *LogPartitioner) *LogWriter { if batchSize <= 0 { batchSize = 10 } if queueCapacity <= 0 { queueCapacity = 1000 } if flushInterval <= 0 { flushInterval = time.Second } return &LogWriter{ ch: make(chan model.LogRecord, queueCapacity), batchSize: batchSize, flushInterval: flushInterval, db: db, partitioner: partitioner, } } // Start begins a background writer. Should be called once at startup. func (w *LogWriter) Start(ctx context.Context) { go func() { ticker := time.NewTicker(w.flushInterval) defer ticker.Stop() buf := make([]model.LogRecord, 0, w.batchSize) flush := func() { if len(buf) == 0 { return } if w.partitioner == nil || !w.partitioner.Enabled() { if err := w.db.Create(&buf).Error; err != nil { slog.Default().Error("log batch insert failed", "err", err) } else { logBatchWriteTotal.Add(1) } buf = buf[:0] return } byTable := make(map[string][]model.LogRecord) for _, rec := range buf { t := rec.CreatedAt if t.IsZero() { t = time.Now().UTC() } table, err := w.partitioner.EnsurePartitionFor(t) if err != nil { slog.Default().Error("log partition ensure failed", "err", err) table = "log_records" } byTable[table] = append(byTable[table], rec) } for table, records := range byTable { if err := w.db.Table(table).Create(&records).Error; err != nil { slog.Default().Error("log batch insert failed", "table", table, "err", err) continue } logBatchWriteTotal.Add(1) } buf = buf[:0] } for { select { case <-ctx.Done(): flush() return case rec := <-w.ch: buf = append(buf, rec) if len(buf) >= w.batchSize { flush() } case <-ticker.C: flush() } } }() } // Write queues a log record; drops silently if buffer is full to protect performance. func (w *LogWriter) Write(rec model.LogRecord) { select { case w.ch <- rec: default: // drop to avoid blocking hot path logQueueDropped.Add(1) } }