Explorar o código

Implement deferred task subscriptions (#7517)

Chris Estreich hai 4 meses
pai
achega
1d46bd1bbc

+ 35 - 1
packages/cloud/src/bridge/BridgeOrchestrator.ts

@@ -31,6 +31,8 @@ export interface BridgeOrchestratorOptions {
 export class BridgeOrchestrator {
 	private static instance: BridgeOrchestrator | null = null
 
+	private static pendingTask: TaskLike | null = null
+
 	// Core
 	private readonly userId: string
 	private readonly socketBridgeUrl: string
@@ -116,6 +118,22 @@ export class BridgeOrchestrator {
 		}
 	}
 
+	/**
+	 * @TODO: What if subtasks also get spawned? We'd probably want deferred
+	 * subscriptions for those too.
+	 */
+	public static async subscribeToTask(task: TaskLike): Promise<void> {
+		const instance = BridgeOrchestrator.instance
+
+		if (instance && instance.socketTransport.isConnected()) {
+			console.log(`[BridgeOrchestrator#subscribeToTask] Subscribing to task ${task.taskId}`)
+			await instance.subscribeToTask(task)
+		} else {
+			console.log(`[BridgeOrchestrator#subscribeToTask] Deferring subscription for task ${task.taskId}`)
+			BridgeOrchestrator.pendingTask = task
+		}
+	}
+
 	private constructor(options: BridgeOrchestratorOptions) {
 		this.userId = options.userId
 		this.socketBridgeUrl = options.socketBridgeUrl
@@ -180,12 +198,27 @@ export class BridgeOrchestrator {
 		const socket = this.socketTransport.getSocket()
 
 		if (!socket) {
-			console.error("[BridgeOrchestrator] Socket not available after connect")
+			console.error("[BridgeOrchestrator#handleConnect] Socket not available")
 			return
 		}
 
 		await this.extensionChannel.onConnect(socket)
 		await this.taskChannel.onConnect(socket)
+
+		if (BridgeOrchestrator.pendingTask) {
+			console.log(
+				`[BridgeOrchestrator#handleConnect] Subscribing to task ${BridgeOrchestrator.pendingTask.taskId}`,
+			)
+
+			try {
+				await this.subscribeToTask(BridgeOrchestrator.pendingTask)
+				BridgeOrchestrator.pendingTask = null
+			} catch (error) {
+				console.error(
+					`[BridgeOrchestrator#handleConnect] subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
+				)
+			}
+		}
 	}
 
 	private handleDisconnect() {
@@ -261,6 +294,7 @@ export class BridgeOrchestrator {
 		await this.taskChannel.cleanup(this.socketTransport.getSocket())
 		await this.socketTransport.disconnect()
 		BridgeOrchestrator.instance = null
+		BridgeOrchestrator.pendingTask = null
 	}
 
 	public async reconnect(): Promise<void> {

+ 8 - 6
packages/cloud/src/bridge/TaskChannel.ts

@@ -163,25 +163,27 @@ export class TaskChannel extends BaseChannel<
 	public async unsubscribeFromTask(taskId: string, _socket: Socket): Promise<void> {
 		const task = this.subscribedTasks.get(taskId)
 
+		if (!task) {
+			return
+		}
+
 		await this.publish(TaskSocketEvents.LEAVE, { taskId }, (response: LeaveResponse) => {
 			if (response.success) {
-				console.log(`[TaskChannel#unsubscribeFromTask] unsubscribed from ${taskId}`, response)
+				console.log(`[TaskChannel#unsubscribeFromTask] unsubscribed from ${taskId}`)
 			} else {
 				console.error(`[TaskChannel#unsubscribeFromTask] failed to unsubscribe from ${taskId}`)
 			}
 
 			// If we failed to unsubscribe then something is probably wrong and
 			// we should still discard this task from `subscribedTasks`.
-			if (task) {
-				this.removeTaskListeners(task)
-				this.subscribedTasks.delete(taskId)
-			}
+			this.removeTaskListeners(task)
+			this.subscribedTasks.delete(taskId)
 		})
 	}
 
 	private setupTaskListeners(task: TaskLike): void {
 		if (this.taskListeners.has(task.taskId)) {
-			console.warn("[TaskChannel] Listeners already exist for task, removing old listeners:", task.taskId)
+			console.warn(`[TaskChannel] Listeners already exist for task, removing old listeners for ${task.taskId}`)
 			this.removeTaskListeners(task)
 		}
 

+ 1 - 2
packages/cloud/src/bridge/__tests__/TaskChannel.test.ts

@@ -299,8 +299,7 @@ describe("TaskChannel", () => {
 
 			// Verify warning was logged
 			expect(warnSpy).toHaveBeenCalledWith(
-				"[TaskChannel] Listeners already exist for task, removing old listeners:",
-				taskId,
+				`[TaskChannel] Listeners already exist for task, removing old listeners for ${taskId}`,
 			)
 
 			// Verify only one set of listeners exists

+ 1 - 1
packages/types/npm/package.metadata.json

@@ -1,6 +1,6 @@
 {
 	"name": "@roo-code/types",
-	"version": "1.64.0",
+	"version": "1.65.0",
 	"description": "TypeScript type definitions for Roo Code.",
 	"publishConfig": {
 		"access": "public",

+ 14 - 23
src/core/task/Task.ts

@@ -256,7 +256,6 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
 
 	// Task Bridge
 	enableBridge: boolean
-	bridge: BridgeOrchestrator | null = null
 
 	// Streaming
 	isWaitingForFirstChunk = false
@@ -1084,14 +1083,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
 	private async startTask(task?: string, images?: string[]): Promise<void> {
 		if (this.enableBridge) {
 			try {
-				this.bridge = this.bridge || BridgeOrchestrator.getInstance()
-
-				if (this.bridge) {
-					await this.bridge.subscribeToTask(this)
-				}
+				await BridgeOrchestrator.subscribeToTask(this)
 			} catch (error) {
 				console.error(
-					`[Task#startTask] subscribeToTask failed - ${error instanceof Error ? error.message : String(error)}`,
+					`[Task#startTask] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
 				)
 			}
 		}
@@ -1156,14 +1151,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
 	private async resumeTaskFromHistory() {
 		if (this.enableBridge) {
 			try {
-				this.bridge = this.bridge || BridgeOrchestrator.getInstance()
-
-				if (this.bridge) {
-					await this.bridge.subscribeToTask(this)
-				}
+				await BridgeOrchestrator.subscribeToTask(this)
 			} catch (error) {
 				console.error(
-					`[Task#resumeTaskFromHistory] subscribeToTask failed - ${error instanceof Error ? error.message : String(error)}`,
+					`[Task#resumeTaskFromHistory] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
 				)
 			}
 		}
@@ -1417,10 +1408,9 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
 	}
 
 	public dispose(): void {
-		// Disposing task
-		console.log(`[Task] disposing task ${this.taskId}.${this.instanceId}`)
+		console.log(`[Task#dispose] disposing task ${this.taskId}.${this.instanceId}`)
 
-		// Remove all event listeners to prevent memory leaks
+		// Remove all event listeners to prevent memory leaks.
 		try {
 			this.removeAllListeners()
 		} catch (error) {
@@ -1433,13 +1423,14 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
 			this.pauseInterval = undefined
 		}
 
-		// Unsubscribe from TaskBridge service.
-		if (this.bridge) {
-			this.bridge
-				.unsubscribeFromTask(this.taskId)
-				.catch((error: unknown) => console.error("Error unsubscribing from task bridge:", error))
-
-			this.bridge = null
+		if (this.enableBridge) {
+			BridgeOrchestrator.getInstance()
+				?.unsubscribeFromTask(this.taskId)
+				.catch((error) =>
+					console.error(
+						`[Task#dispose] BridgeOrchestrator#unsubscribeFromTask() failed: ${error instanceof Error ? error.message : String(error)}`,
+					),
+				)
 		}
 
 		// Release any terminals associated with this task.

+ 1 - 1
src/core/task/__tests__/Task.dispose.test.ts

@@ -134,7 +134,7 @@ describe("Task dispose method", () => {
 
 		// Verify dispose was called and logged
 		expect(consoleLogSpy).toHaveBeenCalledWith(
-			expect.stringContaining(`[Task] disposing task ${task.taskId}.${task.instanceId}`),
+			expect.stringContaining(`[Task#dispose] disposing task ${task.taskId}.${task.instanceId}`),
 		)
 
 		// Verify removeAllListeners was called first (before other cleanup)

+ 23 - 17
src/core/webview/ClineProvider.ts

@@ -165,6 +165,8 @@ export class ClineProvider
 
 		this.marketplaceManager = new MarketplaceManager(this.context, this.customModesManager)
 
+		// Forward <most> task events to the provider.
+		// We do something fairly similar for the IPC-based API.
 		this.taskCreationCallback = (instance: Task) => {
 			this.emit(RooCodeEventName.TaskCreated, instance)
 
@@ -346,18 +348,18 @@ export class ClineProvider
 		let task = this.clineStack.pop()
 
 		if (task) {
+			task.emit(RooCodeEventName.TaskUnfocused)
+
 			try {
 				// Abort the running task and set isAbandoned to true so
 				// all running promises will exit as well.
 				await task.abortTask(true)
 			} catch (e) {
 				this.log(
-					`[removeClineFromStack] encountered error while aborting task ${task.taskId}.${task.instanceId}: ${e.message}`,
+					`[ClineProvider#removeClineFromStack] abortTask() failed ${task.taskId}.${task.instanceId}: ${e.message}`,
 				)
 			}
 
-			task.emit(RooCodeEventName.TaskUnfocused)
-
 			// Remove event listeners before clearing the reference.
 			const cleanupFunctions = this.taskEventListeners.get(task)
 
@@ -405,12 +407,6 @@ export class ClineProvider
 		await this.getCurrentTask()?.resumePausedTask(lastMessage)
 	}
 
-	// Clear the current task without treating it as a subtask.
-	// This is used when the user cancels a task that is not a subtask.
-	async clearTask() {
-		await this.removeClineFromStack()
-	}
-
 	resumeTask(taskId: string): void {
 		// Use the existing showTaskWithId method which handles both current and historical tasks
 		this.showTaskWithId(taskId).catch((error) => {
@@ -1307,6 +1303,16 @@ export class ClineProvider
 		await this.createTaskWithHistoryItem({ ...historyItem, rootTask, parentTask })
 	}
 
+	// Clear the current task without treating it as a subtask.
+	// This is used when the user cancels a task that is not a subtask.
+	async clearTask() {
+		if (this.clineStack.length > 0) {
+			const task = this.clineStack[this.clineStack.length - 1]
+			console.log(`[clearTask] clearing task ${task.taskId}.${task.instanceId}`)
+			await this.removeClineFromStack()
+		}
+	}
+
 	async updateCustomInstructions(instructions?: string) {
 		// User may be clearing the field.
 		await this.updateGlobalState("customInstructions", instructions || undefined)
@@ -1585,6 +1591,7 @@ export class ClineProvider
 			})
 		} catch (error) {
 			console.error("Failed to fetch marketplace data:", error)
+
 			// Send empty data on error to prevent UI from hanging
 			this.postMessageToWebview({
 				type: "marketplaceData",
@@ -2213,24 +2220,23 @@ export class ClineProvider
 		if (bridge) {
 			const currentTask = this.getCurrentTask()
 
-			if (currentTask && !currentTask.bridge) {
+			if (currentTask && !currentTask.enableBridge) {
 				try {
-					currentTask.bridge = bridge
-					await currentTask.bridge.subscribeToTask(currentTask)
+					currentTask.enableBridge = true
+					await BridgeOrchestrator.subscribeToTask(currentTask)
 				} catch (error) {
-					const message = `[ClineProvider#remoteControlEnabled] subscribeToTask failed - ${error instanceof Error ? error.message : String(error)}`
+					const message = `[ClineProvider#remoteControlEnabled] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`
 					this.log(message)
 					console.error(message)
 				}
 			}
 		} else {
 			for (const task of this.clineStack) {
-				if (task.bridge) {
+				if (task.enableBridge) {
 					try {
-						await task.bridge.unsubscribeFromTask(task.taskId)
-						task.bridge = null
+						await BridgeOrchestrator.getInstance()?.unsubscribeFromTask(task.taskId)
 					} catch (error) {
-						const message = `[ClineProvider#remoteControlEnabled] unsubscribeFromTask failed - ${error instanceof Error ? error.message : String(error)}`
+						const message = `[ClineProvider#remoteControlEnabled] BridgeOrchestrator#unsubscribeFromTask() failed: ${error instanceof Error ? error.message : String(error)}`
 						this.log(message)
 						console.error(message)
 					}

+ 23 - 5
src/extension/api.ts

@@ -253,11 +253,29 @@ export class API extends EventEmitter<RooCodeEvents> implements RooCodeAPI {
 				this.taskMap.delete(task.taskId)
 			})
 
-			// Optional:
-			// RooCodeEventName.TaskFocused
-			// RooCodeEventName.TaskUnfocused
-			// RooCodeEventName.TaskActive
-			// RooCodeEventName.TaskIdle
+			task.on(RooCodeEventName.TaskFocused, () => {
+				this.emit(RooCodeEventName.TaskFocused, task.taskId)
+			})
+
+			task.on(RooCodeEventName.TaskUnfocused, () => {
+				this.emit(RooCodeEventName.TaskUnfocused, task.taskId)
+			})
+
+			task.on(RooCodeEventName.TaskActive, () => {
+				this.emit(RooCodeEventName.TaskActive, task.taskId)
+			})
+
+			task.on(RooCodeEventName.TaskInteractive, () => {
+				this.emit(RooCodeEventName.TaskInteractive, task.taskId)
+			})
+
+			task.on(RooCodeEventName.TaskResumable, () => {
+				this.emit(RooCodeEventName.TaskResumable, task.taskId)
+			})
+
+			task.on(RooCodeEventName.TaskIdle, () => {
+				this.emit(RooCodeEventName.TaskIdle, task.taskId)
+			})
 
 			// Subtask Lifecycle