mirror of
https://github.com/EZ-Api/ez-api.git
synced 2026-01-13 17:47:51 +00:00
Removes the legacy route table maintenance logic from the sync service that populated deprecated Redis keys. Additionally, deletes the unused TokenService and KeyDTO files to reduce technical debt.
842 lines
24 KiB
Go
842 lines
24 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/ez-api/ez-api/internal/model"
|
|
groupx "github.com/ez-api/foundation/group"
|
|
"github.com/ez-api/foundation/jsoncodec"
|
|
"github.com/ez-api/foundation/modelcap"
|
|
"github.com/ez-api/foundation/routing"
|
|
"github.com/ez-api/foundation/tokenhash"
|
|
"github.com/redis/go-redis/v9"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type SyncService struct {
|
|
rdb *redis.Client
|
|
outbox *SyncOutboxService
|
|
}
|
|
|
|
func NewSyncService(rdb *redis.Client) *SyncService {
|
|
return &SyncService{rdb: rdb}
|
|
}
|
|
|
|
// SetOutbox enables sync outbox retry for this service.
|
|
func (s *SyncService) SetOutbox(outbox *SyncOutboxService) {
|
|
s.outbox = outbox
|
|
}
|
|
|
|
// SyncKey writes a single key into Redis without rebuilding the entire snapshot.
|
|
func (s *SyncService) SyncKey(key *model.Key) error {
|
|
if key == nil {
|
|
return fmt.Errorf("key required")
|
|
}
|
|
tokenHash := key.TokenHash
|
|
if strings.TrimSpace(tokenHash) == "" {
|
|
tokenHash = tokenhash.HashToken(key.KeySecret) // backward compatibility
|
|
}
|
|
if strings.TrimSpace(tokenHash) == "" {
|
|
return fmt.Errorf("token hash missing for key %d", key.ID)
|
|
}
|
|
return s.handleSyncError(s.SyncKeyNow(context.Background(), key), SyncOutboxEntry{
|
|
ResourceType: "key",
|
|
Action: "upsert",
|
|
ResourceID: &key.ID,
|
|
})
|
|
}
|
|
|
|
// SyncKeyNow writes key metadata to Redis without outbox handling.
|
|
func (s *SyncService) SyncKeyNow(ctx context.Context, key *model.Key) error {
|
|
if key == nil {
|
|
return fmt.Errorf("key required")
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
tokenHash := key.TokenHash
|
|
if strings.TrimSpace(tokenHash) == "" {
|
|
tokenHash = tokenhash.HashToken(key.KeySecret) // backward compatibility
|
|
}
|
|
if strings.TrimSpace(tokenHash) == "" {
|
|
return fmt.Errorf("token hash missing for key %d", key.ID)
|
|
}
|
|
|
|
fields := map[string]interface{}{
|
|
"id": key.ID,
|
|
"master_id": key.MasterID,
|
|
"issued_at_epoch": key.IssuedAtEpoch,
|
|
"status": key.Status,
|
|
"group": key.Group,
|
|
"scopes": key.Scopes,
|
|
"default_namespace": key.DefaultNamespace,
|
|
"namespaces": key.Namespaces,
|
|
"model_limits": strings.TrimSpace(key.ModelLimits),
|
|
"model_limits_enabled": key.ModelLimitsEnabled,
|
|
"expires_at": unixOrZero(key.ExpiresAt),
|
|
"allow_ips": strings.TrimSpace(key.AllowIPs),
|
|
"deny_ips": strings.TrimSpace(key.DenyIPs),
|
|
"last_accessed_at": unixOrZero(key.LastAccessedAt),
|
|
"request_count": key.RequestCount,
|
|
"used_tokens": key.UsedTokens,
|
|
"quota_limit": key.QuotaLimit,
|
|
"quota_used": key.QuotaUsed,
|
|
"quota_reset_at": unixOrZero(key.QuotaResetAt),
|
|
"quota_reset_type": strings.TrimSpace(key.QuotaResetType),
|
|
}
|
|
if err := s.rdb.HSet(ctx, fmt.Sprintf("auth:token:%s", tokenHash), fields).Err(); err != nil {
|
|
return fmt.Errorf("write auth token: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SyncMaster writes master metadata into Redis used by the balancer for validation.
|
|
func (s *SyncService) SyncMaster(master *model.Master) error {
|
|
if master == nil {
|
|
return fmt.Errorf("master required")
|
|
}
|
|
return s.handleSyncError(s.SyncMasterNow(context.Background(), master), SyncOutboxEntry{
|
|
ResourceType: "master",
|
|
Action: "upsert",
|
|
ResourceID: &master.ID,
|
|
})
|
|
}
|
|
|
|
// SyncMasterNow writes master metadata to Redis without outbox handling.
|
|
func (s *SyncService) SyncMasterNow(ctx context.Context, master *model.Master) error {
|
|
if master == nil {
|
|
return fmt.Errorf("master required")
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
key := fmt.Sprintf("auth:master:%d", master.ID)
|
|
if err := s.rdb.HSet(ctx, key, map[string]interface{}{
|
|
"epoch": master.Epoch,
|
|
"status": master.Status,
|
|
"global_qps": master.GlobalQPS,
|
|
}).Err(); err != nil {
|
|
return fmt.Errorf("write master metadata: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SyncProviders rebuilds provider snapshots from ProviderGroup + APIKey tables.
|
|
func (s *SyncService) SyncProviders(db *gorm.DB) error {
|
|
return s.syncProviders(db, SyncOutboxEntry{
|
|
ResourceType: "snapshot",
|
|
Action: "sync_providers",
|
|
})
|
|
}
|
|
|
|
// SyncProvidersForGroup retries provider snapshot with provider_group context.
|
|
func (s *SyncService) SyncProvidersForGroup(db *gorm.DB, groupID uint) error {
|
|
return s.syncProviders(db, SyncOutboxEntry{
|
|
ResourceType: "provider_group",
|
|
Action: "sync_providers",
|
|
ResourceID: &groupID,
|
|
})
|
|
}
|
|
|
|
// SyncProvidersForAPIKey retries provider snapshot with api_key context.
|
|
func (s *SyncService) SyncProvidersForAPIKey(db *gorm.DB, apiKeyID uint) error {
|
|
return s.syncProviders(db, SyncOutboxEntry{
|
|
ResourceType: "api_key",
|
|
Action: "sync_providers",
|
|
ResourceID: &apiKeyID,
|
|
})
|
|
}
|
|
|
|
func (s *SyncService) syncProviders(db *gorm.DB, entry SyncOutboxEntry) error {
|
|
if db == nil {
|
|
return fmt.Errorf("db required")
|
|
}
|
|
return s.handleSyncError(s.SyncProvidersNow(context.Background(), db), entry)
|
|
}
|
|
|
|
// SyncProvidersNow rebuilds provider snapshots without outbox handling.
|
|
func (s *SyncService) SyncProvidersNow(ctx context.Context, db *gorm.DB) error {
|
|
if db == nil {
|
|
return fmt.Errorf("db required")
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
var groups []model.ProviderGroup
|
|
if err := db.Find(&groups).Error; err != nil {
|
|
return fmt.Errorf("load provider groups: %w", err)
|
|
}
|
|
var apiKeys []model.APIKey
|
|
if err := db.Find(&apiKeys).Error; err != nil {
|
|
return fmt.Errorf("load api keys: %w", err)
|
|
}
|
|
|
|
pipe := s.rdb.TxPipeline()
|
|
pipe.Del(ctx, "config:providers")
|
|
if err := s.writeProvidersSnapshot(ctx, pipe, groups, apiKeys); err != nil {
|
|
return err
|
|
}
|
|
if _, err := pipe.Exec(ctx); err != nil {
|
|
return fmt.Errorf("write provider snapshot: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SyncModel writes a single model metadata record.
|
|
func (s *SyncService) SyncModel(m *model.Model) error {
|
|
if m == nil {
|
|
return fmt.Errorf("model required")
|
|
}
|
|
if strings.TrimSpace(m.Name) == "" {
|
|
return fmt.Errorf("model name required")
|
|
}
|
|
return s.handleSyncError(s.SyncModelNow(context.Background(), m), SyncOutboxEntry{
|
|
ResourceType: "model",
|
|
Action: "upsert",
|
|
ResourceID: &m.ID,
|
|
})
|
|
}
|
|
|
|
// SyncModelNow writes a single model metadata record without outbox handling.
|
|
func (s *SyncService) SyncModelNow(ctx context.Context, m *model.Model) error {
|
|
if m == nil {
|
|
return fmt.Errorf("model required")
|
|
}
|
|
if strings.TrimSpace(m.Name) == "" {
|
|
return fmt.Errorf("model name required")
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
snap := modelcap.Model{
|
|
Name: m.Name,
|
|
Kind: string(modelcap.NormalizeKind(m.Kind)),
|
|
ContextWindow: m.ContextWindow,
|
|
CostPerToken: m.CostPerToken,
|
|
SupportsVision: m.SupportsVision,
|
|
SupportsFunction: m.SupportsFunctions,
|
|
SupportsToolChoice: m.SupportsToolChoice,
|
|
SupportsFim: m.SupportsFIM,
|
|
MaxOutputTokens: m.MaxOutputTokens,
|
|
}.Normalized()
|
|
if err := s.hsetJSON(ctx, "meta:models", snap.Name, snap); err != nil {
|
|
return err
|
|
}
|
|
if err := s.refreshModelsMetaFromRedis(ctx, "db"); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SyncModelDelete removes model metadata from Redis and refreshes meta:models_meta.
|
|
func (s *SyncService) SyncModelDelete(m *model.Model) error {
|
|
if m == nil {
|
|
return fmt.Errorf("model required")
|
|
}
|
|
name := strings.TrimSpace(m.Name)
|
|
if name == "" {
|
|
return fmt.Errorf("model name required")
|
|
}
|
|
return s.handleSyncError(s.SyncModelDeleteNow(context.Background(), m), SyncOutboxEntry{
|
|
ResourceType: "model",
|
|
Action: "delete",
|
|
ResourceID: &m.ID,
|
|
Payload: map[string]any{
|
|
"name": name,
|
|
},
|
|
})
|
|
}
|
|
|
|
// SyncModelDeleteNow removes model metadata from Redis without outbox handling.
|
|
func (s *SyncService) SyncModelDeleteNow(ctx context.Context, m *model.Model) error {
|
|
if m == nil {
|
|
return fmt.Errorf("model required")
|
|
}
|
|
name := strings.TrimSpace(m.Name)
|
|
if name == "" {
|
|
return fmt.Errorf("model name required")
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
if err := s.rdb.HDel(ctx, "meta:models", name).Err(); err != nil {
|
|
return fmt.Errorf("delete meta:models: %w", err)
|
|
}
|
|
if err := s.refreshModelsMetaFromRedis(ctx, "db"); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type providerSnapshot struct {
|
|
ID uint `json:"id"`
|
|
Name string `json:"name"`
|
|
Type string `json:"type"`
|
|
BaseURL string `json:"base_url"`
|
|
APIKey string `json:"api_key"`
|
|
AccessToken string `json:"access_token,omitempty"`
|
|
ExpiresAt int64 `json:"expires_at,omitempty"`
|
|
AccountID string `json:"account_id,omitempty"`
|
|
ProjectID string `json:"project_id,omitempty"`
|
|
GoogleProject string `json:"google_project,omitempty"`
|
|
GoogleLocation string `json:"google_location,omitempty"`
|
|
StaticHeaders string `json:"static_headers,omitempty"`
|
|
HeadersProfile string `json:"headers_profile,omitempty"`
|
|
GroupID uint `json:"group_id,omitempty"`
|
|
Group string `json:"group"`
|
|
Models []string `json:"models"`
|
|
Weight int `json:"weight,omitempty"`
|
|
Status string `json:"status"`
|
|
AutoBan bool `json:"auto_ban"`
|
|
BanReason string `json:"ban_reason,omitempty"`
|
|
BanUntil int64 `json:"ban_until,omitempty"` // unix seconds
|
|
}
|
|
|
|
func (s *SyncService) writeProvidersSnapshot(ctx context.Context, pipe redis.Pipeliner, groups []model.ProviderGroup, apiKeys []model.APIKey) error {
|
|
groupMap := make(map[uint]model.ProviderGroup, len(groups))
|
|
for _, g := range groups {
|
|
groupMap[g.ID] = g
|
|
}
|
|
|
|
for _, k := range apiKeys {
|
|
g, ok := groupMap[k.GroupID]
|
|
if !ok {
|
|
continue
|
|
}
|
|
groupName := groupx.Normalize(g.Name)
|
|
if strings.TrimSpace(groupName) == "" {
|
|
groupName = "default"
|
|
}
|
|
groupStatus := normalizeStatus(g.Status)
|
|
keyStatus := normalizeStatus(k.Status)
|
|
status := keyStatus
|
|
if groupStatus != "" && groupStatus != "active" {
|
|
status = groupStatus
|
|
}
|
|
|
|
rawModels := strings.Split(g.Models, ",")
|
|
var models []string
|
|
for _, m := range rawModels {
|
|
m = strings.TrimSpace(m)
|
|
if m != "" {
|
|
models = append(models, m)
|
|
}
|
|
}
|
|
|
|
name := strings.TrimSpace(g.Name)
|
|
if name == "" {
|
|
name = groupName
|
|
}
|
|
name = fmt.Sprintf("%s#%d", name, k.ID)
|
|
|
|
snap := providerSnapshot{
|
|
ID: k.ID,
|
|
Name: name,
|
|
Type: strings.TrimSpace(g.Type),
|
|
BaseURL: strings.TrimSpace(g.BaseURL),
|
|
APIKey: strings.TrimSpace(k.APIKey),
|
|
AccessToken: strings.TrimSpace(k.AccessToken),
|
|
GoogleProject: strings.TrimSpace(g.GoogleProject),
|
|
GoogleLocation: strings.TrimSpace(g.GoogleLocation),
|
|
StaticHeaders: strings.TrimSpace(g.StaticHeaders),
|
|
HeadersProfile: strings.TrimSpace(g.HeadersProfile),
|
|
AccountID: strings.TrimSpace(k.AccountID),
|
|
ProjectID: strings.TrimSpace(k.ProjectID),
|
|
GroupID: g.ID,
|
|
Group: groupName,
|
|
Models: models,
|
|
Weight: k.Weight,
|
|
Status: status,
|
|
AutoBan: k.AutoBan,
|
|
BanReason: strings.TrimSpace(k.BanReason),
|
|
}
|
|
if k.ExpiresAt != nil {
|
|
snap.ExpiresAt = k.ExpiresAt.UTC().Unix()
|
|
}
|
|
if k.BanUntil != nil {
|
|
snap.BanUntil = k.BanUntil.UTC().Unix()
|
|
}
|
|
|
|
payload, err := jsoncodec.Marshal(snap)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal provider %d: %w", k.ID, err)
|
|
}
|
|
pipe.HSet(ctx, "config:providers", fmt.Sprintf("%d", k.ID), payload)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// keySnapshot is no longer needed as we write directly to auth:token:*
|
|
|
|
// SyncAll rebuilds Redis hashes from the database; use for cold starts or forced refreshes.
|
|
func (s *SyncService) SyncAll(db *gorm.DB) error {
|
|
if db == nil {
|
|
return fmt.Errorf("db required")
|
|
}
|
|
return s.handleSyncError(s.SyncAllNow(context.Background(), db), SyncOutboxEntry{
|
|
ResourceType: "snapshot",
|
|
Action: "sync_all",
|
|
})
|
|
}
|
|
|
|
// SyncAllNow rebuilds snapshots without outbox handling.
|
|
func (s *SyncService) SyncAllNow(ctx context.Context, db *gorm.DB) error {
|
|
if db == nil {
|
|
return fmt.Errorf("db required")
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
var groups []model.ProviderGroup
|
|
if err := db.Find(&groups).Error; err != nil {
|
|
return fmt.Errorf("load provider groups: %w", err)
|
|
}
|
|
var apiKeys []model.APIKey
|
|
if err := db.Find(&apiKeys).Error; err != nil {
|
|
return fmt.Errorf("load api keys: %w", err)
|
|
}
|
|
|
|
var keys []model.Key
|
|
if err := db.Find(&keys).Error; err != nil {
|
|
return fmt.Errorf("load keys: %w", err)
|
|
}
|
|
|
|
var masters []model.Master
|
|
if err := db.Find(&masters).Error; err != nil {
|
|
return fmt.Errorf("load masters: %w", err)
|
|
}
|
|
|
|
var models []model.Model
|
|
if err := db.Find(&models).Error; err != nil {
|
|
return fmt.Errorf("load models: %w", err)
|
|
}
|
|
var modelsPayloads map[string]string
|
|
|
|
var bindings []model.Binding
|
|
if err := db.Find(&bindings).Error; err != nil {
|
|
return fmt.Errorf("load bindings: %w", err)
|
|
}
|
|
|
|
pipe := s.rdb.TxPipeline()
|
|
pipe.Del(ctx, "config:providers", "config:keys", "meta:models", "meta:models_meta", "config:bindings", "meta:bindings_meta")
|
|
// Also clear master keys
|
|
var masterKeys []string
|
|
iter := s.rdb.Scan(ctx, 0, "auth:master:*", 0).Iterator()
|
|
for iter.Next(ctx) {
|
|
masterKeys = append(masterKeys, iter.Val())
|
|
}
|
|
if err := iter.Err(); err != nil {
|
|
return fmt.Errorf("scan master keys: %w", err)
|
|
}
|
|
if len(masterKeys) > 0 {
|
|
pipe.Del(ctx, masterKeys...)
|
|
}
|
|
|
|
if err := s.writeProvidersSnapshot(ctx, pipe, groups, apiKeys); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, k := range keys {
|
|
tokenHash := strings.TrimSpace(k.TokenHash)
|
|
if tokenHash == "" {
|
|
tokenHash = tokenhash.HashToken(k.KeySecret) // fallback for legacy rows
|
|
}
|
|
if tokenHash == "" {
|
|
return fmt.Errorf("token hash missing for key %d", k.ID)
|
|
}
|
|
pipe.HSet(ctx, fmt.Sprintf("auth:token:%s", tokenHash), map[string]interface{}{
|
|
"id": k.ID,
|
|
"master_id": k.MasterID,
|
|
"issued_at_epoch": k.IssuedAtEpoch,
|
|
"status": k.Status,
|
|
"group": k.Group,
|
|
"scopes": k.Scopes,
|
|
"default_namespace": k.DefaultNamespace,
|
|
"namespaces": k.Namespaces,
|
|
"model_limits": strings.TrimSpace(k.ModelLimits),
|
|
"model_limits_enabled": k.ModelLimitsEnabled,
|
|
"expires_at": unixOrZero(k.ExpiresAt),
|
|
"allow_ips": strings.TrimSpace(k.AllowIPs),
|
|
"deny_ips": strings.TrimSpace(k.DenyIPs),
|
|
"last_accessed_at": unixOrZero(k.LastAccessedAt),
|
|
"request_count": k.RequestCount,
|
|
"used_tokens": k.UsedTokens,
|
|
"quota_limit": k.QuotaLimit,
|
|
"quota_used": k.QuotaUsed,
|
|
"quota_reset_at": unixOrZero(k.QuotaResetAt),
|
|
"quota_reset_type": strings.TrimSpace(k.QuotaResetType),
|
|
})
|
|
}
|
|
|
|
for _, m := range masters {
|
|
pipe.HSet(ctx, fmt.Sprintf("auth:master:%d", m.ID), map[string]interface{}{
|
|
"epoch": m.Epoch,
|
|
"status": m.Status,
|
|
"global_qps": m.GlobalQPS,
|
|
})
|
|
}
|
|
|
|
for _, m := range models {
|
|
snap := modelcap.Model{
|
|
Name: m.Name,
|
|
Kind: string(modelcap.NormalizeKind(m.Kind)),
|
|
ContextWindow: m.ContextWindow,
|
|
CostPerToken: m.CostPerToken,
|
|
SupportsVision: m.SupportsVision,
|
|
SupportsFunction: m.SupportsFunctions,
|
|
SupportsToolChoice: m.SupportsToolChoice,
|
|
SupportsFim: m.SupportsFIM,
|
|
MaxOutputTokens: m.MaxOutputTokens,
|
|
}.Normalized()
|
|
payload, err := jsoncodec.Marshal(snap)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal model %s: %w", m.Name, err)
|
|
}
|
|
// Capture payloads so we can compute deterministic checksum for meta:models_meta.
|
|
if modelsPayloads == nil {
|
|
modelsPayloads = make(map[string]string, len(models))
|
|
}
|
|
modelsPayloads[snap.Name] = string(payload)
|
|
pipe.HSet(ctx, "meta:models", snap.Name, payload)
|
|
}
|
|
|
|
now := time.Now().Unix()
|
|
if err := writeModelsMeta(ctx, pipe, modelcap.Meta{
|
|
Version: fmt.Sprintf("%d", now),
|
|
UpdatedAt: fmt.Sprintf("%d", now),
|
|
Source: "db",
|
|
Checksum: modelcap.ChecksumFromPayloads(modelsPayloads),
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.writeBindingsSnapshot(ctx, pipe, bindings, groups, apiKeys); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := pipe.Exec(ctx); err != nil {
|
|
return fmt.Errorf("write snapshots: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SyncBindings rebuilds the binding snapshot for DP routing.
|
|
// This is intentionally a rebuild to avoid stale entries on deletes/updates.
|
|
func (s *SyncService) SyncBindings(db *gorm.DB) error {
|
|
return s.syncBindings(db, SyncOutboxEntry{
|
|
ResourceType: "snapshot",
|
|
Action: "sync_bindings",
|
|
})
|
|
}
|
|
|
|
// SyncBindingsForGroup retries binding snapshot with provider_group context.
|
|
func (s *SyncService) SyncBindingsForGroup(db *gorm.DB, groupID uint) error {
|
|
return s.syncBindings(db, SyncOutboxEntry{
|
|
ResourceType: "provider_group",
|
|
Action: "sync_bindings",
|
|
ResourceID: &groupID,
|
|
})
|
|
}
|
|
|
|
// SyncBindingsForAPIKey retries binding snapshot with api_key context.
|
|
func (s *SyncService) SyncBindingsForAPIKey(db *gorm.DB, apiKeyID uint) error {
|
|
return s.syncBindings(db, SyncOutboxEntry{
|
|
ResourceType: "api_key",
|
|
Action: "sync_bindings",
|
|
ResourceID: &apiKeyID,
|
|
})
|
|
}
|
|
|
|
func (s *SyncService) syncBindings(db *gorm.DB, entry SyncOutboxEntry) error {
|
|
if db == nil {
|
|
return fmt.Errorf("db required")
|
|
}
|
|
return s.handleSyncError(s.SyncBindingsNow(context.Background(), db), entry)
|
|
}
|
|
|
|
// SyncBindingsNow rebuilds binding snapshot without outbox handling.
|
|
func (s *SyncService) SyncBindingsNow(ctx context.Context, db *gorm.DB) error {
|
|
if db == nil {
|
|
return fmt.Errorf("db required")
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
var groups []model.ProviderGroup
|
|
if err := db.Find(&groups).Error; err != nil {
|
|
return fmt.Errorf("load provider groups: %w", err)
|
|
}
|
|
var apiKeys []model.APIKey
|
|
if err := db.Find(&apiKeys).Error; err != nil {
|
|
return fmt.Errorf("load api keys: %w", err)
|
|
}
|
|
var bindings []model.Binding
|
|
if err := db.Find(&bindings).Error; err != nil {
|
|
return fmt.Errorf("load bindings: %w", err)
|
|
}
|
|
|
|
pipe := s.rdb.TxPipeline()
|
|
pipe.Del(ctx, "config:bindings", "meta:bindings_meta")
|
|
if err := s.writeBindingsSnapshot(ctx, pipe, bindings, groups, apiKeys); err != nil {
|
|
return err
|
|
}
|
|
if _, err := pipe.Exec(ctx); err != nil {
|
|
return fmt.Errorf("write bindings snapshot: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipeliner, bindings []model.Binding, groups []model.ProviderGroup, apiKeys []model.APIKey) error {
|
|
type groupLite struct {
|
|
id uint
|
|
name string
|
|
ptype string
|
|
baseURL string
|
|
googleProject string
|
|
googleLoc string
|
|
models []string
|
|
status string
|
|
}
|
|
groupsByID := make(map[uint]groupLite, len(groups))
|
|
for _, g := range groups {
|
|
rawModels := strings.Split(g.Models, ",")
|
|
var outModels []string
|
|
for _, m := range rawModels {
|
|
m = strings.TrimSpace(m)
|
|
if m != "" {
|
|
outModels = append(outModels, m)
|
|
}
|
|
}
|
|
groupsByID[g.ID] = groupLite{
|
|
id: g.ID,
|
|
name: groupx.Normalize(g.Name),
|
|
ptype: strings.TrimSpace(g.Type),
|
|
baseURL: strings.TrimSpace(g.BaseURL),
|
|
googleProject: strings.TrimSpace(g.GoogleProject),
|
|
googleLoc: strings.TrimSpace(g.GoogleLocation),
|
|
models: outModels,
|
|
status: normalizeStatus(g.Status),
|
|
}
|
|
}
|
|
|
|
type apiKeyLite struct {
|
|
id uint
|
|
groupID uint
|
|
status string
|
|
weight int
|
|
autoBan bool
|
|
banUntil *time.Time
|
|
}
|
|
keysByGroup := make(map[uint][]apiKeyLite)
|
|
for _, k := range apiKeys {
|
|
keysByGroup[k.GroupID] = append(keysByGroup[k.GroupID], apiKeyLite{
|
|
id: k.ID,
|
|
groupID: k.GroupID,
|
|
status: normalizeStatus(k.Status),
|
|
weight: k.Weight,
|
|
autoBan: k.AutoBan,
|
|
banUntil: k.BanUntil,
|
|
})
|
|
}
|
|
|
|
type bindingAgg struct {
|
|
snap routing.BindingSnapshot
|
|
}
|
|
snaps := make(map[string]*routing.BindingSnapshot)
|
|
now := time.Now()
|
|
nowUnix := now.Unix()
|
|
version := now.UnixNano()
|
|
|
|
for _, b := range bindings {
|
|
if strings.TrimSpace(b.Status) != "" && strings.TrimSpace(b.Status) != "active" {
|
|
continue
|
|
}
|
|
ns := strings.TrimSpace(b.Namespace)
|
|
pm := strings.TrimSpace(b.PublicModel)
|
|
if ns == "" || pm == "" {
|
|
continue
|
|
}
|
|
group, ok := groupsByID[b.GroupID]
|
|
if !ok {
|
|
continue
|
|
}
|
|
if group.status != "" && group.status != "active" {
|
|
continue
|
|
}
|
|
|
|
key := ns + "." + pm
|
|
snap := snaps[key]
|
|
if snap == nil {
|
|
snap = &routing.BindingSnapshot{
|
|
Namespace: ns,
|
|
PublicModel: pm,
|
|
Status: "active",
|
|
UpdatedAt: nowUnix,
|
|
}
|
|
snaps[key] = snap
|
|
}
|
|
|
|
candidate := routing.BindingCandidate{
|
|
GroupID: group.id,
|
|
RouteGroup: group.name,
|
|
Weight: normalizeWeight(b.Weight),
|
|
SelectorType: strings.TrimSpace(b.SelectorType),
|
|
SelectorValue: strings.TrimSpace(b.SelectorValue),
|
|
Status: "active",
|
|
Upstreams: make(map[string]string),
|
|
}
|
|
|
|
selectorType := strings.TrimSpace(b.SelectorType)
|
|
selectorValue := strings.TrimSpace(b.SelectorValue)
|
|
keys := keysByGroup[b.GroupID]
|
|
if len(keys) == 0 {
|
|
candidate.Error = "no_provider"
|
|
}
|
|
|
|
for _, k := range keys {
|
|
if k.status != "" && k.status != "active" {
|
|
continue
|
|
}
|
|
if k.banUntil != nil && k.banUntil.UTC().Unix() > nowUnix {
|
|
continue
|
|
}
|
|
up, err := routing.ResolveUpstreamModel(routing.SelectorType(selectorType), selectorValue, pm, group.models)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
candidate.Upstreams[fmt.Sprintf("%d", k.id)] = up
|
|
}
|
|
if len(candidate.Upstreams) == 0 && candidate.Error == "" {
|
|
candidate.Error = "config_error"
|
|
}
|
|
|
|
snap.Candidates = append(snap.Candidates, candidate)
|
|
}
|
|
|
|
for key, snap := range snaps {
|
|
payload, err := jsoncodec.Marshal(snap)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal config:bindings:%s: %w", key, err)
|
|
}
|
|
pipe.HSet(ctx, "config:bindings", key, payload)
|
|
}
|
|
|
|
meta := map[string]string{
|
|
"version": fmt.Sprintf("%d", version),
|
|
"updated_at": fmt.Sprintf("%d", nowUnix),
|
|
"source": "cp_builtin",
|
|
}
|
|
if err := pipe.HSet(ctx, "meta:bindings_meta", meta).Err(); err != nil {
|
|
return fmt.Errorf("write meta:bindings_meta: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *SyncService) hsetJSON(ctx context.Context, key, field string, val interface{}) error {
|
|
payload, err := jsoncodec.Marshal(val)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal %s:%s: %w", key, field, err)
|
|
}
|
|
if err := s.rdb.HSet(ctx, key, field, payload).Err(); err != nil {
|
|
return fmt.Errorf("write %s:%s: %w", key, field, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *SyncService) handleSyncError(err error, entry SyncOutboxEntry) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if s == nil || s.outbox == nil || !s.outbox.Enabled() {
|
|
return err
|
|
}
|
|
if enqueueErr := s.outbox.Enqueue(entry); enqueueErr != nil {
|
|
return fmt.Errorf("sync failed: %w (outbox enqueue failed: %v)", err, enqueueErr)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func normalizeWeight(weight int) int {
|
|
if weight <= 0 {
|
|
return 1
|
|
}
|
|
return weight
|
|
}
|
|
|
|
func normalizeStatus(status string) string {
|
|
st := strings.ToLower(strings.TrimSpace(status))
|
|
if st == "" {
|
|
return "active"
|
|
}
|
|
switch st {
|
|
case "active", "auto_disabled", "manual_disabled":
|
|
return st
|
|
default:
|
|
return st
|
|
}
|
|
}
|
|
|
|
func unixOrZero(t *time.Time) int64 {
|
|
if t == nil {
|
|
return 0
|
|
}
|
|
return t.UTC().Unix()
|
|
}
|
|
|
|
func (s *SyncService) refreshModelsMetaFromRedis(ctx context.Context, source string) error {
|
|
raw, err := s.rdb.HGetAll(ctx, "meta:models").Result()
|
|
if err != nil {
|
|
return fmt.Errorf("read meta:models: %w", err)
|
|
}
|
|
now := time.Now().Unix()
|
|
meta := map[string]interface{}{
|
|
"version": fmt.Sprintf("%d", now),
|
|
"updated_at": fmt.Sprintf("%d", now),
|
|
"source": source,
|
|
"checksum": modelcap.ChecksumFromPayloads(raw),
|
|
}
|
|
if err := s.rdb.HSet(ctx, "meta:models_meta", meta).Err(); err != nil {
|
|
return fmt.Errorf("write meta:models_meta: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func writeModelsMeta(ctx context.Context, pipe redis.Pipeliner, meta modelcap.Meta) error {
|
|
fields := map[string]string{
|
|
"version": strings.TrimSpace(meta.Version),
|
|
"updated_at": strings.TrimSpace(meta.UpdatedAt),
|
|
"source": strings.TrimSpace(meta.Source),
|
|
"checksum": strings.TrimSpace(meta.Checksum),
|
|
"upstream_url": strings.TrimSpace(meta.UpstreamURL),
|
|
"upstream_ref": strings.TrimSpace(meta.UpstreamRef),
|
|
}
|
|
for k, v := range fields {
|
|
if v == "" {
|
|
delete(fields, k)
|
|
}
|
|
}
|
|
if fields["version"] == "" {
|
|
fields["version"] = fmt.Sprintf("%d", time.Now().Unix())
|
|
}
|
|
if fields["updated_at"] == "" {
|
|
fields["updated_at"] = fmt.Sprintf("%d", time.Now().Unix())
|
|
}
|
|
if fields["source"] == "" {
|
|
fields["source"] = "db"
|
|
}
|
|
if fields["checksum"] == "" {
|
|
fields["checksum"] = "unknown"
|
|
}
|
|
if err := pipe.HSet(ctx, "meta:models_meta", fields).Err(); err != nil {
|
|
return fmt.Errorf("write meta:models_meta: %w", err)
|
|
}
|
|
return nil
|
|
}
|