|
|
@@ -15,11 +15,21 @@ import {
|
|
|
} from "@roo-code/types"
|
|
|
import { IpcClient } from "@roo-code/ipc"
|
|
|
|
|
|
-import { type Run, type Task, updateTask, createTaskMetrics, updateTaskMetrics, createToolError } from "../db/index.js"
|
|
|
-import { exercisesPath } from "../exercises/index.js"
|
|
|
-
|
|
|
-import { isDockerContainer } from "./utils.js"
|
|
|
-import { FileLogger } from "./FileLogger.js"
|
|
|
+import {
|
|
|
+ type Run,
|
|
|
+ type Task,
|
|
|
+ findRun,
|
|
|
+ findTask,
|
|
|
+ updateTask,
|
|
|
+ createTaskMetrics,
|
|
|
+ updateTaskMetrics,
|
|
|
+ createToolError,
|
|
|
+} from "../db/index.js"
|
|
|
+import { EVALS_REPO_PATH } from "../exercises/index.js"
|
|
|
+
|
|
|
+import { Logger, getTag, isDockerContainer } from "./utils.js"
|
|
|
+import { redisClient, getPubSubKey, registerRunner, deregisterRunner } from "./redis.js"
|
|
|
+import { runUnitTest } from "./runUnitTest.js"
|
|
|
|
|
|
class SubprocessTimeoutError extends Error {
|
|
|
constructor(timeout: number) {
|
|
|
@@ -28,17 +38,118 @@ class SubprocessTimeoutError extends Error {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+export const processTask = async ({ taskId, logger }: { taskId: number; logger?: Logger }) => {
|
|
|
+ const task = await findTask(taskId)
|
|
|
+ const { language, exercise } = task
|
|
|
+ const run = await findRun(task.runId)
|
|
|
+ await registerRunner({ runId: run.id, taskId })
|
|
|
+
|
|
|
+ logger =
|
|
|
+ logger ||
|
|
|
+ new Logger({
|
|
|
+ logDir: `/var/log/evals/runs/${run.id}`,
|
|
|
+ filename: `${language}-${exercise}.log`,
|
|
|
+ tag: getTag("runTask", { run, task }),
|
|
|
+ })
|
|
|
+
|
|
|
+ try {
|
|
|
+ const publish = async (e: TaskEvent) => {
|
|
|
+ const redis = await redisClient()
|
|
|
+ await redis.publish(getPubSubKey(run.id), JSON.stringify(e))
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info(`running task ${task.id} (${language}/${exercise})...`)
|
|
|
+ await runTask({ run, task, publish, logger })
|
|
|
+
|
|
|
+ logger.info(`testing task ${task.id} (${language}/${exercise})...`)
|
|
|
+ const passed = await runUnitTest({ task, logger })
|
|
|
+
|
|
|
+ logger.info(`task ${task.id} (${language}/${exercise}) -> ${passed}`)
|
|
|
+ await updateTask(task.id, { passed })
|
|
|
+
|
|
|
+ await publish({
|
|
|
+ eventName: passed ? RooCodeEventName.EvalPass : RooCodeEventName.EvalFail,
|
|
|
+ taskId: task.id,
|
|
|
+ })
|
|
|
+ } finally {
|
|
|
+ await deregisterRunner({ runId: run.id, taskId })
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+export const processTaskInContainer = async ({
|
|
|
+ taskId,
|
|
|
+ logger,
|
|
|
+ maxRetries = 10,
|
|
|
+}: {
|
|
|
+ taskId: number
|
|
|
+ logger: Logger
|
|
|
+ maxRetries?: number
|
|
|
+}) => {
|
|
|
+ const baseArgs = [
|
|
|
+ "--rm",
|
|
|
+ "--network evals_default",
|
|
|
+ "-v /var/run/docker.sock:/var/run/docker.sock",
|
|
|
+ "-v /tmp/evals:/var/log/evals",
|
|
|
+ "-e HOST_EXECUTION_METHOD=docker",
|
|
|
+ ]
|
|
|
+
|
|
|
+ const command = `pnpm --filter @roo-code/evals cli --taskId ${taskId}`
|
|
|
+ logger.info(command)
|
|
|
+
|
|
|
+ for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
|
|
+ const containerName = `evals-task-${taskId}.${attempt}`
|
|
|
+ const args = [`--name ${containerName}`, ...baseArgs]
|
|
|
+ const isRetry = attempt > 0
|
|
|
+
|
|
|
+ if (isRetry) {
|
|
|
+ const delayMs = Math.pow(2, attempt - 1) * 1000 * (0.5 + Math.random())
|
|
|
+ logger.info(`retrying in ${delayMs}ms (attempt ${attempt + 1}/${maxRetries + 1})`)
|
|
|
+ await new Promise((resolve) => setTimeout(resolve, delayMs))
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ `${isRetry ? "retrying" : "executing"} container command (attempt ${attempt + 1}/${maxRetries + 1})`,
|
|
|
+ )
|
|
|
+
|
|
|
+ const subprocess = execa(`docker run ${args.join(" ")} evals-runner sh -c "${command}"`, { shell: true })
|
|
|
+ // subprocess.stdout?.on("data", (data) => console.log(data.toString()))
|
|
|
+ // subprocess.stderr?.on("data", (data) => console.error(data.toString()))
|
|
|
+
|
|
|
+ try {
|
|
|
+ const result = await subprocess
|
|
|
+ logger.info(`container process completed with exit code: ${result.exitCode}`)
|
|
|
+ return
|
|
|
+ } catch (error) {
|
|
|
+ if (error && typeof error === "object" && "exitCode" in error) {
|
|
|
+ logger.error(
|
|
|
+ `container process failed with exit code: ${error.exitCode} (attempt ${attempt + 1}/${maxRetries + 1})`,
|
|
|
+ )
|
|
|
+ } else {
|
|
|
+ logger.error(`container process failed with error: ${error} (attempt ${attempt + 1}/${maxRetries + 1})`)
|
|
|
+ }
|
|
|
+
|
|
|
+ if (attempt === maxRetries) {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.error(`all ${maxRetries + 1} attempts failed, giving up`)
|
|
|
+
|
|
|
+ // TODO: Mark task as failed.
|
|
|
+}
|
|
|
+
|
|
|
type RunTaskOptions = {
|
|
|
run: Run
|
|
|
task: Task
|
|
|
publish: (taskEvent: TaskEvent) => Promise<void>
|
|
|
- logger: FileLogger
|
|
|
+ logger: Logger
|
|
|
}
|
|
|
|
|
|
export const runTask = async ({ run, task, publish, logger }: RunTaskOptions) => {
|
|
|
const { language, exercise } = task
|
|
|
- const prompt = fs.readFileSync(path.resolve(exercisesPath, `prompts/${language}.md`), "utf-8")
|
|
|
- const workspacePath = path.resolve(exercisesPath, language, exercise)
|
|
|
+ const prompt = fs.readFileSync(path.resolve(EVALS_REPO_PATH, `prompts/${language}.md`), "utf-8")
|
|
|
+ const workspacePath = path.resolve(EVALS_REPO_PATH, language, exercise)
|
|
|
const ipcSocketPath = path.resolve(os.tmpdir(), `evals-${run.id}-${task.id}.sock`)
|
|
|
const env = { ROO_CODE_IPC_SOCKET_PATH: ipcSocketPath }
|
|
|
const controller = new AbortController()
|
|
|
@@ -87,6 +198,7 @@ export const runTask = async ({ run, task, publish, logger }: RunTaskOptions) =>
|
|
|
let taskStartedAt = Date.now()
|
|
|
let taskFinishedAt: number | undefined
|
|
|
let taskAbortedAt: number | undefined
|
|
|
+ let taskTimedOut: boolean = false
|
|
|
let taskMetricsId: number | undefined
|
|
|
let rooTaskId: string | undefined
|
|
|
let isClientDisconnected = false
|
|
|
@@ -196,6 +308,7 @@ export const runTask = async ({ run, task, publish, logger }: RunTaskOptions) =>
|
|
|
timeout: EVALS_TIMEOUT,
|
|
|
})
|
|
|
} catch (_error) {
|
|
|
+ taskTimedOut = true
|
|
|
logger.error("time limit reached")
|
|
|
|
|
|
if (rooTaskId && !isClientDisconnected) {
|
|
|
@@ -207,16 +320,16 @@ export const runTask = async ({ run, task, publish, logger }: RunTaskOptions) =>
|
|
|
taskFinishedAt = Date.now()
|
|
|
}
|
|
|
|
|
|
- if (taskFinishedAt) {
|
|
|
- logger.info("setting task finished at")
|
|
|
- await updateTask(task.id, { finishedAt: new Date() })
|
|
|
- }
|
|
|
-
|
|
|
- if (!taskFinishedAt && isClientDisconnected) {
|
|
|
+ if (!taskFinishedAt && !taskTimedOut) {
|
|
|
logger.error("client disconnected before task finished")
|
|
|
throw new Error("Client disconnected before task completion.")
|
|
|
}
|
|
|
|
|
|
+ // If the task was aborted unexpectedly or the client disconnected
|
|
|
+ // unexpectedly, then throw to trigger a retry.
|
|
|
+ logger.info("setting task finished at")
|
|
|
+ await updateTask(task.id, { finishedAt: new Date() })
|
|
|
+
|
|
|
if (rooTaskId && !isClientDisconnected) {
|
|
|
logger.info("closing task")
|
|
|
client.sendCommand({ commandName: TaskCommandName.CloseTask, data: rooTaskId })
|