Browse Source

Handle stdin-stream control-flow errors gracefully (#11811)

Chris Estreich 1 month ago
parent
commit
52d606bab3

+ 104 - 0
apps/cli/src/commands/cli/__tests__/cancellation.test.ts

@@ -0,0 +1,104 @@
+import {
+	isCancellationLikeError,
+	isExpectedControlFlowError,
+	isNoActiveTaskLikeError,
+	isStreamTeardownLikeError,
+} from "../cancellation.js"
+
+describe("isCancellationLikeError", () => {
+	it("returns true for aborted error messages", () => {
+		expect(isCancellationLikeError(new Error("[RooCode#say] task 123 aborted"))).toBe(true)
+		expect(isCancellationLikeError("AbortError: operation aborted")).toBe(true)
+	})
+
+	it("returns true for abort/cancel error names and codes", () => {
+		expect(isCancellationLikeError({ name: "AbortError", message: "stop now" })).toBe(true)
+		expect(isCancellationLikeError({ code: "ABORT_ERR", message: "aborted" })).toBe(true)
+		expect(isCancellationLikeError({ code: "ERR_CANCELED", message: "request failed" })).toBe(true)
+	})
+
+	it("returns true for canceled/cancelled error messages", () => {
+		expect(isCancellationLikeError(new Error("Request canceled"))).toBe(true)
+		expect(isCancellationLikeError(new Error("request cancelled by user"))).toBe(true)
+	})
+
+	it("returns false for non-cancellation errors", () => {
+		expect(isCancellationLikeError(new Error("network timeout"))).toBe(false)
+		expect(isCancellationLikeError("validation failed")).toBe(false)
+	})
+})
+
+describe("isNoActiveTaskLikeError", () => {
+	it("matches task-settled cancel race messages", () => {
+		expect(isNoActiveTaskLikeError(new Error("no active task to cancel"))).toBe(true)
+		expect(isNoActiveTaskLikeError(new Error("task not found"))).toBe(true)
+		expect(isNoActiveTaskLikeError("already completed")).toBe(true)
+	})
+
+	it("does not match unrelated messages", () => {
+		expect(isNoActiveTaskLikeError("network timeout")).toBe(false)
+	})
+})
+
+describe("isStreamTeardownLikeError", () => {
+	it("matches common stream teardown errors", () => {
+		expect(isStreamTeardownLikeError({ code: "EPIPE", message: "broken pipe" })).toBe(true)
+		expect(isStreamTeardownLikeError({ code: "ERR_STREAM_DESTROYED", message: "stream destroyed" })).toBe(true)
+		expect(isStreamTeardownLikeError(new Error("write after end"))).toBe(true)
+	})
+
+	it("does not match unrelated stream errors", () => {
+		expect(isStreamTeardownLikeError(new Error("permission denied"))).toBe(false)
+	})
+})
+
+describe("isExpectedControlFlowError", () => {
+	it("returns false when not in stdin stream mode", () => {
+		expect(
+			isExpectedControlFlowError(new Error("AbortError: aborted"), {
+				stdinStreamMode: false,
+				operation: "runtime",
+			}),
+		).toBe(false)
+	})
+
+	it("accepts cancellation-like runtime errors in stdin stream mode", () => {
+		expect(
+			isExpectedControlFlowError(new Error("AbortError: aborted"), {
+				stdinStreamMode: true,
+				operation: "runtime",
+			}),
+		).toBe(true)
+	})
+
+	it("accepts no-active-task races for cancel operations", () => {
+		expect(
+			isExpectedControlFlowError(new Error("task not found"), {
+				stdinStreamMode: true,
+				operation: "cancel",
+			}),
+		).toBe(true)
+	})
+
+	it("accepts stream teardown errors during shutdown", () => {
+		expect(
+			isExpectedControlFlowError(
+				{ code: "EPIPE", message: "broken pipe" },
+				{
+					stdinStreamMode: true,
+					shuttingDown: true,
+					operation: "runtime",
+				},
+			),
+		).toBe(true)
+	})
+
+	it("rejects unrelated errors", () => {
+		expect(
+			isExpectedControlFlowError(new Error("authentication failed"), {
+				stdinStreamMode: true,
+				operation: "runtime",
+			}),
+		).toBe(false)
+	})
+})

+ 131 - 0
apps/cli/src/commands/cli/cancellation.ts

@@ -0,0 +1,131 @@
+const CANCELLATION_ERROR_PATTERNS = ["aborted", "aborterror", "cancelled", "canceled"]
+const CANCELLATION_ERROR_NAMES = new Set(["aborterror"])
+const CANCELLATION_ERROR_CODES = new Set(["ABORT_ERR", "ERR_CANCELED", "ERR_CANCELLED"])
+const NO_ACTIVE_TASK_PATTERNS = [
+	"no active task",
+	"no task to cancel",
+	"task not found",
+	"unable to find task",
+	"already completed",
+	"already cancelled",
+	"already canceled",
+]
+const STREAM_TEARDOWN_CODES = new Set(["EPIPE", "ECONNRESET", "ERR_STREAM_DESTROYED", "ERR_STREAM_PREMATURE_CLOSE"])
+const STREAM_TEARDOWN_PATTERNS = [
+	"write after end",
+	"stream destroyed",
+	"premature close",
+	"socket hang up",
+	"broken pipe",
+]
+
+export interface ExpectedControlFlowErrorContext {
+	stdinStreamMode: boolean
+	cancelRequested?: boolean
+	shuttingDown?: boolean
+	operation?: "runtime" | "client" | "cancel" | "shutdown"
+}
+
+interface ErrorMetadata {
+	message: string
+	normalizedMessage: string
+	name?: string
+	normalizedName?: string
+	code?: string
+}
+
+function getErrorMetadata(error: unknown): ErrorMetadata {
+	if (error instanceof Error) {
+		const maybeCode = (error as Error & { code?: unknown }).code
+		const code = typeof maybeCode === "string" ? maybeCode : undefined
+		return {
+			message: error.message,
+			normalizedMessage: error.message.toLowerCase(),
+			name: error.name,
+			normalizedName: error.name.toLowerCase(),
+			code,
+		}
+	}
+
+	if (typeof error === "object" && error !== null) {
+		const nameRaw = (error as { name?: unknown }).name
+		const messageRaw = (error as { message?: unknown }).message
+		const codeRaw = (error as { code?: unknown }).code
+		const message = typeof messageRaw === "string" ? messageRaw : String(error)
+		return {
+			message,
+			normalizedMessage: message.toLowerCase(),
+			name: typeof nameRaw === "string" ? nameRaw : undefined,
+			normalizedName: typeof nameRaw === "string" ? nameRaw.toLowerCase() : undefined,
+			code: typeof codeRaw === "string" ? codeRaw : undefined,
+		}
+	}
+
+	const message = String(error)
+	return {
+		message,
+		normalizedMessage: message.toLowerCase(),
+	}
+}
+
+/**
+ * Best-effort classifier for cancellation/abort failures.
+ */
+export function isCancellationLikeError(error: unknown): boolean {
+	const details = getErrorMetadata(error)
+
+	if (details.code && CANCELLATION_ERROR_CODES.has(details.code)) {
+		return true
+	}
+
+	if (details.normalizedName && CANCELLATION_ERROR_NAMES.has(details.normalizedName)) {
+		return true
+	}
+
+	return CANCELLATION_ERROR_PATTERNS.some((pattern) => details.normalizedMessage.includes(pattern))
+}
+
+export function isNoActiveTaskLikeError(error: unknown): boolean {
+	const details = getErrorMetadata(error)
+	return NO_ACTIVE_TASK_PATTERNS.some((pattern) => details.normalizedMessage.includes(pattern))
+}
+
+export function isStreamTeardownLikeError(error: unknown): boolean {
+	const details = getErrorMetadata(error)
+	if (details.code && STREAM_TEARDOWN_CODES.has(details.code)) {
+		return true
+	}
+
+	return STREAM_TEARDOWN_PATTERNS.some((pattern) => details.normalizedMessage.includes(pattern))
+}
+
+/**
+ * Classify errors that should be treated as expected control flow rather than
+ * fatal failures while handling stdin stream tasks.
+ */
+export function isExpectedControlFlowError(error: unknown, context: ExpectedControlFlowErrorContext): boolean {
+	if (!context.stdinStreamMode) {
+		return false
+	}
+
+	if (context.shuttingDown && isStreamTeardownLikeError(error)) {
+		return true
+	}
+
+	const isCancelLike = isCancellationLikeError(error)
+	if (isCancelLike && (context.cancelRequested || context.shuttingDown || context.operation === "runtime")) {
+		return true
+	}
+
+	if (
+		isNoActiveTaskLikeError(error) &&
+		(context.cancelRequested ||
+			context.shuttingDown ||
+			context.operation === "cancel" ||
+			context.operation === "shutdown")
+	) {
+		return true
+	}
+
+	return false
+}

+ 21 - 0
apps/cli/src/commands/cli/run.ts

@@ -30,6 +30,7 @@ import { getDefaultExtensionPath } from "@/lib/utils/extension.js"
 import { VERSION } from "@/lib/utils/version.js"
 
 import { ExtensionHost, ExtensionHostOptions } from "@/agent/index.js"
+import { isExpectedControlFlowError } from "./cancellation.js"
 import { runStdinStreamMode } from "./stdin-stream.js"
 
 const __dirname = path.dirname(fileURLToPath(import.meta.url))
@@ -479,6 +480,16 @@ export async function run(promptArg: string | undefined, flagOptions: FlagOption
 		}
 
 		const onUncaughtException = (error: Error) => {
+			if (
+				isExpectedControlFlowError(error, {
+					stdinStreamMode: useStdinPromptStream,
+					shuttingDown: isShuttingDown,
+					operation: "runtime",
+				})
+			) {
+				return
+			}
+
 			emitRuntimeError(error, "uncaughtException")
 
 			if (signalOnlyExit) {
@@ -489,6 +500,16 @@ export async function run(promptArg: string | undefined, flagOptions: FlagOption
 		}
 
 		const onUnhandledRejection = (reason: unknown) => {
+			if (
+				isExpectedControlFlowError(reason, {
+					stdinStreamMode: useStdinPromptStream,
+					shuttingDown: isShuttingDown,
+					operation: "runtime",
+				})
+			) {
+				return
+			}
+
 			const error = normalizeError(reason)
 			emitRuntimeError(error, "unhandledRejection")
 

+ 66 - 11
apps/cli/src/commands/cli/stdin-stream.ts

@@ -9,6 +9,7 @@ import {
 } from "@roo-code/types"
 
 import { isRecord } from "@/lib/utils/guards.js"
+import { isCancellationLikeError, isExpectedControlFlowError, isNoActiveTaskLikeError } from "./cancellation.js"
 
 import type { ExtensionHost } from "@/agent/index.js"
 import type { JsonEventEmitter } from "@/agent/json-event-emitter.js"
@@ -188,12 +189,6 @@ export interface StdinStreamModeOptions {
 	setStreamRequestId: (id: string | undefined) => void
 }
 
-function isCancellationLikeError(error: unknown): boolean {
-	const message = error instanceof Error ? error.message : String(error)
-	const normalized = message.toLowerCase()
-	return normalized.includes("aborted") || normalized.includes("cancelled") || normalized.includes("canceled")
-}
-
 const RESUME_ASKS = new Set(["resume_task", "resume_completed_task"])
 const CANCEL_RECOVERY_WAIT_TIMEOUT_MS = 8_000
 const CANCEL_RECOVERY_POLL_INTERVAL_MS = 100
@@ -313,8 +308,15 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 	}
 
 	const offClientError = host.client.on("error", (error) => {
-		if (cancelRequestedForActiveTask && isCancellationLikeError(error)) {
-			if (activeTaskCommand === "start") {
+		if (
+			isExpectedControlFlowError(error, {
+				stdinStreamMode: true,
+				cancelRequested: cancelRequestedForActiveTask,
+				shuttingDown: shouldShutdown,
+				operation: "client",
+			})
+		) {
+			if (activeTaskCommand === "start" && (cancelRequestedForActiveTask || isCancellationLikeError(error))) {
 				jsonEmitter.emitControl({
 					subtype: "done",
 					requestId: activeRequestId,
@@ -329,6 +331,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 			activeRequestId = undefined
 			setStreamRequestId(undefined)
 			cancelRequestedForActiveTask = false
+			awaitingPostCancelRecovery = false
 			return
 		}
 
@@ -443,6 +446,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 				code: completionCode,
 				success: event.success,
 			})
+
 			activeTaskCommand = undefined
 			activeRequestId = undefined
 			setStreamRequestId(undefined)
@@ -478,6 +482,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 							code: "task_busy",
 							success: false,
 						})
+
 						break
 					}
 
@@ -503,8 +508,18 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 						.catch((error) => {
 							const message = error instanceof Error ? error.message : String(error)
 
-							if (cancelRequestedForActiveTask || isCancellationLikeError(error)) {
-								if (activeTaskCommand === "start") {
+							if (
+								isExpectedControlFlowError(error, {
+									stdinStreamMode: true,
+									cancelRequested: cancelRequestedForActiveTask,
+									shuttingDown: shouldShutdown,
+									operation: "client",
+								})
+							) {
+								if (
+									activeTaskCommand === "start" &&
+									(cancelRequestedForActiveTask || isCancellationLikeError(error))
+								) {
 									jsonEmitter.emitControl({
 										subtype: "done",
 										requestId: stdinCommand.requestId,
@@ -515,10 +530,12 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 										success: false,
 									})
 								}
+
 								activeTaskCommand = undefined
 								activeRequestId = undefined
 								setStreamRequestId(undefined)
 								cancelRequestedForActiveTask = false
+								awaitingPostCancelRecovery = false
 								return
 							}
 
@@ -526,6 +543,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 							activeTaskCommand = undefined
 							activeRequestId = undefined
 							setStreamRequestId(undefined)
+
 							jsonEmitter.emitControl({
 								subtype: "error",
 								requestId: stdinCommand.requestId,
@@ -539,6 +557,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 						.finally(() => {
 							activeTaskPromise = null
 						})
+
 					break
 
 				case "message": {
@@ -547,6 +566,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 					if (awaitingPostCancelRecovery) {
 						await waitForPostCancelRecovery(host)
 					}
+
 					const wasResumable = isResumableState(host)
 
 					if (!host.client.hasActiveTask()) {
@@ -559,10 +579,12 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 							code: "no_active_task",
 							success: false,
 						})
+
 						break
 					}
 
 					setStreamRequestId(stdinCommand.requestId)
+
 					jsonEmitter.emitControl({
 						subtype: "ack",
 						requestId: stdinCommand.requestId,
@@ -572,7 +594,9 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 						code: "accepted",
 						success: true,
 					})
+
 					host.sendToExtension({ type: "queueMessage", text: stdinCommand.prompt })
+
 					jsonEmitter.emitControl({
 						subtype: "done",
 						requestId: stdinCommand.requestId,
@@ -582,6 +606,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 						code: wasResumable ? "resumed" : "queued",
 						success: true,
 					})
+
 					awaitingPostCancelRecovery = false
 					break
 				}
@@ -603,6 +628,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 							code: "accepted",
 							success: true,
 						})
+
 						jsonEmitter.emitControl({
 							subtype: "done",
 							requestId: stdinCommand.requestId,
@@ -612,11 +638,13 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 							code: "no_active_task",
 							success: true,
 						})
+
 						break
 					}
 
 					cancelRequestedForActiveTask = true
 					awaitingPostCancelRecovery = true
+
 					jsonEmitter.emitControl({
 						subtype: "ack",
 						requestId: stdinCommand.requestId,
@@ -626,8 +654,10 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 						code: "accepted",
 						success: true,
 					})
+
 					try {
 						host.client.cancelTask()
+
 						jsonEmitter.emitControl({
 							subtype: "done",
 							requestId: stdinCommand.requestId,
@@ -638,7 +668,32 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
 							success: true,
 						})
 					} catch (error) {
-						if (!isCancellationLikeError(error)) {
+						if (
+							isExpectedControlFlowError(error, {
+								stdinStreamMode: true,
+								cancelRequested: true,
+								shuttingDown: shouldShutdown,
+								operation: "cancel",
+							})
+						) {
+							const noActiveTask = isNoActiveTaskLikeError(error)
+
+							jsonEmitter.emitControl({
+								subtype: "done",
+								requestId: stdinCommand.requestId,
+								command: "cancel",
+								taskId: latestTaskId,
+								content: noActiveTask ? "cancel ignored (task already settled)" : "cancel handled",
+								code: noActiveTask ? "no_active_task" : "cancel_requested",
+								success: true,
+							})
+
+							if (noActiveTask) {
+								awaitingPostCancelRecovery = false
+							}
+
+							cancelRequestedForActiveTask = false
+						} else {
 							const message = error instanceof Error ? error.message : String(error)
 							jsonEmitter.emitControl({
 								subtype: "error",