Files
ez-api/internal/service/sync.go

260 lines
6.9 KiB
Go

package service
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"strings"
"github.com/ez-api/ez-api/internal/model"
"github.com/redis/go-redis/v9"
"gorm.io/gorm"
)
type SyncService struct {
rdb *redis.Client
}
func NewSyncService(rdb *redis.Client) *SyncService {
return &SyncService{rdb: rdb}
}
// SyncKey writes a single key into Redis without rebuilding the entire snapshot.
func (s *SyncService) SyncKey(key *model.Key) error {
ctx := context.Background()
snap := keySnapshot{
ID: key.ID,
TokenHash: hashToken(key.KeySecret),
Group: normalizeGroup(key.Group),
Status: key.Status,
Weight: key.Weight,
Balance: key.Balance,
}
if err := s.hsetJSON(ctx, "config:keys", snap.TokenHash, snap); err != nil {
return err
}
fields := map[string]interface{}{
"status": snap.Status,
"group": snap.Group,
"weight": snap.Weight,
"balance": snap.Balance,
}
if err := s.rdb.HSet(ctx, fmt.Sprintf("auth:token:%s", snap.TokenHash), fields).Err(); err != nil {
return fmt.Errorf("write auth token: %w", err)
}
return nil
}
// SyncProvider writes a single provider into Redis hash storage and updates routing tables.
func (s *SyncService) SyncProvider(provider *model.Provider) error {
ctx := context.Background()
group := normalizeGroup(provider.Group)
models := strings.Split(provider.Models, ",")
snap := providerSnapshot{
ID: provider.ID,
Name: provider.Name,
Type: provider.Type,
BaseURL: provider.BaseURL,
APIKey: provider.APIKey,
Group: group,
Models: models,
}
// 1. Update Provider Config
if err := s.hsetJSON(ctx, "config:providers", fmt.Sprintf("%d", provider.ID), snap); err != nil {
return err
}
// 2. Update Routing Table: route:group:{group}:{model} -> Set(provider_id)
// Note: This is an additive operation. Removing models requires full sync or smarter logic.
pipe := s.rdb.Pipeline()
for _, m := range models {
m = strings.TrimSpace(m)
if m == "" {
continue
}
routeKey := fmt.Sprintf("route:group:%s:%s", group, m)
pipe.SAdd(ctx, routeKey, provider.ID)
}
_, err := pipe.Exec(ctx)
return err
}
// SyncModel writes a single model metadata record.
func (s *SyncService) SyncModel(m *model.Model) error {
ctx := context.Background()
snap := modelSnapshot{
Name: m.Name,
ContextWindow: m.ContextWindow,
CostPerToken: m.CostPerToken,
SupportsVision: m.SupportsVision,
SupportsFunction: m.SupportsFunctions,
SupportsToolChoice: m.SupportsToolChoice,
SupportsFIM: m.SupportsFIM,
MaxOutputTokens: m.MaxOutputTokens,
}
return s.hsetJSON(ctx, "meta:models", snap.Name, snap)
}
type providerSnapshot struct {
ID uint `json:"id"`
Name string `json:"name"`
Type string `json:"type"`
BaseURL string `json:"base_url"`
APIKey string `json:"api_key"`
Group string `json:"group"`
Models []string `json:"models"`
}
type keySnapshot struct {
ID uint `json:"id"`
TokenHash string `json:"token_hash"`
Group string `json:"group"`
Status string `json:"status"`
Weight int `json:"weight"`
Balance float64 `json:"balance"`
}
type modelSnapshot struct {
Name string `json:"name"`
ContextWindow int `json:"context_window"`
CostPerToken float64 `json:"cost_per_token"`
SupportsVision bool `json:"supports_vision"`
SupportsFunction bool `json:"supports_functions"`
SupportsToolChoice bool `json:"supports_tool_choice"`
SupportsFIM bool `json:"supports_fim"`
MaxOutputTokens int `json:"max_output_tokens"`
}
// SyncAll rebuilds Redis hashes from the database; use for cold starts or forced refreshes.
func (s *SyncService) SyncAll(db *gorm.DB) error {
ctx := context.Background()
var providers []model.Provider
if err := db.Find(&providers).Error; err != nil {
return fmt.Errorf("load providers: %w", err)
}
var keys []model.Key
if err := db.Find(&keys).Error; err != nil {
return fmt.Errorf("load keys: %w", err)
}
var models []model.Model
if err := db.Find(&models).Error; err != nil {
return fmt.Errorf("load models: %w", err)
}
pipe := s.rdb.TxPipeline()
pipe.Del(ctx, "config:providers", "config:keys", "meta:models")
// Clear old routing tables (pattern scan would be better in prod, but keys are predictable if we knew them)
// For MVP, we rely on the fact that we are rebuilding.
// Ideally, we should scan "route:group:*" and del, but let's just rebuild.
for _, p := range providers {
group := normalizeGroup(p.Group)
models := strings.Split(p.Models, ",")
snap := providerSnapshot{
ID: p.ID,
Name: p.Name,
Type: p.Type,
BaseURL: p.BaseURL,
APIKey: p.APIKey,
Group: group,
Models: models,
}
payload, err := json.Marshal(snap)
if err != nil {
return fmt.Errorf("marshal provider %d: %w", p.ID, err)
}
pipe.HSet(ctx, "config:providers", fmt.Sprintf("%d", p.ID), payload)
// Rebuild Routing Table
for _, m := range models {
m = strings.TrimSpace(m)
if m == "" {
continue
}
routeKey := fmt.Sprintf("route:group:%s:%s", group, m)
pipe.SAdd(ctx, routeKey, p.ID)
}
}
for _, k := range keys {
snap := keySnapshot{
ID: k.ID,
TokenHash: hashToken(k.KeySecret),
Group: normalizeGroup(k.Group),
Status: k.Status,
Weight: k.Weight,
Balance: k.Balance,
}
payload, err := json.Marshal(snap)
if err != nil {
return fmt.Errorf("marshal key %d: %w", k.ID, err)
}
pipe.HSet(ctx, "config:keys", snap.TokenHash, payload)
pipe.HSet(ctx, fmt.Sprintf("auth:token:%s", snap.TokenHash), map[string]interface{}{
"status": snap.Status,
"group": snap.Group,
"weight": snap.Weight,
"balance": snap.Balance,
})
}
for _, m := range models {
snap := modelSnapshot{
Name: m.Name,
ContextWindow: m.ContextWindow,
CostPerToken: m.CostPerToken,
SupportsVision: m.SupportsVision,
SupportsFunction: m.SupportsFunctions,
SupportsToolChoice: m.SupportsToolChoice,
SupportsFIM: m.SupportsFIM,
MaxOutputTokens: m.MaxOutputTokens,
}
payload, err := json.Marshal(snap)
if err != nil {
return fmt.Errorf("marshal model %s: %w", m.Name, err)
}
pipe.HSet(ctx, "meta:models", snap.Name, payload)
}
if _, err := pipe.Exec(ctx); err != nil {
return fmt.Errorf("write snapshots: %w", err)
}
return nil
}
func (s *SyncService) hsetJSON(ctx context.Context, key, field string, val interface{}) error {
payload, err := json.Marshal(val)
if err != nil {
return fmt.Errorf("marshal %s:%s: %w", key, field, err)
}
if err := s.rdb.HSet(ctx, key, field, payload).Err(); err != nil {
return fmt.Errorf("write %s:%s: %w", key, field, err)
}
return nil
}
func hashToken(token string) string {
hasher := sha256.New()
hasher.Write([]byte(token))
return hex.EncodeToString(hasher.Sum(nil))
}
func normalizeGroup(group string) string {
if strings.TrimSpace(group) == "" {
return "default"
}
return group
}