create-with-session-id-resume-loads-correct-session.ts 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. import fs from "fs/promises"
  2. import os from "os"
  3. import path from "path"
  4. import readline from "readline"
  5. import { fileURLToPath } from "url"
  6. import { randomUUID } from "crypto"
  7. import { execa } from "execa"
  8. import type { TaskSessionEntry } from "@roo-code/core/cli"
  9. type StreamEvent = {
  10. type?: string
  11. subtype?: string
  12. requestId?: string
  13. command?: string
  14. taskId?: string
  15. content?: string
  16. code?: string
  17. success?: boolean
  18. done?: boolean
  19. }
  20. const RESUME_TIMEOUT_MS = 180_000
  21. const __dirname = path.dirname(fileURLToPath(import.meta.url))
  22. function parseStreamEvent(line: string): StreamEvent | null {
  23. const trimmed = line.trim()
  24. if (!trimmed.startsWith("{")) {
  25. return null
  26. }
  27. try {
  28. return JSON.parse(trimmed) as StreamEvent
  29. } catch {
  30. return null
  31. }
  32. }
  33. async function listSessions(cliRoot: string, workspacePath: string): Promise<TaskSessionEntry[]> {
  34. const result = await execa("pnpm", ["dev", "list", "sessions", "--workspace", workspacePath, "--format", "json"], {
  35. cwd: cliRoot,
  36. reject: false,
  37. })
  38. if (result.exitCode !== 0) {
  39. throw new Error(`list sessions failed with exit code ${result.exitCode}: ${result.stderr || result.stdout}`)
  40. }
  41. const stdoutLines = result.stdout.split("\n")
  42. const jsonStartIndex = stdoutLines.findIndex((line) => line.trim().startsWith("{"))
  43. if (jsonStartIndex === -1) {
  44. throw new Error(`list sessions output did not contain JSON payload: ${result.stdout}`)
  45. }
  46. const jsonPayload = stdoutLines.slice(jsonStartIndex).join("\n").trim()
  47. let parsed: unknown
  48. try {
  49. parsed = JSON.parse(jsonPayload)
  50. } catch (error) {
  51. throw new Error(
  52. `failed to parse list sessions output as JSON: ${error instanceof Error ? error.message : String(error)}`,
  53. )
  54. }
  55. if (
  56. typeof parsed !== "object" ||
  57. parsed === null ||
  58. !("sessions" in parsed) ||
  59. !Array.isArray((parsed as { sessions?: unknown }).sessions)
  60. ) {
  61. throw new Error("list sessions output missing sessions array")
  62. }
  63. return (parsed as { sessions: TaskSessionEntry[] }).sessions
  64. }
  65. async function createSessionWithCustomId(
  66. cliRoot: string,
  67. workspacePath: string,
  68. sessionId: string,
  69. prompt: string,
  70. ): Promise<void> {
  71. const result = await execa(
  72. "pnpm",
  73. [
  74. "dev",
  75. "--print",
  76. "--provider",
  77. "roo",
  78. "--output-format",
  79. "stream-json",
  80. "--workspace",
  81. workspacePath,
  82. "--create-with-session-id",
  83. sessionId,
  84. prompt,
  85. ],
  86. {
  87. cwd: cliRoot,
  88. reject: false,
  89. },
  90. )
  91. if (result.exitCode !== 0) {
  92. throw new Error(
  93. `create-with-session-id failed for ${sessionId} with exit code ${result.exitCode}: ${result.stderr || result.stdout}`,
  94. )
  95. }
  96. const lines = result.stdout.split("\n")
  97. const events = lines.map(parseStreamEvent).filter((event): event is StreamEvent => Boolean(event))
  98. const errorEvent = events.find((event) => event.type === "error")
  99. if (errorEvent) {
  100. throw new Error(
  101. `create-with-session-id emitted error for ${sessionId}: code=${errorEvent.code ?? "none"} content=${errorEvent.content ?? ""}`,
  102. )
  103. }
  104. const completion = events.find((event) => event.type === "result" && event.done === true)
  105. if (!completion) {
  106. throw new Error(`create-with-session-id did not emit final result for ${sessionId}`)
  107. }
  108. if (completion.success !== true) {
  109. throw new Error(`create-with-session-id completed unsuccessfully for ${sessionId}`)
  110. }
  111. }
  112. async function resumeSessionAndSendMarker(
  113. cliRoot: string,
  114. workspacePath: string,
  115. sessionId: string,
  116. messageToken: string,
  117. ): Promise<void> {
  118. const pingRequestId = `ping-${Date.now()}`
  119. const messageRequestId = `message-${Date.now()}`
  120. const shutdownRequestId = `shutdown-${Date.now()}`
  121. const messagePrompt = `Resume marker token: ${messageToken}. Reply with exactly "ack-${messageToken}".`
  122. const child = execa(
  123. "pnpm",
  124. [
  125. "dev",
  126. "--print",
  127. "--stdin-prompt-stream",
  128. "--provider",
  129. "roo",
  130. "--output-format",
  131. "stream-json",
  132. "--workspace",
  133. workspacePath,
  134. "--session-id",
  135. sessionId,
  136. ],
  137. {
  138. cwd: cliRoot,
  139. stdin: "pipe",
  140. stdout: "pipe",
  141. stderr: "pipe",
  142. reject: false,
  143. forceKillAfterDelay: 2_000,
  144. },
  145. )
  146. child.stderr?.on("data", (chunk) => {
  147. process.stderr.write(chunk)
  148. })
  149. let pingSent = false
  150. let messageSent = false
  151. let shutdownSent = false
  152. let sawMessageControlDone = false
  153. let sawUserTurnWithMarker = false
  154. let shutdownTaskId: string | undefined
  155. let handlerError: Error | null = null
  156. let timedOut = false
  157. const sendCommand = (command: { command: "ping" | "message" | "shutdown"; requestId: string; prompt?: string }) => {
  158. if (!child.stdin || child.stdin.destroyed) {
  159. return
  160. }
  161. child.stdin.write(`${JSON.stringify(command)}\n`)
  162. }
  163. const timeout = setTimeout(() => {
  164. timedOut = true
  165. handlerError = new Error(
  166. `timed out resuming session ${sessionId} (pingSent=${pingSent}, messageSent=${messageSent}, sawMessageControlDone=${sawMessageControlDone}, sawUserTurnWithMarker=${sawUserTurnWithMarker})`,
  167. )
  168. child.kill("SIGTERM")
  169. }, RESUME_TIMEOUT_MS)
  170. const rl = readline.createInterface({
  171. input: child.stdout!,
  172. crlfDelay: Infinity,
  173. })
  174. rl.on("line", (line) => {
  175. process.stdout.write(`${line}\n`)
  176. const event = parseStreamEvent(line)
  177. if (!event) {
  178. return
  179. }
  180. if (event.type === "system" && event.subtype === "init" && !pingSent) {
  181. pingSent = true
  182. sendCommand({ command: "ping", requestId: pingRequestId })
  183. return
  184. }
  185. if (
  186. event.type === "control" &&
  187. event.subtype === "done" &&
  188. event.command === "ping" &&
  189. event.requestId === pingRequestId &&
  190. !messageSent
  191. ) {
  192. messageSent = true
  193. sendCommand({
  194. command: "message",
  195. requestId: messageRequestId,
  196. prompt: messagePrompt,
  197. })
  198. return
  199. }
  200. if (
  201. event.type === "control" &&
  202. event.subtype === "error" &&
  203. event.command === "message" &&
  204. event.requestId === messageRequestId
  205. ) {
  206. handlerError = new Error(
  207. `message command failed while resuming ${sessionId}: code=${event.code ?? "unknown"} content=${event.content ?? ""}`,
  208. )
  209. child.kill("SIGTERM")
  210. return
  211. }
  212. if (
  213. event.type === "control" &&
  214. event.subtype === "done" &&
  215. event.command === "message" &&
  216. event.requestId === messageRequestId
  217. ) {
  218. sawMessageControlDone = true
  219. return
  220. }
  221. if (event.type === "user" && event.requestId === messageRequestId && event.content?.includes(messageToken)) {
  222. sawUserTurnWithMarker = true
  223. if (!shutdownSent) {
  224. shutdownSent = true
  225. sendCommand({ command: "shutdown", requestId: shutdownRequestId })
  226. }
  227. return
  228. }
  229. if (
  230. event.type === "control" &&
  231. (event.subtype === "ack" || event.subtype === "done") &&
  232. event.command === "shutdown" &&
  233. event.requestId === shutdownRequestId &&
  234. typeof event.taskId === "string"
  235. ) {
  236. shutdownTaskId = event.taskId
  237. return
  238. }
  239. if (event.type === "control" && event.subtype === "error" && event.requestId !== shutdownRequestId) {
  240. handlerError = new Error(
  241. `unexpected control error while resuming ${sessionId}: command=${event.command ?? "unknown"} code=${event.code ?? "unknown"} content=${event.content ?? ""}`,
  242. )
  243. child.kill("SIGTERM")
  244. return
  245. }
  246. })
  247. const result = await child
  248. clearTimeout(timeout)
  249. rl.close()
  250. if (handlerError) {
  251. throw handlerError
  252. }
  253. if (timedOut) {
  254. throw new Error(`stream resume for ${sessionId} timed out`)
  255. }
  256. if (result.exitCode !== 0) {
  257. throw new Error(`stream resume for ${sessionId} exited non-zero: ${result.exitCode}`)
  258. }
  259. if (!sawMessageControlDone) {
  260. throw new Error(`did not observe message control completion while resuming ${sessionId}`)
  261. }
  262. if (!sawUserTurnWithMarker) {
  263. throw new Error(`did not observe resumed user marker turn while resuming ${sessionId}`)
  264. }
  265. if (shutdownTaskId !== sessionId) {
  266. throw new Error(
  267. `shutdown taskId did not match resumed session (expected=${sessionId}, actual=${shutdownTaskId ?? "none"})`,
  268. )
  269. }
  270. }
  271. async function main() {
  272. const cliRoot = process.env.ROO_CLI_ROOT
  273. ? path.resolve(process.env.ROO_CLI_ROOT)
  274. : path.resolve(__dirname, "../../..")
  275. const workspacePath = await fs.mkdtemp(path.join(os.tmpdir(), "roo-cli-create-session-id-"))
  276. const firstSessionId = randomUUID()
  277. const secondSessionId = randomUUID()
  278. const firstMarker = `FIRST-MARKER-${Date.now()}`
  279. const secondMarker = `SECOND-MARKER-${Date.now()}`
  280. try {
  281. await createSessionWithCustomId(
  282. cliRoot,
  283. workspacePath,
  284. firstSessionId,
  285. `Create first session marker ${firstMarker}. Reply with exactly "ok-${firstMarker}".`,
  286. )
  287. await createSessionWithCustomId(
  288. cliRoot,
  289. workspacePath,
  290. secondSessionId,
  291. `Create second session marker ${secondMarker}. Reply with exactly "ok-${secondMarker}".`,
  292. )
  293. const initialSessions = await listSessions(cliRoot, workspacePath)
  294. if (!initialSessions.some((session) => session.id === firstSessionId)) {
  295. throw new Error(`session list missing first custom session id ${firstSessionId}`)
  296. }
  297. if (!initialSessions.some((session) => session.id === secondSessionId)) {
  298. throw new Error(`session list missing second custom session id ${secondSessionId}`)
  299. }
  300. const resumeMarkerForFirst = `resume-first-${Date.now()}`
  301. await resumeSessionAndSendMarker(cliRoot, workspacePath, firstSessionId, resumeMarkerForFirst)
  302. const resumeMarkerForSecond = `resume-second-${Date.now()}`
  303. await resumeSessionAndSendMarker(cliRoot, workspacePath, secondSessionId, resumeMarkerForSecond)
  304. console.log(`[PASS] created and resumed custom sessions: ${firstSessionId}, ${secondSessionId}`)
  305. } finally {
  306. await fs.rm(workspacePath, { recursive: true, force: true })
  307. }
  308. }
  309. main().catch((error) => {
  310. console.error(`[FAIL] ${error instanceof Error ? error.message : String(error)}`)
  311. process.exit(1)
  312. })