runTaskInVscode.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. import * as fs from "fs"
  2. import * as path from "path"
  3. import * as os from "node:os"
  4. import pWaitFor from "p-wait-for"
  5. import { execa } from "execa"
  6. import {
  7. type ClineSay,
  8. type ToolUsage,
  9. TaskCommandName,
  10. RooCodeEventName,
  11. IpcMessageType,
  12. EVALS_SETTINGS,
  13. } from "@roo-code/types"
  14. import { IpcClient } from "@roo-code/ipc"
  15. import { updateTask, createTaskMetrics, updateTaskMetrics, createToolError } from "../db/index.js"
  16. import { EVALS_REPO_PATH } from "../exercises/index.js"
  17. import { type RunTaskOptions } from "./types.js"
  18. import { isDockerContainer, copyConversationHistory, mergeToolUsage, waitForSubprocessWithTimeout } from "./utils.js"
  19. import { MessageLogDeduper } from "./messageLogDeduper.js"
  20. export const runTaskInVscode = async ({ run, task, publish, logger, jobToken }: RunTaskOptions) => {
  21. const { language, exercise } = task
  22. const prompt = fs.readFileSync(path.resolve(EVALS_REPO_PATH, `prompts/${language}.md`), "utf-8")
  23. const workspacePath = path.resolve(EVALS_REPO_PATH, language, exercise)
  24. const ipcSocketPath = path.resolve(os.tmpdir(), `evals-${run.id}-${task.id}.sock`)
  25. const env = { ROO_CODE_IPC_SOCKET_PATH: ipcSocketPath }
  26. const controller = new AbortController()
  27. const cancelSignal = controller.signal
  28. const containerized = isDockerContainer()
  29. const logDir = containerized ? `/var/log/evals/runs/${run.id}` : `/tmp/evals/runs/${run.id}`
  30. let codeCommand = containerized
  31. ? `xvfb-run --auto-servernum --server-num=1 code --wait --log trace --disable-workspace-trust --disable-gpu --disable-lcd-text --no-sandbox --user-data-dir /roo/.vscode --password-store="basic" -n ${workspacePath}`
  32. : `code --disable-workspace-trust -n ${workspacePath}`
  33. if (jobToken) {
  34. codeCommand = `ROO_CODE_CLOUD_TOKEN=${jobToken} ${codeCommand}`
  35. }
  36. logger.info(codeCommand)
  37. // Sleep for a random amount of time between 5 and 10 seconds, unless we're
  38. // running in a container, in which case there are no issues with flooding
  39. // VSCode with new windows.
  40. if (!containerized) {
  41. await new Promise((resolve) => setTimeout(resolve, Math.random() * 5_000 + 5_000))
  42. }
  43. const subprocess = execa({ env, shell: "/bin/bash", cancelSignal })`${codeCommand}`
  44. // If debugging, add `--verbose` to `command` and uncomment the following line.
  45. // subprocess.stdout.pipe(process.stdout)
  46. // Give VSCode some time to spawn before connecting to its unix socket.
  47. await new Promise((resolve) => setTimeout(resolve, 3_000))
  48. let client: IpcClient | undefined = undefined
  49. let attempts = 5
  50. while (true) {
  51. try {
  52. client = new IpcClient(ipcSocketPath)
  53. await pWaitFor(() => client!.isReady, { interval: 250, timeout: 1_000 })
  54. break
  55. } catch (_error) {
  56. client?.disconnect()
  57. attempts--
  58. if (attempts <= 0) {
  59. logger.error(`unable to connect to IPC socket -> ${ipcSocketPath}`)
  60. throw new Error("Unable to connect.")
  61. }
  62. }
  63. }
  64. let taskStartedAt = Date.now()
  65. let taskFinishedAt: number | undefined
  66. let taskAbortedAt: number | undefined
  67. let taskTimedOut: boolean = false
  68. let taskMetricsId: number | undefined
  69. let rooTaskId: string | undefined
  70. let isClientDisconnected = false
  71. // Track accumulated tool usage across task instances (handles rehydration after abort)
  72. const accumulatedToolUsage: ToolUsage = {}
  73. // Promise that resolves when taskMetricsId is set, preventing race conditions
  74. // where TaskTokenUsageUpdated arrives before TaskStarted handler completes
  75. let resolveTaskMetricsReady: () => void
  76. const taskMetricsReady = new Promise<void>((resolve) => {
  77. resolveTaskMetricsReady = resolve
  78. })
  79. const ignoreEvents: Record<"broadcast" | "log", RooCodeEventName[]> = {
  80. broadcast: [RooCodeEventName.Message],
  81. log: [RooCodeEventName.TaskTokenUsageUpdated, RooCodeEventName.TaskAskResponded],
  82. }
  83. const loggableSays: ClineSay[] = [
  84. "error",
  85. "command_output",
  86. "rooignore_error",
  87. "diff_error",
  88. "condense_context",
  89. "condense_context_error",
  90. "api_req_rate_limit_wait",
  91. "api_req_retry_delayed",
  92. "api_req_retried",
  93. ]
  94. let isApiUnstable = false
  95. const messageLogDeduper = new MessageLogDeduper()
  96. client.on(IpcMessageType.TaskEvent, async (taskEvent) => {
  97. const { eventName, payload } = taskEvent
  98. if (
  99. eventName === RooCodeEventName.Message &&
  100. payload[0].message.say &&
  101. ["api_req_retry_delayed", "api_req_retried"].includes(payload[0].message.say)
  102. ) {
  103. isApiUnstable = true
  104. }
  105. // Publish all events except for these to Redis.
  106. if (!ignoreEvents.broadcast.includes(eventName)) {
  107. await publish({ ...taskEvent, taskId: task.id })
  108. }
  109. // Log all events except for these.
  110. // For message events we only log non-partial messages.
  111. if (
  112. !ignoreEvents.log.includes(eventName) &&
  113. (eventName !== RooCodeEventName.Message ||
  114. (payload[0].message.say && loggableSays.includes(payload[0].message.say)) ||
  115. payload[0].message.partial !== true)
  116. ) {
  117. // Dedupe identical repeated message events (same message.ts + same payload)
  118. if (eventName === RooCodeEventName.Message) {
  119. const action = payload[0]?.action as string | undefined
  120. const message = payload[0]?.message
  121. if (!messageLogDeduper.shouldLog(action, message)) {
  122. return
  123. }
  124. }
  125. // Extract tool name for tool-related messages for clearer logging
  126. let logEventName: string = eventName
  127. if (eventName === RooCodeEventName.Message && payload[0]?.message?.ask === "tool") {
  128. try {
  129. const textJson = JSON.parse(payload[0].message.text ?? "{}")
  130. if (textJson.tool) {
  131. logEventName = `${eventName} (tool: ${textJson.tool})`
  132. }
  133. } catch {
  134. // If parsing fails, use the default event name
  135. }
  136. } else if (eventName === RooCodeEventName.Message && payload[0]?.message?.ask === "command") {
  137. logEventName = `${eventName} (command)`
  138. } else if (eventName === RooCodeEventName.Message && payload[0]?.message?.ask === "completion_result") {
  139. logEventName = `${eventName} (completion_result)`
  140. }
  141. logger.info(`${logEventName} ->`, payload)
  142. }
  143. if (eventName === RooCodeEventName.TaskStarted) {
  144. taskStartedAt = Date.now()
  145. const taskMetrics = await createTaskMetrics({
  146. cost: 0,
  147. tokensIn: 0,
  148. tokensOut: 0,
  149. tokensContext: 0,
  150. duration: 0,
  151. cacheWrites: 0,
  152. cacheReads: 0,
  153. })
  154. await updateTask(task.id, { taskMetricsId: taskMetrics.id, startedAt: new Date() })
  155. taskStartedAt = Date.now()
  156. taskMetricsId = taskMetrics.id
  157. rooTaskId = payload[0]
  158. // Signal that taskMetricsId is now ready for other handlers
  159. resolveTaskMetricsReady()
  160. }
  161. if (eventName === RooCodeEventName.TaskToolFailed) {
  162. const [_taskId, toolName, error] = payload
  163. await createToolError({ taskId: task.id, toolName, error })
  164. }
  165. if (eventName === RooCodeEventName.TaskTokenUsageUpdated || eventName === RooCodeEventName.TaskCompleted) {
  166. // Wait for taskMetricsId to be set by the TaskStarted handler.
  167. // This prevents a race condition where these events arrive before
  168. // the TaskStarted handler finishes its async database operations.
  169. // Note: taskMetricsReady is also resolved on disconnect to prevent deadlock.
  170. await taskMetricsReady
  171. // Guard: taskMetricsReady may have been resolved due to disconnect
  172. // without taskMetricsId being set. Skip metrics update in this case.
  173. if (!taskMetricsId) {
  174. logger.info(`skipping metrics update: taskMetricsId not set (event: ${eventName})`)
  175. return
  176. }
  177. const duration = Date.now() - taskStartedAt
  178. const { totalCost, totalTokensIn, totalTokensOut, contextTokens, totalCacheWrites, totalCacheReads } =
  179. payload[1]
  180. // For both TaskTokenUsageUpdated and TaskCompleted: toolUsage is payload[2]
  181. const incomingToolUsage: ToolUsage = payload[2] ?? {}
  182. mergeToolUsage(accumulatedToolUsage, incomingToolUsage)
  183. await updateTaskMetrics(taskMetricsId, {
  184. cost: totalCost,
  185. tokensIn: totalTokensIn,
  186. tokensOut: totalTokensOut,
  187. tokensContext: contextTokens,
  188. duration,
  189. cacheWrites: totalCacheWrites ?? 0,
  190. cacheReads: totalCacheReads ?? 0,
  191. toolUsage: accumulatedToolUsage,
  192. })
  193. }
  194. if (eventName === RooCodeEventName.TaskAborted) {
  195. taskAbortedAt = Date.now()
  196. }
  197. if (eventName === RooCodeEventName.TaskCompleted) {
  198. taskFinishedAt = Date.now()
  199. }
  200. })
  201. client.on(IpcMessageType.Disconnect, async () => {
  202. logger.info(`disconnected from IPC socket -> ${ipcSocketPath}`)
  203. isClientDisconnected = true
  204. // Resolve taskMetricsReady to unblock any handlers waiting on it.
  205. // This prevents deadlock if TaskStarted never fired or threw before resolving.
  206. // The handlers check for taskMetricsId being set before proceeding.
  207. resolveTaskMetricsReady()
  208. })
  209. client.sendCommand({
  210. commandName: TaskCommandName.StartNewTask,
  211. data: {
  212. configuration: {
  213. ...EVALS_SETTINGS,
  214. openRouterApiKey: process.env.OPENROUTER_API_KEY,
  215. ...run.settings, // Allow the provided settings to override `openRouterApiKey`.
  216. },
  217. text: prompt,
  218. },
  219. })
  220. try {
  221. const timeoutMs = (run.timeout || 5) * 60 * 1_000 // Convert minutes to milliseconds
  222. await pWaitFor(() => !!taskFinishedAt || !!taskAbortedAt || isClientDisconnected, {
  223. interval: 1_000,
  224. timeout: timeoutMs,
  225. })
  226. } catch (_error) {
  227. taskTimedOut = true
  228. logger.error("time limit reached")
  229. if (rooTaskId && !isClientDisconnected) {
  230. logger.info("cancelling task")
  231. client.sendCommand({ commandName: TaskCommandName.CancelTask, data: rooTaskId })
  232. await new Promise((resolve) => setTimeout(resolve, 5_000)) // Allow some time for the task to cancel.
  233. }
  234. taskFinishedAt = Date.now()
  235. }
  236. if (!taskFinishedAt && !taskTimedOut) {
  237. logger.error("client disconnected before task finished")
  238. throw new Error("Client disconnected before task completion.")
  239. }
  240. // If the task was aborted unexpectedly or the client disconnected
  241. // unexpectedly, then throw to trigger a retry.
  242. logger.info("setting task finished at")
  243. await updateTask(task.id, { finishedAt: new Date() })
  244. if (rooTaskId && !isClientDisconnected) {
  245. logger.info("closing task")
  246. client.sendCommand({ commandName: TaskCommandName.CloseTask, data: rooTaskId })
  247. await new Promise((resolve) => setTimeout(resolve, 2_000)) // Allow some time for the window to close.
  248. }
  249. if (!isClientDisconnected) {
  250. logger.info("disconnecting client")
  251. client.disconnect()
  252. }
  253. logger.info("waiting for subprocess to finish")
  254. controller.abort()
  255. await waitForSubprocessWithTimeout({ subprocess, logger })
  256. // Copy conversation history files from VS Code extension storage to the log directory
  257. // for post-mortem analysis. Only do this in containerized mode where we have a known path.
  258. if (containerized && rooTaskId) {
  259. await copyConversationHistory({
  260. rooTaskId,
  261. logDir,
  262. language,
  263. exercise,
  264. iteration: task.iteration,
  265. logger,
  266. })
  267. }
  268. logger.close()
  269. // Only throw for API instability if the task didn't complete successfully.
  270. // If taskFinishedAt is set via TaskCompleted event, the task succeeded despite
  271. // API retries, so re-running from scratch would waste resources.
  272. if (isApiUnstable && !taskFinishedAt) {
  273. throw new Error("API is unstable, throwing to trigger a retry.")
  274. }
  275. }