mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
Add endpoints for master and key access management to configure default and allowed namespaces, including propagation options. Implement GET and DELETE operations for individual bindings. Update sync service to persist bindings snapshots even when no upstreams are available.
447 lines
13 KiB
Go
447 lines
13 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/ez-api/ez-api/internal/model"
|
|
groupx "github.com/ez-api/foundation/group"
|
|
"github.com/ez-api/foundation/jsoncodec"
|
|
"github.com/ez-api/foundation/routing"
|
|
"github.com/ez-api/foundation/tokenhash"
|
|
"github.com/redis/go-redis/v9"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type SyncService struct {
|
|
rdb *redis.Client
|
|
}
|
|
|
|
func NewSyncService(rdb *redis.Client) *SyncService {
|
|
return &SyncService{rdb: rdb}
|
|
}
|
|
|
|
// SyncKey writes a single key into Redis without rebuilding the entire snapshot.
|
|
func (s *SyncService) SyncKey(key *model.Key) error {
|
|
ctx := context.Background()
|
|
tokenHash := key.TokenHash
|
|
if strings.TrimSpace(tokenHash) == "" {
|
|
tokenHash = tokenhash.HashToken(key.KeySecret) // backward compatibility
|
|
}
|
|
if strings.TrimSpace(tokenHash) == "" {
|
|
return fmt.Errorf("token hash missing for key %d", key.ID)
|
|
}
|
|
|
|
fields := map[string]interface{}{
|
|
"master_id": key.MasterID,
|
|
"issued_at_epoch": key.IssuedAtEpoch,
|
|
"status": key.Status,
|
|
"group": key.Group,
|
|
"scopes": key.Scopes,
|
|
"default_namespace": key.DefaultNamespace,
|
|
"namespaces": key.Namespaces,
|
|
}
|
|
if err := s.rdb.HSet(ctx, fmt.Sprintf("auth:token:%s", tokenHash), fields).Err(); err != nil {
|
|
return fmt.Errorf("write auth token: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SyncMaster writes master metadata into Redis used by the balancer for validation.
|
|
func (s *SyncService) SyncMaster(master *model.Master) error {
|
|
ctx := context.Background()
|
|
key := fmt.Sprintf("auth:master:%d", master.ID)
|
|
if err := s.rdb.HSet(ctx, key, map[string]interface{}{
|
|
"epoch": master.Epoch,
|
|
"status": master.Status,
|
|
"global_qps": master.GlobalQPS,
|
|
}).Err(); err != nil {
|
|
return fmt.Errorf("write master metadata: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SyncProvider writes a single provider into Redis hash storage and updates routing tables.
|
|
func (s *SyncService) SyncProvider(provider *model.Provider) error {
|
|
ctx := context.Background()
|
|
group := groupx.Normalize(provider.Group)
|
|
models := strings.Split(provider.Models, ",")
|
|
|
|
snap := providerSnapshot{
|
|
ID: provider.ID,
|
|
Name: provider.Name,
|
|
Type: provider.Type,
|
|
BaseURL: provider.BaseURL,
|
|
APIKey: provider.APIKey,
|
|
GoogleProject: provider.GoogleProject,
|
|
GoogleLocation: provider.GoogleLocation,
|
|
Group: group,
|
|
Models: models,
|
|
Status: normalizeStatus(provider.Status),
|
|
AutoBan: provider.AutoBan,
|
|
BanReason: provider.BanReason,
|
|
}
|
|
if provider.BanUntil != nil {
|
|
snap.BanUntil = provider.BanUntil.UTC().Unix()
|
|
}
|
|
|
|
// 1. Update Provider Config
|
|
if err := s.hsetJSON(ctx, "config:providers", fmt.Sprintf("%d", provider.ID), snap); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 2. Update Routing Table: route:group:{group}:{model} -> Set(provider_id)
|
|
// Note: This is an additive operation. Removing models requires full sync or smarter logic.
|
|
pipe := s.rdb.Pipeline()
|
|
for _, m := range models {
|
|
m = strings.TrimSpace(m)
|
|
if m == "" {
|
|
continue
|
|
}
|
|
if snap.Status != "active" {
|
|
continue
|
|
}
|
|
if snap.BanUntil > 0 && time.Now().Unix() < snap.BanUntil {
|
|
continue
|
|
}
|
|
routeKey := fmt.Sprintf("route:group:%s:%s", group, m)
|
|
pipe.SAdd(ctx, routeKey, provider.ID)
|
|
}
|
|
_, err := pipe.Exec(ctx)
|
|
return err
|
|
}
|
|
|
|
// SyncModel writes a single model metadata record.
|
|
func (s *SyncService) SyncModel(m *model.Model) error {
|
|
ctx := context.Background()
|
|
snap := modelSnapshot{
|
|
Name: m.Name,
|
|
ContextWindow: m.ContextWindow,
|
|
CostPerToken: m.CostPerToken,
|
|
SupportsVision: m.SupportsVision,
|
|
SupportsFunction: m.SupportsFunctions,
|
|
SupportsToolChoice: m.SupportsToolChoice,
|
|
SupportsFIM: m.SupportsFIM,
|
|
MaxOutputTokens: m.MaxOutputTokens,
|
|
}
|
|
return s.hsetJSON(ctx, "meta:models", snap.Name, snap)
|
|
}
|
|
|
|
type providerSnapshot struct {
|
|
ID uint `json:"id"`
|
|
Name string `json:"name"`
|
|
Type string `json:"type"`
|
|
BaseURL string `json:"base_url"`
|
|
APIKey string `json:"api_key"`
|
|
GoogleProject string `json:"google_project,omitempty"`
|
|
GoogleLocation string `json:"google_location,omitempty"`
|
|
Group string `json:"group"`
|
|
Models []string `json:"models"`
|
|
Status string `json:"status"`
|
|
AutoBan bool `json:"auto_ban"`
|
|
BanReason string `json:"ban_reason,omitempty"`
|
|
BanUntil int64 `json:"ban_until,omitempty"` // unix seconds
|
|
}
|
|
|
|
// keySnapshot is no longer needed as we write directly to auth:token:*
|
|
|
|
type modelSnapshot struct {
|
|
Name string `json:"name"`
|
|
ContextWindow int `json:"context_window"`
|
|
CostPerToken float64 `json:"cost_per_token"`
|
|
SupportsVision bool `json:"supports_vision"`
|
|
SupportsFunction bool `json:"supports_functions"`
|
|
SupportsToolChoice bool `json:"supports_tool_choice"`
|
|
SupportsFIM bool `json:"supports_fim"`
|
|
MaxOutputTokens int `json:"max_output_tokens"`
|
|
}
|
|
|
|
// SyncAll rebuilds Redis hashes from the database; use for cold starts or forced refreshes.
|
|
func (s *SyncService) SyncAll(db *gorm.DB) error {
|
|
ctx := context.Background()
|
|
|
|
var providers []model.Provider
|
|
if err := db.Find(&providers).Error; err != nil {
|
|
return fmt.Errorf("load providers: %w", err)
|
|
}
|
|
|
|
var keys []model.Key
|
|
if err := db.Find(&keys).Error; err != nil {
|
|
return fmt.Errorf("load keys: %w", err)
|
|
}
|
|
|
|
var masters []model.Master
|
|
if err := db.Find(&masters).Error; err != nil {
|
|
return fmt.Errorf("load masters: %w", err)
|
|
}
|
|
|
|
var models []model.Model
|
|
if err := db.Find(&models).Error; err != nil {
|
|
return fmt.Errorf("load models: %w", err)
|
|
}
|
|
|
|
var bindings []model.Binding
|
|
if err := db.Find(&bindings).Error; err != nil {
|
|
return fmt.Errorf("load bindings: %w", err)
|
|
}
|
|
|
|
pipe := s.rdb.TxPipeline()
|
|
pipe.Del(ctx, "config:providers", "config:keys", "meta:models", "config:bindings", "meta:bindings_meta")
|
|
// Also clear master keys
|
|
var masterKeys []string
|
|
iter := s.rdb.Scan(ctx, 0, "auth:master:*", 0).Iterator()
|
|
for iter.Next(ctx) {
|
|
masterKeys = append(masterKeys, iter.Val())
|
|
}
|
|
if err := iter.Err(); err != nil {
|
|
return fmt.Errorf("scan master keys: %w", err)
|
|
}
|
|
if len(masterKeys) > 0 {
|
|
pipe.Del(ctx, masterKeys...)
|
|
}
|
|
|
|
// Clear old routing tables (pattern scan would be better in prod, but keys are predictable if we knew them)
|
|
// For MVP, we rely on the fact that we are rebuilding.
|
|
// Ideally, we should scan "route:group:*" and del, but let's just rebuild.
|
|
|
|
for _, p := range providers {
|
|
group := groupx.Normalize(p.Group)
|
|
models := strings.Split(p.Models, ",")
|
|
|
|
snap := providerSnapshot{
|
|
ID: p.ID,
|
|
Name: p.Name,
|
|
Type: p.Type,
|
|
BaseURL: p.BaseURL,
|
|
APIKey: p.APIKey,
|
|
GoogleProject: p.GoogleProject,
|
|
GoogleLocation: p.GoogleLocation,
|
|
Group: group,
|
|
Models: models,
|
|
Status: normalizeStatus(p.Status),
|
|
AutoBan: p.AutoBan,
|
|
BanReason: p.BanReason,
|
|
}
|
|
if p.BanUntil != nil {
|
|
snap.BanUntil = p.BanUntil.UTC().Unix()
|
|
}
|
|
payload, err := jsoncodec.Marshal(snap)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal provider %d: %w", p.ID, err)
|
|
}
|
|
pipe.HSet(ctx, "config:providers", fmt.Sprintf("%d", p.ID), payload)
|
|
|
|
// Rebuild Routing Table
|
|
for _, m := range models {
|
|
m = strings.TrimSpace(m)
|
|
if m == "" {
|
|
continue
|
|
}
|
|
if snap.Status != "active" {
|
|
continue
|
|
}
|
|
if snap.BanUntil > 0 && time.Now().Unix() < snap.BanUntil {
|
|
continue
|
|
}
|
|
routeKey := fmt.Sprintf("route:group:%s:%s", group, m)
|
|
pipe.SAdd(ctx, routeKey, p.ID)
|
|
}
|
|
}
|
|
|
|
for _, k := range keys {
|
|
tokenHash := strings.TrimSpace(k.TokenHash)
|
|
if tokenHash == "" {
|
|
tokenHash = tokenhash.HashToken(k.KeySecret) // fallback for legacy rows
|
|
}
|
|
if tokenHash == "" {
|
|
return fmt.Errorf("token hash missing for key %d", k.ID)
|
|
}
|
|
pipe.HSet(ctx, fmt.Sprintf("auth:token:%s", tokenHash), map[string]interface{}{
|
|
"master_id": k.MasterID,
|
|
"issued_at_epoch": k.IssuedAtEpoch,
|
|
"status": k.Status,
|
|
"group": k.Group,
|
|
"scopes": k.Scopes,
|
|
"default_namespace": k.DefaultNamespace,
|
|
"namespaces": k.Namespaces,
|
|
})
|
|
}
|
|
|
|
for _, m := range masters {
|
|
pipe.HSet(ctx, fmt.Sprintf("auth:master:%d", m.ID), map[string]interface{}{
|
|
"epoch": m.Epoch,
|
|
"status": m.Status,
|
|
"global_qps": m.GlobalQPS,
|
|
})
|
|
}
|
|
|
|
for _, m := range models {
|
|
snap := modelSnapshot{
|
|
Name: m.Name,
|
|
ContextWindow: m.ContextWindow,
|
|
CostPerToken: m.CostPerToken,
|
|
SupportsVision: m.SupportsVision,
|
|
SupportsFunction: m.SupportsFunctions,
|
|
SupportsToolChoice: m.SupportsToolChoice,
|
|
SupportsFIM: m.SupportsFIM,
|
|
MaxOutputTokens: m.MaxOutputTokens,
|
|
}
|
|
payload, err := jsoncodec.Marshal(snap)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal model %s: %w", m.Name, err)
|
|
}
|
|
pipe.HSet(ctx, "meta:models", snap.Name, payload)
|
|
}
|
|
|
|
if err := s.writeBindingsSnapshot(ctx, pipe, bindings, providers); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := pipe.Exec(ctx); err != nil {
|
|
return fmt.Errorf("write snapshots: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SyncBindings rebuilds the binding snapshot for DP routing.
|
|
// This is intentionally a rebuild to avoid stale entries on deletes/updates.
|
|
func (s *SyncService) SyncBindings(db *gorm.DB) error {
|
|
ctx := context.Background()
|
|
|
|
var providers []model.Provider
|
|
if err := db.Find(&providers).Error; err != nil {
|
|
return fmt.Errorf("load providers: %w", err)
|
|
}
|
|
var bindings []model.Binding
|
|
if err := db.Find(&bindings).Error; err != nil {
|
|
return fmt.Errorf("load bindings: %w", err)
|
|
}
|
|
|
|
pipe := s.rdb.TxPipeline()
|
|
pipe.Del(ctx, "config:bindings", "meta:bindings_meta")
|
|
if err := s.writeBindingsSnapshot(ctx, pipe, bindings, providers); err != nil {
|
|
return err
|
|
}
|
|
if _, err := pipe.Exec(ctx); err != nil {
|
|
return fmt.Errorf("write bindings snapshot: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipeliner, bindings []model.Binding, providers []model.Provider) error {
|
|
// Group providers by route group for selector resolution.
|
|
type providerLite struct {
|
|
id uint
|
|
group string
|
|
models []string
|
|
}
|
|
providersByGroup := make(map[string][]providerLite)
|
|
for _, p := range providers {
|
|
group := groupx.Normalize(p.Group)
|
|
models := strings.Split(p.Models, ",")
|
|
var outModels []string
|
|
for _, m := range models {
|
|
m = strings.TrimSpace(m)
|
|
if m != "" {
|
|
outModels = append(outModels, m)
|
|
}
|
|
}
|
|
providersByGroup[group] = append(providersByGroup[group], providerLite{
|
|
id: p.ID,
|
|
group: group,
|
|
models: outModels,
|
|
})
|
|
}
|
|
|
|
now := time.Now().Unix()
|
|
for _, b := range bindings {
|
|
if strings.TrimSpace(b.Status) != "" && strings.TrimSpace(b.Status) != "active" {
|
|
continue
|
|
}
|
|
ns := strings.TrimSpace(b.Namespace)
|
|
pm := strings.TrimSpace(b.PublicModel)
|
|
if ns == "" || pm == "" {
|
|
continue
|
|
}
|
|
rg := groupx.Normalize(b.RouteGroup)
|
|
if rg == "" {
|
|
continue
|
|
}
|
|
|
|
snap := struct {
|
|
Namespace string `json:"namespace"`
|
|
PublicModel string `json:"public_model"`
|
|
RouteGroup string `json:"route_group"`
|
|
SelectorType string `json:"selector_type,omitempty"`
|
|
SelectorValue string `json:"selector_value,omitempty"`
|
|
Status string `json:"status,omitempty"`
|
|
UpdatedAt int64 `json:"updated_at,omitempty"`
|
|
Upstreams map[string]string `json:"upstreams"`
|
|
}{
|
|
Namespace: ns,
|
|
PublicModel: pm,
|
|
RouteGroup: rg,
|
|
SelectorType: strings.TrimSpace(b.SelectorType),
|
|
SelectorValue: strings.TrimSpace(b.SelectorValue),
|
|
Status: "active",
|
|
UpdatedAt: now,
|
|
Upstreams: make(map[string]string),
|
|
}
|
|
|
|
selectorType := strings.TrimSpace(b.SelectorType)
|
|
selectorValue := strings.TrimSpace(b.SelectorValue)
|
|
|
|
for _, p := range providersByGroup[rg] {
|
|
up, err := routing.ResolveUpstreamModel(routing.SelectorType(selectorType), selectorValue, pm, p.models)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
snap.Upstreams[fmt.Sprintf("%d", p.id)] = up
|
|
}
|
|
|
|
key := ns + "." + pm
|
|
payload, err := jsoncodec.Marshal(snap)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal config:bindings:%s: %w", key, err)
|
|
}
|
|
pipe.HSet(ctx, "config:bindings", key, payload)
|
|
}
|
|
|
|
meta := map[string]string{
|
|
"version": fmt.Sprintf("%d", now),
|
|
"updated_at": fmt.Sprintf("%d", now),
|
|
"source": "cp_builtin",
|
|
}
|
|
if err := pipe.HSet(ctx, "meta:bindings_meta", meta).Err(); err != nil {
|
|
return fmt.Errorf("write meta:bindings_meta: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *SyncService) hsetJSON(ctx context.Context, key, field string, val interface{}) error {
|
|
payload, err := jsoncodec.Marshal(val)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal %s:%s: %w", key, field, err)
|
|
}
|
|
if err := s.rdb.HSet(ctx, key, field, payload).Err(); err != nil {
|
|
return fmt.Errorf("write %s:%s: %w", key, field, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func normalizeStatus(status string) string {
|
|
st := strings.ToLower(strings.TrimSpace(status))
|
|
if st == "" {
|
|
return "active"
|
|
}
|
|
switch st {
|
|
case "active", "auto_disabled", "manual_disabled":
|
|
return st
|
|
default:
|
|
return st
|
|
}
|
|
}
|