2
0

stream-harness.ts 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import path from "path"
  2. import { fileURLToPath } from "url"
  3. import readline from "readline"
  4. import { execa } from "execa"
  5. export type StreamEvent = {
  6. type?: string
  7. subtype?: string
  8. requestId?: string
  9. command?: string
  10. content?: string
  11. code?: string
  12. success?: boolean
  13. done?: boolean
  14. id?: number
  15. queueDepth?: number
  16. queue?: Array<{ id?: string; text?: string; imageCount?: number; timestamp?: number }>
  17. tool_use?: {
  18. name?: string
  19. input?: Record<string, unknown>
  20. }
  21. tool_result?: {
  22. name?: string
  23. output?: string
  24. }
  25. }
  26. export type StreamCommand = {
  27. command: "start" | "message" | "cancel" | "ping" | "shutdown"
  28. requestId: string
  29. prompt?: string
  30. images?: string[]
  31. }
  32. export interface StreamCaseContext {
  33. readonly cliRoot: string
  34. readonly timeoutMs: number
  35. nextRequestId(prefix: string): string
  36. sendCommand(command: StreamCommand): void
  37. }
  38. export interface RunStreamCaseOptions {
  39. timeoutMs?: number
  40. onEvent: (event: StreamEvent, context: StreamCaseContext) => void
  41. onTimeoutMessage?: (context: StreamCaseContext) => string
  42. }
  43. const __dirname = path.dirname(fileURLToPath(import.meta.url))
  44. const defaultCliRoot = path.resolve(__dirname, "../../..")
  45. function parseEvent(line: string): StreamEvent | null {
  46. const trimmed = line.trim()
  47. if (!trimmed.startsWith("{")) {
  48. return null
  49. }
  50. try {
  51. return JSON.parse(trimmed) as StreamEvent
  52. } catch {
  53. return null
  54. }
  55. }
  56. export async function runStreamCase(options: RunStreamCaseOptions): Promise<void> {
  57. const cliRoot = process.env.ROO_CLI_ROOT ? path.resolve(process.env.ROO_CLI_ROOT) : defaultCliRoot
  58. const timeoutMs = options.timeoutMs ?? 120_000
  59. const child = execa(
  60. "pnpm",
  61. ["dev", "--print", "--stdin-prompt-stream", "--provider", "roo", "--output-format", "stream-json"],
  62. {
  63. cwd: cliRoot,
  64. stdin: "pipe",
  65. stdout: "pipe",
  66. stderr: "pipe",
  67. reject: false,
  68. forceKillAfterDelay: 2_000,
  69. },
  70. )
  71. child.stderr?.on("data", (chunk) => {
  72. process.stderr.write(chunk)
  73. })
  74. let requestCounter = 0
  75. const context: StreamCaseContext = {
  76. cliRoot,
  77. timeoutMs,
  78. nextRequestId(prefix: string): string {
  79. requestCounter += 1
  80. return `${prefix}-${Date.now()}-${requestCounter}`
  81. },
  82. sendCommand(command: StreamCommand): void {
  83. if (child.stdin?.destroyed) {
  84. return
  85. }
  86. child.stdin.write(`${JSON.stringify(command)}\n`)
  87. },
  88. }
  89. let handlerError: Error | null = null
  90. let timedOut = false
  91. const timeout = setTimeout(() => {
  92. timedOut = true
  93. const message = options.onTimeoutMessage?.(context) ?? "timed out waiting for stream scenario completion"
  94. handlerError = new Error(message)
  95. child.kill("SIGTERM")
  96. }, timeoutMs)
  97. const rl = readline.createInterface({
  98. input: child.stdout!,
  99. crlfDelay: Infinity,
  100. })
  101. rl.on("line", (line) => {
  102. process.stdout.write(`${line}\n`)
  103. const event = parseEvent(line)
  104. if (!event) {
  105. return
  106. }
  107. try {
  108. options.onEvent(event, context)
  109. } catch (error) {
  110. handlerError = error instanceof Error ? error : new Error(String(error))
  111. child.kill("SIGTERM")
  112. }
  113. })
  114. const result = await child
  115. clearTimeout(timeout)
  116. rl.close()
  117. if (handlerError) {
  118. throw handlerError
  119. }
  120. if (timedOut) {
  121. throw new Error("stream scenario timed out")
  122. }
  123. if (result.exitCode !== 0) {
  124. throw new Error(`CLI exited with non-zero code: ${result.exitCode}`)
  125. }
  126. }