relay_task.go 12 KB


  1. package relay
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "one-api/common"
  10. "one-api/constant"
  11. "one-api/dto"
  12. "one-api/model"
  13. "one-api/relay/channel"
  14. relaycommon "one-api/relay/common"
  15. relayconstant "one-api/relay/constant"
  16. "one-api/service"
  17. "one-api/setting/ratio_setting"
  18. "strconv"
  19. "strings"
  20. "github.com/gin-gonic/gin"
  21. )
  22. /*
  23. Task 任务通过平台、Action 区分任务
  24. */
  25. func RelayTaskSubmit(c *gin.Context, info *relaycommon.RelayInfo) (taskErr *dto.TaskError) {
  26. info.InitChannelMeta(c)
  27. // ensure TaskRelayInfo is initialized to avoid nil dereference when accessing embedded fields
  28. if info.TaskRelayInfo == nil {
  29. info.TaskRelayInfo = &relaycommon.TaskRelayInfo{}
  30. }
  31. platform := constant.TaskPlatform(c.GetString("platform"))
  32. if platform == "" {
  33. platform = GetTaskPlatform(c)
  34. }
  35. info.InitChannelMeta(c)
  36. adaptor := GetTaskAdaptor(platform)
  37. if adaptor == nil {
  38. return service.TaskErrorWrapperLocal(fmt.Errorf("invalid api platform: %s", platform), "invalid_api_platform", http.StatusBadRequest)
  39. }
  40. adaptor.Init(info)
  41. // get & validate taskRequest 获取并验证文本请求
  42. taskErr = adaptor.ValidateRequestAndSetAction(c, info)
  43. if taskErr != nil {
  44. return
  45. }
  46. modelName := info.OriginModelName
  47. if modelName == "" {
  48. modelName = service.CoverTaskActionToModelName(platform, info.Action)
  49. }
  50. modelPrice, success := ratio_setting.GetModelPrice(modelName, true)
  51. if !success {
  52. defaultPrice, ok := ratio_setting.GetDefaultModelPriceMap()[modelName]
  53. if !ok {
  54. modelPrice = 0.1
  55. } else {
  56. modelPrice = defaultPrice
  57. }
  58. }
  59. // 预扣
  60. groupRatio := ratio_setting.GetGroupRatio(info.UsingGroup)
  61. var ratio float64
  62. userGroupRatio, hasUserGroupRatio := ratio_setting.GetGroupGroupRatio(info.UserGroup, info.UsingGroup)
  63. if hasUserGroupRatio {
  64. ratio = modelPrice * userGroupRatio
  65. } else {
  66. ratio = modelPrice * groupRatio
  67. }
  68. if len(info.PriceData.OtherRatios) > 0 {
  69. for _, ra := range info.PriceData.OtherRatios {
  70. if 1.0 != ra {
  71. ratio *= ra
  72. }
  73. }
  74. }
  75. println(fmt.Sprintf("model: %s, model_price: %.4f, group: %s, group_ratio: %.4f, final_ratio: %.4f", modelName, modelPrice, info.UsingGroup, groupRatio, ratio))
  76. userQuota, err := model.GetUserQuota(info.UserId, false)
  77. if err != nil {
  78. taskErr = service.TaskErrorWrapper(err, "get_user_quota_failed", http.StatusInternalServerError)
  79. return
  80. }
  81. quota := int(ratio * common.QuotaPerUnit)
  82. if userQuota-quota < 0 {
  83. taskErr = service.TaskErrorWrapperLocal(errors.New("user quota is not enough"), "quota_not_enough", http.StatusForbidden)
  84. return
  85. }
  86. if info.OriginTaskID != "" {
  87. originTask, exist, err := model.GetByTaskId(info.UserId, info.OriginTaskID)
  88. if err != nil {
  89. taskErr = service.TaskErrorWrapper(err, "get_origin_task_failed", http.StatusInternalServerError)
  90. return
  91. }
  92. if !exist {
  93. taskErr = service.TaskErrorWrapperLocal(errors.New("task_origin_not_exist"), "task_not_exist", http.StatusBadRequest)
  94. return
  95. }
  96. if originTask.ChannelId != info.ChannelId {
  97. channel, err := model.GetChannelById(originTask.ChannelId, true)
  98. if err != nil {
  99. taskErr = service.TaskErrorWrapperLocal(err, "channel_not_found", http.StatusBadRequest)
  100. return
  101. }
  102. if channel.Status != common.ChannelStatusEnabled {
  103. return service.TaskErrorWrapperLocal(errors.New("该任务所属渠道已被禁用"), "task_channel_disable", http.StatusBadRequest)
  104. }
  105. c.Set("base_url", channel.GetBaseURL())
  106. c.Set("channel_id", originTask.ChannelId)
  107. c.Request.Header.Set("Authorization", fmt.Sprintf("Bearer %s", channel.Key))
  108. info.ChannelBaseUrl = channel.GetBaseURL()
  109. info.ChannelId = originTask.ChannelId
  110. }
  111. }
  112. // build body
  113. requestBody, err := adaptor.BuildRequestBody(c, info)
  114. if err != nil {
  115. taskErr = service.TaskErrorWrapper(err, "build_request_failed", http.StatusInternalServerError)
  116. return
  117. }
  118. // do request
  119. resp, err := adaptor.DoRequest(c, info, requestBody)
  120. if err != nil {
  121. taskErr = service.TaskErrorWrapper(err, "do_request_failed", http.StatusInternalServerError)
  122. return
  123. }
  124. // handle response
  125. if resp != nil && resp.StatusCode != http.StatusOK {
  126. responseBody, _ := io.ReadAll(resp.Body)
  127. taskErr = service.TaskErrorWrapper(fmt.Errorf(string(responseBody)), "fail_to_fetch_task", resp.StatusCode)
  128. return
  129. }
  130. defer func() {
  131. // release quota
  132. if info.ConsumeQuota && taskErr == nil {
  133. err := service.PostConsumeQuota(info, quota, 0, true)
  134. if err != nil {
  135. common.SysLog("error consuming token remain quota: " + err.Error())
  136. }
  137. if quota != 0 {
  138. tokenName := c.GetString("token_name")
  139. //gRatio := groupRatio
  140. //if hasUserGroupRatio {
  141. // gRatio = userGroupRatio
  142. //}
  143. logContent := fmt.Sprintf("操作 %s", info.Action)
  144. if len(info.PriceData.OtherRatios) > 0 {
  145. var contents []string
  146. for key, ra := range info.PriceData.OtherRatios {
  147. if 1.0 != ra {
  148. contents = append(contents, fmt.Sprintf("%s: %.2f", key, ra))
  149. }
  150. }
  151. if len(contents) > 0 {
  152. logContent = fmt.Sprintf("%s, 计算参数:%s", logContent, strings.Join(contents, ", "))
  153. }
  154. }
  155. other := make(map[string]interface{})
  156. other["model_price"] = modelPrice
  157. other["group_ratio"] = groupRatio
  158. if hasUserGroupRatio {
  159. other["user_group_ratio"] = userGroupRatio
  160. }
  161. model.RecordConsumeLog(c, info.UserId, model.RecordConsumeLogParams{
  162. ChannelId: info.ChannelId,
  163. ModelName: modelName,
  164. TokenName: tokenName,
  165. Quota: quota,
  166. Content: logContent,
  167. TokenId: info.TokenId,
  168. Group: info.UsingGroup,
  169. Other: other,
  170. })
  171. model.UpdateUserUsedQuotaAndRequestCount(info.UserId, quota)
  172. model.UpdateChannelUsedQuota(info.ChannelId, quota)
  173. }
  174. }
  175. }()
  176. taskID, taskData, taskErr := adaptor.DoResponse(c, resp, info)
  177. if taskErr != nil {
  178. return
  179. }
  180. info.ConsumeQuota = true
  181. // insert task
  182. task := model.InitTask(platform, info)
  183. task.TaskID = taskID
  184. task.Quota = quota
  185. task.Data = taskData
  186. task.Action = info.Action
  187. err = task.Insert()
  188. if err != nil {
  189. taskErr = service.TaskErrorWrapper(err, "insert_task_failed", http.StatusInternalServerError)
  190. return
  191. }
  192. return nil
  193. }
  194. var fetchRespBuilders = map[int]func(c *gin.Context) (respBody []byte, taskResp *dto.TaskError){
  195. relayconstant.RelayModeSunoFetchByID: sunoFetchByIDRespBodyBuilder,
  196. relayconstant.RelayModeSunoFetch: sunoFetchRespBodyBuilder,
  197. relayconstant.RelayModeVideoFetchByID: videoFetchByIDRespBodyBuilder,
  198. }
  199. func RelayTaskFetch(c *gin.Context, relayMode int) (taskResp *dto.TaskError) {
  200. respBuilder, ok := fetchRespBuilders[relayMode]
  201. if !ok {
  202. taskResp = service.TaskErrorWrapperLocal(errors.New("invalid_relay_mode"), "invalid_relay_mode", http.StatusBadRequest)
  203. }
  204. respBody, taskErr := respBuilder(c)
  205. if taskErr != nil {
  206. return taskErr
  207. }
  208. if len(respBody) == 0 {
  209. respBody = []byte("{\"code\":\"success\",\"data\":null}")
  210. }
  211. c.Writer.Header().Set("Content-Type", "application/json")
  212. _, err := io.Copy(c.Writer, bytes.NewBuffer(respBody))
  213. if err != nil {
  214. taskResp = service.TaskErrorWrapper(err, "copy_response_body_failed", http.StatusInternalServerError)
  215. return
  216. }
  217. return
  218. }
  219. func sunoFetchRespBodyBuilder(c *gin.Context) (respBody []byte, taskResp *dto.TaskError) {
  220. userId := c.GetInt("id")
  221. var condition = struct {
  222. IDs []any `json:"ids"`
  223. Action string `json:"action"`
  224. }{}
  225. err := c.BindJSON(&condition)
  226. if err != nil {
  227. taskResp = service.TaskErrorWrapper(err, "invalid_request", http.StatusBadRequest)
  228. return
  229. }
  230. var tasks []any
  231. if len(condition.IDs) > 0 {
  232. taskModels, err := model.GetByTaskIds(userId, condition.IDs)
  233. if err != nil {
  234. taskResp = service.TaskErrorWrapper(err, "get_tasks_failed", http.StatusInternalServerError)
  235. return
  236. }
  237. for _, task := range taskModels {
  238. tasks = append(tasks, TaskModel2Dto(task))
  239. }
  240. } else {
  241. tasks = make([]any, 0)
  242. }
  243. respBody, err = json.Marshal(dto.TaskResponse[[]any]{
  244. Code: "success",
  245. Data: tasks,
  246. })
  247. return
  248. }
  249. func sunoFetchByIDRespBodyBuilder(c *gin.Context) (respBody []byte, taskResp *dto.TaskError) {
  250. taskId := c.Param("id")
  251. userId := c.GetInt("id")
  252. originTask, exist, err := model.GetByTaskId(userId, taskId)
  253. if err != nil {
  254. taskResp = service.TaskErrorWrapper(err, "get_task_failed", http.StatusInternalServerError)
  255. return
  256. }
  257. if !exist {
  258. taskResp = service.TaskErrorWrapperLocal(errors.New("task_not_exist"), "task_not_exist", http.StatusBadRequest)
  259. return
  260. }
  261. respBody, err = json.Marshal(dto.TaskResponse[any]{
  262. Code: "success",
  263. Data: TaskModel2Dto(originTask),
  264. })
  265. return
  266. }
  267. func videoFetchByIDRespBodyBuilder(c *gin.Context) (respBody []byte, taskResp *dto.TaskError) {
  268. taskId := c.Param("task_id")
  269. if taskId == "" {
  270. taskId = c.GetString("task_id")
  271. }
  272. userId := c.GetInt("id")
  273. originTask, exist, err := model.GetByTaskId(userId, taskId)
  274. if err != nil {
  275. taskResp = service.TaskErrorWrapper(err, "get_task_failed", http.StatusInternalServerError)
  276. return
  277. }
  278. if !exist {
  279. taskResp = service.TaskErrorWrapperLocal(errors.New("task_not_exist"), "task_not_exist", http.StatusBadRequest)
  280. return
  281. }
  282. func() {
  283. channelModel, err2 := model.GetChannelById(originTask.ChannelId, true)
  284. if err2 != nil {
  285. return
  286. }
  287. if channelModel.Type != constant.ChannelTypeVertexAi {
  288. return
  289. }
  290. baseURL := constant.ChannelBaseURLs[channelModel.Type]
  291. if channelModel.GetBaseURL() != "" {
  292. baseURL = channelModel.GetBaseURL()
  293. }
  294. adaptor := GetTaskAdaptor(constant.TaskPlatform(strconv.Itoa(channelModel.Type)))
  295. if adaptor == nil {
  296. return
  297. }
  298. resp, err2 := adaptor.FetchTask(baseURL, channelModel.Key, map[string]any{
  299. "task_id": originTask.TaskID,
  300. "action": originTask.Action,
  301. })
  302. if err2 != nil || resp == nil {
  303. return
  304. }
  305. defer resp.Body.Close()
  306. body, err2 := io.ReadAll(resp.Body)
  307. if err2 != nil {
  308. return
  309. }
  310. ti, err2 := adaptor.ParseTaskResult(body)
  311. if err2 == nil && ti != nil {
  312. if ti.Status != "" {
  313. originTask.Status = model.TaskStatus(ti.Status)
  314. }
  315. if ti.Progress != "" {
  316. originTask.Progress = ti.Progress
  317. }
  318. if ti.Url != "" {
  319. originTask.FailReason = ti.Url
  320. }
  321. _ = originTask.Update()
  322. var raw map[string]any
  323. _ = json.Unmarshal(body, &raw)
  324. format := "mp4"
  325. if respObj, ok := raw["response"].(map[string]any); ok {
  326. if vids, ok := respObj["videos"].([]any); ok && len(vids) > 0 {
  327. if v0, ok := vids[0].(map[string]any); ok {
  328. if mt, ok := v0["mimeType"].(string); ok && mt != "" {
  329. if strings.Contains(mt, "mp4") {
  330. format = "mp4"
  331. } else {
  332. format = mt
  333. }
  334. }
  335. }
  336. }
  337. }
  338. status := "processing"
  339. switch originTask.Status {
  340. case model.TaskStatusSuccess:
  341. status = "succeeded"
  342. case model.TaskStatusFailure:
  343. status = "failed"
  344. case model.TaskStatusQueued, model.TaskStatusSubmitted:
  345. status = "queued"
  346. }
  347. out := map[string]any{
  348. "error": nil,
  349. "format": format,
  350. "metadata": nil,
  351. "status": status,
  352. "task_id": originTask.TaskID,
  353. "url": originTask.FailReason,
  354. }
  355. respBody, _ = json.Marshal(dto.TaskResponse[any]{
  356. Code: "success",
  357. Data: out,
  358. })
  359. }
  360. }()
  361. if len(respBody) != 0 {
  362. return
  363. }
  364. if strings.HasPrefix(c.Request.RequestURI, "/v1/videos/") {
  365. adaptor := GetTaskAdaptor(originTask.Platform)
  366. if adaptor == nil {
  367. taskResp = service.TaskErrorWrapperLocal(fmt.Errorf("invalid channel id: %d", originTask.ChannelId), "invalid_channel_id", http.StatusBadRequest)
  368. return
  369. }
  370. if converter, ok := adaptor.(channel.OpenAIVideoConverter); ok {
  371. openAIVideo, err := converter.ConvertToOpenAIVideo(originTask)
  372. if err != nil {
  373. taskResp = service.TaskErrorWrapper(err, "convert_to_openai_video_failed", http.StatusInternalServerError)
  374. return
  375. }
  376. respBody, _ = json.Marshal(openAIVideo)
  377. return
  378. }
  379. taskResp = service.TaskErrorWrapperLocal(errors.New(fmt.Sprintf("not_implemented:%s", originTask.Platform)), "not_implemented", http.StatusNotImplemented)
  380. return
  381. }
  382. respBody, err = json.Marshal(dto.TaskResponse[any]{
  383. Code: "success",
  384. Data: TaskModel2Dto(originTask),
  385. })
  386. if err != nil {
  387. taskResp = service.TaskErrorWrapper(err, "marshal_response_failed", http.StatusInternalServerError)
  388. }
  389. return
  390. }
  391. func TaskModel2Dto(task *model.Task) *dto.TaskDto {
  392. return &dto.TaskDto{
  393. TaskID: task.TaskID,
  394. Action: task.Action,
  395. Status: string(task.Status),
  396. FailReason: task.FailReason,
  397. SubmitTime: task.SubmitTime,
  398. StartTime: task.StartTime,
  399. FinishTime: task.FinishTime,
  400. Progress: task.Progress,
  401. Data: task.Data,
  402. }
  403. }