stream-harness.ts 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. import path from "path"
  2. import { fileURLToPath } from "url"
  3. import readline from "readline"
  4. import { execa } from "execa"
  5. export type StreamEvent = {
  6. type?: string
  7. subtype?: string
  8. requestId?: string
  9. command?: string
  10. content?: string
  11. code?: string
  12. success?: boolean
  13. done?: boolean
  14. id?: number
  15. queueDepth?: number
  16. queue?: Array<{ id?: string; text?: string; imageCount?: number; timestamp?: number }>
  17. tool_use?: {
  18. name?: string
  19. input?: Record<string, unknown>
  20. }
  21. tool_result?: {
  22. name?: string
  23. output?: string
  24. }
  25. }
  26. export type StreamCommand = {
  27. command: "start" | "message" | "cancel" | "ping" | "shutdown"
  28. requestId: string
  29. prompt?: string
  30. }
  31. export interface StreamCaseContext {
  32. readonly cliRoot: string
  33. readonly timeoutMs: number
  34. nextRequestId(prefix: string): string
  35. sendCommand(command: StreamCommand): void
  36. }
  37. export interface RunStreamCaseOptions {
  38. timeoutMs?: number
  39. onEvent: (event: StreamEvent, context: StreamCaseContext) => void
  40. onTimeoutMessage?: (context: StreamCaseContext) => string
  41. }
  42. const __dirname = path.dirname(fileURLToPath(import.meta.url))
  43. const defaultCliRoot = path.resolve(__dirname, "../../..")
  44. function parseEvent(line: string): StreamEvent | null {
  45. const trimmed = line.trim()
  46. if (!trimmed.startsWith("{")) {
  47. return null
  48. }
  49. try {
  50. return JSON.parse(trimmed) as StreamEvent
  51. } catch {
  52. return null
  53. }
  54. }
  55. export async function runStreamCase(options: RunStreamCaseOptions): Promise<void> {
  56. const cliRoot = process.env.ROO_CLI_ROOT ? path.resolve(process.env.ROO_CLI_ROOT) : defaultCliRoot
  57. const timeoutMs = options.timeoutMs ?? 120_000
  58. const child = execa(
  59. "pnpm",
  60. ["dev", "--print", "--stdin-prompt-stream", "--provider", "roo", "--output-format", "stream-json"],
  61. {
  62. cwd: cliRoot,
  63. stdin: "pipe",
  64. stdout: "pipe",
  65. stderr: "pipe",
  66. reject: false,
  67. forceKillAfterDelay: 2_000,
  68. },
  69. )
  70. child.stderr?.on("data", (chunk) => {
  71. process.stderr.write(chunk)
  72. })
  73. let requestCounter = 0
  74. const context: StreamCaseContext = {
  75. cliRoot,
  76. timeoutMs,
  77. nextRequestId(prefix: string): string {
  78. requestCounter += 1
  79. return `${prefix}-${Date.now()}-${requestCounter}`
  80. },
  81. sendCommand(command: StreamCommand): void {
  82. if (child.stdin?.destroyed) {
  83. return
  84. }
  85. child.stdin.write(`${JSON.stringify(command)}\n`)
  86. },
  87. }
  88. let handlerError: Error | null = null
  89. let timedOut = false
  90. const timeout = setTimeout(() => {
  91. timedOut = true
  92. const message = options.onTimeoutMessage?.(context) ?? "timed out waiting for stream scenario completion"
  93. handlerError = new Error(message)
  94. child.kill("SIGTERM")
  95. }, timeoutMs)
  96. const rl = readline.createInterface({
  97. input: child.stdout!,
  98. crlfDelay: Infinity,
  99. })
  100. rl.on("line", (line) => {
  101. process.stdout.write(`${line}\n`)
  102. const event = parseEvent(line)
  103. if (!event) {
  104. return
  105. }
  106. try {
  107. options.onEvent(event, context)
  108. } catch (error) {
  109. handlerError = error instanceof Error ? error : new Error(String(error))
  110. child.kill("SIGTERM")
  111. }
  112. })
  113. const result = await child
  114. clearTimeout(timeout)
  115. rl.close()
  116. if (handlerError) {
  117. throw handlerError
  118. }
  119. if (timedOut) {
  120. throw new Error("stream scenario timed out")
  121. }
  122. if (result.exitCode !== 0) {
  123. throw new Error(`CLI exited with non-zero code: ${result.exitCode}`)
  124. }
  125. }