| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- package service
- import (
- "context"
- "fmt"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/QuantumNous/new-api/common"
- "github.com/QuantumNous/new-api/constant"
- "github.com/QuantumNous/new-api/logger"
- "github.com/QuantumNous/new-api/model"
- "github.com/bytedance/gopkg/util/gopool"
- )
- const (
- codexCredentialRefreshTickInterval = 10 * time.Minute
- codexCredentialRefreshThreshold = 24 * time.Hour
- codexCredentialRefreshBatchSize = 200
- codexCredentialRefreshTimeout = 15 * time.Second
- )
- var (
- codexCredentialRefreshOnce sync.Once
- codexCredentialRefreshRunning atomic.Bool
- )
- func StartCodexCredentialAutoRefreshTask() {
- codexCredentialRefreshOnce.Do(func() {
- if !common.IsMasterNode {
- return
- }
- gopool.Go(func() {
- logger.LogInfo(context.Background(), fmt.Sprintf("codex credential auto-refresh task started: tick=%s threshold=%s", codexCredentialRefreshTickInterval, codexCredentialRefreshThreshold))
- ticker := time.NewTicker(codexCredentialRefreshTickInterval)
- defer ticker.Stop()
- runCodexCredentialAutoRefreshOnce()
- for range ticker.C {
- runCodexCredentialAutoRefreshOnce()
- }
- })
- })
- }
- func runCodexCredentialAutoRefreshOnce() {
- if !codexCredentialRefreshRunning.CompareAndSwap(false, true) {
- return
- }
- defer codexCredentialRefreshRunning.Store(false)
- ctx := context.Background()
- now := time.Now()
- var refreshed int
- var scanned int
- offset := 0
- for {
- var channels []*model.Channel
- err := model.DB.
- Select("id", "name", "key", "status", "channel_info").
- Where("type = ? AND status = 1", constant.ChannelTypeCodex).
- Order("id asc").
- Limit(codexCredentialRefreshBatchSize).
- Offset(offset).
- Find(&channels).Error
- if err != nil {
- logger.LogError(ctx, fmt.Sprintf("codex credential auto-refresh: query channels failed: %v", err))
- return
- }
- if len(channels) == 0 {
- break
- }
- offset += codexCredentialRefreshBatchSize
- for _, ch := range channels {
- if ch == nil {
- continue
- }
- scanned++
- if ch.ChannelInfo.IsMultiKey {
- continue
- }
- rawKey := strings.TrimSpace(ch.Key)
- if rawKey == "" {
- continue
- }
- oauthKey, err := parseCodexOAuthKey(rawKey)
- if err != nil {
- continue
- }
- refreshToken := strings.TrimSpace(oauthKey.RefreshToken)
- if refreshToken == "" {
- continue
- }
- expiredAtRaw := strings.TrimSpace(oauthKey.Expired)
- expiredAt, err := time.Parse(time.RFC3339, expiredAtRaw)
- if err == nil && !expiredAt.IsZero() && expiredAt.Sub(now) > codexCredentialRefreshThreshold {
- continue
- }
- refreshCtx, cancel := context.WithTimeout(ctx, codexCredentialRefreshTimeout)
- newKey, _, err := RefreshCodexChannelCredential(refreshCtx, ch.Id, CodexCredentialRefreshOptions{ResetCaches: false})
- cancel()
- if err != nil {
- logger.LogWarn(ctx, fmt.Sprintf("codex credential auto-refresh: channel_id=%d name=%s refresh failed: %v", ch.Id, ch.Name, err))
- continue
- }
- refreshed++
- logger.LogInfo(ctx, fmt.Sprintf("codex credential auto-refresh: channel_id=%d name=%s refreshed, expires_at=%s", ch.Id, ch.Name, newKey.Expired))
- }
- }
- if refreshed > 0 {
- func() {
- defer func() {
- if r := recover(); r != nil {
- logger.LogWarn(ctx, fmt.Sprintf("codex credential auto-refresh: InitChannelCache panic: %v", r))
- }
- }()
- model.InitChannelCache()
- }()
- ResetProxyClientCache()
- }
- if common.DebugEnabled {
- logger.LogDebug(ctx, "codex credential auto-refresh: scanned=%d refreshed=%d", scanned, refreshed)
- }
- }
|