mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
feat(service): add provider metadata tracking
Write 'meta:providers_meta' to Redis during provider synchronization, including version, timestamp, and configuration checksum. This aligns provider sync with model metadata handling and enables better cache invalidation.
This commit is contained in:
@@ -176,8 +176,12 @@ func (s *SyncService) SyncProvidersNow(ctx context.Context, db *gorm.DB) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pipe := s.rdb.TxPipeline()
|
pipe := s.rdb.TxPipeline()
|
||||||
pipe.Del(ctx, "config:providers")
|
pipe.Del(ctx, "config:providers", "meta:providers_meta")
|
||||||
if err := s.writeProvidersSnapshot(ctx, pipe, groups, apiKeys); err != nil {
|
payloads, err := s.writeProvidersSnapshot(ctx, pipe, groups, apiKeys)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := writeProvidersMeta(ctx, pipe, payloads); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err := pipe.Exec(ctx); err != nil {
|
if _, err := pipe.Exec(ctx); err != nil {
|
||||||
@@ -296,12 +300,13 @@ type providerSnapshot struct {
|
|||||||
BanUntil int64 `json:"ban_until,omitempty"` // unix seconds
|
BanUntil int64 `json:"ban_until,omitempty"` // unix seconds
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SyncService) writeProvidersSnapshot(ctx context.Context, pipe redis.Pipeliner, groups []model.ProviderGroup, apiKeys []model.APIKey) error {
|
func (s *SyncService) writeProvidersSnapshot(ctx context.Context, pipe redis.Pipeliner, groups []model.ProviderGroup, apiKeys []model.APIKey) (map[string]string, error) {
|
||||||
groupMap := make(map[uint]model.ProviderGroup, len(groups))
|
groupMap := make(map[uint]model.ProviderGroup, len(groups))
|
||||||
for _, g := range groups {
|
for _, g := range groups {
|
||||||
groupMap[g.ID] = g
|
groupMap[g.ID] = g
|
||||||
}
|
}
|
||||||
|
|
||||||
|
payloads := make(map[string]string, len(apiKeys))
|
||||||
for _, k := range apiKeys {
|
for _, k := range apiKeys {
|
||||||
g, ok := groupMap[k.GroupID]
|
g, ok := groupMap[k.GroupID]
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -363,11 +368,13 @@ func (s *SyncService) writeProvidersSnapshot(ctx context.Context, pipe redis.Pip
|
|||||||
|
|
||||||
payload, err := jsoncodec.Marshal(snap)
|
payload, err := jsoncodec.Marshal(snap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("marshal provider %d: %w", k.ID, err)
|
return nil, fmt.Errorf("marshal provider %d: %w", k.ID, err)
|
||||||
}
|
}
|
||||||
pipe.HSet(ctx, "config:providers", fmt.Sprintf("%d", k.ID), payload)
|
key := fmt.Sprintf("%d", k.ID)
|
||||||
|
pipe.HSet(ctx, "config:providers", key, payload)
|
||||||
|
payloads[key] = string(payload)
|
||||||
}
|
}
|
||||||
return nil
|
return payloads, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// keySnapshot is no longer needed as we write directly to auth:token:*
|
// keySnapshot is no longer needed as we write directly to auth:token:*
|
||||||
@@ -423,7 +430,7 @@ func (s *SyncService) SyncAllNow(ctx context.Context, db *gorm.DB) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pipe := s.rdb.TxPipeline()
|
pipe := s.rdb.TxPipeline()
|
||||||
pipe.Del(ctx, "config:providers", "config:keys", "meta:models", "meta:models_meta", "config:bindings", "meta:bindings_meta")
|
pipe.Del(ctx, "config:providers", "meta:providers_meta", "config:keys", "meta:models", "meta:models_meta", "config:bindings", "meta:bindings_meta")
|
||||||
// Also clear master keys
|
// Also clear master keys
|
||||||
var masterKeys []string
|
var masterKeys []string
|
||||||
iter := s.rdb.Scan(ctx, 0, "auth:master:*", 0).Iterator()
|
iter := s.rdb.Scan(ctx, 0, "auth:master:*", 0).Iterator()
|
||||||
@@ -437,7 +444,11 @@ func (s *SyncService) SyncAllNow(ctx context.Context, db *gorm.DB) error {
|
|||||||
pipe.Del(ctx, masterKeys...)
|
pipe.Del(ctx, masterKeys...)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.writeProvidersSnapshot(ctx, pipe, groups, apiKeys); err != nil {
|
providersPayloads, err := s.writeProvidersSnapshot(ctx, pipe, groups, apiKeys)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := writeProvidersMeta(ctx, pipe, providersPayloads); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -839,3 +850,17 @@ func writeModelsMeta(ctx context.Context, pipe redis.Pipeliner, meta modelcap.Me
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func writeProvidersMeta(ctx context.Context, pipe redis.Pipeliner, payloads map[string]string) error {
|
||||||
|
now := time.Now()
|
||||||
|
fields := map[string]string{
|
||||||
|
"version": fmt.Sprintf("%d", now.UnixNano()),
|
||||||
|
"updated_at": fmt.Sprintf("%d", now.Unix()),
|
||||||
|
"source": "db",
|
||||||
|
"checksum": modelcap.ChecksumFromPayloads(payloads),
|
||||||
|
}
|
||||||
|
if err := pipe.HSet(ctx, "meta:providers_meta", fields).Err(); err != nil {
|
||||||
|
return fmt.Errorf("write meta:providers_meta: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -61,6 +61,16 @@ func TestSyncProviders_WritesSnapshot(t *testing.T) {
|
|||||||
if snap["group"] != "default" {
|
if snap["group"] != "default" {
|
||||||
t.Fatalf("expected group default, got %#v", snap["group"])
|
t.Fatalf("expected group default, got %#v", snap["group"])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if v := mr.HGet("meta:providers_meta", "version"); v == "" {
|
||||||
|
t.Fatalf("expected meta:providers_meta.version")
|
||||||
|
}
|
||||||
|
if v := mr.HGet("meta:providers_meta", "updated_at"); v == "" {
|
||||||
|
t.Fatalf("expected meta:providers_meta.updated_at")
|
||||||
|
}
|
||||||
|
if v := mr.HGet("meta:providers_meta", "checksum"); v == "" {
|
||||||
|
t.Fatalf("expected meta:providers_meta.checksum")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncKey_WritesTokenID(t *testing.T) {
|
func TestSyncKey_WritesTokenID(t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user