Simon Klee před 1 dnem
rodič
revize
212aa46b3e

+ 1 - 1
packages/opencode/src/cli/cmd/run/prompt.shared.ts

@@ -105,7 +105,7 @@ export function printableBinding(binding: string, leader: string): string {
 
 
 export function isExitCommand(input: string): boolean {
 export function isExitCommand(input: string): boolean {
   const text = input.trim().toLowerCase()
   const text = input.trim().toLowerCase()
-  return text === "/exit" || text === "/quit"
+  return text === "/exit" || text === "/quit" || text === ":q"
 }
 }
 
 
 export function promptInfo(event: {
 export function promptInfo(event: {

+ 133 - 111
packages/opencode/src/cli/cmd/run/runtime.queue.ts

@@ -17,6 +17,12 @@ type Trace = {
   write(type: string, data?: unknown): void
   write(type: string, data?: unknown): void
 }
 }
 
 
+type Deferred<T = void> = {
+  promise: Promise<T>
+  resolve: (value: T | PromiseLike<T>) => void
+  reject: (error?: unknown) => void
+}
+
 export type QueueInput = {
 export type QueueInput = {
   footer: FooterApi
   footer: FooterApi
   initialInput?: string
   initialInput?: string
@@ -25,6 +31,23 @@ export type QueueInput = {
   run: (prompt: RunPrompt, signal: AbortSignal) => Promise<void>
   run: (prompt: RunPrompt, signal: AbortSignal) => Promise<void>
 }
 }
 
 
+type State = {
+  queue: RunPrompt[]
+  ctrl?: AbortController
+  closed: boolean
+}
+
+function defer<T = void>(): Deferred<T> {
+  let resolve!: (value: T | PromiseLike<T>) => void
+  let reject!: (error?: unknown) => void
+  const promise = new Promise<T>((next, fail) => {
+    resolve = next
+    reject = fail
+  })
+
+  return { promise, resolve, reject }
+}
+
 // Runs the prompt queue until the footer closes.
 // Runs the prompt queue until the footer closes.
 //
 //
 // Subscribes to footer prompt events, queues them, and drains one at a
 // Subscribes to footer prompt events, queues them, and drains one at a
@@ -32,123 +55,128 @@ export type QueueInput = {
 // a turn is running, they queue up and execute in order. The footer shows
 // a turn is running, they queue up and execute in order. The footer shows
 // the queue depth so the user knows how many are pending.
 // the queue depth so the user knows how many are pending.
 export async function runPromptQueue(input: QueueInput): Promise<void> {
 export async function runPromptQueue(input: QueueInput): Promise<void> {
-  const q: RunPrompt[] = []
-  let busy = false
-  let closed = input.footer.isClosed
-  let ctrl: AbortController | undefined
-  let stop: (() => void) | undefined
-  let err: unknown
-  let hasErr = false
-  let done: (() => void) | undefined
-  const wait = new Promise<void>((resolve) => {
-    done = resolve
-  })
-  const until = new Promise<void>((resolve) => {
-    stop = resolve
-  })
+  const stop = defer<{ type: "closed" }>()
+  const done = defer<void>()
+  const state: State = {
+    queue: [],
+    closed: input.footer.isClosed,
+  }
+  let draining: Promise<void> | undefined
 
 
-  const fail = (error: unknown) => {
-    err = error
-    hasErr = true
-    done?.()
-    done = undefined
+  const emit = (next: FooterEvent, row: Record<string, unknown>) => {
+    input.trace?.write("ui.patch", row)
+    input.footer.event(next)
   }
   }
 
 
   const finish = () => {
   const finish = () => {
-    if (!closed || busy) {
+    if (!state.closed || draining) {
       return
       return
     }
     }
 
 
-    done?.()
-    done = undefined
-  }
-
-  const emit = (next: FooterEvent, row: Record<string, unknown>) => {
-    input.trace?.write("ui.patch", row)
-    input.footer.event(next)
+    done.resolve()
   }
   }
 
 
-  const pump = async () => {
-    if (busy || closed) {
+  const close = () => {
+    if (state.closed) {
       return
       return
     }
     }
 
 
-    busy = true
+    state.closed = true
+    state.queue.length = 0
+    state.ctrl?.abort()
+    stop.resolve({ type: "closed" })
+    finish()
+  }
 
 
-    try {
-      while (!closed && q.length > 0) {
-        const prompt = q.shift()
-        if (!prompt) {
-          continue
-        }
+  const drain = () => {
+    if (draining || state.closed || state.queue.length === 0) {
+      return
+    }
 
 
-        emit(
-          {
-            type: "turn.send",
-            queue: q.length,
-          },
-          {
-            phase: "running",
-            status: "sending prompt",
-            queue: q.length,
-          },
-        )
-        const start = Date.now()
-        const next = new AbortController()
-        ctrl = next
-        try {
-          const task = input.run(prompt, next.signal).then(
-            () => ({ type: "done" as const }),
-            (error) => ({ type: "error" as const, error }),
-          )
-          await input.footer.idle()
-          const commit = { kind: "user", text: prompt.text, phase: "start", source: "system" } as const
-          input.trace?.write("ui.commit", commit)
-          input.footer.append(commit)
-          const out = await Promise.race([task, until.then(() => ({ type: "closed" as const }))])
-          if (out.type === "closed") {
-            next.abort()
-            break
+    draining = (async () => {
+      try {
+        while (!state.closed && state.queue.length > 0) {
+          const prompt = state.queue.shift()
+          if (!prompt) {
+            continue
           }
           }
 
 
-          if (out.type === "error") {
-            throw out.error
-          }
-        } finally {
-          if (ctrl === next) {
-            ctrl = undefined
-          }
-          const duration = Locale.duration(Math.max(0, Date.now() - start))
           emit(
           emit(
             {
             {
-              type: "turn.duration",
-              duration,
+              type: "turn.send",
+              queue: state.queue.length,
             },
             },
             {
             {
-              duration,
+              phase: "running",
+              status: "sending prompt",
+              queue: state.queue.length,
             },
             },
           )
           )
+          const start = Date.now()
+          const ctrl = new AbortController()
+          state.ctrl = ctrl
+
+          try {
+            const task = input.run(prompt, ctrl.signal).then(
+              () => ({ type: "done" as const }),
+              (error) => ({ type: "error" as const, error }),
+            )
+
+            await input.footer.idle()
+            const commit = { kind: "user", text: prompt.text, phase: "start", source: "system" } as const
+            input.trace?.write("ui.commit", commit)
+            input.footer.append(commit)
+
+            const next = await Promise.race([task, stop.promise])
+            if (next.type === "closed") {
+              ctrl.abort()
+              break
+            }
+
+            if (next.type === "error") {
+              throw next.error
+            }
+          } finally {
+            if (state.ctrl === ctrl) {
+              state.ctrl = undefined
+            }
+
+            const duration = Locale.duration(Math.max(0, Date.now() - start))
+            emit(
+              {
+                type: "turn.duration",
+                duration,
+              },
+              {
+                duration,
+              },
+            )
+          }
         }
         }
+      } catch (error) {
+        done.reject(error)
+        return
+      } finally {
+        draining = undefined
+        emit(
+          {
+            type: "turn.idle",
+            queue: state.queue.length,
+          },
+          {
+            phase: "idle",
+            status: "",
+            queue: state.queue.length,
+          },
+        )
       }
       }
-    } finally {
-      busy = false
-      emit(
-        {
-          type: "turn.idle",
-          queue: q.length,
-        },
-        {
-          phase: "idle",
-          status: "",
-          queue: q.length,
-        },
-      )
+
       finish()
       finish()
-    }
+    })()
   }
   }
 
 
-  const push = (prompt: RunPrompt) => {
-    if (!prompt.text.trim() || closed) {
+  const submit = (prompt: RunPrompt) => {
+    if (!prompt.text.trim() || state.closed) {
       return
       return
     }
     }
 
 
@@ -158,14 +186,14 @@ export async function runPromptQueue(input: QueueInput): Promise<void> {
     }
     }
 
 
     input.onPrompt?.()
     input.onPrompt?.()
-    q.push(prompt)
+    state.queue.push(prompt)
     emit(
     emit(
       {
       {
         type: "queue",
         type: "queue",
-        queue: q.length,
+        queue: state.queue.length,
       },
       },
       {
       {
-        queue: q.length,
+        queue: state.queue.length,
       },
       },
     )
     )
     emit(
     emit(
@@ -177,37 +205,31 @@ export async function runPromptQueue(input: QueueInput): Promise<void> {
         first: false,
         first: false,
       },
       },
     )
     )
-    void pump().catch(fail)
+    drain()
   }
   }
 
 
   const offPrompt = input.footer.onPrompt((prompt) => {
   const offPrompt = input.footer.onPrompt((prompt) => {
-    push(prompt)
+    submit(prompt)
   })
   })
   const offClose = input.footer.onClose(() => {
   const offClose = input.footer.onClose(() => {
-    closed = true
-    q.length = 0
-    ctrl?.abort()
-    stop?.()
-    finish()
+    close()
   })
   })
 
 
   try {
   try {
-    if (closed) {
+    if (state.closed) {
       return
       return
     }
     }
 
 
-    push({ text: input.initialInput ?? "", parts: [] })
-    await pump()
-
-    if (!closed) {
-      await wait
-    }
-
-    if (hasErr) {
-      throw err
-    }
+    submit({
+      text: input.initialInput ?? "",
+      parts: [],
+    })
+    finish()
+    await done.promise
   } finally {
   } finally {
     offPrompt()
     offPrompt()
     offClose()
     offClose()
+    close()
+    await draining?.catch(() => {})
   }
   }
 }
 }

+ 1 - 1
packages/opencode/src/cli/cmd/run/splash.ts

@@ -232,7 +232,7 @@ function build(input: SplashWriterInput, kind: "entry" | "exit", ctx: Scrollback
   }
   }
 
 
   if (kind === "entry") {
   if (kind === "entry") {
-    push(lines, 0, y, "Type /exit or /quit to finish.", input.theme.system.body, undefined, undefined)
+    push(lines, 0, y, "Type /exit to finish.", input.theme.system.body, undefined, undefined)
     y += 1
     y += 1
   }
   }
 
 

+ 5 - 3
packages/opencode/src/cli/cmd/run/stream.transport.ts

@@ -152,7 +152,7 @@ function active(event: Event, sessionID: string): boolean {
 // Races the turn's deferred completion against an abort signal.
 // Races the turn's deferred completion against an abort signal.
 function waitTurn(done: Wait["done"], signal: AbortSignal) {
 function waitTurn(done: Wait["done"], signal: AbortSignal) {
   return Effect.raceAll([
   return Effect.raceAll([
-    Deferred.await(done).pipe(Effect.as("idle" as const)),
+    Deferred.await(done).pipe(Effect.as("idle" as const), Effect.exit),
     Effect.callback<"abort">((resume) => {
     Effect.callback<"abort">((resume) => {
       if (signal.aborted) {
       if (signal.aborted) {
         resume(Effect.succeed("abort"))
         resume(Effect.succeed("abort"))
@@ -166,8 +166,10 @@ function waitTurn(done: Wait["done"], signal: AbortSignal) {
 
 
       signal.addEventListener("abort", onAbort, { once: true })
       signal.addEventListener("abort", onAbort, { once: true })
       return Effect.sync(() => signal.removeEventListener("abort", onAbort))
       return Effect.sync(() => signal.removeEventListener("abort", onAbort))
-    }),
-  ])
+    }).pipe(Effect.exit),
+  ]).pipe(
+    Effect.flatMap((exit) => (Exit.isFailure(exit) ? Effect.failCause(exit.cause) : Effect.succeed(exit.value))),
+  )
 }
 }
 
 
 export function formatUnknownError(error: unknown): string {
 export function formatUnknownError(error: unknown): string {

+ 33 - 0
packages/opencode/test/cli/run/runtime.queue.test.ts

@@ -161,6 +161,39 @@ describe("run runtime queue", () => {
     expect(seen).toEqual(["one", "two"])
     expect(seen).toEqual(["one", "two"])
   })
   })
 
 
+  test("drains a prompt queued during an in-flight turn", async () => {
+    const ui = footer()
+    const seen: string[] = []
+    let wake: (() => void) | undefined
+    const gate = new Promise<void>((resolve) => {
+      wake = resolve
+    })
+
+    const task = runPromptQueue({
+      footer: ui.api,
+      run: async (input) => {
+        seen.push(input.text)
+        if (seen.length === 1) {
+          await gate
+          return
+        }
+
+        ui.api.close()
+      },
+    })
+
+    ui.submit("one")
+    await Promise.resolve()
+    expect(seen).toEqual(["one"])
+
+    wake?.()
+    await Promise.resolve()
+    ui.submit("two")
+    await task
+
+    expect(seen).toEqual(["one", "two"])
+  })
+
   test("close aborts the active run and drops pending queued work", async () => {
   test("close aborts the active run and drops pending queued work", async () => {
     const ui = footer()
     const ui = footer()
     const seen: string[] = []
     const seen: string[] = []

+ 52 - 0
packages/opencode/test/cli/run/stream.transport.test.ts

@@ -668,6 +668,58 @@ describe("run stream transport", () => {
     }
     }
   })
   })
 
 
+  test("rejects the active turn when the event stream faults", async () => {
+    const ui = footer()
+    const ready = defer()
+
+    const transport = await createSessionTransport({
+      sdk: {
+        event: {
+          subscribe: async () => ({
+            stream: (async function* () {
+              await ready.promise
+              yield busy()
+              throw new Error("boom")
+            })(),
+          }),
+        },
+        session: {
+          promptAsync: async () => {
+            ready.resolve()
+          },
+          status: async () => ({ data: { "session-1": { type: "busy" } } }),
+          messages: async () => ({ data: [] }),
+          children: async () => ({ data: [] }),
+        },
+        permission: {
+          list: async () => ({ data: [] }),
+        },
+        question: {
+          list: async () => ({ data: [] }),
+        },
+      } as unknown as OpencodeClient,
+      sessionID: "session-1",
+      thinking: true,
+      limits: () => ({}),
+      footer: ui.api,
+    })
+
+    try {
+      await expect(
+        transport.runPromptTurn({
+          agent: undefined,
+          model: undefined,
+          variant: undefined,
+          prompt: { text: "hello", parts: [] },
+          files: [],
+          includeFiles: false,
+        }),
+      ).rejects.toThrow("boom")
+    } finally {
+      await transport.close()
+    }
+  })
+
   test("closes while the event stream is waiting for the next item", async () => {
   test("closes while the event stream is waiting for the next item", async () => {
     const src = blockingFeed()
     const src = blockingFeed()
     const ui = footer()
     const ui = footer()