codex_credential_refresh_task.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/QuantumNous/new-api/common"
  10. "github.com/QuantumNous/new-api/constant"
  11. "github.com/QuantumNous/new-api/logger"
  12. "github.com/QuantumNous/new-api/model"
  13. "github.com/bytedance/gopkg/util/gopool"
  14. )
  15. const (
  16. codexCredentialRefreshTickInterval = 10 * time.Minute
  17. codexCredentialRefreshThreshold = 24 * time.Hour
  18. codexCredentialRefreshBatchSize = 200
  19. codexCredentialRefreshTimeout = 15 * time.Second
  20. )
  21. var (
  22. codexCredentialRefreshOnce sync.Once
  23. codexCredentialRefreshRunning atomic.Bool
  24. )
  25. func StartCodexCredentialAutoRefreshTask() {
  26. codexCredentialRefreshOnce.Do(func() {
  27. if !common.IsMasterNode {
  28. return
  29. }
  30. gopool.Go(func() {
  31. logger.LogInfo(context.Background(), fmt.Sprintf("codex credential auto-refresh task started: tick=%s threshold=%s", codexCredentialRefreshTickInterval, codexCredentialRefreshThreshold))
  32. ticker := time.NewTicker(codexCredentialRefreshTickInterval)
  33. defer ticker.Stop()
  34. runCodexCredentialAutoRefreshOnce()
  35. for range ticker.C {
  36. runCodexCredentialAutoRefreshOnce()
  37. }
  38. })
  39. })
  40. }
  41. func runCodexCredentialAutoRefreshOnce() {
  42. if !codexCredentialRefreshRunning.CompareAndSwap(false, true) {
  43. return
  44. }
  45. defer codexCredentialRefreshRunning.Store(false)
  46. ctx := context.Background()
  47. now := time.Now()
  48. var refreshed int
  49. var scanned int
  50. offset := 0
  51. for {
  52. var channels []*model.Channel
  53. err := model.DB.
  54. Select("id", "name", "key", "status", "channel_info").
  55. Where("type = ? AND status = 1", constant.ChannelTypeCodex).
  56. Order("id asc").
  57. Limit(codexCredentialRefreshBatchSize).
  58. Offset(offset).
  59. Find(&channels).Error
  60. if err != nil {
  61. logger.LogError(ctx, fmt.Sprintf("codex credential auto-refresh: query channels failed: %v", err))
  62. return
  63. }
  64. if len(channels) == 0 {
  65. break
  66. }
  67. offset += codexCredentialRefreshBatchSize
  68. for _, ch := range channels {
  69. if ch == nil {
  70. continue
  71. }
  72. scanned++
  73. if ch.ChannelInfo.IsMultiKey {
  74. continue
  75. }
  76. rawKey := strings.TrimSpace(ch.Key)
  77. if rawKey == "" {
  78. continue
  79. }
  80. oauthKey, err := parseCodexOAuthKey(rawKey)
  81. if err != nil {
  82. continue
  83. }
  84. refreshToken := strings.TrimSpace(oauthKey.RefreshToken)
  85. if refreshToken == "" {
  86. continue
  87. }
  88. expiredAtRaw := strings.TrimSpace(oauthKey.Expired)
  89. expiredAt, err := time.Parse(time.RFC3339, expiredAtRaw)
  90. if err == nil && !expiredAt.IsZero() && expiredAt.Sub(now) > codexCredentialRefreshThreshold {
  91. continue
  92. }
  93. refreshCtx, cancel := context.WithTimeout(ctx, codexCredentialRefreshTimeout)
  94. newKey, _, err := RefreshCodexChannelCredential(refreshCtx, ch.Id, CodexCredentialRefreshOptions{ResetCaches: false})
  95. cancel()
  96. if err != nil {
  97. logger.LogWarn(ctx, fmt.Sprintf("codex credential auto-refresh: channel_id=%d name=%s refresh failed: %v", ch.Id, ch.Name, err))
  98. continue
  99. }
  100. refreshed++
  101. logger.LogInfo(ctx, fmt.Sprintf("codex credential auto-refresh: channel_id=%d name=%s refreshed, expires_at=%s", ch.Id, ch.Name, newKey.Expired))
  102. }
  103. }
  104. if refreshed > 0 {
  105. func() {
  106. defer func() {
  107. if r := recover(); r != nil {
  108. logger.LogWarn(ctx, fmt.Sprintf("codex credential auto-refresh: InitChannelCache panic: %v", r))
  109. }
  110. }()
  111. model.InitChannelCache()
  112. }()
  113. ResetProxyClientCache()
  114. }
  115. if common.DebugEnabled {
  116. logger.LogDebug(ctx, "codex credential auto-refresh: scanned=%d refreshed=%d", scanned, refreshed)
  117. }
  118. }