Explorar o código

Harden evals with retry logic + centralize logs on Docker host (#4440)

* Harden evals with retry logic + centralize logs on Docker host

* Don't delete controller container on exit

* Add randomness

* More tweaks

* More tweaks
Chris Estreich hai 6 meses
pai
achega
52673b3721

+ 2 - 1
apps/web-evals/src/actions/runs.ts

@@ -52,9 +52,10 @@ export async function createRun({ suite, exercises = [], systemPrompt, ...values
 
 		const dockerArgs = [
 			`--name evals-controller-${run.id}`,
-			"--rm",
+			// "--rm",
 			"--network evals_default",
 			"-v /var/run/docker.sock:/var/run/docker.sock",
+			"-v /tmp/evals:/var/log/evals",
 			"-e HOST_EXECUTION_METHOD=docker",
 		]
 

+ 1 - 0
packages/evals/docker-compose.yml

@@ -72,6 +72,7 @@ services:
             - HOST_EXECUTION_METHOD=docker
         volumes:
             - /var/run/docker.sock:/var/run/docker.sock
+            - /tmp/evals:/var/log/evals
         stdin_open: true
         tty: true
         profiles:

+ 86 - 0
packages/evals/src/cli/FileLogger.ts

@@ -0,0 +1,86 @@
+import * as fs from "fs"
+import * as path from "path"
+
+export enum LogLevel {
+	INFO = "INFO",
+	ERROR = "ERROR",
+	WARN = "WARN",
+	DEBUG = "DEBUG",
+}
+
+export interface LoggerOptions {
+	logDir: string
+	filename: string
+	tag: string
+}
+
+export class FileLogger {
+	private logStream: fs.WriteStream | undefined
+	private logFilePath: string
+	private tag: string
+
+	constructor({ logDir, filename, tag }: LoggerOptions) {
+		this.tag = tag
+		this.logFilePath = path.join(logDir, filename)
+		this.initializeLogger(logDir)
+	}
+
+	private initializeLogger(logDir: string): void {
+		try {
+			fs.mkdirSync(logDir, { recursive: true })
+		} catch (error) {
+			console.error(`Failed to create log directory ${logDir}:`, error)
+		}
+
+		try {
+			this.logStream = fs.createWriteStream(this.logFilePath, { flags: "a" })
+		} catch (error) {
+			console.error(`Failed to create log file ${this.logFilePath}:`, error)
+		}
+	}
+
+	private writeToLog(level: LogLevel, message: string, ...args: unknown[]) {
+		try {
+			const timestamp = new Date().toISOString()
+
+			const logLine = `[${timestamp} | ${level} | ${this.tag}] ${message} ${
+				args.length > 0 ? JSON.stringify(args) : ""
+			}\n`
+
+			console.log(logLine.trim())
+
+			if (this.logStream) {
+				this.logStream.write(logLine)
+			}
+		} catch (error) {
+			console.error(`Failed to write to log file ${this.logFilePath}:`, error)
+		}
+	}
+
+	public info(message: string, ...args: unknown[]): void {
+		this.writeToLog(LogLevel.INFO, message, ...args)
+	}
+
+	public error(message: string, ...args: unknown[]): void {
+		this.writeToLog(LogLevel.ERROR, message, ...args)
+	}
+
+	public warn(message: string, ...args: unknown[]): void {
+		this.writeToLog(LogLevel.WARN, message, ...args)
+	}
+
+	public debug(message: string, ...args: unknown[]): void {
+		this.writeToLog(LogLevel.DEBUG, message, ...args)
+	}
+
+	public log(message: string, ...args: unknown[]): void {
+		this.info(message, ...args)
+	}
+
+	public close(): void {
+		if (this.logStream) {
+			this.logStream.end()
+			this.logStream = undefined
+		}
+	}
+}

+ 1 - 1
packages/evals/src/cli/index.ts

@@ -32,7 +32,7 @@ const main = async () => {
 					if (runId !== -1) {
 						await runEvals(runId)
 					} else {
-						await processTask(taskId)
+						await processTask({ taskId })
 					}
 				} catch (error) {
 					console.error(error)

+ 71 - 15
packages/evals/src/cli/processTask.ts

@@ -1,33 +1,42 @@
+import { execa } from "execa"
+
 import { RooCodeEventName, type TaskEvent } from "@roo-code/types"
 
 import { findTask, updateTask, findRun } from "../db/index.js"
 
 import { getTag } from "./utils.js"
+import { FileLogger } from "./FileLogger.js"
 import { redisClient, getPubSubKey, registerRunner, deregisterRunner } from "./redis.js"
 import { runTask } from "./runTask.js"
 import { runUnitTest } from "./runUnitTest.js"
-import { execa } from "execa"
 
-export const processTask = async (taskId: number) => {
+export const processTask = async ({ taskId, logger }: { taskId: number; logger?: FileLogger }) => {
 	const task = await findTask(taskId)
+	const { language, exercise } = task
 	const run = await findRun(task.runId)
 	await registerRunner({ runId: run.id, taskId })
 
-	try {
-		const tag = getTag("processTask", { run, task })
+	logger =
+		logger ||
+		new FileLogger({
+			logDir: `/var/log/evals/runs/${run.id}`,
+			filename: `${language}-${exercise}.log`,
+			tag: getTag("runTask", { run, task }),
+		})
 
+	try {
 		const publish = async (e: TaskEvent) => {
 			const redis = await redisClient()
 			await redis.publish(getPubSubKey(run.id), JSON.stringify(e))
 		}
 
-		console.log(`[${Date.now()} | ${tag}] running task ${task.id} (${task.language}/${task.exercise})...`)
-		await runTask({ run, task, publish })
+		logger.info(`running task ${task.id} (${language}/${exercise})...`)
+		await runTask({ run, task, publish, logger })
 
-		console.log(`[${Date.now()} | ${tag}] testing task ${task.id} (${task.language}/${task.exercise})...`)
+		logger.info(`testing task ${task.id} (${language}/${exercise})...`)
 		const passed = await runUnitTest({ run, task })
 
-		console.log(`[${Date.now()} | ${tag}] task ${task.id} (${task.language}/${task.exercise}) -> ${passed}`)
+		logger.info(`task ${task.id} (${language}/${exercise}) -> ${passed}`)
 		await updateTask(task.id, { passed })
 
 		await publish({
@@ -39,18 +48,65 @@ export const processTask = async (taskId: number) => {
 	}
 }
 
-export const processTaskInContainer = async (taskId: number) => {
-	const args = [
-		`--name evals-task-${taskId}`,
+export const processTaskInContainer = async ({
+	taskId,
+	logger,
+	maxRetries = 10,
+}: {
+	taskId: number
+	logger: FileLogger
+	maxRetries?: number
+}) => {
+	const baseArgs = [
 		"--rm",
 		"--network evals_default",
 		"-v /var/run/docker.sock:/var/run/docker.sock",
+		"-v /tmp/evals:/var/log/evals",
 		"-e HOST_EXECUTION_METHOD=docker",
 	]
 
 	const command = `pnpm --filter @roo-code/evals cli --taskId ${taskId}`
-	const subprocess = execa(`docker run ${args.join(" ")} evals-runner sh -c "${command}"`, { shell: true })
-	// subprocess.stdout?.on("data", (data) => console.log(data.toString()))
-	// subprocess.stderr?.on("data", (data) => console.error(data.toString()))
-	await subprocess
+	logger.info(command)
+
+	for (let attempt = 0; attempt <= maxRetries; attempt++) {
+		const containerName = `evals-task-${taskId}.${attempt}`
+		const args = [`--name ${containerName}`, ...baseArgs]
+		const isRetry = attempt > 0
+
+		if (isRetry) {
+			const delayMs = Math.pow(2, attempt - 1) * 1000 * (0.5 + Math.random())
+			logger.info(`retrying in ${delayMs}ms (attempt ${attempt + 1}/${maxRetries + 1})`)
+			await new Promise((resolve) => setTimeout(resolve, delayMs))
+		}
+
+		logger.info(
+			`${isRetry ? "retrying" : "executing"} container command (attempt ${attempt + 1}/${maxRetries + 1})`,
+		)
+
+		const subprocess = execa(`docker run ${args.join(" ")} evals-runner sh -c "${command}"`, { shell: true })
+		// subprocess.stdout?.on("data", (data) => console.log(data.toString()))
+		// subprocess.stderr?.on("data", (data) => console.error(data.toString()))
+
+		try {
+			const result = await subprocess
+			logger.info(`container process completed with exit code: ${result.exitCode}`)
+			return
+		} catch (error) {
+			if (error && typeof error === "object" && "exitCode" in error) {
+				logger.error(
+					`container process failed with exit code: ${error.exitCode} (attempt ${attempt + 1}/${maxRetries + 1})`,
+				)
+			} else {
+				logger.error(`container process failed with error: ${error} (attempt ${attempt + 1}/${maxRetries + 1})`)
+			}
+
+			if (attempt === maxRetries) {
+				break
+			}
+		}
+	}
+
+	logger.error(`all ${maxRetries + 1} attempts failed, giving up`)
+
+	// TODO: Mark task as failed.
 }

+ 23 - 6
packages/evals/src/cli/runEvals.ts

@@ -6,6 +6,7 @@ import { exercisesPath } from "../exercises/index.js"
 import { getTag, isDockerContainer, resetEvalsRepo, commitEvalsRepoChanges } from "./utils.js"
 import { processTask, processTaskInContainer } from "./processTask.js"
 import { startHeartbeat, stopHeartbeat } from "./redis.js"
+import { FileLogger } from "./FileLogger.js"
 
 export const runEvals = async (runId: number) => {
 	const run = await findRun(runId)
@@ -20,8 +21,13 @@ export const runEvals = async (runId: number) => {
 		throw new Error(`Run ${run.id} has no tasks.`)
 	}
 
-	const tag = getTag("runEvals", { run })
-	console.log(`[${Date.now()} | ${tag}] running ${tasks.length} task(s)`)
+	const logger = new FileLogger({
+		logDir: `/var/log/evals/runs/${run.id}`,
+		filename: `controller.log`,
+		tag: getTag("runEvals", { run }),
+	})
+
+	logger.info(`running ${tasks.length} task(s)`)
 
 	const containerized = isDockerContainer()
 
@@ -36,12 +42,22 @@ export const runEvals = async (runId: number) => {
 		await queue.addAll(
 			tasks
 				.filter((task) => task.finishedAt === null)
-				.map((task) => () => (containerized ? processTaskInContainer(task.id) : processTask(task.id))),
+				.map((task) => async () => {
+					try {
+						if (containerized) {
+							await processTaskInContainer({ taskId: task.id, logger })
+						} else {
+							await processTask({ taskId: task.id, logger })
+						}
+					} catch (error) {
+						logger.error("error processing task", error)
+					}
+				}),
 		)
 
-		console.log(`[${Date.now()} | ${tag}] finishRun`)
+		logger.info("finishRun")
 		const result = await finishRun(run.id)
-		console.log(`[${Date.now()} | ${tag}] result ->`, result)
+		logger.info("result ->", result)
 
 		// There's no need to commit the changes in the container since they
 		// will lost when the container is destroyed. I think we should
@@ -50,7 +66,8 @@ export const runEvals = async (runId: number) => {
 			await commitEvalsRepoChanges({ run, cwd: exercisesPath })
 		}
 	} finally {
-		console.log(`[${Date.now()} | ${tag}] cleaning up`)
+		logger.info("cleaning up")
 		stopHeartbeat(run.id, heartbeat)
+		logger.close()
 	}
 }

+ 21 - 22
packages/evals/src/cli/runTask.ts

@@ -18,7 +18,8 @@ import { IpcClient } from "@roo-code/ipc"
 import { type Run, type Task, updateTask, createTaskMetrics, updateTaskMetrics, createToolError } from "../db/index.js"
 import { exercisesPath } from "../exercises/index.js"
 
-import { getTag, isDockerContainer } from "./utils.js"
+import { isDockerContainer } from "./utils.js"
+import { FileLogger } from "./FileLogger.js"
 
 class SubprocessTimeoutError extends Error {
 	constructor(timeout: number) {
@@ -31,14 +32,10 @@ type RunTaskOptions = {
 	run: Run
 	task: Task
 	publish: (taskEvent: TaskEvent) => Promise<void>
+	logger: FileLogger
 }
 
-export const runTask = async ({ run, task, publish }: RunTaskOptions) => {
-	const tag = getTag("runTask", { run, task })
-	const log = (message: string, ...args: unknown[]) => console.log(`[${Date.now()} | ${tag}] ${message}`, ...args)
-	const logError = (message: string, ...args: unknown[]) =>
-		console.error(`[${Date.now()} | ${tag}] ${message}`, ...args)
-
+export const runTask = async ({ run, task, publish, logger }: RunTaskOptions) => {
 	const { language, exercise } = task
 	const prompt = fs.readFileSync(path.resolve(exercisesPath, `prompts/${language}.md`), "utf-8")
 	const workspacePath = path.resolve(exercisesPath, language, exercise)
@@ -52,7 +49,7 @@ export const runTask = async ({ run, task, publish }: RunTaskOptions) => {
 		? `xvfb-run --auto-servernum --server-num=1 code --wait --log trace --disable-workspace-trust --disable-gpu --disable-lcd-text --no-sandbox --user-data-dir /roo/.vscode --password-store="basic" -n ${workspacePath}`
 		: `code --disable-workspace-trust -n ${workspacePath}`
 
-	log(codeCommand)
+	logger.info(codeCommand)
 
 	// Sleep for a random amount of time between 5 and 10 seconds, unless we're
 	// running in a container, in which case there are no issues with flooding
@@ -81,8 +78,8 @@ export const runTask = async ({ run, task, publish }: RunTaskOptions) => {
 			attempts--
 
 			if (attempts <= 0) {
-				logError(`unable to connect to IPC socket -> ${ipcSocketPath}`)
-				return
+				logger.error(`unable to connect to IPC socket -> ${ipcSocketPath}`)
+				throw new Error("Unable to connect.")
 			}
 		}
 	}
@@ -112,7 +109,7 @@ export const runTask = async ({ run, task, publish }: RunTaskOptions) => {
 			!ignoreEvents.log.includes(eventName) &&
 			(eventName !== RooCodeEventName.Message || payload[0].message.partial !== true)
 		) {
-			log(`${eventName} ->`, payload)
+			logger.info(`${eventName} ->`, payload)
 		}
 
 		if (eventName === RooCodeEventName.TaskStarted) {
@@ -172,7 +169,7 @@ export const runTask = async ({ run, task, publish }: RunTaskOptions) => {
 	})
 
 	client.on(IpcMessageType.Disconnect, async () => {
-		log(`disconnected from IPC socket -> ${ipcSocketPath}`)
+		logger.info(`disconnected from IPC socket -> ${ipcSocketPath}`)
 		isClientDisconnected = true
 	})
 
@@ -192,10 +189,10 @@ export const runTask = async ({ run, task, publish }: RunTaskOptions) => {
 	try {
 		await pWaitFor(() => !!taskFinishedAt || isClientDisconnected, { interval: 1_000, timeout: EVALS_TIMEOUT })
 	} catch (_error) {
-		logError("time limit reached")
+		logger.error("time limit reached")
 
 		if (rooTaskId && !isClientDisconnected) {
-			log("cancelling task")
+			logger.info("cancelling task")
 			client.sendCommand({ commandName: TaskCommandName.CancelTask, data: rooTaskId })
 			await new Promise((resolve) => setTimeout(resolve, 5_000)) // Allow some time for the task to cancel.
 		}
@@ -204,10 +201,10 @@ export const runTask = async ({ run, task, publish }: RunTaskOptions) => {
 	}
 
 	if (isClientDisconnected) {
-		logError("client disconnected before task finished")
+		logger.error("client disconnected before task finished")
 	} else {
 		if (rooTaskId) {
-			log("closing task")
+			logger.info("closing task")
 			client.sendCommand({ commandName: TaskCommandName.CloseTask, data: rooTaskId })
 			await new Promise((resolve) => setTimeout(resolve, 2_000)) // Allow some time for the window to close.
 		}
@@ -215,7 +212,7 @@ export const runTask = async ({ run, task, publish }: RunTaskOptions) => {
 		client.disconnect()
 	}
 
-	log("waiting for subprocess to finish")
+	logger.info("waiting for subprocess to finish")
 	controller.abort()
 
 	// Wait for subprocess to finish gracefully, with a timeout.
@@ -229,22 +226,24 @@ export const runTask = async ({ run, task, publish }: RunTaskOptions) => {
 			),
 		])
 
-		log("subprocess finished gracefully")
+		logger.info("subprocess finished gracefully")
 	} catch (error) {
 		if (error instanceof SubprocessTimeoutError) {
-			logError("subprocess did not finish within timeout, force killing")
+			logger.error("subprocess did not finish within timeout, force killing")
 
 			try {
 				if (subprocess.kill("SIGKILL")) {
-					log("SIGKILL sent to subprocess")
+					logger.info("SIGKILL sent to subprocess")
 				} else {
-					logError("failed to send SIGKILL to subprocess")
+					logger.error("failed to send SIGKILL to subprocess")
 				}
 			} catch (killError) {
-				logError("subprocess.kill(SIGKILL) failed:", killError)
+				logger.error("subprocess.kill(SIGKILL) failed:", killError)
 			}
 		} else {
 			throw error
 		}
 	}
+
+	logger.close()
 }