package service import ( "context" "fmt" "strings" "time" "github.com/ez-api/ez-api/internal/model" groupx "github.com/ez-api/foundation/group" "github.com/ez-api/foundation/jsoncodec" "github.com/ez-api/foundation/tokenhash" "github.com/redis/go-redis/v9" "gorm.io/gorm" ) type SyncService struct { rdb *redis.Client } func NewSyncService(rdb *redis.Client) *SyncService { return &SyncService{rdb: rdb} } // SyncKey writes a single key into Redis without rebuilding the entire snapshot. func (s *SyncService) SyncKey(key *model.Key) error { ctx := context.Background() tokenHash := key.TokenHash if strings.TrimSpace(tokenHash) == "" { tokenHash = tokenhash.HashToken(key.KeySecret) // backward compatibility } if strings.TrimSpace(tokenHash) == "" { return fmt.Errorf("token hash missing for key %d", key.ID) } fields := map[string]interface{}{ "master_id": key.MasterID, "issued_at_epoch": key.IssuedAtEpoch, "status": key.Status, "group": key.Group, "scopes": key.Scopes, "default_namespace": key.DefaultNamespace, "namespaces": key.Namespaces, } if err := s.rdb.HSet(ctx, fmt.Sprintf("auth:token:%s", tokenHash), fields).Err(); err != nil { return fmt.Errorf("write auth token: %w", err) } return nil } // SyncMaster writes master metadata into Redis used by the balancer for validation. func (s *SyncService) SyncMaster(master *model.Master) error { ctx := context.Background() key := fmt.Sprintf("auth:master:%d", master.ID) if err := s.rdb.HSet(ctx, key, map[string]interface{}{ "epoch": master.Epoch, "status": master.Status, "global_qps": master.GlobalQPS, }).Err(); err != nil { return fmt.Errorf("write master metadata: %w", err) } return nil } // SyncProvider writes a single provider into Redis hash storage and updates routing tables. func (s *SyncService) SyncProvider(provider *model.Provider) error { ctx := context.Background() group := groupx.Normalize(provider.Group) models := strings.Split(provider.Models, ",") snap := providerSnapshot{ ID: provider.ID, Name: provider.Name, Type: provider.Type, BaseURL: provider.BaseURL, APIKey: provider.APIKey, GoogleProject: provider.GoogleProject, GoogleLocation: provider.GoogleLocation, Group: group, Models: models, Status: normalizeStatus(provider.Status), AutoBan: provider.AutoBan, BanReason: provider.BanReason, } if provider.BanUntil != nil { snap.BanUntil = provider.BanUntil.UTC().Unix() } // 1. Update Provider Config if err := s.hsetJSON(ctx, "config:providers", fmt.Sprintf("%d", provider.ID), snap); err != nil { return err } // 2. Update Routing Table: route:group:{group}:{model} -> Set(provider_id) // Note: This is an additive operation. Removing models requires full sync or smarter logic. pipe := s.rdb.Pipeline() for _, m := range models { m = strings.TrimSpace(m) if m == "" { continue } if snap.Status != "active" { continue } if snap.BanUntil > 0 && time.Now().Unix() < snap.BanUntil { continue } routeKey := fmt.Sprintf("route:group:%s:%s", group, m) pipe.SAdd(ctx, routeKey, provider.ID) } _, err := pipe.Exec(ctx) return err } // SyncModel writes a single model metadata record. func (s *SyncService) SyncModel(m *model.Model) error { ctx := context.Background() snap := modelSnapshot{ Name: m.Name, ContextWindow: m.ContextWindow, CostPerToken: m.CostPerToken, SupportsVision: m.SupportsVision, SupportsFunction: m.SupportsFunctions, SupportsToolChoice: m.SupportsToolChoice, SupportsFIM: m.SupportsFIM, MaxOutputTokens: m.MaxOutputTokens, } return s.hsetJSON(ctx, "meta:models", snap.Name, snap) } type providerSnapshot struct { ID uint `json:"id"` Name string `json:"name"` Type string `json:"type"` BaseURL string `json:"base_url"` APIKey string `json:"api_key"` GoogleProject string `json:"google_project,omitempty"` GoogleLocation string `json:"google_location,omitempty"` Group string `json:"group"` Models []string `json:"models"` Status string `json:"status"` AutoBan bool `json:"auto_ban"` BanReason string `json:"ban_reason,omitempty"` BanUntil int64 `json:"ban_until,omitempty"` // unix seconds } // keySnapshot is no longer needed as we write directly to auth:token:* type modelSnapshot struct { Name string `json:"name"` ContextWindow int `json:"context_window"` CostPerToken float64 `json:"cost_per_token"` SupportsVision bool `json:"supports_vision"` SupportsFunction bool `json:"supports_functions"` SupportsToolChoice bool `json:"supports_tool_choice"` SupportsFIM bool `json:"supports_fim"` MaxOutputTokens int `json:"max_output_tokens"` } // SyncAll rebuilds Redis hashes from the database; use for cold starts or forced refreshes. func (s *SyncService) SyncAll(db *gorm.DB) error { ctx := context.Background() var providers []model.Provider if err := db.Find(&providers).Error; err != nil { return fmt.Errorf("load providers: %w", err) } var keys []model.Key if err := db.Find(&keys).Error; err != nil { return fmt.Errorf("load keys: %w", err) } var masters []model.Master if err := db.Find(&masters).Error; err != nil { return fmt.Errorf("load masters: %w", err) } var models []model.Model if err := db.Find(&models).Error; err != nil { return fmt.Errorf("load models: %w", err) } var bindings []model.Binding if err := db.Find(&bindings).Error; err != nil { return fmt.Errorf("load bindings: %w", err) } pipe := s.rdb.TxPipeline() pipe.Del(ctx, "config:providers", "config:keys", "meta:models", "config:bindings", "meta:bindings_meta") // Also clear master keys var masterKeys []string iter := s.rdb.Scan(ctx, 0, "auth:master:*", 0).Iterator() for iter.Next(ctx) { masterKeys = append(masterKeys, iter.Val()) } if err := iter.Err(); err != nil { return fmt.Errorf("scan master keys: %w", err) } if len(masterKeys) > 0 { pipe.Del(ctx, masterKeys...) } // Clear old routing tables (pattern scan would be better in prod, but keys are predictable if we knew them) // For MVP, we rely on the fact that we are rebuilding. // Ideally, we should scan "route:group:*" and del, but let's just rebuild. for _, p := range providers { group := groupx.Normalize(p.Group) models := strings.Split(p.Models, ",") snap := providerSnapshot{ ID: p.ID, Name: p.Name, Type: p.Type, BaseURL: p.BaseURL, APIKey: p.APIKey, GoogleProject: p.GoogleProject, GoogleLocation: p.GoogleLocation, Group: group, Models: models, Status: normalizeStatus(p.Status), AutoBan: p.AutoBan, BanReason: p.BanReason, } if p.BanUntil != nil { snap.BanUntil = p.BanUntil.UTC().Unix() } payload, err := jsoncodec.Marshal(snap) if err != nil { return fmt.Errorf("marshal provider %d: %w", p.ID, err) } pipe.HSet(ctx, "config:providers", fmt.Sprintf("%d", p.ID), payload) // Rebuild Routing Table for _, m := range models { m = strings.TrimSpace(m) if m == "" { continue } if snap.Status != "active" { continue } if snap.BanUntil > 0 && time.Now().Unix() < snap.BanUntil { continue } routeKey := fmt.Sprintf("route:group:%s:%s", group, m) pipe.SAdd(ctx, routeKey, p.ID) } } for _, k := range keys { tokenHash := strings.TrimSpace(k.TokenHash) if tokenHash == "" { tokenHash = tokenhash.HashToken(k.KeySecret) // fallback for legacy rows } if tokenHash == "" { return fmt.Errorf("token hash missing for key %d", k.ID) } pipe.HSet(ctx, fmt.Sprintf("auth:token:%s", tokenHash), map[string]interface{}{ "master_id": k.MasterID, "issued_at_epoch": k.IssuedAtEpoch, "status": k.Status, "group": k.Group, "scopes": k.Scopes, "default_namespace": k.DefaultNamespace, "namespaces": k.Namespaces, }) } for _, m := range masters { pipe.HSet(ctx, fmt.Sprintf("auth:master:%d", m.ID), map[string]interface{}{ "epoch": m.Epoch, "status": m.Status, "global_qps": m.GlobalQPS, }) } for _, m := range models { snap := modelSnapshot{ Name: m.Name, ContextWindow: m.ContextWindow, CostPerToken: m.CostPerToken, SupportsVision: m.SupportsVision, SupportsFunction: m.SupportsFunctions, SupportsToolChoice: m.SupportsToolChoice, SupportsFIM: m.SupportsFIM, MaxOutputTokens: m.MaxOutputTokens, } payload, err := jsoncodec.Marshal(snap) if err != nil { return fmt.Errorf("marshal model %s: %w", m.Name, err) } pipe.HSet(ctx, "meta:models", snap.Name, payload) } if err := s.writeBindingsSnapshot(ctx, pipe, bindings, providers); err != nil { return err } if _, err := pipe.Exec(ctx); err != nil { return fmt.Errorf("write snapshots: %w", err) } return nil } // SyncBindings rebuilds the binding snapshot for DP routing. // This is intentionally a rebuild to avoid stale entries on deletes/updates. func (s *SyncService) SyncBindings(db *gorm.DB) error { ctx := context.Background() var providers []model.Provider if err := db.Find(&providers).Error; err != nil { return fmt.Errorf("load providers: %w", err) } var bindings []model.Binding if err := db.Find(&bindings).Error; err != nil { return fmt.Errorf("load bindings: %w", err) } pipe := s.rdb.TxPipeline() pipe.Del(ctx, "config:bindings", "meta:bindings_meta") if err := s.writeBindingsSnapshot(ctx, pipe, bindings, providers); err != nil { return err } if _, err := pipe.Exec(ctx); err != nil { return fmt.Errorf("write bindings snapshot: %w", err) } return nil } func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipeliner, bindings []model.Binding, providers []model.Provider) error { // Group providers by route group for selector resolution. type providerLite struct { id uint group string models []string } providersByGroup := make(map[string][]providerLite) for _, p := range providers { group := groupx.Normalize(p.Group) models := strings.Split(p.Models, ",") var outModels []string for _, m := range models { m = strings.TrimSpace(m) if m != "" { outModels = append(outModels, m) } } providersByGroup[group] = append(providersByGroup[group], providerLite{ id: p.ID, group: group, models: outModels, }) } now := time.Now().Unix() for _, b := range bindings { if strings.TrimSpace(b.Status) != "" && strings.TrimSpace(b.Status) != "active" { continue } ns := strings.TrimSpace(b.Namespace) pm := strings.TrimSpace(b.PublicModel) if ns == "" || pm == "" { continue } rg := groupx.Normalize(b.RouteGroup) if rg == "" { continue } snap := struct { Namespace string `json:"namespace"` PublicModel string `json:"public_model"` RouteGroup string `json:"route_group"` SelectorType string `json:"selector_type,omitempty"` SelectorValue string `json:"selector_value,omitempty"` Status string `json:"status,omitempty"` UpdatedAt int64 `json:"updated_at,omitempty"` Upstreams map[string]string `json:"upstreams"` }{ Namespace: ns, PublicModel: pm, RouteGroup: rg, SelectorType: strings.TrimSpace(b.SelectorType), SelectorValue: strings.TrimSpace(b.SelectorValue), Status: "active", UpdatedAt: now, Upstreams: make(map[string]string), } selectorType := strings.TrimSpace(b.SelectorType) selectorValue := strings.TrimSpace(b.SelectorValue) for _, p := range providersByGroup[rg] { up, err := routing.ResolveUpstreamModel(routing.SelectorType(selectorType), selectorValue, pm, p.models) if err != nil { continue } snap.Upstreams[fmt.Sprintf("%d", p.id)] = up } // Only write bindings that have at least one usable upstream mapping. if len(snap.Upstreams) == 0 { continue } key := ns + "." + pm if err := s.hsetJSON(ctx, "config:bindings", key, snap); err != nil { return err } } meta := map[string]string{ "version": fmt.Sprintf("%d", now), "updated_at": fmt.Sprintf("%d", now), "source": "cp_builtin", } if err := pipe.HSet(ctx, "meta:bindings_meta", meta).Err(); err != nil { return fmt.Errorf("write meta:bindings_meta: %w", err) } return nil } func (s *SyncService) hsetJSON(ctx context.Context, key, field string, val interface{}) error { payload, err := jsoncodec.Marshal(val) if err != nil { return fmt.Errorf("marshal %s:%s: %w", key, field, err) } if err := s.rdb.HSet(ctx, key, field, payload).Err(); err != nil { return fmt.Errorf("write %s:%s: %w", key, field, err) } return nil } func normalizeStatus(status string) string { st := strings.ToLower(strings.TrimSpace(status)) if st == "" { return "active" } switch st { case "active", "auto_disabled", "manual_disabled": return st default: return st } }