json-event-emitter.ts 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905
  1. /**
  2. * JsonEventEmitter - Handles structured JSON output for the CLI
  3. *
  4. * This class transforms internal CLI events (ClineMessage, state changes, etc.)
  5. * into structured JSON events and outputs them to stdout.
  6. *
  7. * Supports two output modes:
  8. * - "stream-json": NDJSON format (one JSON object per line) for real-time streaming
  9. * - "json": Single JSON object at the end with accumulated events
  10. *
  11. * Schema is optimized for efficiency with high message volume:
  12. * - Minimal fields per event
  13. * - No redundant wrappers
  14. * - `done` flag instead of partial:false
  15. */
  16. import type { ClineMessage } from "@roo-code/types"
  17. import type { JsonEvent, JsonEventCost, JsonEventQueueItem, JsonFinalOutput } from "@/types/json-events.js"
  18. import type { ExtensionClient } from "./extension-client.js"
  19. import type { AgentStateChangeEvent, TaskCompletedEvent } from "./events.js"
  20. import { AgentLoopState } from "./agent-state.js"
  21. /**
  22. * Options for JsonEventEmitter.
  23. */
  24. export interface JsonEventEmitterOptions {
  25. /** Output mode: "json" or "stream-json" */
  26. mode: "json" | "stream-json"
  27. /** Output stream (defaults to process.stdout) */
  28. stdout?: NodeJS.WriteStream
  29. /** Optional request id provider for correlating stream events */
  30. requestIdProvider?: () => string | undefined
  31. /** Transport schema version emitted in system:init */
  32. schemaVersion?: number
  33. /** Transport protocol identifier emitted in system:init */
  34. protocol?: string
  35. /** Supported stdin protocol capabilities emitted in system:init */
  36. capabilities?: string[]
  37. }
  38. /**
  39. * Parse tool information from a ClineMessage text field.
  40. * Tool messages are JSON with a `tool` field containing the tool name.
  41. */
  42. function parseToolInfo(text: string | undefined): { name: string; input: Record<string, unknown> } | null {
  43. if (!text) return null
  44. try {
  45. const parsed = JSON.parse(text)
  46. return parsed.tool ? { name: parsed.tool, input: parsed } : null
  47. } catch {
  48. return null
  49. }
  50. }
  51. /**
  52. * Parse API request cost information from api_req_started message text.
  53. */
  54. function parseApiReqCost(text: string | undefined): JsonEventCost | undefined {
  55. if (!text) return undefined
  56. try {
  57. const parsed = JSON.parse(text)
  58. return parsed.cost !== undefined
  59. ? {
  60. totalCost: parsed.cost,
  61. inputTokens: parsed.tokensIn,
  62. outputTokens: parsed.tokensOut,
  63. cacheWrites: parsed.cacheWrites,
  64. cacheReads: parsed.cacheReads,
  65. }
  66. : undefined
  67. } catch {
  68. return undefined
  69. }
  70. }
  71. /** Internal events that should not be emitted */
  72. const SKIP_SAY_TYPES = new Set([
  73. "api_req_finished",
  74. "api_req_retried",
  75. "api_req_retry_delayed",
  76. "api_req_rate_limit_wait",
  77. "api_req_deleted",
  78. "checkpoint_saved",
  79. "condense_context",
  80. "condense_context_error",
  81. "sliding_window_truncation",
  82. ])
  83. /** Key offset for reasoning content to avoid collision with text content delta tracking */
  84. const REASONING_KEY_OFFSET = 1_000_000_000
  85. /** Grace period to wait for final say:command_output after status:exited */
  86. const COMMAND_OUTPUT_EXIT_GRACE_MS = 250
  87. export class JsonEventEmitter {
  88. private mode: "json" | "stream-json"
  89. private stdout: NodeJS.WriteStream
  90. private events: JsonEvent[] = []
  91. private unsubscribers: (() => void)[] = []
  92. private pendingWrites = new Set<Promise<void>>()
  93. private lastCost: JsonEventCost | undefined
  94. private requestIdProvider: () => string | undefined
  95. private schemaVersion: number
  96. private protocol: string
  97. private capabilities: string[]
  98. private seenMessageIds = new Set<number>()
  99. // Track previous content for delta computation
  100. private previousContent = new Map<number, string>()
  101. // Track previous tool-use content for structured (non-append-only) delta computation.
  102. private previousToolUseContent = new Map<number, string>()
  103. // Track the currently active execute_command tool_use id for command_output correlation.
  104. private activeCommandToolUseId: number | undefined
  105. // Track command output snapshots by command tool-use id for delta computation.
  106. private previousCommandOutputByToolUseId = new Map<number, string>()
  107. // Track command ids whose output is being streamed from commandExecutionStatus updates.
  108. private statusDrivenCommandOutputIds = new Set<number>()
  109. // Track command ids that already emitted a terminal command_output done event.
  110. private completedCommandOutputIds = new Set<number>()
  111. // Track exited commands awaiting final say:command_output completion.
  112. private pendingCommandCompletionByToolUseId = new Map<number, { exitCode?: number; timer: NodeJS.Timeout }>()
  113. // Track the completion result content
  114. private completionResultContent: string | undefined
  115. // Track the latest assistant text as a fallback for result.content.
  116. private lastAssistantText: string | undefined
  117. // The first non-partial "say:text" per task is the echoed user prompt.
  118. private expectPromptEchoAsUser = true
  119. constructor(options: JsonEventEmitterOptions) {
  120. this.mode = options.mode
  121. this.stdout = options.stdout ?? process.stdout
  122. this.requestIdProvider = options.requestIdProvider ?? (() => undefined)
  123. this.schemaVersion = options.schemaVersion ?? 1
  124. this.protocol = options.protocol ?? "roo-cli-stream"
  125. this.capabilities = options.capabilities ?? [
  126. "stdin:start",
  127. "stdin:message",
  128. "stdin:cancel",
  129. "stdin:ping",
  130. "stdin:shutdown",
  131. ]
  132. }
  133. /**
  134. * Attach to an ExtensionClient and subscribe to its events.
  135. */
  136. attachToClient(client: ExtensionClient): void {
  137. // Subscribe to message events
  138. const unsubMessage = client.on("message", (msg) => this.handleMessage(msg, false))
  139. const unsubMessageUpdated = client.on("messageUpdated", (msg) => this.handleMessage(msg, true))
  140. const unsubStateChange = client.on("stateChange", (event) => this.handleStateChange(event))
  141. const unsubTaskCompleted = client.on("taskCompleted", (event) => this.handleTaskCompleted(event))
  142. const unsubError = client.on("error", (error) => this.handleError(error))
  143. this.unsubscribers.push(unsubMessage, unsubMessageUpdated, unsubStateChange, unsubTaskCompleted, unsubError)
  144. // Emit init event
  145. this.emitEvent({
  146. type: "system",
  147. subtype: "init",
  148. content: "Task started",
  149. schemaVersion: this.schemaVersion,
  150. protocol: this.protocol,
  151. capabilities: this.capabilities,
  152. })
  153. }
  154. emitControl(event: {
  155. subtype: "ack" | "done" | "error"
  156. requestId?: string
  157. command?: JsonEvent["command"]
  158. taskId?: string
  159. content?: string
  160. success?: boolean
  161. code?: string
  162. }): void {
  163. this.emitEvent({
  164. type: "control",
  165. subtype: event.subtype,
  166. requestId: event.requestId,
  167. command: event.command,
  168. taskId: event.taskId,
  169. content: event.content,
  170. success: event.success,
  171. code: event.code,
  172. done: event.subtype === "done" ? true : undefined,
  173. })
  174. }
  175. emitQueue(event: {
  176. subtype: "snapshot" | "enqueued" | "dequeued" | "drained" | "updated"
  177. taskId?: string
  178. content?: string
  179. queueDepth: number
  180. queue: JsonEventQueueItem[]
  181. }): void {
  182. this.emitEvent({
  183. type: "queue",
  184. subtype: event.subtype,
  185. taskId: event.taskId,
  186. content: event.content,
  187. queueDepth: event.queueDepth,
  188. queue: event.queue,
  189. })
  190. }
  191. private handleStateChange(event: AgentStateChangeEvent): void {
  192. // Only treat the next say:text as a prompt echo when a new task starts.
  193. if (
  194. event.previousState.state === AgentLoopState.NO_TASK &&
  195. event.currentState.state !== AgentLoopState.NO_TASK
  196. ) {
  197. this.expectPromptEchoAsUser = true
  198. }
  199. }
  200. /**
  201. * Detach from the client and clean up subscriptions.
  202. */
  203. detach(): void {
  204. for (const unsub of this.unsubscribers) {
  205. unsub()
  206. }
  207. this.unsubscribers = []
  208. }
  209. /**
  210. * Compute the delta (new content) for a streaming message.
  211. * Returns null if there's no new content.
  212. */
  213. private computeDelta(msgId: number, fullContent: string | undefined): string | null {
  214. if (!fullContent) return null
  215. const previous = this.previousContent.get(msgId) || ""
  216. if (fullContent === previous) return null
  217. this.previousContent.set(msgId, fullContent)
  218. // If content is appended, return only the new part
  219. return fullContent.startsWith(previous) ? fullContent.slice(previous.length) : fullContent
  220. }
  221. /**
  222. * Compute a compact delta for structured strings (for tool_use snapshots).
  223. *
  224. * Unlike append-only text streams, tool-use payloads are often full snapshots
  225. * where edits happen before a stable suffix (e.g., inside JSON strings). This
  226. * extracts the inserted segment when possible; otherwise it falls back to the
  227. * full snapshot so consumers can recover.
  228. */
  229. private computeStructuredDelta(msgId: number, fullContent: string | undefined): string | null {
  230. if (!fullContent) {
  231. return null
  232. }
  233. const previous = this.previousToolUseContent.get(msgId) || ""
  234. if (fullContent === previous) {
  235. return null
  236. }
  237. this.previousToolUseContent.set(msgId, fullContent)
  238. if (previous.length === 0) {
  239. return fullContent
  240. }
  241. if (fullContent.startsWith(previous)) {
  242. return fullContent.slice(previous.length)
  243. }
  244. let prefix = 0
  245. while (prefix < previous.length && prefix < fullContent.length && previous[prefix] === fullContent[prefix]) {
  246. prefix++
  247. }
  248. let suffix = 0
  249. while (
  250. suffix < previous.length - prefix &&
  251. suffix < fullContent.length - prefix &&
  252. previous[previous.length - 1 - suffix] === fullContent[fullContent.length - 1 - suffix]
  253. ) {
  254. suffix++
  255. }
  256. const isPureInsertion = fullContent.length >= previous.length && prefix + suffix >= previous.length
  257. if (isPureInsertion) {
  258. return fullContent.slice(prefix, fullContent.length - suffix)
  259. }
  260. return fullContent
  261. }
  262. /**
  263. * Check if this is a streaming partial message with no new content.
  264. */
  265. private isEmptyStreamingDelta(content: string | null): boolean {
  266. return this.mode === "stream-json" && content === null
  267. }
  268. private computeCommandOutputDelta(commandId: number, fullOutput: string | undefined): string | null {
  269. const normalized = fullOutput ?? ""
  270. const previous = this.previousCommandOutputByToolUseId.get(commandId) || ""
  271. if (normalized === previous) {
  272. return null
  273. }
  274. this.previousCommandOutputByToolUseId.set(commandId, normalized)
  275. return normalized.startsWith(previous) ? normalized.slice(previous.length) : normalized
  276. }
  277. private emitCommandOutputEvent(
  278. commandId: number,
  279. fullOutput: string | undefined,
  280. isDone: boolean,
  281. exitCode?: number,
  282. ): void {
  283. if (this.mode === "stream-json") {
  284. const outputDelta = this.computeCommandOutputDelta(commandId, fullOutput)
  285. const event: JsonEvent = {
  286. type: "tool_result",
  287. id: commandId,
  288. subtype: "command",
  289. tool_result: { name: "execute_command" },
  290. }
  291. if (outputDelta !== null && outputDelta.length > 0) {
  292. event.tool_result = { name: "execute_command", output: outputDelta }
  293. }
  294. if (isDone && exitCode !== undefined) {
  295. event.tool_result = {
  296. ...(event.tool_result ?? { name: "execute_command" }),
  297. exitCode,
  298. }
  299. }
  300. if (isDone) {
  301. event.done = true
  302. this.clearPendingCommandCompletion(commandId)
  303. this.previousCommandOutputByToolUseId.delete(commandId)
  304. this.statusDrivenCommandOutputIds.delete(commandId)
  305. this.completedCommandOutputIds.add(commandId)
  306. if (this.activeCommandToolUseId === commandId) {
  307. this.activeCommandToolUseId = undefined
  308. }
  309. }
  310. // Suppress empty partial updates that carry no delta.
  311. if (!isDone && outputDelta === null) {
  312. return
  313. }
  314. this.emitEvent(event)
  315. return
  316. }
  317. this.emitEvent({
  318. type: "tool_result",
  319. id: commandId,
  320. subtype: "command",
  321. tool_result: {
  322. name: "execute_command",
  323. output: fullOutput,
  324. ...(isDone && exitCode !== undefined ? { exitCode } : {}),
  325. },
  326. ...(isDone ? { done: true } : {}),
  327. })
  328. if (isDone) {
  329. this.clearPendingCommandCompletion(commandId)
  330. this.previousCommandOutputByToolUseId.delete(commandId)
  331. this.statusDrivenCommandOutputIds.delete(commandId)
  332. this.completedCommandOutputIds.add(commandId)
  333. if (this.activeCommandToolUseId === commandId) {
  334. this.activeCommandToolUseId = undefined
  335. }
  336. }
  337. }
  338. public emitCommandOutputChunk(outputSnapshot: string): void {
  339. const commandId = this.activeCommandToolUseId
  340. if (commandId === undefined) {
  341. return
  342. }
  343. this.statusDrivenCommandOutputIds.add(commandId)
  344. this.emitCommandOutputEvent(commandId, outputSnapshot, false)
  345. }
  346. public markCommandOutputExited(exitCode?: number): void {
  347. const commandId = this.activeCommandToolUseId
  348. if (commandId === undefined) {
  349. return
  350. }
  351. this.statusDrivenCommandOutputIds.add(commandId)
  352. this.clearPendingCommandCompletion(commandId)
  353. const timer = setTimeout(() => {
  354. // Fallback close if final say:command_output never arrives.
  355. if (!this.pendingCommandCompletionByToolUseId.has(commandId)) {
  356. return
  357. }
  358. this.pendingCommandCompletionByToolUseId.delete(commandId)
  359. this.emitCommandOutputEvent(commandId, undefined, true, exitCode)
  360. }, COMMAND_OUTPUT_EXIT_GRACE_MS)
  361. timer.unref?.()
  362. this.pendingCommandCompletionByToolUseId.set(commandId, { exitCode, timer })
  363. }
  364. public emitCommandOutputDone(exitCode?: number): void {
  365. const commandId = this.activeCommandToolUseId
  366. if (commandId === undefined) {
  367. return
  368. }
  369. this.statusDrivenCommandOutputIds.add(commandId)
  370. this.emitCommandOutputEvent(commandId, undefined, true, exitCode)
  371. }
  372. private clearPendingCommandCompletion(commandId: number): void {
  373. const pending = this.pendingCommandCompletionByToolUseId.get(commandId)
  374. if (!pending) {
  375. return
  376. }
  377. clearTimeout(pending.timer)
  378. this.pendingCommandCompletionByToolUseId.delete(commandId)
  379. }
  380. /**
  381. * Get content to send for a message (delta for streaming, full for json mode).
  382. */
  383. private getContentToSend(msgId: number, text: string | undefined, isPartial: boolean): string | null {
  384. if (this.mode === "stream-json" && isPartial) {
  385. return this.computeDelta(msgId, text)
  386. }
  387. return text ?? null
  388. }
  389. /**
  390. * Build a base event with optional done flag.
  391. */
  392. private buildTextEvent(
  393. type: "assistant" | "thinking" | "user",
  394. id: number,
  395. content: string | null,
  396. isDone: boolean,
  397. subtype?: string,
  398. ): JsonEvent {
  399. const event: JsonEvent = { type, id }
  400. if (content !== null) {
  401. event.content = content
  402. }
  403. if (subtype) {
  404. event.subtype = subtype
  405. }
  406. if (isDone) {
  407. event.done = true
  408. }
  409. return event
  410. }
  411. /**
  412. * Handle a ClineMessage and emit the appropriate JSON event.
  413. */
  414. private handleMessage(msg: ClineMessage, _isUpdate: boolean): void {
  415. const isDone = !msg.partial
  416. // In json mode, only emit complete (non-partial) messages
  417. if (this.mode === "json" && msg.partial) {
  418. return
  419. }
  420. // Skip duplicate complete messages
  421. if (isDone && this.seenMessageIds.has(msg.ts)) {
  422. return
  423. }
  424. if (isDone) {
  425. this.seenMessageIds.add(msg.ts)
  426. this.previousContent.delete(msg.ts)
  427. this.previousToolUseContent.delete(msg.ts)
  428. }
  429. if (msg.type === "say" && msg.say) {
  430. const contentToSend = this.getContentToSend(msg.ts, msg.text, msg.partial ?? false)
  431. // Skip if no new content for streaming partial messages
  432. if (msg.partial && this.isEmptyStreamingDelta(contentToSend)) {
  433. return
  434. }
  435. this.handleSayMessage(msg, contentToSend, isDone)
  436. }
  437. if (msg.type === "ask" && msg.ask) {
  438. this.handleAskMessage(msg, isDone)
  439. }
  440. }
  441. /**
  442. * Handle "say" type messages.
  443. */
  444. private handleSayMessage(msg: ClineMessage, contentToSend: string | null, isDone: boolean): void {
  445. switch (msg.say) {
  446. case "text":
  447. if (this.expectPromptEchoAsUser) {
  448. this.emitEvent(this.buildTextEvent("user", msg.ts, contentToSend, isDone))
  449. if (isDone) {
  450. this.expectPromptEchoAsUser = false
  451. }
  452. } else {
  453. this.emitEvent(this.buildTextEvent("assistant", msg.ts, contentToSend, isDone))
  454. if (msg.text) {
  455. this.lastAssistantText = msg.text
  456. }
  457. }
  458. break
  459. case "reasoning":
  460. this.handleReasoningMessage(msg, isDone)
  461. break
  462. case "error":
  463. this.emitEvent({ type: "error", id: msg.ts, content: contentToSend ?? undefined })
  464. break
  465. case "command_output":
  466. this.handleCommandOutputMessage(msg, isDone)
  467. break
  468. case "user_feedback":
  469. case "user_feedback_diff":
  470. this.emitEvent(this.buildTextEvent("user", msg.ts, contentToSend, isDone))
  471. if (isDone) {
  472. this.expectPromptEchoAsUser = false
  473. }
  474. break
  475. case "api_req_started": {
  476. const cost = parseApiReqCost(msg.text)
  477. if (cost) {
  478. this.lastCost = cost
  479. }
  480. break
  481. }
  482. case "mcp_server_response":
  483. this.emitEvent({
  484. type: "tool_result",
  485. subtype: "mcp",
  486. tool_result: { name: "mcp_server", output: msg.text },
  487. })
  488. break
  489. case "completion_result":
  490. if (msg.text && !msg.partial) {
  491. this.completionResultContent = msg.text
  492. }
  493. break
  494. default:
  495. if (SKIP_SAY_TYPES.has(msg.say!)) {
  496. break
  497. }
  498. if (msg.text) {
  499. this.emitEvent(this.buildTextEvent("assistant", msg.ts, contentToSend, isDone, msg.say))
  500. }
  501. break
  502. }
  503. }
  504. /**
  505. * Handle reasoning/thinking messages with separate delta tracking.
  506. */
  507. private handleReasoningMessage(msg: ClineMessage, isDone: boolean): void {
  508. const reasoningContent = msg.reasoning || msg.text
  509. const reasoningKey = msg.ts + REASONING_KEY_OFFSET
  510. const reasoningDelta = this.getContentToSend(reasoningKey, reasoningContent, msg.partial ?? false)
  511. if (msg.partial && this.isEmptyStreamingDelta(reasoningDelta)) {
  512. return
  513. }
  514. if (!msg.partial) {
  515. this.previousContent.delete(reasoningKey)
  516. }
  517. this.emitEvent(this.buildTextEvent("thinking", msg.ts, reasoningDelta, isDone))
  518. }
  519. /**
  520. * Handle "ask" type messages.
  521. */
  522. private handleAskMessage(msg: ClineMessage, isDone: boolean): void {
  523. switch (msg.ask) {
  524. case "tool":
  525. this.handleToolUseAsk(msg, "tool", isDone)
  526. break
  527. case "command":
  528. this.handleToolUseAsk(msg, "command", isDone)
  529. break
  530. case "use_mcp_server":
  531. this.handleToolUseAsk(msg, "mcp", isDone)
  532. break
  533. case "followup": {
  534. const contentToSend = this.getContentToSend(msg.ts, msg.text, msg.partial ?? false)
  535. // Skip if no new content for streaming partial messages
  536. if (msg.partial && this.isEmptyStreamingDelta(contentToSend)) {
  537. return
  538. }
  539. this.emitEvent(this.buildTextEvent("assistant", msg.ts, contentToSend, isDone, "followup"))
  540. break
  541. }
  542. case "command_output":
  543. // Handled in say type
  544. break
  545. case "completion_result":
  546. if (msg.text && !msg.partial) {
  547. this.completionResultContent = msg.text
  548. }
  549. break
  550. default:
  551. if (msg.text) {
  552. const contentToSend = this.getContentToSend(msg.ts, msg.text, msg.partial ?? false)
  553. // Skip if no new content for streaming partial messages
  554. if (msg.partial && this.isEmptyStreamingDelta(contentToSend)) {
  555. return
  556. }
  557. this.emitEvent(this.buildTextEvent("assistant", msg.ts, contentToSend, isDone, msg.ask))
  558. }
  559. break
  560. }
  561. }
  562. private handleToolUseAsk(msg: ClineMessage, subtype: "tool" | "command" | "mcp", isDone: boolean): void {
  563. const isStreamingPartial = this.mode === "stream-json" && msg.partial === true
  564. const toolInfo = parseToolInfo(msg.text)
  565. if (subtype === "command") {
  566. if (this.activeCommandToolUseId !== undefined && this.activeCommandToolUseId !== msg.ts) {
  567. const previousCommandId = this.activeCommandToolUseId
  568. const pending = this.pendingCommandCompletionByToolUseId.get(previousCommandId)
  569. if (pending) {
  570. clearTimeout(pending.timer)
  571. this.pendingCommandCompletionByToolUseId.delete(previousCommandId)
  572. this.emitCommandOutputEvent(previousCommandId, undefined, true, pending.exitCode)
  573. }
  574. }
  575. this.activeCommandToolUseId = msg.ts
  576. this.completedCommandOutputIds.delete(msg.ts)
  577. this.clearPendingCommandCompletion(msg.ts)
  578. if (isStreamingPartial) {
  579. const commandDelta = this.computeStructuredDelta(msg.ts, msg.text)
  580. if (commandDelta === null) {
  581. return
  582. }
  583. this.emitEvent({
  584. type: "tool_use",
  585. id: msg.ts,
  586. subtype: "command",
  587. content: commandDelta,
  588. tool_use: { name: "execute_command", input: { command: commandDelta } },
  589. })
  590. return
  591. }
  592. this.emitEvent({
  593. type: "tool_use",
  594. id: msg.ts,
  595. subtype: "command",
  596. tool_use: { name: "execute_command", input: { command: msg.text } },
  597. ...(isDone ? { done: true } : {}),
  598. })
  599. return
  600. }
  601. if (subtype === "mcp") {
  602. if (isStreamingPartial) {
  603. const mcpDelta = this.computeStructuredDelta(msg.ts, msg.text)
  604. if (mcpDelta === null) {
  605. return
  606. }
  607. this.emitEvent({
  608. type: "tool_use",
  609. id: msg.ts,
  610. subtype: "mcp",
  611. content: mcpDelta,
  612. tool_use: { name: "mcp_server" },
  613. })
  614. return
  615. }
  616. this.emitEvent({
  617. type: "tool_use",
  618. id: msg.ts,
  619. subtype: "mcp",
  620. tool_use: { name: "mcp_server", input: { raw: msg.text } },
  621. ...(isDone ? { done: true } : {}),
  622. })
  623. return
  624. }
  625. if (isStreamingPartial) {
  626. const toolDelta = this.computeStructuredDelta(msg.ts, msg.text)
  627. if (toolDelta === null) {
  628. return
  629. }
  630. this.emitEvent({
  631. type: "tool_use",
  632. id: msg.ts,
  633. subtype: "tool",
  634. content: toolDelta,
  635. tool_use: { name: toolInfo?.name ?? "unknown_tool" },
  636. })
  637. return
  638. }
  639. this.emitEvent({
  640. type: "tool_use",
  641. id: msg.ts,
  642. subtype: "tool",
  643. tool_use: toolInfo ?? { name: "unknown_tool", input: { raw: msg.text } },
  644. ...(isDone ? { done: true } : {}),
  645. })
  646. }
  647. private handleCommandOutputMessage(msg: ClineMessage, isDone: boolean): void {
  648. const commandId = this.activeCommandToolUseId ?? msg.ts
  649. if (this.completedCommandOutputIds.has(commandId)) {
  650. return
  651. }
  652. const pending = this.pendingCommandCompletionByToolUseId.get(commandId)
  653. if (pending) {
  654. if (!isDone) {
  655. return
  656. }
  657. clearTimeout(pending.timer)
  658. this.pendingCommandCompletionByToolUseId.delete(commandId)
  659. this.emitCommandOutputEvent(commandId, msg.text, true, pending.exitCode)
  660. return
  661. }
  662. if (this.statusDrivenCommandOutputIds.has(commandId)) {
  663. return
  664. }
  665. this.emitCommandOutputEvent(commandId, msg.text, isDone)
  666. }
  667. /**
  668. * Handle task completion and emit result event.
  669. */
  670. private handleTaskCompleted(event: TaskCompletedEvent): void {
  671. // Prefer the completion payload from the current event. If it is empty,
  672. // fall back to the most recent tracked completion text, then assistant text.
  673. const resultContent = event.message?.text || this.completionResultContent || this.lastAssistantText
  674. this.emitEvent({
  675. type: "result",
  676. id: event.message?.ts ?? Date.now(),
  677. content: resultContent,
  678. done: true,
  679. success: event.success,
  680. cost: this.lastCost,
  681. })
  682. // Prevent stale completion content from leaking into later turns.
  683. this.completionResultContent = undefined
  684. this.lastAssistantText = undefined
  685. // For "json" mode, output the final accumulated result
  686. if (this.mode === "json") {
  687. this.outputFinalResult(event.success, resultContent)
  688. }
  689. }
  690. /**
  691. * Handle errors and emit error event.
  692. */
  693. private handleError(error: Error): void {
  694. this.emitEvent({
  695. type: "error",
  696. id: Date.now(),
  697. content: error.message,
  698. })
  699. }
  700. /**
  701. * Emit a JSON event.
  702. * For stream-json mode: immediately output to stdout
  703. * For json mode: accumulate for final output
  704. */
  705. private emitEvent(event: JsonEvent): void {
  706. const requestId = event.requestId ?? this.requestIdProvider()
  707. const payload = requestId ? { ...event, requestId } : event
  708. this.events.push(payload)
  709. if (this.mode === "stream-json") {
  710. this.outputLine(payload)
  711. }
  712. }
  713. /**
  714. * Output a single JSON line (NDJSON format).
  715. */
  716. private outputLine(data: unknown): void {
  717. this.writeToStdout(JSON.stringify(data) + "\n")
  718. }
  719. /**
  720. * Output the final accumulated result (for "json" mode).
  721. */
  722. private outputFinalResult(success: boolean, content?: string): void {
  723. const output: JsonFinalOutput = {
  724. type: "result",
  725. success,
  726. content,
  727. cost: this.lastCost,
  728. events: this.events.filter((e) => e.type !== "result"), // Exclude the result event itself
  729. }
  730. this.writeToStdout(JSON.stringify(output, null, 2) + "\n")
  731. }
  732. private writeToStdout(content: string): void {
  733. const writePromise = new Promise<void>((resolve, reject) => {
  734. this.stdout.write(content, (error?: Error | null) => {
  735. if (error) {
  736. reject(error)
  737. return
  738. }
  739. resolve()
  740. })
  741. })
  742. this.pendingWrites.add(writePromise)
  743. void writePromise.finally(() => {
  744. this.pendingWrites.delete(writePromise)
  745. })
  746. }
  747. async flush(): Promise<void> {
  748. while (this.pendingWrites.size > 0) {
  749. await Promise.all([...this.pendingWrites])
  750. }
  751. }
  752. /**
  753. * Get accumulated events (for testing or external use).
  754. */
  755. getEvents(): JsonEvent[] {
  756. return [...this.events]
  757. }
  758. /**
  759. * Clear accumulated events and state.
  760. */
  761. clear(): void {
  762. this.events = []
  763. this.lastCost = undefined
  764. this.seenMessageIds.clear()
  765. this.previousContent.clear()
  766. this.previousToolUseContent.clear()
  767. this.activeCommandToolUseId = undefined
  768. this.previousCommandOutputByToolUseId.clear()
  769. this.statusDrivenCommandOutputIds.clear()
  770. this.completedCommandOutputIds.clear()
  771. for (const pending of this.pendingCommandCompletionByToolUseId.values()) {
  772. clearTimeout(pending.timer)
  773. }
  774. this.pendingCommandCompletionByToolUseId.clear()
  775. this.completionResultContent = undefined
  776. this.lastAssistantText = undefined
  777. this.expectPromptEchoAsUser = true
  778. }
  779. }