Files
ez-api/internal/service/sync.go
zenfun 05cba292d4 refactor(service): remove legacy routing and unused code
Removes the legacy route table maintenance logic from the sync service
that populated deprecated Redis keys. Additionally, deletes the unused
TokenService and KeyDTO files to reduce technical debt.
2026-01-04 12:07:37 +08:00

842 lines
24 KiB
Go

package service
import (
"context"
"fmt"
"strings"
"time"
"github.com/ez-api/ez-api/internal/model"
groupx "github.com/ez-api/foundation/group"
"github.com/ez-api/foundation/jsoncodec"
"github.com/ez-api/foundation/modelcap"
"github.com/ez-api/foundation/routing"
"github.com/ez-api/foundation/tokenhash"
"github.com/redis/go-redis/v9"
"gorm.io/gorm"
)
type SyncService struct {
rdb *redis.Client
outbox *SyncOutboxService
}
func NewSyncService(rdb *redis.Client) *SyncService {
return &SyncService{rdb: rdb}
}
// SetOutbox enables sync outbox retry for this service.
func (s *SyncService) SetOutbox(outbox *SyncOutboxService) {
s.outbox = outbox
}
// SyncKey writes a single key into Redis without rebuilding the entire snapshot.
func (s *SyncService) SyncKey(key *model.Key) error {
if key == nil {
return fmt.Errorf("key required")
}
tokenHash := key.TokenHash
if strings.TrimSpace(tokenHash) == "" {
tokenHash = tokenhash.HashToken(key.KeySecret) // backward compatibility
}
if strings.TrimSpace(tokenHash) == "" {
return fmt.Errorf("token hash missing for key %d", key.ID)
}
return s.handleSyncError(s.SyncKeyNow(context.Background(), key), SyncOutboxEntry{
ResourceType: "key",
Action: "upsert",
ResourceID: &key.ID,
})
}
// SyncKeyNow writes key metadata to Redis without outbox handling.
func (s *SyncService) SyncKeyNow(ctx context.Context, key *model.Key) error {
if key == nil {
return fmt.Errorf("key required")
}
if ctx == nil {
ctx = context.Background()
}
tokenHash := key.TokenHash
if strings.TrimSpace(tokenHash) == "" {
tokenHash = tokenhash.HashToken(key.KeySecret) // backward compatibility
}
if strings.TrimSpace(tokenHash) == "" {
return fmt.Errorf("token hash missing for key %d", key.ID)
}
fields := map[string]interface{}{
"id": key.ID,
"master_id": key.MasterID,
"issued_at_epoch": key.IssuedAtEpoch,
"status": key.Status,
"group": key.Group,
"scopes": key.Scopes,
"default_namespace": key.DefaultNamespace,
"namespaces": key.Namespaces,
"model_limits": strings.TrimSpace(key.ModelLimits),
"model_limits_enabled": key.ModelLimitsEnabled,
"expires_at": unixOrZero(key.ExpiresAt),
"allow_ips": strings.TrimSpace(key.AllowIPs),
"deny_ips": strings.TrimSpace(key.DenyIPs),
"last_accessed_at": unixOrZero(key.LastAccessedAt),
"request_count": key.RequestCount,
"used_tokens": key.UsedTokens,
"quota_limit": key.QuotaLimit,
"quota_used": key.QuotaUsed,
"quota_reset_at": unixOrZero(key.QuotaResetAt),
"quota_reset_type": strings.TrimSpace(key.QuotaResetType),
}
if err := s.rdb.HSet(ctx, fmt.Sprintf("auth:token:%s", tokenHash), fields).Err(); err != nil {
return fmt.Errorf("write auth token: %w", err)
}
return nil
}
// SyncMaster writes master metadata into Redis used by the balancer for validation.
func (s *SyncService) SyncMaster(master *model.Master) error {
if master == nil {
return fmt.Errorf("master required")
}
return s.handleSyncError(s.SyncMasterNow(context.Background(), master), SyncOutboxEntry{
ResourceType: "master",
Action: "upsert",
ResourceID: &master.ID,
})
}
// SyncMasterNow writes master metadata to Redis without outbox handling.
func (s *SyncService) SyncMasterNow(ctx context.Context, master *model.Master) error {
if master == nil {
return fmt.Errorf("master required")
}
if ctx == nil {
ctx = context.Background()
}
key := fmt.Sprintf("auth:master:%d", master.ID)
if err := s.rdb.HSet(ctx, key, map[string]interface{}{
"epoch": master.Epoch,
"status": master.Status,
"global_qps": master.GlobalQPS,
}).Err(); err != nil {
return fmt.Errorf("write master metadata: %w", err)
}
return nil
}
// SyncProviders rebuilds provider snapshots from ProviderGroup + APIKey tables.
func (s *SyncService) SyncProviders(db *gorm.DB) error {
return s.syncProviders(db, SyncOutboxEntry{
ResourceType: "snapshot",
Action: "sync_providers",
})
}
// SyncProvidersForGroup retries provider snapshot with provider_group context.
func (s *SyncService) SyncProvidersForGroup(db *gorm.DB, groupID uint) error {
return s.syncProviders(db, SyncOutboxEntry{
ResourceType: "provider_group",
Action: "sync_providers",
ResourceID: &groupID,
})
}
// SyncProvidersForAPIKey retries provider snapshot with api_key context.
func (s *SyncService) SyncProvidersForAPIKey(db *gorm.DB, apiKeyID uint) error {
return s.syncProviders(db, SyncOutboxEntry{
ResourceType: "api_key",
Action: "sync_providers",
ResourceID: &apiKeyID,
})
}
func (s *SyncService) syncProviders(db *gorm.DB, entry SyncOutboxEntry) error {
if db == nil {
return fmt.Errorf("db required")
}
return s.handleSyncError(s.SyncProvidersNow(context.Background(), db), entry)
}
// SyncProvidersNow rebuilds provider snapshots without outbox handling.
func (s *SyncService) SyncProvidersNow(ctx context.Context, db *gorm.DB) error {
if db == nil {
return fmt.Errorf("db required")
}
if ctx == nil {
ctx = context.Background()
}
var groups []model.ProviderGroup
if err := db.Find(&groups).Error; err != nil {
return fmt.Errorf("load provider groups: %w", err)
}
var apiKeys []model.APIKey
if err := db.Find(&apiKeys).Error; err != nil {
return fmt.Errorf("load api keys: %w", err)
}
pipe := s.rdb.TxPipeline()
pipe.Del(ctx, "config:providers")
if err := s.writeProvidersSnapshot(ctx, pipe, groups, apiKeys); err != nil {
return err
}
if _, err := pipe.Exec(ctx); err != nil {
return fmt.Errorf("write provider snapshot: %w", err)
}
return nil
}
// SyncModel writes a single model metadata record.
func (s *SyncService) SyncModel(m *model.Model) error {
if m == nil {
return fmt.Errorf("model required")
}
if strings.TrimSpace(m.Name) == "" {
return fmt.Errorf("model name required")
}
return s.handleSyncError(s.SyncModelNow(context.Background(), m), SyncOutboxEntry{
ResourceType: "model",
Action: "upsert",
ResourceID: &m.ID,
})
}
// SyncModelNow writes a single model metadata record without outbox handling.
func (s *SyncService) SyncModelNow(ctx context.Context, m *model.Model) error {
if m == nil {
return fmt.Errorf("model required")
}
if strings.TrimSpace(m.Name) == "" {
return fmt.Errorf("model name required")
}
if ctx == nil {
ctx = context.Background()
}
snap := modelcap.Model{
Name: m.Name,
Kind: string(modelcap.NormalizeKind(m.Kind)),
ContextWindow: m.ContextWindow,
CostPerToken: m.CostPerToken,
SupportsVision: m.SupportsVision,
SupportsFunction: m.SupportsFunctions,
SupportsToolChoice: m.SupportsToolChoice,
SupportsFim: m.SupportsFIM,
MaxOutputTokens: m.MaxOutputTokens,
}.Normalized()
if err := s.hsetJSON(ctx, "meta:models", snap.Name, snap); err != nil {
return err
}
if err := s.refreshModelsMetaFromRedis(ctx, "db"); err != nil {
return err
}
return nil
}
// SyncModelDelete removes model metadata from Redis and refreshes meta:models_meta.
func (s *SyncService) SyncModelDelete(m *model.Model) error {
if m == nil {
return fmt.Errorf("model required")
}
name := strings.TrimSpace(m.Name)
if name == "" {
return fmt.Errorf("model name required")
}
return s.handleSyncError(s.SyncModelDeleteNow(context.Background(), m), SyncOutboxEntry{
ResourceType: "model",
Action: "delete",
ResourceID: &m.ID,
Payload: map[string]any{
"name": name,
},
})
}
// SyncModelDeleteNow removes model metadata from Redis without outbox handling.
func (s *SyncService) SyncModelDeleteNow(ctx context.Context, m *model.Model) error {
if m == nil {
return fmt.Errorf("model required")
}
name := strings.TrimSpace(m.Name)
if name == "" {
return fmt.Errorf("model name required")
}
if ctx == nil {
ctx = context.Background()
}
if err := s.rdb.HDel(ctx, "meta:models", name).Err(); err != nil {
return fmt.Errorf("delete meta:models: %w", err)
}
if err := s.refreshModelsMetaFromRedis(ctx, "db"); err != nil {
return err
}
return nil
}
type providerSnapshot struct {
ID uint `json:"id"`
Name string `json:"name"`
Type string `json:"type"`
BaseURL string `json:"base_url"`
APIKey string `json:"api_key"`
AccessToken string `json:"access_token,omitempty"`
ExpiresAt int64 `json:"expires_at,omitempty"`
AccountID string `json:"account_id,omitempty"`
ProjectID string `json:"project_id,omitempty"`
GoogleProject string `json:"google_project,omitempty"`
GoogleLocation string `json:"google_location,omitempty"`
StaticHeaders string `json:"static_headers,omitempty"`
HeadersProfile string `json:"headers_profile,omitempty"`
GroupID uint `json:"group_id,omitempty"`
Group string `json:"group"`
Models []string `json:"models"`
Weight int `json:"weight,omitempty"`
Status string `json:"status"`
AutoBan bool `json:"auto_ban"`
BanReason string `json:"ban_reason,omitempty"`
BanUntil int64 `json:"ban_until,omitempty"` // unix seconds
}
func (s *SyncService) writeProvidersSnapshot(ctx context.Context, pipe redis.Pipeliner, groups []model.ProviderGroup, apiKeys []model.APIKey) error {
groupMap := make(map[uint]model.ProviderGroup, len(groups))
for _, g := range groups {
groupMap[g.ID] = g
}
for _, k := range apiKeys {
g, ok := groupMap[k.GroupID]
if !ok {
continue
}
groupName := groupx.Normalize(g.Name)
if strings.TrimSpace(groupName) == "" {
groupName = "default"
}
groupStatus := normalizeStatus(g.Status)
keyStatus := normalizeStatus(k.Status)
status := keyStatus
if groupStatus != "" && groupStatus != "active" {
status = groupStatus
}
rawModels := strings.Split(g.Models, ",")
var models []string
for _, m := range rawModels {
m = strings.TrimSpace(m)
if m != "" {
models = append(models, m)
}
}
name := strings.TrimSpace(g.Name)
if name == "" {
name = groupName
}
name = fmt.Sprintf("%s#%d", name, k.ID)
snap := providerSnapshot{
ID: k.ID,
Name: name,
Type: strings.TrimSpace(g.Type),
BaseURL: strings.TrimSpace(g.BaseURL),
APIKey: strings.TrimSpace(k.APIKey),
AccessToken: strings.TrimSpace(k.AccessToken),
GoogleProject: strings.TrimSpace(g.GoogleProject),
GoogleLocation: strings.TrimSpace(g.GoogleLocation),
StaticHeaders: strings.TrimSpace(g.StaticHeaders),
HeadersProfile: strings.TrimSpace(g.HeadersProfile),
AccountID: strings.TrimSpace(k.AccountID),
ProjectID: strings.TrimSpace(k.ProjectID),
GroupID: g.ID,
Group: groupName,
Models: models,
Weight: k.Weight,
Status: status,
AutoBan: k.AutoBan,
BanReason: strings.TrimSpace(k.BanReason),
}
if k.ExpiresAt != nil {
snap.ExpiresAt = k.ExpiresAt.UTC().Unix()
}
if k.BanUntil != nil {
snap.BanUntil = k.BanUntil.UTC().Unix()
}
payload, err := jsoncodec.Marshal(snap)
if err != nil {
return fmt.Errorf("marshal provider %d: %w", k.ID, err)
}
pipe.HSet(ctx, "config:providers", fmt.Sprintf("%d", k.ID), payload)
}
return nil
}
// keySnapshot is no longer needed as we write directly to auth:token:*
// SyncAll rebuilds Redis hashes from the database; use for cold starts or forced refreshes.
func (s *SyncService) SyncAll(db *gorm.DB) error {
if db == nil {
return fmt.Errorf("db required")
}
return s.handleSyncError(s.SyncAllNow(context.Background(), db), SyncOutboxEntry{
ResourceType: "snapshot",
Action: "sync_all",
})
}
// SyncAllNow rebuilds snapshots without outbox handling.
func (s *SyncService) SyncAllNow(ctx context.Context, db *gorm.DB) error {
if db == nil {
return fmt.Errorf("db required")
}
if ctx == nil {
ctx = context.Background()
}
var groups []model.ProviderGroup
if err := db.Find(&groups).Error; err != nil {
return fmt.Errorf("load provider groups: %w", err)
}
var apiKeys []model.APIKey
if err := db.Find(&apiKeys).Error; err != nil {
return fmt.Errorf("load api keys: %w", err)
}
var keys []model.Key
if err := db.Find(&keys).Error; err != nil {
return fmt.Errorf("load keys: %w", err)
}
var masters []model.Master
if err := db.Find(&masters).Error; err != nil {
return fmt.Errorf("load masters: %w", err)
}
var models []model.Model
if err := db.Find(&models).Error; err != nil {
return fmt.Errorf("load models: %w", err)
}
var modelsPayloads map[string]string
var bindings []model.Binding
if err := db.Find(&bindings).Error; err != nil {
return fmt.Errorf("load bindings: %w", err)
}
pipe := s.rdb.TxPipeline()
pipe.Del(ctx, "config:providers", "config:keys", "meta:models", "meta:models_meta", "config:bindings", "meta:bindings_meta")
// Also clear master keys
var masterKeys []string
iter := s.rdb.Scan(ctx, 0, "auth:master:*", 0).Iterator()
for iter.Next(ctx) {
masterKeys = append(masterKeys, iter.Val())
}
if err := iter.Err(); err != nil {
return fmt.Errorf("scan master keys: %w", err)
}
if len(masterKeys) > 0 {
pipe.Del(ctx, masterKeys...)
}
if err := s.writeProvidersSnapshot(ctx, pipe, groups, apiKeys); err != nil {
return err
}
for _, k := range keys {
tokenHash := strings.TrimSpace(k.TokenHash)
if tokenHash == "" {
tokenHash = tokenhash.HashToken(k.KeySecret) // fallback for legacy rows
}
if tokenHash == "" {
return fmt.Errorf("token hash missing for key %d", k.ID)
}
pipe.HSet(ctx, fmt.Sprintf("auth:token:%s", tokenHash), map[string]interface{}{
"id": k.ID,
"master_id": k.MasterID,
"issued_at_epoch": k.IssuedAtEpoch,
"status": k.Status,
"group": k.Group,
"scopes": k.Scopes,
"default_namespace": k.DefaultNamespace,
"namespaces": k.Namespaces,
"model_limits": strings.TrimSpace(k.ModelLimits),
"model_limits_enabled": k.ModelLimitsEnabled,
"expires_at": unixOrZero(k.ExpiresAt),
"allow_ips": strings.TrimSpace(k.AllowIPs),
"deny_ips": strings.TrimSpace(k.DenyIPs),
"last_accessed_at": unixOrZero(k.LastAccessedAt),
"request_count": k.RequestCount,
"used_tokens": k.UsedTokens,
"quota_limit": k.QuotaLimit,
"quota_used": k.QuotaUsed,
"quota_reset_at": unixOrZero(k.QuotaResetAt),
"quota_reset_type": strings.TrimSpace(k.QuotaResetType),
})
}
for _, m := range masters {
pipe.HSet(ctx, fmt.Sprintf("auth:master:%d", m.ID), map[string]interface{}{
"epoch": m.Epoch,
"status": m.Status,
"global_qps": m.GlobalQPS,
})
}
for _, m := range models {
snap := modelcap.Model{
Name: m.Name,
Kind: string(modelcap.NormalizeKind(m.Kind)),
ContextWindow: m.ContextWindow,
CostPerToken: m.CostPerToken,
SupportsVision: m.SupportsVision,
SupportsFunction: m.SupportsFunctions,
SupportsToolChoice: m.SupportsToolChoice,
SupportsFim: m.SupportsFIM,
MaxOutputTokens: m.MaxOutputTokens,
}.Normalized()
payload, err := jsoncodec.Marshal(snap)
if err != nil {
return fmt.Errorf("marshal model %s: %w", m.Name, err)
}
// Capture payloads so we can compute deterministic checksum for meta:models_meta.
if modelsPayloads == nil {
modelsPayloads = make(map[string]string, len(models))
}
modelsPayloads[snap.Name] = string(payload)
pipe.HSet(ctx, "meta:models", snap.Name, payload)
}
now := time.Now().Unix()
if err := writeModelsMeta(ctx, pipe, modelcap.Meta{
Version: fmt.Sprintf("%d", now),
UpdatedAt: fmt.Sprintf("%d", now),
Source: "db",
Checksum: modelcap.ChecksumFromPayloads(modelsPayloads),
}); err != nil {
return err
}
if err := s.writeBindingsSnapshot(ctx, pipe, bindings, groups, apiKeys); err != nil {
return err
}
if _, err := pipe.Exec(ctx); err != nil {
return fmt.Errorf("write snapshots: %w", err)
}
return nil
}
// SyncBindings rebuilds the binding snapshot for DP routing.
// This is intentionally a rebuild to avoid stale entries on deletes/updates.
func (s *SyncService) SyncBindings(db *gorm.DB) error {
return s.syncBindings(db, SyncOutboxEntry{
ResourceType: "snapshot",
Action: "sync_bindings",
})
}
// SyncBindingsForGroup retries binding snapshot with provider_group context.
func (s *SyncService) SyncBindingsForGroup(db *gorm.DB, groupID uint) error {
return s.syncBindings(db, SyncOutboxEntry{
ResourceType: "provider_group",
Action: "sync_bindings",
ResourceID: &groupID,
})
}
// SyncBindingsForAPIKey retries binding snapshot with api_key context.
func (s *SyncService) SyncBindingsForAPIKey(db *gorm.DB, apiKeyID uint) error {
return s.syncBindings(db, SyncOutboxEntry{
ResourceType: "api_key",
Action: "sync_bindings",
ResourceID: &apiKeyID,
})
}
func (s *SyncService) syncBindings(db *gorm.DB, entry SyncOutboxEntry) error {
if db == nil {
return fmt.Errorf("db required")
}
return s.handleSyncError(s.SyncBindingsNow(context.Background(), db), entry)
}
// SyncBindingsNow rebuilds binding snapshot without outbox handling.
func (s *SyncService) SyncBindingsNow(ctx context.Context, db *gorm.DB) error {
if db == nil {
return fmt.Errorf("db required")
}
if ctx == nil {
ctx = context.Background()
}
var groups []model.ProviderGroup
if err := db.Find(&groups).Error; err != nil {
return fmt.Errorf("load provider groups: %w", err)
}
var apiKeys []model.APIKey
if err := db.Find(&apiKeys).Error; err != nil {
return fmt.Errorf("load api keys: %w", err)
}
var bindings []model.Binding
if err := db.Find(&bindings).Error; err != nil {
return fmt.Errorf("load bindings: %w", err)
}
pipe := s.rdb.TxPipeline()
pipe.Del(ctx, "config:bindings", "meta:bindings_meta")
if err := s.writeBindingsSnapshot(ctx, pipe, bindings, groups, apiKeys); err != nil {
return err
}
if _, err := pipe.Exec(ctx); err != nil {
return fmt.Errorf("write bindings snapshot: %w", err)
}
return nil
}
func (s *SyncService) writeBindingsSnapshot(ctx context.Context, pipe redis.Pipeliner, bindings []model.Binding, groups []model.ProviderGroup, apiKeys []model.APIKey) error {
type groupLite struct {
id uint
name string
ptype string
baseURL string
googleProject string
googleLoc string
models []string
status string
}
groupsByID := make(map[uint]groupLite, len(groups))
for _, g := range groups {
rawModels := strings.Split(g.Models, ",")
var outModels []string
for _, m := range rawModels {
m = strings.TrimSpace(m)
if m != "" {
outModels = append(outModels, m)
}
}
groupsByID[g.ID] = groupLite{
id: g.ID,
name: groupx.Normalize(g.Name),
ptype: strings.TrimSpace(g.Type),
baseURL: strings.TrimSpace(g.BaseURL),
googleProject: strings.TrimSpace(g.GoogleProject),
googleLoc: strings.TrimSpace(g.GoogleLocation),
models: outModels,
status: normalizeStatus(g.Status),
}
}
type apiKeyLite struct {
id uint
groupID uint
status string
weight int
autoBan bool
banUntil *time.Time
}
keysByGroup := make(map[uint][]apiKeyLite)
for _, k := range apiKeys {
keysByGroup[k.GroupID] = append(keysByGroup[k.GroupID], apiKeyLite{
id: k.ID,
groupID: k.GroupID,
status: normalizeStatus(k.Status),
weight: k.Weight,
autoBan: k.AutoBan,
banUntil: k.BanUntil,
})
}
type bindingAgg struct {
snap routing.BindingSnapshot
}
snaps := make(map[string]*routing.BindingSnapshot)
now := time.Now()
nowUnix := now.Unix()
version := now.UnixNano()
for _, b := range bindings {
if strings.TrimSpace(b.Status) != "" && strings.TrimSpace(b.Status) != "active" {
continue
}
ns := strings.TrimSpace(b.Namespace)
pm := strings.TrimSpace(b.PublicModel)
if ns == "" || pm == "" {
continue
}
group, ok := groupsByID[b.GroupID]
if !ok {
continue
}
if group.status != "" && group.status != "active" {
continue
}
key := ns + "." + pm
snap := snaps[key]
if snap == nil {
snap = &routing.BindingSnapshot{
Namespace: ns,
PublicModel: pm,
Status: "active",
UpdatedAt: nowUnix,
}
snaps[key] = snap
}
candidate := routing.BindingCandidate{
GroupID: group.id,
RouteGroup: group.name,
Weight: normalizeWeight(b.Weight),
SelectorType: strings.TrimSpace(b.SelectorType),
SelectorValue: strings.TrimSpace(b.SelectorValue),
Status: "active",
Upstreams: make(map[string]string),
}
selectorType := strings.TrimSpace(b.SelectorType)
selectorValue := strings.TrimSpace(b.SelectorValue)
keys := keysByGroup[b.GroupID]
if len(keys) == 0 {
candidate.Error = "no_provider"
}
for _, k := range keys {
if k.status != "" && k.status != "active" {
continue
}
if k.banUntil != nil && k.banUntil.UTC().Unix() > nowUnix {
continue
}
up, err := routing.ResolveUpstreamModel(routing.SelectorType(selectorType), selectorValue, pm, group.models)
if err != nil {
continue
}
candidate.Upstreams[fmt.Sprintf("%d", k.id)] = up
}
if len(candidate.Upstreams) == 0 && candidate.Error == "" {
candidate.Error = "config_error"
}
snap.Candidates = append(snap.Candidates, candidate)
}
for key, snap := range snaps {
payload, err := jsoncodec.Marshal(snap)
if err != nil {
return fmt.Errorf("marshal config:bindings:%s: %w", key, err)
}
pipe.HSet(ctx, "config:bindings", key, payload)
}
meta := map[string]string{
"version": fmt.Sprintf("%d", version),
"updated_at": fmt.Sprintf("%d", nowUnix),
"source": "cp_builtin",
}
if err := pipe.HSet(ctx, "meta:bindings_meta", meta).Err(); err != nil {
return fmt.Errorf("write meta:bindings_meta: %w", err)
}
return nil
}
func (s *SyncService) hsetJSON(ctx context.Context, key, field string, val interface{}) error {
payload, err := jsoncodec.Marshal(val)
if err != nil {
return fmt.Errorf("marshal %s:%s: %w", key, field, err)
}
if err := s.rdb.HSet(ctx, key, field, payload).Err(); err != nil {
return fmt.Errorf("write %s:%s: %w", key, field, err)
}
return nil
}
func (s *SyncService) handleSyncError(err error, entry SyncOutboxEntry) error {
if err == nil {
return nil
}
if s == nil || s.outbox == nil || !s.outbox.Enabled() {
return err
}
if enqueueErr := s.outbox.Enqueue(entry); enqueueErr != nil {
return fmt.Errorf("sync failed: %w (outbox enqueue failed: %v)", err, enqueueErr)
}
return nil
}
func normalizeWeight(weight int) int {
if weight <= 0 {
return 1
}
return weight
}
func normalizeStatus(status string) string {
st := strings.ToLower(strings.TrimSpace(status))
if st == "" {
return "active"
}
switch st {
case "active", "auto_disabled", "manual_disabled":
return st
default:
return st
}
}
func unixOrZero(t *time.Time) int64 {
if t == nil {
return 0
}
return t.UTC().Unix()
}
func (s *SyncService) refreshModelsMetaFromRedis(ctx context.Context, source string) error {
raw, err := s.rdb.HGetAll(ctx, "meta:models").Result()
if err != nil {
return fmt.Errorf("read meta:models: %w", err)
}
now := time.Now().Unix()
meta := map[string]interface{}{
"version": fmt.Sprintf("%d", now),
"updated_at": fmt.Sprintf("%d", now),
"source": source,
"checksum": modelcap.ChecksumFromPayloads(raw),
}
if err := s.rdb.HSet(ctx, "meta:models_meta", meta).Err(); err != nil {
return fmt.Errorf("write meta:models_meta: %w", err)
}
return nil
}
func writeModelsMeta(ctx context.Context, pipe redis.Pipeliner, meta modelcap.Meta) error {
fields := map[string]string{
"version": strings.TrimSpace(meta.Version),
"updated_at": strings.TrimSpace(meta.UpdatedAt),
"source": strings.TrimSpace(meta.Source),
"checksum": strings.TrimSpace(meta.Checksum),
"upstream_url": strings.TrimSpace(meta.UpstreamURL),
"upstream_ref": strings.TrimSpace(meta.UpstreamRef),
}
for k, v := range fields {
if v == "" {
delete(fields, k)
}
}
if fields["version"] == "" {
fields["version"] = fmt.Sprintf("%d", time.Now().Unix())
}
if fields["updated_at"] == "" {
fields["updated_at"] = fmt.Sprintf("%d", time.Now().Unix())
}
if fields["source"] == "" {
fields["source"] = "db"
}
if fields["checksum"] == "" {
fields["checksum"] = "unknown"
}
if err := pipe.HSet(ctx, "meta:models_meta", fields).Err(); err != nil {
return fmt.Errorf("write meta:models_meta: %w", err)
}
return nil
}