processTask.ts 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. import { execa } from "execa"
  2. import { type TaskEvent, RooCodeEventName } from "@roo-code/types"
  3. import { findRun, findTask, updateTask } from "../db/index"
  4. import { Logger, getTag, isDockerContainer } from "./utils"
  5. import { redisClient, getPubSubKey, registerRunner, deregisterRunner } from "./redis"
  6. import { runUnitTest } from "./runUnitTest"
  7. import { runTaskWithCli } from "./runTaskInCli"
  8. import { runTaskInVscode } from "./runTaskInVscode"
  9. export const processTask = async ({
  10. taskId,
  11. jobToken,
  12. logger,
  13. }: {
  14. taskId: number
  15. jobToken: string | null
  16. logger?: Logger
  17. }) => {
  18. const task = await findTask(taskId)
  19. const { language, exercise } = task
  20. const run = await findRun(task.runId)
  21. await registerRunner({ runId: run.id, taskId, timeoutSeconds: (run.timeout || 5) * 60 })
  22. const containerized = isDockerContainer()
  23. logger =
  24. logger ||
  25. new Logger({
  26. logDir: containerized ? `/var/log/evals/runs/${run.id}` : `/tmp/evals/runs/${run.id}`,
  27. filename: `${language}-${exercise}.log`,
  28. tag: getTag("runTask", { run, task }),
  29. })
  30. try {
  31. const publish = async (e: TaskEvent) => {
  32. const redis = await redisClient()
  33. await redis.publish(getPubSubKey(run.id), JSON.stringify(e))
  34. }
  35. const executionMethod = run.executionMethod || "vscode"
  36. logger.info(`running task ${task.id} (${language}/${exercise}) via ${executionMethod}...`)
  37. if (executionMethod === "cli") {
  38. await runTaskWithCli({ run, task, jobToken, publish, logger })
  39. } else {
  40. await runTaskInVscode({ run, task, jobToken, publish, logger })
  41. }
  42. logger.info(`testing task ${task.id} (${language}/${exercise})...`)
  43. const passed = await runUnitTest({ task, logger })
  44. logger.info(`task ${task.id} (${language}/${exercise}) -> ${passed}`)
  45. await updateTask(task.id, { passed })
  46. await publish({
  47. eventName: passed ? RooCodeEventName.EvalPass : RooCodeEventName.EvalFail,
  48. taskId: task.id,
  49. })
  50. } finally {
  51. await deregisterRunner({ runId: run.id, taskId })
  52. }
  53. }
  54. export const processTaskInContainer = async ({
  55. taskId,
  56. jobToken,
  57. logger,
  58. maxRetries = 10,
  59. }: {
  60. taskId: number
  61. jobToken: string | null
  62. logger: Logger
  63. maxRetries?: number
  64. }) => {
  65. const baseArgs = [
  66. "--rm",
  67. "--network evals_default",
  68. "-v /var/run/docker.sock:/var/run/docker.sock",
  69. "-v /tmp/evals:/var/log/evals",
  70. "-e HOST_EXECUTION_METHOD=docker",
  71. ]
  72. if (jobToken) {
  73. baseArgs.push(`-e ROO_CODE_CLOUD_TOKEN=${jobToken}`)
  74. }
  75. // Pass API keys to the container so the CLI can authenticate
  76. const apiKeyEnvVars = [
  77. "OPENROUTER_API_KEY",
  78. "ANTHROPIC_API_KEY",
  79. "OPENAI_API_KEY",
  80. "GOOGLE_API_KEY",
  81. "DEEPSEEK_API_KEY",
  82. "MISTRAL_API_KEY",
  83. ]
  84. for (const envVar of apiKeyEnvVars) {
  85. if (process.env[envVar]) {
  86. baseArgs.push(`-e ${envVar}=${process.env[envVar]}`)
  87. }
  88. }
  89. const command = `pnpm --filter @roo-code/evals cli --taskId ${taskId}`
  90. logger.info(command)
  91. for (let attempt = 0; attempt <= maxRetries; attempt++) {
  92. const containerName = `evals-task-${taskId}.${attempt}`
  93. const args = [`--name ${containerName}`, `-e EVALS_ATTEMPT=${attempt}`, ...baseArgs]
  94. const isRetry = attempt > 0
  95. if (isRetry) {
  96. const delayMs = Math.pow(2, attempt - 1) * 1000 * (0.5 + Math.random())
  97. logger.info(`retrying in ${delayMs}ms (attempt ${attempt + 1}/${maxRetries + 1})`)
  98. await new Promise((resolve) => setTimeout(resolve, delayMs))
  99. }
  100. logger.info(
  101. `${isRetry ? "retrying" : "executing"} container command (attempt ${attempt + 1}/${maxRetries + 1})`,
  102. )
  103. const subprocess = execa(`docker run ${args.join(" ")} evals-runner sh -c "${command}"`, { shell: true })
  104. // subprocess.stdout?.on("data", (data) => console.log(data.toString()))
  105. // subprocess.stderr?.on("data", (data) => console.error(data.toString()))
  106. try {
  107. const result = await subprocess
  108. logger.info(`container process completed with exit code: ${result.exitCode}`)
  109. return
  110. } catch (error) {
  111. if (error && typeof error === "object" && "exitCode" in error) {
  112. logger.error(
  113. `container process failed with exit code: ${error.exitCode} (attempt ${attempt + 1}/${maxRetries + 1})`,
  114. )
  115. } else {
  116. logger.error(`container process failed with error: ${error} (attempt ${attempt + 1}/${maxRetries + 1})`)
  117. }
  118. if (attempt === maxRetries) {
  119. break
  120. }
  121. }
  122. }
  123. logger.error(`all ${maxRetries + 1} attempts failed, giving up`)
  124. // TODO: Mark task as failed.
  125. }