api_request.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. package channel
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "sync"
  10. "time"
  11. common2 "github.com/QuantumNous/new-api/common"
  12. "github.com/QuantumNous/new-api/logger"
  13. "github.com/QuantumNous/new-api/relay/common"
  14. "github.com/QuantumNous/new-api/relay/constant"
  15. "github.com/QuantumNous/new-api/relay/helper"
  16. "github.com/QuantumNous/new-api/service"
  17. "github.com/QuantumNous/new-api/setting/operation_setting"
  18. "github.com/QuantumNous/new-api/types"
  19. "github.com/bytedance/gopkg/util/gopool"
  20. "github.com/gin-gonic/gin"
  21. "github.com/gorilla/websocket"
  22. )
  23. func SetupApiRequestHeader(info *common.RelayInfo, c *gin.Context, req *http.Header) {
  24. if info.RelayMode == constant.RelayModeAudioTranscription || info.RelayMode == constant.RelayModeAudioTranslation {
  25. // multipart/form-data
  26. } else if info.RelayMode == constant.RelayModeRealtime {
  27. // websocket
  28. } else {
  29. req.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  30. req.Set("Accept", c.Request.Header.Get("Accept"))
  31. if info.IsStream && c.Request.Header.Get("Accept") == "" {
  32. req.Set("Accept", "text/event-stream")
  33. }
  34. }
  35. }
  36. // processHeaderOverride 处理请求头覆盖,支持变量替换
  37. // 支持的变量:{api_key}
  38. func processHeaderOverride(info *common.RelayInfo) (map[string]string, error) {
  39. headerOverride := make(map[string]string)
  40. for k, v := range info.HeadersOverride {
  41. str, ok := v.(string)
  42. if !ok {
  43. return nil, types.NewError(nil, types.ErrorCodeChannelHeaderOverrideInvalid)
  44. }
  45. // 替换支持的变量
  46. if strings.Contains(str, "{api_key}") {
  47. str = strings.ReplaceAll(str, "{api_key}", info.ApiKey)
  48. }
  49. headerOverride[k] = str
  50. }
  51. return headerOverride, nil
  52. }
  53. func DoApiRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  54. fullRequestURL, err := a.GetRequestURL(info)
  55. if err != nil {
  56. return nil, fmt.Errorf("get request url failed: %w", err)
  57. }
  58. if common2.DebugEnabled {
  59. println("fullRequestURL:", fullRequestURL)
  60. }
  61. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  62. if err != nil {
  63. return nil, fmt.Errorf("new request failed: %w", err)
  64. }
  65. headers := req.Header
  66. headerOverride, err := processHeaderOverride(info)
  67. if err != nil {
  68. return nil, err
  69. }
  70. for key, value := range headerOverride {
  71. headers.Set(key, value)
  72. }
  73. err = a.SetupRequestHeader(c, &headers, info)
  74. if err != nil {
  75. return nil, fmt.Errorf("setup request header failed: %w", err)
  76. }
  77. resp, err := doRequest(c, req, info)
  78. if err != nil {
  79. return nil, fmt.Errorf("do request failed: %w", err)
  80. }
  81. return resp, nil
  82. }
  83. func DoFormRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  84. fullRequestURL, err := a.GetRequestURL(info)
  85. if err != nil {
  86. return nil, fmt.Errorf("get request url failed: %w", err)
  87. }
  88. if common2.DebugEnabled {
  89. println("fullRequestURL:", fullRequestURL)
  90. }
  91. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  92. if err != nil {
  93. return nil, fmt.Errorf("new request failed: %w", err)
  94. }
  95. // set form data
  96. req.Header.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  97. headers := req.Header
  98. headerOverride, err := processHeaderOverride(info)
  99. if err != nil {
  100. return nil, err
  101. }
  102. for key, value := range headerOverride {
  103. headers.Set(key, value)
  104. }
  105. err = a.SetupRequestHeader(c, &headers, info)
  106. if err != nil {
  107. return nil, fmt.Errorf("setup request header failed: %w", err)
  108. }
  109. resp, err := doRequest(c, req, info)
  110. if err != nil {
  111. return nil, fmt.Errorf("do request failed: %w", err)
  112. }
  113. return resp, nil
  114. }
  115. func DoWssRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*websocket.Conn, error) {
  116. fullRequestURL, err := a.GetRequestURL(info)
  117. if err != nil {
  118. return nil, fmt.Errorf("get request url failed: %w", err)
  119. }
  120. targetHeader := http.Header{}
  121. headerOverride, err := processHeaderOverride(info)
  122. if err != nil {
  123. return nil, err
  124. }
  125. for key, value := range headerOverride {
  126. targetHeader.Set(key, value)
  127. }
  128. err = a.SetupRequestHeader(c, &targetHeader, info)
  129. if err != nil {
  130. return nil, fmt.Errorf("setup request header failed: %w", err)
  131. }
  132. targetHeader.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  133. targetConn, _, err := websocket.DefaultDialer.Dial(fullRequestURL, targetHeader)
  134. if err != nil {
  135. return nil, fmt.Errorf("dial failed to %s: %w", fullRequestURL, err)
  136. }
  137. // send request body
  138. //all, err := io.ReadAll(requestBody)
  139. //err = service.WssString(c, targetConn, string(all))
  140. return targetConn, nil
  141. }
  142. func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.CancelFunc {
  143. pingerCtx, stopPinger := context.WithCancel(context.Background())
  144. gopool.Go(func() {
  145. defer func() {
  146. // 增加panic恢复处理
  147. if r := recover(); r != nil {
  148. if common2.DebugEnabled {
  149. println("SSE ping goroutine panic recovered:", fmt.Sprintf("%v", r))
  150. }
  151. }
  152. if common2.DebugEnabled {
  153. println("SSE ping goroutine stopped.")
  154. }
  155. }()
  156. if pingInterval <= 0 {
  157. pingInterval = helper.DefaultPingInterval
  158. }
  159. ticker := time.NewTicker(pingInterval)
  160. // 确保在任何情况下都清理ticker
  161. defer func() {
  162. ticker.Stop()
  163. if common2.DebugEnabled {
  164. println("SSE ping ticker stopped")
  165. }
  166. }()
  167. var pingMutex sync.Mutex
  168. if common2.DebugEnabled {
  169. println("SSE ping goroutine started")
  170. }
  171. // 增加超时控制,防止goroutine长时间运行
  172. maxPingDuration := 120 * time.Minute // 最大ping持续时间
  173. pingTimeout := time.NewTimer(maxPingDuration)
  174. defer pingTimeout.Stop()
  175. for {
  176. select {
  177. // 发送 ping 数据
  178. case <-ticker.C:
  179. if err := sendPingData(c, &pingMutex); err != nil {
  180. if common2.DebugEnabled {
  181. println("SSE ping error, stopping goroutine:", err.Error())
  182. }
  183. return
  184. }
  185. // 收到退出信号
  186. case <-pingerCtx.Done():
  187. return
  188. // request 结束
  189. case <-c.Request.Context().Done():
  190. return
  191. // 超时保护,防止goroutine无限运行
  192. case <-pingTimeout.C:
  193. if common2.DebugEnabled {
  194. println("SSE ping goroutine timeout, stopping")
  195. }
  196. return
  197. }
  198. }
  199. })
  200. return stopPinger
  201. }
  202. func sendPingData(c *gin.Context, mutex *sync.Mutex) error {
  203. // 增加超时控制,防止锁死等待
  204. done := make(chan error, 1)
  205. go func() {
  206. mutex.Lock()
  207. defer mutex.Unlock()
  208. err := helper.PingData(c)
  209. if err != nil {
  210. logger.LogError(c, "SSE ping error: "+err.Error())
  211. done <- err
  212. return
  213. }
  214. if common2.DebugEnabled {
  215. println("SSE ping data sent.")
  216. }
  217. done <- nil
  218. }()
  219. // 设置发送ping数据的超时时间
  220. select {
  221. case err := <-done:
  222. return err
  223. case <-time.After(10 * time.Second):
  224. return errors.New("SSE ping data send timeout")
  225. case <-c.Request.Context().Done():
  226. return errors.New("request context cancelled during ping")
  227. }
  228. }
  229. func DoRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
  230. return doRequest(c, req, info)
  231. }
  232. func doRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
  233. var client *http.Client
  234. var err error
  235. if info.ChannelSetting.Proxy != "" {
  236. client, err = service.NewProxyHttpClient(info.ChannelSetting.Proxy)
  237. if err != nil {
  238. return nil, fmt.Errorf("new proxy http client failed: %w", err)
  239. }
  240. } else {
  241. client = service.GetHttpClient()
  242. }
  243. var stopPinger context.CancelFunc
  244. if info.IsStream {
  245. helper.SetEventStreamHeaders(c)
  246. // 处理流式请求的 ping 保活
  247. generalSettings := operation_setting.GetGeneralSetting()
  248. if generalSettings.PingIntervalEnabled && !info.DisablePing {
  249. pingInterval := time.Duration(generalSettings.PingIntervalSeconds) * time.Second
  250. stopPinger = startPingKeepAlive(c, pingInterval)
  251. // 使用defer确保在任何情况下都能停止ping goroutine
  252. defer func() {
  253. if stopPinger != nil {
  254. stopPinger()
  255. if common2.DebugEnabled {
  256. println("SSE ping goroutine stopped by defer")
  257. }
  258. }
  259. }()
  260. }
  261. }
  262. resp, err := client.Do(req)
  263. if err != nil {
  264. logger.LogError(c, "do request failed: "+err.Error())
  265. return nil, types.NewError(err, types.ErrorCodeDoRequestFailed, types.ErrOptionWithHideErrMsg("upstream error: do request failed"))
  266. }
  267. if resp == nil {
  268. return nil, errors.New("resp is nil")
  269. }
  270. _ = req.Body.Close()
  271. _ = c.Request.Body.Close()
  272. return resp, nil
  273. }
  274. func DoTaskApiRequest(a TaskAdaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  275. fullRequestURL, err := a.BuildRequestURL(info)
  276. if err != nil {
  277. return nil, err
  278. }
  279. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  280. if err != nil {
  281. return nil, fmt.Errorf("new request failed: %w", err)
  282. }
  283. req.GetBody = func() (io.ReadCloser, error) {
  284. return io.NopCloser(requestBody), nil
  285. }
  286. err = a.BuildRequestHeader(c, req, info)
  287. if err != nil {
  288. return nil, fmt.Errorf("setup request header failed: %w", err)
  289. }
  290. resp, err := doRequest(c, req, info)
  291. if err != nil {
  292. return nil, fmt.Errorf("do request failed: %w", err)
  293. }
  294. return resp, nil
  295. }