| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905 |
- /**
- * JsonEventEmitter - Handles structured JSON output for the CLI
- *
- * This class transforms internal CLI events (ClineMessage, state changes, etc.)
- * into structured JSON events and outputs them to stdout.
- *
- * Supports two output modes:
- * - "stream-json": NDJSON format (one JSON object per line) for real-time streaming
- * - "json": Single JSON object at the end with accumulated events
- *
- * Schema is optimized for efficiency with high message volume:
- * - Minimal fields per event
- * - No redundant wrappers
- * - `done` flag instead of partial:false
- */
- import type { ClineMessage } from "@roo-code/types"
- import type { JsonEvent, JsonEventCost, JsonEventQueueItem, JsonFinalOutput } from "@/types/json-events.js"
- import type { ExtensionClient } from "./extension-client.js"
- import type { AgentStateChangeEvent, TaskCompletedEvent } from "./events.js"
- import { AgentLoopState } from "./agent-state.js"
- /**
- * Options for JsonEventEmitter.
- */
- export interface JsonEventEmitterOptions {
- /** Output mode: "json" or "stream-json" */
- mode: "json" | "stream-json"
- /** Output stream (defaults to process.stdout) */
- stdout?: NodeJS.WriteStream
- /** Optional request id provider for correlating stream events */
- requestIdProvider?: () => string | undefined
- /** Transport schema version emitted in system:init */
- schemaVersion?: number
- /** Transport protocol identifier emitted in system:init */
- protocol?: string
- /** Supported stdin protocol capabilities emitted in system:init */
- capabilities?: string[]
- }
- /**
- * Parse tool information from a ClineMessage text field.
- * Tool messages are JSON with a `tool` field containing the tool name.
- */
- function parseToolInfo(text: string | undefined): { name: string; input: Record<string, unknown> } | null {
- if (!text) return null
- try {
- const parsed = JSON.parse(text)
- return parsed.tool ? { name: parsed.tool, input: parsed } : null
- } catch {
- return null
- }
- }
- /**
- * Parse API request cost information from api_req_started message text.
- */
- function parseApiReqCost(text: string | undefined): JsonEventCost | undefined {
- if (!text) return undefined
- try {
- const parsed = JSON.parse(text)
- return parsed.cost !== undefined
- ? {
- totalCost: parsed.cost,
- inputTokens: parsed.tokensIn,
- outputTokens: parsed.tokensOut,
- cacheWrites: parsed.cacheWrites,
- cacheReads: parsed.cacheReads,
- }
- : undefined
- } catch {
- return undefined
- }
- }
- /** Internal events that should not be emitted */
- const SKIP_SAY_TYPES = new Set([
- "api_req_finished",
- "api_req_retried",
- "api_req_retry_delayed",
- "api_req_rate_limit_wait",
- "api_req_deleted",
- "checkpoint_saved",
- "condense_context",
- "condense_context_error",
- "sliding_window_truncation",
- ])
- /** Key offset for reasoning content to avoid collision with text content delta tracking */
- const REASONING_KEY_OFFSET = 1_000_000_000
- /** Grace period to wait for final say:command_output after status:exited */
- const COMMAND_OUTPUT_EXIT_GRACE_MS = 250
- export class JsonEventEmitter {
- private mode: "json" | "stream-json"
- private stdout: NodeJS.WriteStream
- private events: JsonEvent[] = []
- private unsubscribers: (() => void)[] = []
- private pendingWrites = new Set<Promise<void>>()
- private lastCost: JsonEventCost | undefined
- private requestIdProvider: () => string | undefined
- private schemaVersion: number
- private protocol: string
- private capabilities: string[]
- private seenMessageIds = new Set<number>()
- // Track previous content for delta computation
- private previousContent = new Map<number, string>()
- // Track previous tool-use content for structured (non-append-only) delta computation.
- private previousToolUseContent = new Map<number, string>()
- // Track the currently active execute_command tool_use id for command_output correlation.
- private activeCommandToolUseId: number | undefined
- // Track command output snapshots by command tool-use id for delta computation.
- private previousCommandOutputByToolUseId = new Map<number, string>()
- // Track command ids whose output is being streamed from commandExecutionStatus updates.
- private statusDrivenCommandOutputIds = new Set<number>()
- // Track command ids that already emitted a terminal command_output done event.
- private completedCommandOutputIds = new Set<number>()
- // Track exited commands awaiting final say:command_output completion.
- private pendingCommandCompletionByToolUseId = new Map<number, { exitCode?: number; timer: NodeJS.Timeout }>()
- // Track the completion result content
- private completionResultContent: string | undefined
- // Track the latest assistant text as a fallback for result.content.
- private lastAssistantText: string | undefined
- // The first non-partial "say:text" per task is the echoed user prompt.
- private expectPromptEchoAsUser = true
- constructor(options: JsonEventEmitterOptions) {
- this.mode = options.mode
- this.stdout = options.stdout ?? process.stdout
- this.requestIdProvider = options.requestIdProvider ?? (() => undefined)
- this.schemaVersion = options.schemaVersion ?? 1
- this.protocol = options.protocol ?? "roo-cli-stream"
- this.capabilities = options.capabilities ?? [
- "stdin:start",
- "stdin:message",
- "stdin:cancel",
- "stdin:ping",
- "stdin:shutdown",
- ]
- }
- /**
- * Attach to an ExtensionClient and subscribe to its events.
- */
- attachToClient(client: ExtensionClient): void {
- // Subscribe to message events
- const unsubMessage = client.on("message", (msg) => this.handleMessage(msg, false))
- const unsubMessageUpdated = client.on("messageUpdated", (msg) => this.handleMessage(msg, true))
- const unsubStateChange = client.on("stateChange", (event) => this.handleStateChange(event))
- const unsubTaskCompleted = client.on("taskCompleted", (event) => this.handleTaskCompleted(event))
- const unsubError = client.on("error", (error) => this.handleError(error))
- this.unsubscribers.push(unsubMessage, unsubMessageUpdated, unsubStateChange, unsubTaskCompleted, unsubError)
- // Emit init event
- this.emitEvent({
- type: "system",
- subtype: "init",
- content: "Task started",
- schemaVersion: this.schemaVersion,
- protocol: this.protocol,
- capabilities: this.capabilities,
- })
- }
- emitControl(event: {
- subtype: "ack" | "done" | "error"
- requestId?: string
- command?: JsonEvent["command"]
- taskId?: string
- content?: string
- success?: boolean
- code?: string
- }): void {
- this.emitEvent({
- type: "control",
- subtype: event.subtype,
- requestId: event.requestId,
- command: event.command,
- taskId: event.taskId,
- content: event.content,
- success: event.success,
- code: event.code,
- done: event.subtype === "done" ? true : undefined,
- })
- }
- emitQueue(event: {
- subtype: "snapshot" | "enqueued" | "dequeued" | "drained" | "updated"
- taskId?: string
- content?: string
- queueDepth: number
- queue: JsonEventQueueItem[]
- }): void {
- this.emitEvent({
- type: "queue",
- subtype: event.subtype,
- taskId: event.taskId,
- content: event.content,
- queueDepth: event.queueDepth,
- queue: event.queue,
- })
- }
- private handleStateChange(event: AgentStateChangeEvent): void {
- // Only treat the next say:text as a prompt echo when a new task starts.
- if (
- event.previousState.state === AgentLoopState.NO_TASK &&
- event.currentState.state !== AgentLoopState.NO_TASK
- ) {
- this.expectPromptEchoAsUser = true
- }
- }
- /**
- * Detach from the client and clean up subscriptions.
- */
- detach(): void {
- for (const unsub of this.unsubscribers) {
- unsub()
- }
- this.unsubscribers = []
- }
- /**
- * Compute the delta (new content) for a streaming message.
- * Returns null if there's no new content.
- */
- private computeDelta(msgId: number, fullContent: string | undefined): string | null {
- if (!fullContent) return null
- const previous = this.previousContent.get(msgId) || ""
- if (fullContent === previous) return null
- this.previousContent.set(msgId, fullContent)
- // If content is appended, return only the new part
- return fullContent.startsWith(previous) ? fullContent.slice(previous.length) : fullContent
- }
- /**
- * Compute a compact delta for structured strings (for tool_use snapshots).
- *
- * Unlike append-only text streams, tool-use payloads are often full snapshots
- * where edits happen before a stable suffix (e.g., inside JSON strings). This
- * extracts the inserted segment when possible; otherwise it falls back to the
- * full snapshot so consumers can recover.
- */
- private computeStructuredDelta(msgId: number, fullContent: string | undefined): string | null {
- if (!fullContent) {
- return null
- }
- const previous = this.previousToolUseContent.get(msgId) || ""
- if (fullContent === previous) {
- return null
- }
- this.previousToolUseContent.set(msgId, fullContent)
- if (previous.length === 0) {
- return fullContent
- }
- if (fullContent.startsWith(previous)) {
- return fullContent.slice(previous.length)
- }
- let prefix = 0
- while (prefix < previous.length && prefix < fullContent.length && previous[prefix] === fullContent[prefix]) {
- prefix++
- }
- let suffix = 0
- while (
- suffix < previous.length - prefix &&
- suffix < fullContent.length - prefix &&
- previous[previous.length - 1 - suffix] === fullContent[fullContent.length - 1 - suffix]
- ) {
- suffix++
- }
- const isPureInsertion = fullContent.length >= previous.length && prefix + suffix >= previous.length
- if (isPureInsertion) {
- return fullContent.slice(prefix, fullContent.length - suffix)
- }
- return fullContent
- }
- /**
- * Check if this is a streaming partial message with no new content.
- */
- private isEmptyStreamingDelta(content: string | null): boolean {
- return this.mode === "stream-json" && content === null
- }
- private computeCommandOutputDelta(commandId: number, fullOutput: string | undefined): string | null {
- const normalized = fullOutput ?? ""
- const previous = this.previousCommandOutputByToolUseId.get(commandId) || ""
- if (normalized === previous) {
- return null
- }
- this.previousCommandOutputByToolUseId.set(commandId, normalized)
- return normalized.startsWith(previous) ? normalized.slice(previous.length) : normalized
- }
- private emitCommandOutputEvent(
- commandId: number,
- fullOutput: string | undefined,
- isDone: boolean,
- exitCode?: number,
- ): void {
- if (this.mode === "stream-json") {
- const outputDelta = this.computeCommandOutputDelta(commandId, fullOutput)
- const event: JsonEvent = {
- type: "tool_result",
- id: commandId,
- subtype: "command",
- tool_result: { name: "execute_command" },
- }
- if (outputDelta !== null && outputDelta.length > 0) {
- event.tool_result = { name: "execute_command", output: outputDelta }
- }
- if (isDone && exitCode !== undefined) {
- event.tool_result = {
- ...(event.tool_result ?? { name: "execute_command" }),
- exitCode,
- }
- }
- if (isDone) {
- event.done = true
- this.clearPendingCommandCompletion(commandId)
- this.previousCommandOutputByToolUseId.delete(commandId)
- this.statusDrivenCommandOutputIds.delete(commandId)
- this.completedCommandOutputIds.add(commandId)
- if (this.activeCommandToolUseId === commandId) {
- this.activeCommandToolUseId = undefined
- }
- }
- // Suppress empty partial updates that carry no delta.
- if (!isDone && outputDelta === null) {
- return
- }
- this.emitEvent(event)
- return
- }
- this.emitEvent({
- type: "tool_result",
- id: commandId,
- subtype: "command",
- tool_result: {
- name: "execute_command",
- output: fullOutput,
- ...(isDone && exitCode !== undefined ? { exitCode } : {}),
- },
- ...(isDone ? { done: true } : {}),
- })
- if (isDone) {
- this.clearPendingCommandCompletion(commandId)
- this.previousCommandOutputByToolUseId.delete(commandId)
- this.statusDrivenCommandOutputIds.delete(commandId)
- this.completedCommandOutputIds.add(commandId)
- if (this.activeCommandToolUseId === commandId) {
- this.activeCommandToolUseId = undefined
- }
- }
- }
- public emitCommandOutputChunk(outputSnapshot: string): void {
- const commandId = this.activeCommandToolUseId
- if (commandId === undefined) {
- return
- }
- this.statusDrivenCommandOutputIds.add(commandId)
- this.emitCommandOutputEvent(commandId, outputSnapshot, false)
- }
- public markCommandOutputExited(exitCode?: number): void {
- const commandId = this.activeCommandToolUseId
- if (commandId === undefined) {
- return
- }
- this.statusDrivenCommandOutputIds.add(commandId)
- this.clearPendingCommandCompletion(commandId)
- const timer = setTimeout(() => {
- // Fallback close if final say:command_output never arrives.
- if (!this.pendingCommandCompletionByToolUseId.has(commandId)) {
- return
- }
- this.pendingCommandCompletionByToolUseId.delete(commandId)
- this.emitCommandOutputEvent(commandId, undefined, true, exitCode)
- }, COMMAND_OUTPUT_EXIT_GRACE_MS)
- timer.unref?.()
- this.pendingCommandCompletionByToolUseId.set(commandId, { exitCode, timer })
- }
- public emitCommandOutputDone(exitCode?: number): void {
- const commandId = this.activeCommandToolUseId
- if (commandId === undefined) {
- return
- }
- this.statusDrivenCommandOutputIds.add(commandId)
- this.emitCommandOutputEvent(commandId, undefined, true, exitCode)
- }
- private clearPendingCommandCompletion(commandId: number): void {
- const pending = this.pendingCommandCompletionByToolUseId.get(commandId)
- if (!pending) {
- return
- }
- clearTimeout(pending.timer)
- this.pendingCommandCompletionByToolUseId.delete(commandId)
- }
- /**
- * Get content to send for a message (delta for streaming, full for json mode).
- */
- private getContentToSend(msgId: number, text: string | undefined, isPartial: boolean): string | null {
- if (this.mode === "stream-json" && isPartial) {
- return this.computeDelta(msgId, text)
- }
- return text ?? null
- }
- /**
- * Build a base event with optional done flag.
- */
- private buildTextEvent(
- type: "assistant" | "thinking" | "user",
- id: number,
- content: string | null,
- isDone: boolean,
- subtype?: string,
- ): JsonEvent {
- const event: JsonEvent = { type, id }
- if (content !== null) {
- event.content = content
- }
- if (subtype) {
- event.subtype = subtype
- }
- if (isDone) {
- event.done = true
- }
- return event
- }
- /**
- * Handle a ClineMessage and emit the appropriate JSON event.
- */
- private handleMessage(msg: ClineMessage, _isUpdate: boolean): void {
- const isDone = !msg.partial
- // In json mode, only emit complete (non-partial) messages
- if (this.mode === "json" && msg.partial) {
- return
- }
- // Skip duplicate complete messages
- if (isDone && this.seenMessageIds.has(msg.ts)) {
- return
- }
- if (isDone) {
- this.seenMessageIds.add(msg.ts)
- this.previousContent.delete(msg.ts)
- this.previousToolUseContent.delete(msg.ts)
- }
- if (msg.type === "say" && msg.say) {
- const contentToSend = this.getContentToSend(msg.ts, msg.text, msg.partial ?? false)
- // Skip if no new content for streaming partial messages
- if (msg.partial && this.isEmptyStreamingDelta(contentToSend)) {
- return
- }
- this.handleSayMessage(msg, contentToSend, isDone)
- }
- if (msg.type === "ask" && msg.ask) {
- this.handleAskMessage(msg, isDone)
- }
- }
- /**
- * Handle "say" type messages.
- */
- private handleSayMessage(msg: ClineMessage, contentToSend: string | null, isDone: boolean): void {
- switch (msg.say) {
- case "text":
- if (this.expectPromptEchoAsUser) {
- this.emitEvent(this.buildTextEvent("user", msg.ts, contentToSend, isDone))
- if (isDone) {
- this.expectPromptEchoAsUser = false
- }
- } else {
- this.emitEvent(this.buildTextEvent("assistant", msg.ts, contentToSend, isDone))
- if (msg.text) {
- this.lastAssistantText = msg.text
- }
- }
- break
- case "reasoning":
- this.handleReasoningMessage(msg, isDone)
- break
- case "error":
- this.emitEvent({ type: "error", id: msg.ts, content: contentToSend ?? undefined })
- break
- case "command_output":
- this.handleCommandOutputMessage(msg, isDone)
- break
- case "user_feedback":
- case "user_feedback_diff":
- this.emitEvent(this.buildTextEvent("user", msg.ts, contentToSend, isDone))
- if (isDone) {
- this.expectPromptEchoAsUser = false
- }
- break
- case "api_req_started": {
- const cost = parseApiReqCost(msg.text)
- if (cost) {
- this.lastCost = cost
- }
- break
- }
- case "mcp_server_response":
- this.emitEvent({
- type: "tool_result",
- subtype: "mcp",
- tool_result: { name: "mcp_server", output: msg.text },
- })
- break
- case "completion_result":
- if (msg.text && !msg.partial) {
- this.completionResultContent = msg.text
- }
- break
- default:
- if (SKIP_SAY_TYPES.has(msg.say!)) {
- break
- }
- if (msg.text) {
- this.emitEvent(this.buildTextEvent("assistant", msg.ts, contentToSend, isDone, msg.say))
- }
- break
- }
- }
- /**
- * Handle reasoning/thinking messages with separate delta tracking.
- */
- private handleReasoningMessage(msg: ClineMessage, isDone: boolean): void {
- const reasoningContent = msg.reasoning || msg.text
- const reasoningKey = msg.ts + REASONING_KEY_OFFSET
- const reasoningDelta = this.getContentToSend(reasoningKey, reasoningContent, msg.partial ?? false)
- if (msg.partial && this.isEmptyStreamingDelta(reasoningDelta)) {
- return
- }
- if (!msg.partial) {
- this.previousContent.delete(reasoningKey)
- }
- this.emitEvent(this.buildTextEvent("thinking", msg.ts, reasoningDelta, isDone))
- }
- /**
- * Handle "ask" type messages.
- */
- private handleAskMessage(msg: ClineMessage, isDone: boolean): void {
- switch (msg.ask) {
- case "tool":
- this.handleToolUseAsk(msg, "tool", isDone)
- break
- case "command":
- this.handleToolUseAsk(msg, "command", isDone)
- break
- case "use_mcp_server":
- this.handleToolUseAsk(msg, "mcp", isDone)
- break
- case "followup": {
- const contentToSend = this.getContentToSend(msg.ts, msg.text, msg.partial ?? false)
- // Skip if no new content for streaming partial messages
- if (msg.partial && this.isEmptyStreamingDelta(contentToSend)) {
- return
- }
- this.emitEvent(this.buildTextEvent("assistant", msg.ts, contentToSend, isDone, "followup"))
- break
- }
- case "command_output":
- // Handled in say type
- break
- case "completion_result":
- if (msg.text && !msg.partial) {
- this.completionResultContent = msg.text
- }
- break
- default:
- if (msg.text) {
- const contentToSend = this.getContentToSend(msg.ts, msg.text, msg.partial ?? false)
- // Skip if no new content for streaming partial messages
- if (msg.partial && this.isEmptyStreamingDelta(contentToSend)) {
- return
- }
- this.emitEvent(this.buildTextEvent("assistant", msg.ts, contentToSend, isDone, msg.ask))
- }
- break
- }
- }
- private handleToolUseAsk(msg: ClineMessage, subtype: "tool" | "command" | "mcp", isDone: boolean): void {
- const isStreamingPartial = this.mode === "stream-json" && msg.partial === true
- const toolInfo = parseToolInfo(msg.text)
- if (subtype === "command") {
- if (this.activeCommandToolUseId !== undefined && this.activeCommandToolUseId !== msg.ts) {
- const previousCommandId = this.activeCommandToolUseId
- const pending = this.pendingCommandCompletionByToolUseId.get(previousCommandId)
- if (pending) {
- clearTimeout(pending.timer)
- this.pendingCommandCompletionByToolUseId.delete(previousCommandId)
- this.emitCommandOutputEvent(previousCommandId, undefined, true, pending.exitCode)
- }
- }
- this.activeCommandToolUseId = msg.ts
- this.completedCommandOutputIds.delete(msg.ts)
- this.clearPendingCommandCompletion(msg.ts)
- if (isStreamingPartial) {
- const commandDelta = this.computeStructuredDelta(msg.ts, msg.text)
- if (commandDelta === null) {
- return
- }
- this.emitEvent({
- type: "tool_use",
- id: msg.ts,
- subtype: "command",
- content: commandDelta,
- tool_use: { name: "execute_command", input: { command: commandDelta } },
- })
- return
- }
- this.emitEvent({
- type: "tool_use",
- id: msg.ts,
- subtype: "command",
- tool_use: { name: "execute_command", input: { command: msg.text } },
- ...(isDone ? { done: true } : {}),
- })
- return
- }
- if (subtype === "mcp") {
- if (isStreamingPartial) {
- const mcpDelta = this.computeStructuredDelta(msg.ts, msg.text)
- if (mcpDelta === null) {
- return
- }
- this.emitEvent({
- type: "tool_use",
- id: msg.ts,
- subtype: "mcp",
- content: mcpDelta,
- tool_use: { name: "mcp_server" },
- })
- return
- }
- this.emitEvent({
- type: "tool_use",
- id: msg.ts,
- subtype: "mcp",
- tool_use: { name: "mcp_server", input: { raw: msg.text } },
- ...(isDone ? { done: true } : {}),
- })
- return
- }
- if (isStreamingPartial) {
- const toolDelta = this.computeStructuredDelta(msg.ts, msg.text)
- if (toolDelta === null) {
- return
- }
- this.emitEvent({
- type: "tool_use",
- id: msg.ts,
- subtype: "tool",
- content: toolDelta,
- tool_use: { name: toolInfo?.name ?? "unknown_tool" },
- })
- return
- }
- this.emitEvent({
- type: "tool_use",
- id: msg.ts,
- subtype: "tool",
- tool_use: toolInfo ?? { name: "unknown_tool", input: { raw: msg.text } },
- ...(isDone ? { done: true } : {}),
- })
- }
- private handleCommandOutputMessage(msg: ClineMessage, isDone: boolean): void {
- const commandId = this.activeCommandToolUseId ?? msg.ts
- if (this.completedCommandOutputIds.has(commandId)) {
- return
- }
- const pending = this.pendingCommandCompletionByToolUseId.get(commandId)
- if (pending) {
- if (!isDone) {
- return
- }
- clearTimeout(pending.timer)
- this.pendingCommandCompletionByToolUseId.delete(commandId)
- this.emitCommandOutputEvent(commandId, msg.text, true, pending.exitCode)
- return
- }
- if (this.statusDrivenCommandOutputIds.has(commandId)) {
- return
- }
- this.emitCommandOutputEvent(commandId, msg.text, isDone)
- }
- /**
- * Handle task completion and emit result event.
- */
- private handleTaskCompleted(event: TaskCompletedEvent): void {
- // Prefer the completion payload from the current event. If it is empty,
- // fall back to the most recent tracked completion text, then assistant text.
- const resultContent = event.message?.text || this.completionResultContent || this.lastAssistantText
- this.emitEvent({
- type: "result",
- id: event.message?.ts ?? Date.now(),
- content: resultContent,
- done: true,
- success: event.success,
- cost: this.lastCost,
- })
- // Prevent stale completion content from leaking into later turns.
- this.completionResultContent = undefined
- this.lastAssistantText = undefined
- // For "json" mode, output the final accumulated result
- if (this.mode === "json") {
- this.outputFinalResult(event.success, resultContent)
- }
- }
- /**
- * Handle errors and emit error event.
- */
- private handleError(error: Error): void {
- this.emitEvent({
- type: "error",
- id: Date.now(),
- content: error.message,
- })
- }
- /**
- * Emit a JSON event.
- * For stream-json mode: immediately output to stdout
- * For json mode: accumulate for final output
- */
- private emitEvent(event: JsonEvent): void {
- const requestId = event.requestId ?? this.requestIdProvider()
- const payload = requestId ? { ...event, requestId } : event
- this.events.push(payload)
- if (this.mode === "stream-json") {
- this.outputLine(payload)
- }
- }
- /**
- * Output a single JSON line (NDJSON format).
- */
- private outputLine(data: unknown): void {
- this.writeToStdout(JSON.stringify(data) + "\n")
- }
- /**
- * Output the final accumulated result (for "json" mode).
- */
- private outputFinalResult(success: boolean, content?: string): void {
- const output: JsonFinalOutput = {
- type: "result",
- success,
- content,
- cost: this.lastCost,
- events: this.events.filter((e) => e.type !== "result"), // Exclude the result event itself
- }
- this.writeToStdout(JSON.stringify(output, null, 2) + "\n")
- }
- private writeToStdout(content: string): void {
- const writePromise = new Promise<void>((resolve, reject) => {
- this.stdout.write(content, (error?: Error | null) => {
- if (error) {
- reject(error)
- return
- }
- resolve()
- })
- })
- this.pendingWrites.add(writePromise)
- void writePromise.finally(() => {
- this.pendingWrites.delete(writePromise)
- })
- }
- async flush(): Promise<void> {
- while (this.pendingWrites.size > 0) {
- await Promise.all([...this.pendingWrites])
- }
- }
- /**
- * Get accumulated events (for testing or external use).
- */
- getEvents(): JsonEvent[] {
- return [...this.events]
- }
- /**
- * Clear accumulated events and state.
- */
- clear(): void {
- this.events = []
- this.lastCost = undefined
- this.seenMessageIds.clear()
- this.previousContent.clear()
- this.previousToolUseContent.clear()
- this.activeCommandToolUseId = undefined
- this.previousCommandOutputByToolUseId.clear()
- this.statusDrivenCommandOutputIds.clear()
- this.completedCommandOutputIds.clear()
- for (const pending of this.pendingCommandCompletionByToolUseId.values()) {
- clearTimeout(pending.timer)
- }
- this.pendingCommandCompletionByToolUseId.clear()
- this.completionResultContent = undefined
- this.lastAssistantText = undefined
- this.expectPromptEchoAsUser = true
- }
- }
|