api_request.go 8.2 KB


  1. package channel
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. common2 "one-api/common"
  9. "one-api/logger"
  10. "one-api/relay/common"
  11. "one-api/relay/constant"
  12. "one-api/relay/helper"
  13. "one-api/service"
  14. "one-api/setting/operation_setting"
  15. "one-api/types"
  16. "sync"
  17. "time"
  18. "github.com/bytedance/gopkg/util/gopool"
  19. "github.com/gin-gonic/gin"
  20. "github.com/gorilla/websocket"
  21. )
  22. func SetupApiRequestHeader(info *common.RelayInfo, c *gin.Context, req *http.Header) {
  23. if info.RelayMode == constant.RelayModeAudioTranscription || info.RelayMode == constant.RelayModeAudioTranslation {
  24. // multipart/form-data
  25. } else if info.RelayMode == constant.RelayModeRealtime {
  26. // websocket
  27. } else {
  28. req.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  29. req.Set("Accept", c.Request.Header.Get("Accept"))
  30. if info.IsStream && c.Request.Header.Get("Accept") == "" {
  31. req.Set("Accept", "text/event-stream")
  32. }
  33. }
  34. }
  35. func DoApiRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  36. fullRequestURL, err := a.GetRequestURL(info)
  37. if err != nil {
  38. return nil, fmt.Errorf("get request url failed: %w", err)
  39. }
  40. if common2.DebugEnabled {
  41. println("fullRequestURL:", fullRequestURL)
  42. }
  43. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  44. if err != nil {
  45. return nil, fmt.Errorf("new request failed: %w", err)
  46. }
  47. headers := req.Header
  48. headerOverride := make(map[string]string)
  49. for k, v := range info.HeadersOverride {
  50. if str, ok := v.(string); ok {
  51. headerOverride[k] = str
  52. } else {
  53. return nil, types.NewError(err, types.ErrorCodeChannelHeaderOverrideInvalid)
  54. }
  55. }
  56. for key, value := range headerOverride {
  57. headers.Set(key, value)
  58. }
  59. err = a.SetupRequestHeader(c, &headers, info)
  60. if err != nil {
  61. return nil, fmt.Errorf("setup request header failed: %w", err)
  62. }
  63. resp, err := doRequest(c, req, info)
  64. if err != nil {
  65. return nil, fmt.Errorf("do request failed: %w", err)
  66. }
  67. return resp, nil
  68. }
  69. func DoFormRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  70. fullRequestURL, err := a.GetRequestURL(info)
  71. if err != nil {
  72. return nil, fmt.Errorf("get request url failed: %w", err)
  73. }
  74. if common2.DebugEnabled {
  75. println("fullRequestURL:", fullRequestURL)
  76. }
  77. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  78. if err != nil {
  79. return nil, fmt.Errorf("new request failed: %w", err)
  80. }
  81. // set form data
  82. req.Header.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  83. headers := req.Header
  84. headerOverride := make(map[string]string)
  85. for k, v := range info.HeadersOverride {
  86. if str, ok := v.(string); ok {
  87. headerOverride[k] = str
  88. } else {
  89. return nil, types.NewError(err, types.ErrorCodeChannelHeaderOverrideInvalid)
  90. }
  91. }
  92. for key, value := range headerOverride {
  93. headers.Set(key, value)
  94. }
  95. err = a.SetupRequestHeader(c, &headers, info)
  96. if err != nil {
  97. return nil, fmt.Errorf("setup request header failed: %w", err)
  98. }
  99. resp, err := doRequest(c, req, info)
  100. if err != nil {
  101. return nil, fmt.Errorf("do request failed: %w", err)
  102. }
  103. return resp, nil
  104. }
  105. func DoWssRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*websocket.Conn, error) {
  106. fullRequestURL, err := a.GetRequestURL(info)
  107. if err != nil {
  108. return nil, fmt.Errorf("get request url failed: %w", err)
  109. }
  110. targetHeader := http.Header{}
  111. err = a.SetupRequestHeader(c, &targetHeader, info)
  112. if err != nil {
  113. return nil, fmt.Errorf("setup request header failed: %w", err)
  114. }
  115. targetHeader.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  116. targetConn, _, err := websocket.DefaultDialer.Dial(fullRequestURL, targetHeader)
  117. if err != nil {
  118. return nil, fmt.Errorf("dial failed to %s: %w", fullRequestURL, err)
  119. }
  120. // send request body
  121. //all, err := io.ReadAll(requestBody)
  122. //err = service.WssString(c, targetConn, string(all))
  123. return targetConn, nil
  124. }
  125. func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.CancelFunc {
  126. pingerCtx, stopPinger := context.WithCancel(context.Background())
  127. gopool.Go(func() {
  128. defer func() {
  129. // 增加panic恢复处理
  130. if r := recover(); r != nil {
  131. if common2.DebugEnabled {
  132. println("SSE ping goroutine panic recovered:", fmt.Sprintf("%v", r))
  133. }
  134. }
  135. if common2.DebugEnabled {
  136. println("SSE ping goroutine stopped.")
  137. }
  138. }()
  139. if pingInterval <= 0 {
  140. pingInterval = helper.DefaultPingInterval
  141. }
  142. ticker := time.NewTicker(pingInterval)
  143. // 确保在任何情况下都清理ticker
  144. defer func() {
  145. ticker.Stop()
  146. if common2.DebugEnabled {
  147. println("SSE ping ticker stopped")
  148. }
  149. }()
  150. var pingMutex sync.Mutex
  151. if common2.DebugEnabled {
  152. println("SSE ping goroutine started")
  153. }
  154. // 增加超时控制,防止goroutine长时间运行
  155. maxPingDuration := 120 * time.Minute // 最大ping持续时间
  156. pingTimeout := time.NewTimer(maxPingDuration)
  157. defer pingTimeout.Stop()
  158. for {
  159. select {
  160. // 发送 ping 数据
  161. case <-ticker.C:
  162. if err := sendPingData(c, &pingMutex); err != nil {
  163. if common2.DebugEnabled {
  164. println("SSE ping error, stopping goroutine:", err.Error())
  165. }
  166. return
  167. }
  168. // 收到退出信号
  169. case <-pingerCtx.Done():
  170. return
  171. // request 结束
  172. case <-c.Request.Context().Done():
  173. return
  174. // 超时保护,防止goroutine无限运行
  175. case <-pingTimeout.C:
  176. if common2.DebugEnabled {
  177. println("SSE ping goroutine timeout, stopping")
  178. }
  179. return
  180. }
  181. }
  182. })
  183. return stopPinger
  184. }
  185. func sendPingData(c *gin.Context, mutex *sync.Mutex) error {
  186. // 增加超时控制,防止锁死等待
  187. done := make(chan error, 1)
  188. go func() {
  189. mutex.Lock()
  190. defer mutex.Unlock()
  191. err := helper.PingData(c)
  192. if err != nil {
  193. logger.LogError(c, "SSE ping error: "+err.Error())
  194. done <- err
  195. return
  196. }
  197. if common2.DebugEnabled {
  198. println("SSE ping data sent.")
  199. }
  200. done <- nil
  201. }()
  202. // 设置发送ping数据的超时时间
  203. select {
  204. case err := <-done:
  205. return err
  206. case <-time.After(10 * time.Second):
  207. return errors.New("SSE ping data send timeout")
  208. case <-c.Request.Context().Done():
  209. return errors.New("request context cancelled during ping")
  210. }
  211. }
  212. func DoRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
  213. return doRequest(c, req, info)
  214. }
  215. func doRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
  216. var client *http.Client
  217. var err error
  218. if info.ChannelSetting.Proxy != "" {
  219. client, err = service.NewProxyHttpClient(info.ChannelSetting.Proxy)
  220. if err != nil {
  221. return nil, fmt.Errorf("new proxy http client failed: %w", err)
  222. }
  223. } else {
  224. client = service.GetHttpClient()
  225. }
  226. var stopPinger context.CancelFunc
  227. if info.IsStream {
  228. helper.SetEventStreamHeaders(c)
  229. // 处理流式请求的 ping 保活
  230. generalSettings := operation_setting.GetGeneralSetting()
  231. if generalSettings.PingIntervalEnabled && !info.DisablePing {
  232. pingInterval := time.Duration(generalSettings.PingIntervalSeconds) * time.Second
  233. stopPinger = startPingKeepAlive(c, pingInterval)
  234. // 使用defer确保在任何情况下都能停止ping goroutine
  235. defer func() {
  236. if stopPinger != nil {
  237. stopPinger()
  238. if common2.DebugEnabled {
  239. println("SSE ping goroutine stopped by defer")
  240. }
  241. }
  242. }()
  243. }
  244. }
  245. resp, err := client.Do(req)
  246. if err != nil {
  247. return nil, types.NewError(err, types.ErrorCodeDoRequestFailed, types.ErrOptionWithHideErrMsg("upstream error: do request failed"))
  248. }
  249. if resp == nil {
  250. return nil, errors.New("resp is nil")
  251. }
  252. _ = req.Body.Close()
  253. _ = c.Request.Body.Close()
  254. return resp, nil
  255. }
  256. func DoTaskApiRequest(a TaskAdaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  257. fullRequestURL, err := a.BuildRequestURL(info)
  258. if err != nil {
  259. return nil, err
  260. }
  261. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  262. if err != nil {
  263. return nil, fmt.Errorf("new request failed: %w", err)
  264. }
  265. req.GetBody = func() (io.ReadCloser, error) {
  266. return io.NopCloser(requestBody), nil
  267. }
  268. err = a.BuildRequestHeader(c, req, info)
  269. if err != nil {
  270. return nil, fmt.Errorf("setup request header failed: %w", err)
  271. }
  272. resp, err := doRequest(c, req, info)
  273. if err != nil {
  274. return nil, fmt.Errorf("do request failed: %w", err)
  275. }
  276. return resp, nil
  277. }