diff --git a/cmd/server/main.go b/cmd/server/main.go index ba765f5..bd949f4 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -92,7 +92,7 @@ func main() { logger.Info("connected to postgresql successfully") // Auto Migrate - if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.Provider{}, &model.Model{}, &model.LogRecord{}); err != nil { + if err := db.AutoMigrate(&model.Master{}, &model.Key{}, &model.Provider{}, &model.Model{}, &model.Binding{}, &model.LogRecord{}); err != nil { fatal(logger, "failed to auto migrate", "err", err) } diff --git a/internal/model/models.go b/internal/model/models.go index 92814e2..3d6c880 100644 --- a/internal/model/models.go +++ b/internal/model/models.go @@ -15,6 +15,8 @@ type Master struct { MasterKey string `gorm:"size:255" json:"-"` // bcrypt hash of master key MasterKeyDigest string `gorm:"size:64;uniqueIndex" json:"-"` // sha256 digest for lookup Group string `gorm:"size:100;default:'default'" json:"group"` // routing group + DefaultNamespace string `gorm:"size:100;default:'default'" json:"default_namespace"` + Namespaces string `gorm:"size:1024;default:'default'" json:"namespaces"` // Comma-separated namespaces Epoch int64 `gorm:"default:1" json:"epoch"` // used for revocation/rotation Status string `gorm:"size:50;default:'active'" json:"status"` // active, suspended MaxChildKeys int `gorm:"default:5" json:"max_child_keys"` @@ -29,6 +31,8 @@ type Key struct { TokenHash string `gorm:"size:64;uniqueIndex" json:"token_hash"` // sha256 digest of child key Group string `gorm:"size:100;default:'default'" json:"group"` // routing group Scopes string `gorm:"size:1024" json:"scopes"` // Comma-separated scopes + DefaultNamespace string `gorm:"size:100;default:'default'" json:"default_namespace"` + Namespaces string `gorm:"size:1024;default:'default'" json:"namespaces"` // Comma-separated namespaces IssuedAtEpoch int64 `gorm:"not null" json:"issued_at_epoch"` // copy of master epoch at issuance Status string `gorm:"size:50;default:'active'" json:"status"` // active, suspended IssuedBy string `gorm:"size:20;default:'master'" json:"issued_by"` @@ -63,3 +67,15 @@ type Model struct { SupportsFIM bool `json:"supports_fim"` MaxOutputTokens int `json:"max_output_tokens"` } + +// Binding defines a stable "namespace.public_model" routing key and its target RouteGroup + selector. +// RouteGroup currently reuses Provider.Group. +type Binding struct { + gorm.Model + Namespace string `gorm:"size:100;not null;index:idx_binding_key,unique" json:"namespace"` + PublicModel string `gorm:"size:255;not null;index:idx_binding_key,unique" json:"public_model"` + RouteGroup string `gorm:"size:100;not null" json:"route_group"` + SelectorType string `gorm:"size:50;default:'exact'" json:"selector_type"` + SelectorValue string `gorm:"size:255" json:"selector_value"` + Status string `gorm:"size:50;default:'active'" json:"status"` +} diff --git a/internal/service/master.go b/internal/service/master.go index 922ffb1..8ee125f 100644 --- a/internal/service/master.go +++ b/internal/service/master.go @@ -46,6 +46,8 @@ func (s *MasterService) CreateMaster(name, group string, maxChildKeys, globalQPS MasterKey: string(hashedMasterKey), MasterKeyDigest: masterKeyDigest, Group: group, + DefaultNamespace: "default", + Namespaces: "default", MaxChildKeys: maxChildKeys, GlobalQPS: globalQPS, Status: "active", @@ -150,10 +152,18 @@ func (s *MasterService) issueChildKey(masterID uint, group string, scopes string TokenHash: tokenHash, Group: group, Scopes: scopes, + DefaultNamespace: strings.TrimSpace(master.DefaultNamespace), + Namespaces: strings.TrimSpace(master.Namespaces), IssuedAtEpoch: master.Epoch, Status: "active", IssuedBy: strings.TrimSpace(issuedBy), } + if strings.TrimSpace(key.DefaultNamespace) == "" { + key.DefaultNamespace = "default" + } + if strings.TrimSpace(key.Namespaces) == "" { + key.Namespaces = key.DefaultNamespace + } if key.IssuedBy == "" { key.IssuedBy = "master" } diff --git a/internal/service/sync.go b/internal/service/sync.go index 4bc4c3f..dd5f102 100644 --- a/internal/service/sync.go +++ b/internal/service/sync.go @@ -39,6 +39,8 @@ func (s *SyncService) SyncKey(key *model.Key) error { "status": key.Status, "group": key.Group, "scopes": key.Scopes, + "default_namespace": key.DefaultNamespace, + "namespaces": key.Namespaces, } if err := s.rdb.HSet(ctx, fmt.Sprintf("auth:token:%s", tokenHash), fields).Err(); err != nil { return fmt.Errorf("write auth token: %w", err) @@ -179,8 +181,13 @@ func (s *SyncService) SyncAll(db *gorm.DB) error { return fmt.Errorf("load models: %w", err) } + var bindings []model.Binding + if err := db.Find(&bindings).Error; err != nil { + return fmt.Errorf("load bindings: %w", err) + } + pipe := s.rdb.TxPipeline() - pipe.Del(ctx, "config:providers", "config:keys", "meta:models") + pipe.Del(ctx, "config:providers", "config:keys", "meta:models", "config:bindings", "meta:bindings_meta") // Also clear master keys var masterKeys []string iter := s.rdb.Scan(ctx, 0, "auth:master:*", 0).Iterator() @@ -256,6 +263,8 @@ func (s *SyncService) SyncAll(db *gorm.DB) error { "status": k.Status, "group": k.Group, "scopes": k.Scopes, + "default_namespace": k.DefaultNamespace, + "namespaces": k.Namespaces, }) } @@ -285,6 +294,10 @@ func (s *SyncService) SyncAll(db *gorm.DB) error { pipe.HSet(ctx, "meta:models", snap.Name, payload) } + if err := s.writeBindingsSnapshot(ctx, pipe, bindings, providers); err != nil { + return err + } + if _, err := pipe.Exec(ctx); err != nil { return fmt.Errorf("write snapshots: %w", err) } @@ -292,6 +305,124 @@ func (s *SyncService) SyncAll(db *gorm.DB) error { return nil } +// SyncBindings rebuilds the binding snapshot for DP routing. +// This is intentionally a rebuild to avoid stale entries on deletes/updates. +func (s *SyncService) SyncBindings(db *gorm.DB) error { + ctx := context.Background() + + var providers []model.Provider + if err := db.Find(&providers).Error; err != nil { + return fmt.Errorf("load providers: %w", err) + } + var bindings []model.Binding + if err := db.Find(&bindings).Error; err != nil { + return fmt.Errorf("load bindings: %w", err) + } + + pipe := s.rdb.TxPipeline() + pipe.Del(ctx, "config:bindings", "meta:bindings_meta") + if err := s.writeBindingsSnapshot(ctx, pipe, bindings, providers); err != nil { + return err + } + if _, err := pipe.Exec(ctx); err != nil { + return fmt.Errorf("write bindings snapshot: %w", err) + } + return nil +} + +func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipeliner, bindings []model.Binding, providers []model.Provider) error { + // Group providers by route group for selector resolution. + type providerLite struct { + id uint + group string + models []string + } + providersByGroup := make(map[string][]providerLite) + for _, p := range providers { + group := groupx.Normalize(p.Group) + models := strings.Split(p.Models, ",") + var outModels []string + for _, m := range models { + m = strings.TrimSpace(m) + if m != "" { + outModels = append(outModels, m) + } + } + providersByGroup[group] = append(providersByGroup[group], providerLite{ + id: p.ID, + group: group, + models: outModels, + }) + } + + now := time.Now().Unix() + for _, b := range bindings { + if strings.TrimSpace(b.Status) != "" && strings.TrimSpace(b.Status) != "active" { + continue + } + ns := strings.TrimSpace(b.Namespace) + pm := strings.TrimSpace(b.PublicModel) + if ns == "" || pm == "" { + continue + } + rg := groupx.Normalize(b.RouteGroup) + if rg == "" { + continue + } + + snap := struct { + Namespace string `json:"namespace"` + PublicModel string `json:"public_model"` + RouteGroup string `json:"route_group"` + SelectorType string `json:"selector_type,omitempty"` + SelectorValue string `json:"selector_value,omitempty"` + Status string `json:"status,omitempty"` + UpdatedAt int64 `json:"updated_at,omitempty"` + Upstreams map[string]string `json:"upstreams"` + }{ + Namespace: ns, + PublicModel: pm, + RouteGroup: rg, + SelectorType: strings.TrimSpace(b.SelectorType), + SelectorValue: strings.TrimSpace(b.SelectorValue), + Status: "active", + UpdatedAt: now, + Upstreams: make(map[string]string), + } + + selectorType := strings.TrimSpace(b.SelectorType) + selectorValue := strings.TrimSpace(b.SelectorValue) + + for _, p := range providersByGroup[rg] { + up, err := routing.ResolveUpstreamModel(routing.SelectorType(selectorType), selectorValue, pm, p.models) + if err != nil { + continue + } + snap.Upstreams[fmt.Sprintf("%d", p.id)] = up + } + + // Only write bindings that have at least one usable upstream mapping. + if len(snap.Upstreams) == 0 { + continue + } + + key := ns + "." + pm + if err := s.hsetJSON(ctx, "config:bindings", key, snap); err != nil { + return err + } + } + + meta := map[string]string{ + "version": fmt.Sprintf("%d", now), + "updated_at": fmt.Sprintf("%d", now), + "source": "cp_builtin", + } + if err := pipe.HSet(ctx, "meta:bindings_meta", meta).Err(); err != nil { + return fmt.Errorf("write meta:bindings_meta: %w", err) + } + return nil +} + func (s *SyncService) hsetJSON(ctx context.Context, key, field string, val interface{}) error { payload, err := jsoncodec.Marshal(val) if err != nil {