mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
Restructure the provider management system by separating the monolithic Provider model into two distinct entities: - ProviderGroup: defines shared upstream configuration (type, base_url, google settings, models, status) - APIKey: represents individual credentials within a group (api_key, weight, status, auto_ban, ban settings) This change also updates: - Binding model to reference GroupID instead of RouteGroup string - All CRUD handlers for the new provider-group and api-key endpoints - Sync service to rebuild provider snapshots from joined tables - Model registry to aggregate capabilities across group/key pairs - Access handler to validate namespace existence and subset constraints - Migration importer to handle the new schema structure - All related tests to use the new model relationships BREAKING CHANGE: Provider API endpoints replaced with /provider-groups and /api-keys endpoints; Binding.RouteGroup replaced with Binding.GroupID
649 lines
19 KiB
Go
649 lines
19 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/ez-api/ez-api/internal/model"
|
|
groupx "github.com/ez-api/foundation/group"
|
|
"github.com/ez-api/foundation/jsoncodec"
|
|
"github.com/ez-api/foundation/modelcap"
|
|
"github.com/ez-api/foundation/routing"
|
|
"github.com/ez-api/foundation/tokenhash"
|
|
"github.com/redis/go-redis/v9"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type SyncService struct {
|
|
rdb *redis.Client
|
|
}
|
|
|
|
func NewSyncService(rdb *redis.Client) *SyncService {
|
|
return &SyncService{rdb: rdb}
|
|
}
|
|
|
|
// SyncKey writes a single key into Redis without rebuilding the entire snapshot.
|
|
func (s *SyncService) SyncKey(key *model.Key) error {
|
|
ctx := context.Background()
|
|
tokenHash := key.TokenHash
|
|
if strings.TrimSpace(tokenHash) == "" {
|
|
tokenHash = tokenhash.HashToken(key.KeySecret) // backward compatibility
|
|
}
|
|
if strings.TrimSpace(tokenHash) == "" {
|
|
return fmt.Errorf("token hash missing for key %d", key.ID)
|
|
}
|
|
|
|
fields := map[string]interface{}{
|
|
"id": key.ID,
|
|
"master_id": key.MasterID,
|
|
"issued_at_epoch": key.IssuedAtEpoch,
|
|
"status": key.Status,
|
|
"group": key.Group,
|
|
"scopes": key.Scopes,
|
|
"default_namespace": key.DefaultNamespace,
|
|
"namespaces": key.Namespaces,
|
|
"model_limits": strings.TrimSpace(key.ModelLimits),
|
|
"model_limits_enabled": key.ModelLimitsEnabled,
|
|
"expires_at": unixOrZero(key.ExpiresAt),
|
|
"allow_ips": strings.TrimSpace(key.AllowIPs),
|
|
"deny_ips": strings.TrimSpace(key.DenyIPs),
|
|
"last_accessed_at": unixOrZero(key.LastAccessedAt),
|
|
"request_count": key.RequestCount,
|
|
"used_tokens": key.UsedTokens,
|
|
"quota_limit": key.QuotaLimit,
|
|
"quota_used": key.QuotaUsed,
|
|
"quota_reset_at": unixOrZero(key.QuotaResetAt),
|
|
"quota_reset_type": strings.TrimSpace(key.QuotaResetType),
|
|
}
|
|
if err := s.rdb.HSet(ctx, fmt.Sprintf("auth:token:%s", tokenHash), fields).Err(); err != nil {
|
|
return fmt.Errorf("write auth token: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SyncMaster writes master metadata into Redis used by the balancer for validation.
|
|
func (s *SyncService) SyncMaster(master *model.Master) error {
|
|
ctx := context.Background()
|
|
key := fmt.Sprintf("auth:master:%d", master.ID)
|
|
if err := s.rdb.HSet(ctx, key, map[string]interface{}{
|
|
"epoch": master.Epoch,
|
|
"status": master.Status,
|
|
"global_qps": master.GlobalQPS,
|
|
}).Err(); err != nil {
|
|
return fmt.Errorf("write master metadata: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SyncProviders rebuilds provider snapshots from ProviderGroup + APIKey tables.
|
|
func (s *SyncService) SyncProviders(db *gorm.DB) error {
|
|
if db == nil {
|
|
return fmt.Errorf("db required")
|
|
}
|
|
ctx := context.Background()
|
|
|
|
var groups []model.ProviderGroup
|
|
if err := db.Find(&groups).Error; err != nil {
|
|
return fmt.Errorf("load provider groups: %w", err)
|
|
}
|
|
var apiKeys []model.APIKey
|
|
if err := db.Find(&apiKeys).Error; err != nil {
|
|
return fmt.Errorf("load api keys: %w", err)
|
|
}
|
|
|
|
pipe := s.rdb.TxPipeline()
|
|
pipe.Del(ctx, "config:providers")
|
|
if err := s.writeProvidersSnapshot(ctx, pipe, groups, apiKeys); err != nil {
|
|
return err
|
|
}
|
|
if _, err := pipe.Exec(ctx); err != nil {
|
|
return fmt.Errorf("write provider snapshot: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SyncModel writes a single model metadata record.
|
|
func (s *SyncService) SyncModel(m *model.Model) error {
|
|
ctx := context.Background()
|
|
snap := modelcap.Model{
|
|
Name: m.Name,
|
|
Kind: string(modelcap.NormalizeKind(m.Kind)),
|
|
ContextWindow: m.ContextWindow,
|
|
CostPerToken: m.CostPerToken,
|
|
SupportsVision: m.SupportsVision,
|
|
SupportsFunction: m.SupportsFunctions,
|
|
SupportsToolChoice: m.SupportsToolChoice,
|
|
SupportsFim: m.SupportsFIM,
|
|
MaxOutputTokens: m.MaxOutputTokens,
|
|
}.Normalized()
|
|
if err := s.hsetJSON(ctx, "meta:models", snap.Name, snap); err != nil {
|
|
return err
|
|
}
|
|
if err := s.refreshModelsMetaFromRedis(ctx, "db"); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SyncModelDelete removes model metadata from Redis and refreshes meta:models_meta.
|
|
func (s *SyncService) SyncModelDelete(m *model.Model) error {
|
|
if m == nil {
|
|
return fmt.Errorf("model required")
|
|
}
|
|
name := strings.TrimSpace(m.Name)
|
|
if name == "" {
|
|
return fmt.Errorf("model name required")
|
|
}
|
|
ctx := context.Background()
|
|
if err := s.rdb.HDel(ctx, "meta:models", name).Err(); err != nil {
|
|
return fmt.Errorf("delete meta:models: %w", err)
|
|
}
|
|
if err := s.refreshModelsMetaFromRedis(ctx, "db"); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type providerSnapshot struct {
|
|
ID uint `json:"id"`
|
|
Name string `json:"name"`
|
|
Type string `json:"type"`
|
|
BaseURL string `json:"base_url"`
|
|
APIKey string `json:"api_key"`
|
|
GoogleProject string `json:"google_project,omitempty"`
|
|
GoogleLocation string `json:"google_location,omitempty"`
|
|
GroupID uint `json:"group_id,omitempty"`
|
|
Group string `json:"group"`
|
|
Models []string `json:"models"`
|
|
Weight int `json:"weight,omitempty"`
|
|
Status string `json:"status"`
|
|
AutoBan bool `json:"auto_ban"`
|
|
BanReason string `json:"ban_reason,omitempty"`
|
|
BanUntil int64 `json:"ban_until,omitempty"` // unix seconds
|
|
}
|
|
|
|
func (s *SyncService) writeProvidersSnapshot(ctx context.Context, pipe redis.Pipeliner, groups []model.ProviderGroup, apiKeys []model.APIKey) error {
|
|
groupMap := make(map[uint]model.ProviderGroup, len(groups))
|
|
for _, g := range groups {
|
|
groupMap[g.ID] = g
|
|
}
|
|
|
|
for _, k := range apiKeys {
|
|
g, ok := groupMap[k.GroupID]
|
|
if !ok {
|
|
continue
|
|
}
|
|
groupName := groupx.Normalize(g.Name)
|
|
if strings.TrimSpace(groupName) == "" {
|
|
groupName = "default"
|
|
}
|
|
groupStatus := normalizeStatus(g.Status)
|
|
keyStatus := normalizeStatus(k.Status)
|
|
status := keyStatus
|
|
if groupStatus != "" && groupStatus != "active" {
|
|
status = groupStatus
|
|
}
|
|
|
|
rawModels := strings.Split(g.Models, ",")
|
|
var models []string
|
|
for _, m := range rawModels {
|
|
m = strings.TrimSpace(m)
|
|
if m != "" {
|
|
models = append(models, m)
|
|
}
|
|
}
|
|
|
|
name := strings.TrimSpace(g.Name)
|
|
if name == "" {
|
|
name = groupName
|
|
}
|
|
name = fmt.Sprintf("%s#%d", name, k.ID)
|
|
|
|
snap := providerSnapshot{
|
|
ID: k.ID,
|
|
Name: name,
|
|
Type: strings.TrimSpace(g.Type),
|
|
BaseURL: strings.TrimSpace(g.BaseURL),
|
|
APIKey: strings.TrimSpace(k.APIKey),
|
|
GoogleProject: strings.TrimSpace(g.GoogleProject),
|
|
GoogleLocation: strings.TrimSpace(g.GoogleLocation),
|
|
GroupID: g.ID,
|
|
Group: groupName,
|
|
Models: models,
|
|
Weight: k.Weight,
|
|
Status: status,
|
|
AutoBan: k.AutoBan,
|
|
BanReason: strings.TrimSpace(k.BanReason),
|
|
}
|
|
if k.BanUntil != nil {
|
|
snap.BanUntil = k.BanUntil.UTC().Unix()
|
|
}
|
|
|
|
payload, err := jsoncodec.Marshal(snap)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal provider %d: %w", k.ID, err)
|
|
}
|
|
pipe.HSet(ctx, "config:providers", fmt.Sprintf("%d", k.ID), payload)
|
|
|
|
// Legacy route table maintenance for compatibility.
|
|
for _, m := range models {
|
|
if m == "" {
|
|
continue
|
|
}
|
|
if snap.Status != "active" {
|
|
continue
|
|
}
|
|
if snap.BanUntil > 0 && time.Now().Unix() < snap.BanUntil {
|
|
continue
|
|
}
|
|
routeKey := fmt.Sprintf("route:group:%s:%s", groupName, m)
|
|
pipe.SAdd(ctx, routeKey, k.ID)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// keySnapshot is no longer needed as we write directly to auth:token:*
|
|
|
|
// SyncAll rebuilds Redis hashes from the database; use for cold starts or forced refreshes.
|
|
func (s *SyncService) SyncAll(db *gorm.DB) error {
|
|
ctx := context.Background()
|
|
|
|
var groups []model.ProviderGroup
|
|
if err := db.Find(&groups).Error; err != nil {
|
|
return fmt.Errorf("load provider groups: %w", err)
|
|
}
|
|
var apiKeys []model.APIKey
|
|
if err := db.Find(&apiKeys).Error; err != nil {
|
|
return fmt.Errorf("load api keys: %w", err)
|
|
}
|
|
|
|
var keys []model.Key
|
|
if err := db.Find(&keys).Error; err != nil {
|
|
return fmt.Errorf("load keys: %w", err)
|
|
}
|
|
|
|
var masters []model.Master
|
|
if err := db.Find(&masters).Error; err != nil {
|
|
return fmt.Errorf("load masters: %w", err)
|
|
}
|
|
|
|
var models []model.Model
|
|
if err := db.Find(&models).Error; err != nil {
|
|
return fmt.Errorf("load models: %w", err)
|
|
}
|
|
var modelsPayloads map[string]string
|
|
|
|
var bindings []model.Binding
|
|
if err := db.Find(&bindings).Error; err != nil {
|
|
return fmt.Errorf("load bindings: %w", err)
|
|
}
|
|
|
|
pipe := s.rdb.TxPipeline()
|
|
pipe.Del(ctx, "config:providers", "config:keys", "meta:models", "meta:models_meta", "config:bindings", "meta:bindings_meta")
|
|
// Also clear master keys
|
|
var masterKeys []string
|
|
iter := s.rdb.Scan(ctx, 0, "auth:master:*", 0).Iterator()
|
|
for iter.Next(ctx) {
|
|
masterKeys = append(masterKeys, iter.Val())
|
|
}
|
|
if err := iter.Err(); err != nil {
|
|
return fmt.Errorf("scan master keys: %w", err)
|
|
}
|
|
if len(masterKeys) > 0 {
|
|
pipe.Del(ctx, masterKeys...)
|
|
}
|
|
|
|
if err := s.writeProvidersSnapshot(ctx, pipe, groups, apiKeys); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, k := range keys {
|
|
tokenHash := strings.TrimSpace(k.TokenHash)
|
|
if tokenHash == "" {
|
|
tokenHash = tokenhash.HashToken(k.KeySecret) // fallback for legacy rows
|
|
}
|
|
if tokenHash == "" {
|
|
return fmt.Errorf("token hash missing for key %d", k.ID)
|
|
}
|
|
pipe.HSet(ctx, fmt.Sprintf("auth:token:%s", tokenHash), map[string]interface{}{
|
|
"id": k.ID,
|
|
"master_id": k.MasterID,
|
|
"issued_at_epoch": k.IssuedAtEpoch,
|
|
"status": k.Status,
|
|
"group": k.Group,
|
|
"scopes": k.Scopes,
|
|
"default_namespace": k.DefaultNamespace,
|
|
"namespaces": k.Namespaces,
|
|
"model_limits": strings.TrimSpace(k.ModelLimits),
|
|
"model_limits_enabled": k.ModelLimitsEnabled,
|
|
"expires_at": unixOrZero(k.ExpiresAt),
|
|
"allow_ips": strings.TrimSpace(k.AllowIPs),
|
|
"deny_ips": strings.TrimSpace(k.DenyIPs),
|
|
"last_accessed_at": unixOrZero(k.LastAccessedAt),
|
|
"request_count": k.RequestCount,
|
|
"used_tokens": k.UsedTokens,
|
|
"quota_limit": k.QuotaLimit,
|
|
"quota_used": k.QuotaUsed,
|
|
"quota_reset_at": unixOrZero(k.QuotaResetAt),
|
|
"quota_reset_type": strings.TrimSpace(k.QuotaResetType),
|
|
})
|
|
}
|
|
|
|
for _, m := range masters {
|
|
pipe.HSet(ctx, fmt.Sprintf("auth:master:%d", m.ID), map[string]interface{}{
|
|
"epoch": m.Epoch,
|
|
"status": m.Status,
|
|
"global_qps": m.GlobalQPS,
|
|
})
|
|
}
|
|
|
|
for _, m := range models {
|
|
snap := modelcap.Model{
|
|
Name: m.Name,
|
|
Kind: string(modelcap.NormalizeKind(m.Kind)),
|
|
ContextWindow: m.ContextWindow,
|
|
CostPerToken: m.CostPerToken,
|
|
SupportsVision: m.SupportsVision,
|
|
SupportsFunction: m.SupportsFunctions,
|
|
SupportsToolChoice: m.SupportsToolChoice,
|
|
SupportsFim: m.SupportsFIM,
|
|
MaxOutputTokens: m.MaxOutputTokens,
|
|
}.Normalized()
|
|
payload, err := jsoncodec.Marshal(snap)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal model %s: %w", m.Name, err)
|
|
}
|
|
// Capture payloads so we can compute deterministic checksum for meta:models_meta.
|
|
if modelsPayloads == nil {
|
|
modelsPayloads = make(map[string]string, len(models))
|
|
}
|
|
modelsPayloads[snap.Name] = string(payload)
|
|
pipe.HSet(ctx, "meta:models", snap.Name, payload)
|
|
}
|
|
|
|
now := time.Now().Unix()
|
|
if err := writeModelsMeta(ctx, pipe, modelcap.Meta{
|
|
Version: fmt.Sprintf("%d", now),
|
|
UpdatedAt: fmt.Sprintf("%d", now),
|
|
Source: "db",
|
|
Checksum: modelcap.ChecksumFromPayloads(modelsPayloads),
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.writeBindingsSnapshot(ctx, pipe, bindings, groups, apiKeys); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := pipe.Exec(ctx); err != nil {
|
|
return fmt.Errorf("write snapshots: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SyncBindings rebuilds the binding snapshot for DP routing.
|
|
// This is intentionally a rebuild to avoid stale entries on deletes/updates.
|
|
func (s *SyncService) SyncBindings(db *gorm.DB) error {
|
|
ctx := context.Background()
|
|
|
|
var groups []model.ProviderGroup
|
|
if err := db.Find(&groups).Error; err != nil {
|
|
return fmt.Errorf("load provider groups: %w", err)
|
|
}
|
|
var apiKeys []model.APIKey
|
|
if err := db.Find(&apiKeys).Error; err != nil {
|
|
return fmt.Errorf("load api keys: %w", err)
|
|
}
|
|
var bindings []model.Binding
|
|
if err := db.Find(&bindings).Error; err != nil {
|
|
return fmt.Errorf("load bindings: %w", err)
|
|
}
|
|
|
|
pipe := s.rdb.TxPipeline()
|
|
pipe.Del(ctx, "config:bindings", "meta:bindings_meta")
|
|
if err := s.writeBindingsSnapshot(ctx, pipe, bindings, groups, apiKeys); err != nil {
|
|
return err
|
|
}
|
|
if _, err := pipe.Exec(ctx); err != nil {
|
|
return fmt.Errorf("write bindings snapshot: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipeliner, bindings []model.Binding, groups []model.ProviderGroup, apiKeys []model.APIKey) error {
|
|
type groupLite struct {
|
|
id uint
|
|
name string
|
|
ptype string
|
|
baseURL string
|
|
googleProject string
|
|
googleLoc string
|
|
models []string
|
|
status string
|
|
}
|
|
groupsByID := make(map[uint]groupLite, len(groups))
|
|
for _, g := range groups {
|
|
rawModels := strings.Split(g.Models, ",")
|
|
var outModels []string
|
|
for _, m := range rawModels {
|
|
m = strings.TrimSpace(m)
|
|
if m != "" {
|
|
outModels = append(outModels, m)
|
|
}
|
|
}
|
|
groupsByID[g.ID] = groupLite{
|
|
id: g.ID,
|
|
name: groupx.Normalize(g.Name),
|
|
ptype: strings.TrimSpace(g.Type),
|
|
baseURL: strings.TrimSpace(g.BaseURL),
|
|
googleProject: strings.TrimSpace(g.GoogleProject),
|
|
googleLoc: strings.TrimSpace(g.GoogleLocation),
|
|
models: outModels,
|
|
status: normalizeStatus(g.Status),
|
|
}
|
|
}
|
|
|
|
type apiKeyLite struct {
|
|
id uint
|
|
groupID uint
|
|
status string
|
|
weight int
|
|
autoBan bool
|
|
banUntil *time.Time
|
|
}
|
|
keysByGroup := make(map[uint][]apiKeyLite)
|
|
for _, k := range apiKeys {
|
|
keysByGroup[k.GroupID] = append(keysByGroup[k.GroupID], apiKeyLite{
|
|
id: k.ID,
|
|
groupID: k.GroupID,
|
|
status: normalizeStatus(k.Status),
|
|
weight: k.Weight,
|
|
autoBan: k.AutoBan,
|
|
banUntil: k.BanUntil,
|
|
})
|
|
}
|
|
|
|
type bindingAgg struct {
|
|
snap routing.BindingSnapshot
|
|
}
|
|
snaps := make(map[string]*routing.BindingSnapshot)
|
|
now := time.Now().Unix()
|
|
|
|
for _, b := range bindings {
|
|
if strings.TrimSpace(b.Status) != "" && strings.TrimSpace(b.Status) != "active" {
|
|
continue
|
|
}
|
|
ns := strings.TrimSpace(b.Namespace)
|
|
pm := strings.TrimSpace(b.PublicModel)
|
|
if ns == "" || pm == "" {
|
|
continue
|
|
}
|
|
group, ok := groupsByID[b.GroupID]
|
|
if !ok {
|
|
continue
|
|
}
|
|
if group.status != "" && group.status != "active" {
|
|
continue
|
|
}
|
|
|
|
key := ns + "." + pm
|
|
snap := snaps[key]
|
|
if snap == nil {
|
|
snap = &routing.BindingSnapshot{
|
|
Namespace: ns,
|
|
PublicModel: pm,
|
|
Status: "active",
|
|
UpdatedAt: now,
|
|
}
|
|
snaps[key] = snap
|
|
}
|
|
|
|
candidate := routing.BindingCandidate{
|
|
GroupID: group.id,
|
|
RouteGroup: group.name,
|
|
Weight: normalizeWeight(b.Weight),
|
|
SelectorType: strings.TrimSpace(b.SelectorType),
|
|
SelectorValue: strings.TrimSpace(b.SelectorValue),
|
|
Status: "active",
|
|
Upstreams: make(map[string]string),
|
|
}
|
|
|
|
selectorType := strings.TrimSpace(b.SelectorType)
|
|
selectorValue := strings.TrimSpace(b.SelectorValue)
|
|
keys := keysByGroup[b.GroupID]
|
|
if len(keys) == 0 {
|
|
candidate.Error = "no_provider"
|
|
}
|
|
|
|
nowUnix := time.Now().Unix()
|
|
for _, k := range keys {
|
|
if k.status != "" && k.status != "active" {
|
|
continue
|
|
}
|
|
if k.banUntil != nil && k.banUntil.UTC().Unix() > nowUnix {
|
|
continue
|
|
}
|
|
up, err := routing.ResolveUpstreamModel(routing.SelectorType(selectorType), selectorValue, pm, group.models)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
candidate.Upstreams[fmt.Sprintf("%d", k.id)] = up
|
|
}
|
|
if len(candidate.Upstreams) == 0 && candidate.Error == "" {
|
|
candidate.Error = "config_error"
|
|
}
|
|
|
|
snap.Candidates = append(snap.Candidates, candidate)
|
|
}
|
|
|
|
for key, snap := range snaps {
|
|
payload, err := jsoncodec.Marshal(snap)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal config:bindings:%s: %w", key, err)
|
|
}
|
|
pipe.HSet(ctx, "config:bindings", key, payload)
|
|
}
|
|
|
|
meta := map[string]string{
|
|
"version": fmt.Sprintf("%d", now),
|
|
"updated_at": fmt.Sprintf("%d", now),
|
|
"source": "cp_builtin",
|
|
}
|
|
if err := pipe.HSet(ctx, "meta:bindings_meta", meta).Err(); err != nil {
|
|
return fmt.Errorf("write meta:bindings_meta: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *SyncService) hsetJSON(ctx context.Context, key, field string, val interface{}) error {
|
|
payload, err := jsoncodec.Marshal(val)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal %s:%s: %w", key, field, err)
|
|
}
|
|
if err := s.rdb.HSet(ctx, key, field, payload).Err(); err != nil {
|
|
return fmt.Errorf("write %s:%s: %w", key, field, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func normalizeWeight(weight int) int {
|
|
if weight <= 0 {
|
|
return 1
|
|
}
|
|
return weight
|
|
}
|
|
|
|
func normalizeStatus(status string) string {
|
|
st := strings.ToLower(strings.TrimSpace(status))
|
|
if st == "" {
|
|
return "active"
|
|
}
|
|
switch st {
|
|
case "active", "auto_disabled", "manual_disabled":
|
|
return st
|
|
default:
|
|
return st
|
|
}
|
|
}
|
|
|
|
func unixOrZero(t *time.Time) int64 {
|
|
if t == nil {
|
|
return 0
|
|
}
|
|
return t.UTC().Unix()
|
|
}
|
|
|
|
func (s *SyncService) refreshModelsMetaFromRedis(ctx context.Context, source string) error {
|
|
raw, err := s.rdb.HGetAll(ctx, "meta:models").Result()
|
|
if err != nil {
|
|
return fmt.Errorf("read meta:models: %w", err)
|
|
}
|
|
now := time.Now().Unix()
|
|
meta := map[string]interface{}{
|
|
"version": fmt.Sprintf("%d", now),
|
|
"updated_at": fmt.Sprintf("%d", now),
|
|
"source": source,
|
|
"checksum": modelcap.ChecksumFromPayloads(raw),
|
|
}
|
|
if err := s.rdb.HSet(ctx, "meta:models_meta", meta).Err(); err != nil {
|
|
return fmt.Errorf("write meta:models_meta: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func writeModelsMeta(ctx context.Context, pipe redis.Pipeliner, meta modelcap.Meta) error {
|
|
fields := map[string]string{
|
|
"version": strings.TrimSpace(meta.Version),
|
|
"updated_at": strings.TrimSpace(meta.UpdatedAt),
|
|
"source": strings.TrimSpace(meta.Source),
|
|
"checksum": strings.TrimSpace(meta.Checksum),
|
|
"upstream_url": strings.TrimSpace(meta.UpstreamURL),
|
|
"upstream_ref": strings.TrimSpace(meta.UpstreamRef),
|
|
}
|
|
for k, v := range fields {
|
|
if v == "" {
|
|
delete(fields, k)
|
|
}
|
|
}
|
|
if fields["version"] == "" {
|
|
fields["version"] = fmt.Sprintf("%d", time.Now().Unix())
|
|
}
|
|
if fields["updated_at"] == "" {
|
|
fields["updated_at"] = fmt.Sprintf("%d", time.Now().Unix())
|
|
}
|
|
if fields["source"] == "" {
|
|
fields["source"] = "db"
|
|
}
|
|
if fields["checksum"] == "" {
|
|
fields["checksum"] = "unknown"
|
|
}
|
|
if err := pipe.HSet(ctx, "meta:models_meta", fields).Err(); err != nil {
|
|
return fmt.Errorf("write meta:models_meta: %w", err)
|
|
}
|
|
return nil
|
|
}
|