mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
fix(sync): use nanosecond precision for bindings version to ensure uniqueness
Use UnixNano for version field while keeping Unix seconds for updated_at timestamp. This ensures version changes are detected even when multiple syncs occur within the same second.
This commit is contained in:
@@ -651,7 +651,9 @@ func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipe
|
|||||||
snap routing.BindingSnapshot
|
snap routing.BindingSnapshot
|
||||||
}
|
}
|
||||||
snaps := make(map[string]*routing.BindingSnapshot)
|
snaps := make(map[string]*routing.BindingSnapshot)
|
||||||
now := time.Now().Unix()
|
now := time.Now()
|
||||||
|
nowUnix := now.Unix()
|
||||||
|
version := now.UnixNano()
|
||||||
|
|
||||||
for _, b := range bindings {
|
for _, b := range bindings {
|
||||||
if strings.TrimSpace(b.Status) != "" && strings.TrimSpace(b.Status) != "active" {
|
if strings.TrimSpace(b.Status) != "" && strings.TrimSpace(b.Status) != "active" {
|
||||||
@@ -677,7 +679,7 @@ func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipe
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
PublicModel: pm,
|
PublicModel: pm,
|
||||||
Status: "active",
|
Status: "active",
|
||||||
UpdatedAt: now,
|
UpdatedAt: nowUnix,
|
||||||
}
|
}
|
||||||
snaps[key] = snap
|
snaps[key] = snap
|
||||||
}
|
}
|
||||||
@@ -699,7 +701,6 @@ func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipe
|
|||||||
candidate.Error = "no_provider"
|
candidate.Error = "no_provider"
|
||||||
}
|
}
|
||||||
|
|
||||||
nowUnix := time.Now().Unix()
|
|
||||||
for _, k := range keys {
|
for _, k := range keys {
|
||||||
if k.status != "" && k.status != "active" {
|
if k.status != "" && k.status != "active" {
|
||||||
continue
|
continue
|
||||||
@@ -729,8 +730,8 @@ func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipe
|
|||||||
}
|
}
|
||||||
|
|
||||||
meta := map[string]string{
|
meta := map[string]string{
|
||||||
"version": fmt.Sprintf("%d", now),
|
"version": fmt.Sprintf("%d", version),
|
||||||
"updated_at": fmt.Sprintf("%d", now),
|
"updated_at": fmt.Sprintf("%d", nowUnix),
|
||||||
"source": "cp_builtin",
|
"source": "cp_builtin",
|
||||||
}
|
}
|
||||||
if err := pipe.HSet(ctx, "meta:bindings_meta", meta).Err(); err != nil {
|
if err := pipe.HSet(ctx, "meta:bindings_meta", meta).Err(); err != nil {
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package service
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/alicebob/miniredis/v2"
|
"github.com/alicebob/miniredis/v2"
|
||||||
@@ -138,3 +139,61 @@ func TestSyncBindings_SelectorRegexAndNormalize(t *testing.T) {
|
|||||||
t.Fatalf("expected config_error with no upstreams, got %+v", snapNorm.Candidates[0])
|
t.Fatalf("expected config_error with no upstreams, got %+v", snapNorm.Candidates[0])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncBindings_VersionChangesWithinSameSecond(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
db, err := gorm.Open(sqlite.Open("file:"+t.Name()+"?mode=memory&cache=shared"), &gorm.Config{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("open sqlite: %v", err)
|
||||||
|
}
|
||||||
|
if err := db.AutoMigrate(&model.ProviderGroup{}, &model.APIKey{}, &model.Binding{}); err != nil {
|
||||||
|
t.Fatalf("migrate: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
group := model.ProviderGroup{Name: "rg", Type: "openai", BaseURL: "https://api.openai.com/v1", Models: "m", Status: "active"}
|
||||||
|
if err := db.Create(&group).Error; err != nil {
|
||||||
|
t.Fatalf("create group: %v", err)
|
||||||
|
}
|
||||||
|
key := model.APIKey{GroupID: group.ID, APIKey: "k1", Status: "active"}
|
||||||
|
if err := db.Create(&key).Error; err != nil {
|
||||||
|
t.Fatalf("create api key: %v", err)
|
||||||
|
}
|
||||||
|
b := model.Binding{Namespace: "ns", PublicModel: "m", GroupID: group.ID, Weight: 1, SelectorType: "exact", Status: "active"}
|
||||||
|
if err := db.Create(&b).Error; err != nil {
|
||||||
|
t.Fatalf("create binding: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mr := miniredis.RunT(t)
|
||||||
|
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
|
||||||
|
svc := NewSyncService(rdb)
|
||||||
|
|
||||||
|
var prevVersion string
|
||||||
|
var prevUpdated string
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
if err := svc.SyncBindings(db); err != nil {
|
||||||
|
t.Fatalf("SyncBindings: %v", err)
|
||||||
|
}
|
||||||
|
version := mr.HGet("meta:bindings_meta", "version")
|
||||||
|
updated := mr.HGet("meta:bindings_meta", "updated_at")
|
||||||
|
if version == "" || updated == "" {
|
||||||
|
t.Fatalf("expected bindings meta fields, got version=%q updated_at=%q", version, updated)
|
||||||
|
}
|
||||||
|
if _, err := strconv.ParseInt(version, 10, 64); err != nil {
|
||||||
|
t.Fatalf("invalid version: %q", version)
|
||||||
|
}
|
||||||
|
if _, err := strconv.ParseInt(updated, 10, 64); err != nil {
|
||||||
|
t.Fatalf("invalid updated_at: %q", updated)
|
||||||
|
}
|
||||||
|
|
||||||
|
if prevUpdated != "" && updated == prevUpdated {
|
||||||
|
if version == prevVersion {
|
||||||
|
t.Fatalf("expected version to change within same second, got %q", version)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
prevVersion = version
|
||||||
|
prevUpdated = updated
|
||||||
|
}
|
||||||
|
t.Fatalf("failed to observe two syncs within the same second")
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user