TaskManager.ts 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. import type { Socket } from "socket.io-client"
  2. import {
  3. type ClineMessage,
  4. type TaskEvents,
  5. type TaskLike,
  6. type TaskBridgeCommand,
  7. type TaskBridgeEvent,
  8. RooCodeEventName,
  9. TaskBridgeEventName,
  10. TaskBridgeCommandName,
  11. TaskSocketEvents,
  12. } from "@roo-code/types"
  13. type TaskEventListener = {
  14. [K in keyof TaskEvents]: (...args: TaskEvents[K]) => void | Promise<void>
  15. }[keyof TaskEvents]
  16. const TASK_EVENT_MAPPING: Record<TaskBridgeEventName, keyof TaskEvents> = {
  17. [TaskBridgeEventName.Message]: RooCodeEventName.Message,
  18. [TaskBridgeEventName.TaskModeSwitched]: RooCodeEventName.TaskModeSwitched,
  19. [TaskBridgeEventName.TaskInteractive]: RooCodeEventName.TaskInteractive,
  20. }
  21. export class TaskManager {
  22. private subscribedTasks: Map<string, TaskLike> = new Map()
  23. private pendingTasks: Map<string, TaskLike> = new Map()
  24. private socket: Socket | null = null
  25. private taskListeners: Map<string, Map<TaskBridgeEventName, TaskEventListener>> = new Map()
  26. constructor() {}
  27. public async onConnect(socket: Socket): Promise<void> {
  28. this.socket = socket
  29. // Rejoin all subscribed tasks.
  30. for (const taskId of this.subscribedTasks.keys()) {
  31. try {
  32. socket.emit(TaskSocketEvents.JOIN, { taskId })
  33. console.log(`[TaskManager] emit() -> ${TaskSocketEvents.JOIN} ${taskId}`)
  34. } catch (error) {
  35. console.error(
  36. `[TaskManager] emit() failed -> ${TaskSocketEvents.JOIN}: ${
  37. error instanceof Error ? error.message : String(error)
  38. }`,
  39. )
  40. }
  41. }
  42. // Subscribe to any pending tasks.
  43. for (const task of this.pendingTasks.values()) {
  44. await this.subscribeToTask(task, socket)
  45. }
  46. this.pendingTasks.clear()
  47. }
  48. public onDisconnect(): void {
  49. this.socket = null
  50. }
  51. public async onReconnect(socket: Socket): Promise<void> {
  52. this.socket = socket
  53. // Rejoin all subscribed tasks.
  54. for (const taskId of this.subscribedTasks.keys()) {
  55. try {
  56. socket.emit(TaskSocketEvents.JOIN, { taskId })
  57. console.log(`[TaskManager] emit() -> ${TaskSocketEvents.JOIN} ${taskId}`)
  58. } catch (error) {
  59. console.error(
  60. `[TaskManager] emit() failed -> ${TaskSocketEvents.JOIN}: ${
  61. error instanceof Error ? error.message : String(error)
  62. }`,
  63. )
  64. }
  65. }
  66. }
  67. public async cleanup(socket: Socket | null): Promise<void> {
  68. if (!socket) {
  69. return
  70. }
  71. const unsubscribePromises = []
  72. for (const taskId of this.subscribedTasks.keys()) {
  73. unsubscribePromises.push(this.unsubscribeFromTask(taskId, socket))
  74. }
  75. await Promise.allSettled(unsubscribePromises)
  76. this.subscribedTasks.clear()
  77. this.taskListeners.clear()
  78. this.pendingTasks.clear()
  79. this.socket = null
  80. }
  81. public addPendingTask(task: TaskLike): void {
  82. this.pendingTasks.set(task.taskId, task)
  83. }
  84. public async subscribeToTask(task: TaskLike, socket: Socket): Promise<void> {
  85. const taskId = task.taskId
  86. this.subscribedTasks.set(taskId, task)
  87. this.setupListeners(task)
  88. try {
  89. socket.emit(TaskSocketEvents.JOIN, { taskId })
  90. console.log(`[TaskManager] emit() -> ${TaskSocketEvents.JOIN} ${taskId}`)
  91. } catch (error) {
  92. console.error(
  93. `[TaskManager] emit() failed -> ${TaskSocketEvents.JOIN}: ${
  94. error instanceof Error ? error.message : String(error)
  95. }`,
  96. )
  97. }
  98. }
  99. public async unsubscribeFromTask(taskId: string, socket: Socket): Promise<void> {
  100. const task = this.subscribedTasks.get(taskId)
  101. if (task) {
  102. this.removeListeners(task)
  103. this.subscribedTasks.delete(taskId)
  104. }
  105. try {
  106. socket.emit(TaskSocketEvents.LEAVE, { taskId })
  107. console.log(`[TaskManager] emit() -> ${TaskSocketEvents.LEAVE} ${taskId}`)
  108. } catch (error) {
  109. console.error(
  110. `[TaskManager] emit() failed -> ${TaskSocketEvents.LEAVE}: ${
  111. error instanceof Error ? error.message : String(error)
  112. }`,
  113. )
  114. }
  115. }
  116. public handleTaskCommand(message: TaskBridgeCommand): void {
  117. const task = this.subscribedTasks.get(message.taskId)
  118. if (!task) {
  119. console.error(`[TaskManager#handleTaskCommand] Unable to find task ${message.taskId}`)
  120. return
  121. }
  122. switch (message.type) {
  123. case TaskBridgeCommandName.Message:
  124. console.log(
  125. `[TaskManager#handleTaskCommand] ${TaskBridgeCommandName.Message} ${message.taskId} -> submitUserMessage()`,
  126. message,
  127. )
  128. task.submitUserMessage(message.payload.text, message.payload.images)
  129. break
  130. case TaskBridgeCommandName.ApproveAsk:
  131. console.log(
  132. `[TaskManager#handleTaskCommand] ${TaskBridgeCommandName.ApproveAsk} ${message.taskId} -> approveAsk()`,
  133. message,
  134. )
  135. task.approveAsk(message.payload)
  136. break
  137. case TaskBridgeCommandName.DenyAsk:
  138. console.log(
  139. `[TaskManager#handleTaskCommand] ${TaskBridgeCommandName.DenyAsk} ${message.taskId} -> denyAsk()`,
  140. message,
  141. )
  142. task.denyAsk(message.payload)
  143. break
  144. }
  145. }
  146. private setupListeners(task: TaskLike): void {
  147. if (this.taskListeners.has(task.taskId)) {
  148. console.warn("[TaskManager] Listeners already exist for task, removing old listeners:", task.taskId)
  149. this.removeListeners(task)
  150. }
  151. const listeners = new Map<TaskBridgeEventName, TaskEventListener>()
  152. const onMessage = ({ action, message }: { action: string; message: ClineMessage }) => {
  153. this.publishEvent({
  154. type: TaskBridgeEventName.Message,
  155. taskId: task.taskId,
  156. action,
  157. message,
  158. })
  159. }
  160. task.on(RooCodeEventName.Message, onMessage)
  161. listeners.set(TaskBridgeEventName.Message, onMessage)
  162. const onTaskModeSwitched = (mode: string) => {
  163. this.publishEvent({
  164. type: TaskBridgeEventName.TaskModeSwitched,
  165. taskId: task.taskId,
  166. mode,
  167. })
  168. }
  169. task.on(RooCodeEventName.TaskModeSwitched, onTaskModeSwitched)
  170. listeners.set(TaskBridgeEventName.TaskModeSwitched, onTaskModeSwitched)
  171. const onTaskInteractive = (_taskId: string) => {
  172. this.publishEvent({
  173. type: TaskBridgeEventName.TaskInteractive,
  174. taskId: task.taskId,
  175. })
  176. }
  177. task.on(RooCodeEventName.TaskInteractive, onTaskInteractive)
  178. listeners.set(TaskBridgeEventName.TaskInteractive, onTaskInteractive)
  179. this.taskListeners.set(task.taskId, listeners)
  180. console.log("[TaskManager] Task listeners setup complete for:", task.taskId)
  181. }
  182. private removeListeners(task: TaskLike): void {
  183. const listeners = this.taskListeners.get(task.taskId)
  184. if (!listeners) {
  185. return
  186. }
  187. console.log("[TaskManager] Removing task listeners for:", task.taskId)
  188. listeners.forEach((listener, eventName) => {
  189. try {
  190. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  191. task.off(TASK_EVENT_MAPPING[eventName], listener as any)
  192. } catch (error) {
  193. console.error(
  194. `[TaskManager] Error removing listener for ${String(eventName)} on task ${task.taskId}:`,
  195. error,
  196. )
  197. }
  198. })
  199. this.taskListeners.delete(task.taskId)
  200. }
  201. private async publishEvent(message: TaskBridgeEvent): Promise<boolean> {
  202. if (!this.socket) {
  203. console.error("[TaskManager] publishEvent -> socket not available")
  204. return false
  205. }
  206. try {
  207. this.socket.emit(TaskSocketEvents.EVENT, message)
  208. if (message.type !== TaskBridgeEventName.Message) {
  209. console.log(
  210. `[TaskManager] emit() -> ${TaskSocketEvents.EVENT} ${message.taskId} ${message.type}`,
  211. message,
  212. )
  213. }
  214. return true
  215. } catch (error) {
  216. console.error(
  217. `[TaskManager] emit() failed -> ${TaskSocketEvents.EVENT}: ${
  218. error instanceof Error ? error.message : String(error)
  219. }`,
  220. )
  221. return false
  222. }
  223. }
  224. }