diff --git a/internal/service/sync.go b/internal/service/sync.go index bb44ced..9900548 100644 --- a/internal/service/sync.go +++ b/internal/service/sync.go @@ -651,7 +651,9 @@ func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipe snap routing.BindingSnapshot } snaps := make(map[string]*routing.BindingSnapshot) - now := time.Now().Unix() + now := time.Now() + nowUnix := now.Unix() + version := now.UnixNano() for _, b := range bindings { if strings.TrimSpace(b.Status) != "" && strings.TrimSpace(b.Status) != "active" { @@ -677,7 +679,7 @@ func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipe Namespace: ns, PublicModel: pm, Status: "active", - UpdatedAt: now, + UpdatedAt: nowUnix, } snaps[key] = snap } @@ -699,7 +701,6 @@ func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipe candidate.Error = "no_provider" } - nowUnix := time.Now().Unix() for _, k := range keys { if k.status != "" && k.status != "active" { continue @@ -729,8 +730,8 @@ func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipe } meta := map[string]string{ - "version": fmt.Sprintf("%d", now), - "updated_at": fmt.Sprintf("%d", now), + "version": fmt.Sprintf("%d", version), + "updated_at": fmt.Sprintf("%d", nowUnix), "source": "cp_builtin", } if err := pipe.HSet(ctx, "meta:bindings_meta", meta).Err(); err != nil { diff --git a/internal/service/sync_bindings_spec_test.go b/internal/service/sync_bindings_spec_test.go index 6d528ea..37601e2 100644 --- a/internal/service/sync_bindings_spec_test.go +++ b/internal/service/sync_bindings_spec_test.go @@ -2,6 +2,7 @@ package service import ( "encoding/json" + "strconv" "testing" "github.com/alicebob/miniredis/v2" @@ -138,3 +139,61 @@ func TestSyncBindings_SelectorRegexAndNormalize(t *testing.T) { t.Fatalf("expected config_error with no upstreams, got %+v", snapNorm.Candidates[0]) } } + +func TestSyncBindings_VersionChangesWithinSameSecond(t *testing.T) { + t.Parallel() + + db, err := gorm.Open(sqlite.Open("file:"+t.Name()+"?mode=memory&cache=shared"), &gorm.Config{}) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + if err := db.AutoMigrate(&model.ProviderGroup{}, &model.APIKey{}, &model.Binding{}); err != nil { + t.Fatalf("migrate: %v", err) + } + + group := model.ProviderGroup{Name: "rg", Type: "openai", BaseURL: "https://api.openai.com/v1", Models: "m", Status: "active"} + if err := db.Create(&group).Error; err != nil { + t.Fatalf("create group: %v", err) + } + key := model.APIKey{GroupID: group.ID, APIKey: "k1", Status: "active"} + if err := db.Create(&key).Error; err != nil { + t.Fatalf("create api key: %v", err) + } + b := model.Binding{Namespace: "ns", PublicModel: "m", GroupID: group.ID, Weight: 1, SelectorType: "exact", Status: "active"} + if err := db.Create(&b).Error; err != nil { + t.Fatalf("create binding: %v", err) + } + + mr := miniredis.RunT(t) + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + svc := NewSyncService(rdb) + + var prevVersion string + var prevUpdated string + for i := 0; i < 10; i++ { + if err := svc.SyncBindings(db); err != nil { + t.Fatalf("SyncBindings: %v", err) + } + version := mr.HGet("meta:bindings_meta", "version") + updated := mr.HGet("meta:bindings_meta", "updated_at") + if version == "" || updated == "" { + t.Fatalf("expected bindings meta fields, got version=%q updated_at=%q", version, updated) + } + if _, err := strconv.ParseInt(version, 10, 64); err != nil { + t.Fatalf("invalid version: %q", version) + } + if _, err := strconv.ParseInt(updated, 10, 64); err != nil { + t.Fatalf("invalid updated_at: %q", updated) + } + + if prevUpdated != "" && updated == prevUpdated { + if version == prevVersion { + t.Fatalf("expected version to change within same second, got %q", version) + } + return + } + prevVersion = version + prevUpdated = updated + } + t.Fatalf("failed to observe two syncs within the same second") +}