Simon Klee 2 дней назад
Родитель
Сommit
d6fca5b246

+ 57 - 66
packages/opencode/src/cli/cmd/run/footer.subagent.tsx

@@ -2,7 +2,7 @@
 import type { ScrollBoxRenderable } from "@opentui/core"
 import { useKeyboard } from "@opentui/solid"
 import "opentui-spinner/solid"
-import { For, createMemo } from "solid-js"
+import { createMemo, mapArray } from "solid-js"
 import { SPINNER_FRAMES } from "../tui/component/spinner"
 import { RunEntryContent, sameEntryGroup } from "./scrollback.writer"
 import type { FooterSubagentDetail, FooterSubagentTab, RunDiffStyle } from "./types"
@@ -35,21 +35,21 @@ function statusIcon(status: FooterSubagentTab["status"]) {
   return "◔"
 }
 
-function tabText(input: { tab: FooterSubagentTab; slot: string; count: number; width: number }) {
+function tabText(tab: FooterSubagentTab, slot: string, count: number, width: number) {
   const perTab = Math.max(
     1,
-    Math.floor((input.width - 4 - Math.max(0, input.count - 1) * 3) / Math.max(1, input.count)),
+    Math.floor((width - 4 - Math.max(0, count - 1) * 3) / Math.max(1, count)),
   )
-  if (input.count >= 8 || perTab < 12) {
-    return `[${input.slot}]`
+  if (count >= 8 || perTab < 12) {
+    return `[${slot}]`
   }
 
-  const label = `[${input.slot}] ${input.tab.label}`
-  if (input.count >= 5 || perTab < 24) {
+  const label = `[${slot}] ${tab.label}`
+  if (count >= 5 || perTab < 24) {
     return label
   }
 
-  const detail = input.tab.description || input.tab.title
+  const detail = tab.description || tab.title
   if (!detail) {
     return label
   }
@@ -63,6 +63,32 @@ export function RunFooterSubagentTabs(props: {
   theme: RunFooterTheme
   width: number
 }) {
+  const items = mapArray(
+    () => props.tabs,
+    (tab, index) => {
+      const active = () => props.selected === tab.sessionID
+      const slot = () => String(index() + 1)
+      return (
+        <box paddingRight={1}>
+          <box flexDirection="row" gap={1} width="100%">
+            {tab.status === "running" ? (
+              <box flexShrink={0}>
+                <spinner frames={SPINNER_FRAMES} interval={80} color={statusColor(props.theme, tab.status)} />
+              </box>
+            ) : (
+              <text fg={statusColor(props.theme, tab.status)} wrapMode="none" truncate flexShrink={0}>
+                {statusIcon(tab.status)}
+              </text>
+            )}
+            <text fg={active() ? props.theme.text : props.theme.muted} wrapMode="none" truncate>
+              {tabText(tab, slot(), props.tabs.length, props.width)}
+            </text>
+          </box>
+        </box>
+      )
+    },
+  )
+
   return (
     <box
       id="run-direct-footer-subagent-tabs"
@@ -74,35 +100,7 @@ export function RunFooterSubagentTabs(props: {
       flexDirection="row"
       flexShrink={0}
     >
-      <box flexDirection="row" gap={3} flexShrink={1} flexGrow={1}>
-        {props.tabs.map((tab, index) => {
-          const active = () => props.selected === tab.sessionID
-          const slot = String(index + 1)
-          return (
-            <box paddingRight={1}>
-              <box flexDirection="row" gap={1} width="100%">
-                {tab.status === "running" ? (
-                  <box flexShrink={0}>
-                    <spinner frames={SPINNER_FRAMES} interval={80} color={statusColor(props.theme, tab.status)} />
-                  </box>
-                ) : (
-                  <text fg={statusColor(props.theme, tab.status)} wrapMode="none" truncate flexShrink={0}>
-                    {statusIcon(tab.status)}
-                  </text>
-                )}
-                <text fg={active() ? props.theme.text : props.theme.muted} wrapMode="none" truncate>
-                  {tabText({
-                    tab,
-                    slot,
-                    count: props.tabs.length,
-                    width: props.width,
-                  })}
-                </text>
-              </box>
-            </box>
-          )
-        })}
-      </box>
+      <box flexDirection="row" gap={3} flexShrink={1} flexGrow={1}>{items()}</box>
     </box>
   )
 }
@@ -116,13 +114,22 @@ export function RunFooterSubagentBody(props: {
   onCycle: (dir: -1 | 1) => void
   onClose: () => void
 }) {
+  const theme = createMemo(() => props.theme())
+  const footer = createMemo(() => theme().footer)
   const commits = createMemo(() => props.detail()?.commits ?? [])
-  const entries = createMemo(() => {
-    return commits().map((commit, index, list) => ({
-      commit,
-      gap: index > 0 && !sameEntryGroup(list[index - 1], commit),
-    }))
-  })
+  const opts = createMemo(() => ({ diffStyle: props.diffStyle }))
+  const scrollbar = createMemo(() => ({
+    trackOptions: {
+      backgroundColor: footer().surface,
+      foregroundColor: footer().line,
+    },
+  }))
+  const rows = mapArray(commits, (commit, index) => (
+    <box flexDirection="column" gap={0}>
+      {index() > 0 && !sameEntryGroup(commits()[index() - 1], commit) ? <box height={1} flexShrink={0} /> : null}
+      <RunEntryContent commit={commit} theme={theme()} opts={opts()} width={props.width()} />
+    </box>
+  ))
   let scroll: ScrollBoxRenderable | undefined
 
   useKeyboard((event) => {
@@ -160,7 +167,7 @@ export function RunFooterSubagentBody(props: {
       width="100%"
       height="100%"
       flexDirection="column"
-      backgroundColor={props.theme().footer.surface}
+      backgroundColor={footer().surface}
     >
       <box paddingTop={1} paddingLeft={1} paddingRight={3} paddingBottom={1} flexDirection="column" flexGrow={1}>
         <scrollbox
@@ -168,35 +175,19 @@ export function RunFooterSubagentBody(props: {
           height="100%"
           stickyScroll={true}
           stickyStart="bottom"
-          verticalScrollbarOptions={{
-            trackOptions: {
-              backgroundColor: props.theme().footer.surface,
-              foregroundColor: props.theme().footer.line,
-            },
-          }}
+          verticalScrollbarOptions={scrollbar()}
           ref={(item) => {
             scroll = item as ScrollBoxRenderable
           }}
         >
           <box width="100%" flexDirection="column" gap={0}>
-            <For each={entries()}>
-              {(item) => (
-                <box flexDirection="column" gap={0}>
-                  {item.gap ? <box height={1} flexShrink={0} /> : null}
-                  <RunEntryContent
-                    commit={item.commit}
-                    theme={props.theme()}
-                    opts={{ diffStyle: props.diffStyle }}
-                    width={props.width()}
-                  />
-                </box>
-              )}
-            </For>
-            {entries().length === 0 ? (
-              <text fg={props.theme().footer.muted} wrapMode="word">
+            {commits().length > 0 ? (
+              rows()
+            ) : (
+              <text fg={footer().muted} wrapMode="word">
                 No subagent activity yet
               </text>
-            ) : null}
+            )}
           </box>
         </scrollbox>
       </box>

+ 10 - 4
packages/opencode/src/cli/cmd/run/footer.ts

@@ -26,6 +26,7 @@
 import { CliRenderEvents, type CliRenderer, type TreeSitterClient } from "@opentui/core"
 import { render } from "@opentui/solid"
 import { createComponent, createSignal, type Accessor, type Setter } from "solid-js"
+import { createStore, reconcile } from "solid-js/store"
 import { SUBAGENT_INSPECTOR_ROWS, SUBAGENT_TAB_ROWS } from "./footer.subagent"
 import { PROMPT_MAX_ROWS, TEXTAREA_MIN_ROWS } from "./footer.prompt"
 import { printableBinding } from "./prompt.shared"
@@ -156,7 +157,7 @@ export class RunFooter implements FooterApi {
   private view: Accessor<FooterView>
   private setView: Setter<FooterView>
   private subagent: Accessor<FooterSubagentState>
-  private setSubagent: Setter<FooterSubagentState>
+  private setSubagent: (next: FooterSubagentState) => void
   private promptRoute: FooterPromptRoute = { type: "composer" }
   private tabsVisible = false
   private interruptTimeout: NodeJS.Timeout | undefined
@@ -190,9 +191,14 @@ export class RunFooter implements FooterApi {
     const [resources, setResources] = createSignal<RunResource[]>(options.resources)
     this.resources = resources
     this.setResources = setResources
-    const [subagent, setSubagent] = createSignal<FooterSubagentState>(createEmptySubagentState())
-    this.subagent = subagent
-    this.setSubagent = setSubagent
+    const [subagent, setSubagent] = createStore<FooterSubagentState>(createEmptySubagentState())
+    this.subagent = () => subagent as FooterSubagentState
+    this.setSubagent = (next) => {
+      setSubagent("tabs", reconcile(next.tabs, { key: "sessionID" }))
+      setSubagent("details", reconcile(next.details))
+      setSubagent("permissions", reconcile(next.permissions, { key: "id" }))
+      setSubagent("questions", reconcile(next.questions, { key: "id" }))
+    }
     this.base = Math.max(1, renderer.footerHeight - TEXTAREA_MIN_ROWS)
     this.interruptHint = printableBinding(options.keybinds.interrupt, options.keybinds.leader) || "esc"
     this.scrollback = new RunScrollbackStream(renderer, options.theme, {

+ 541 - 452
packages/opencode/src/cli/cmd/run/stream.transport.ts

@@ -13,6 +13,8 @@
 // We also re-check live session status before resolving an idle event so a
 // delayed idle from an older turn cannot complete a newer busy turn.
 import type { Event, OpencodeClient } from "@opencode-ai/sdk/v2"
+import { Context, Deferred, Effect, Exit, Layer, Scope, Stream } from "effect"
+import { makeRuntime } from "@/effect/run-service"
 import {
   blockerStatus,
   bootstrapSessionData,
@@ -70,9 +72,7 @@ type Wait = {
   tick: number
   armed: boolean
   live: boolean
-  done: Promise<void>
-  resolve: () => void
-  reject: (error: unknown) => void
+  done: Deferred.Deferred<void, unknown>
 }
 
 export type SessionTurnInput = {
@@ -91,25 +91,26 @@ export type SessionTransport = {
   close(): Promise<void>
 }
 
-// Creates a deferred promise tied to a specific turn tick.
-function defer(tick: number): Wait {
-  let resolve: () => void = () => {}
-  let reject: (error: unknown) => void = () => {}
-  const done = new Promise<void>((next, fail) => {
-    resolve = next
-    reject = fail
-  })
+type State = {
+  data: SessionData
+  subagent: SubagentData
+  wait?: Wait
+  tick: number
+  fault?: unknown
+  footerView: FooterView
+  blockerTick: number
+  selectedSubagent?: string
+  blockers: Map<string, number>
+}
 
-  return {
-    tick,
-    armed: false,
-    live: false,
-    done,
-    resolve,
-    reject,
-  }
+type TransportService = {
+  readonly runPromptTurn: (input: SessionTurnInput) => Effect.Effect<void, unknown>
+  readonly selectSubagent: (sessionID: string | undefined) => Effect.Effect<void>
+  readonly close: () => Effect.Effect<void>
 }
 
+class Service extends Context.Service<Service, TransportService>()("@opencode/RunStreamTransport") {}
+
 function sid(event: Event): string | undefined {
   if (event.type === "message.updated") {
     return event.properties.sessionID
@@ -148,31 +149,25 @@ function active(event: Event, sessionID: string): boolean {
   return event.properties.status.type !== "idle"
 }
 
-// Races the turn's deferred promise against an abort signal.
-function waitTurn(done: Promise<void>, signal: AbortSignal): Promise<"idle" | "abort"> {
-  return new Promise((resolve, reject) => {
-    if (signal.aborted) {
-      resolve("abort")
-      return
-    }
-
-    const onAbort = () => {
-      signal.removeEventListener("abort", onAbort)
-      resolve("abort")
-    }
+// Races the turn's deferred completion against an abort signal.
+function waitTurn(done: Wait["done"], signal: AbortSignal) {
+  return Effect.raceAll([
+    Deferred.await(done).pipe(Effect.as("idle" as const)),
+    Effect.callback<"abort">((resume) => {
+      if (signal.aborted) {
+        resume(Effect.succeed("abort"))
+        return
+      }
 
-    signal.addEventListener("abort", onAbort, { once: true })
-    done.then(
-      () => {
-        signal.removeEventListener("abort", onAbort)
-        resolve("idle")
-      },
-      (error) => {
+      const onAbort = () => {
         signal.removeEventListener("abort", onAbort)
-        reject(error)
-      },
-    )
-  })
+        resume(Effect.succeed("abort"))
+      }
+
+      signal.addEventListener("abort", onAbort, { once: true })
+      return Effect.sync(() => signal.removeEventListener("abort", onAbort))
+    }),
+  ])
 }
 
 export function formatUnknownError(error: unknown): string {
@@ -316,452 +311,546 @@ function traceTabs(trace: Trace | undefined, prev: FooterSubagentTab[], next: Fo
   }
 }
 
-// Opens an SDK event subscription and returns a SessionTransport.
-//
-// The background `watch` loop consumes every SDK event, runs it through the
-// reducer, and writes output to the footer. When a session.status idle
-// event arrives, it resolves the current turn's Wait so runPromptTurn()
-// can return.
-//
-// The transport is single-turn: only one runPromptTurn() call can be active
-// at a time. The prompt queue enforces this from above.
-export async function createSessionTransport(input: StreamInput): Promise<SessionTransport> {
-  const abort = new AbortController()
-  const halt = () => {
-    abort.abort()
-  }
-  input.signal?.addEventListener("abort", halt, { once: true })
-
-  const events = await input.sdk.event.subscribe(undefined, {
-    signal: abort.signal,
-  })
-  input.trace?.write("recv.subscribe", {
-    sessionID: input.sessionID,
-  })
-
-  const closeStream = () => {
-    // Pass undefined explicitly so TS accepts AsyncGenerator.return().
-    void events.stream.return(undefined).catch(() => {})
-  }
-
-  let data = createSessionData()
-  let subagent = createSubagentData()
-  let wait: Wait | undefined
-  let tick = 0
-  let fault: unknown
-  let closed = false
-  let footerView: FooterView = { type: "prompt" }
-  let blockerTick = 0
-  let selectedSubagent: string | undefined
-  const blockers = new Map<string, number>()
-
-  const currentSubagentState = () => {
-    if (selectedSubagent && !subagent.tabs.has(selectedSubagent)) {
-      selectedSubagent = undefined
-    }
-
-    return snapshotSelectedSubagentData(subagent, selectedSubagent)
-  }
-
-  const seedBlocker = (id: string) => {
-    if (blockers.has(id)) {
-      return
-    }
-
-    blockerTick += 1
-    blockers.set(id, blockerTick)
-  }
-
-  const trackBlocker = (event: Event) => {
-    if (event.type !== "permission.asked" && event.type !== "question.asked") {
-      return
-    }
-
-    if (event.properties.sessionID !== input.sessionID && !subagent.tabs.has(event.properties.sessionID)) {
-      return
-    }
-
-    seedBlocker(event.properties.id)
-  }
-
-  const releaseBlocker = (event: Event) => {
-    if (
-      event.type !== "permission.replied" &&
-      event.type !== "question.replied" &&
-      event.type !== "question.rejected"
-    ) {
-      return
-    }
-
-    blockers.delete(event.properties.requestID)
-  }
-
-  const syncFooter = (commits: StreamCommit[], patch?: FooterPatch, nextSubagent?: FooterSubagentState) => {
-    const current = pickView(data, subagent, blockers)
-    const footer = composeFooter({
-      patch,
-      subagent: nextSubagent,
-      current,
-      previous: footerView,
-    })
-
-    if (commits.length === 0 && !footer) {
-      footerView = current
-      return
-    }
-
-    input.trace?.write("reduce.output", {
-      commits,
-      footer: traceFooterOutput(footer),
-    })
-    writeSessionOutput(
-      {
-        footer: input.footer,
-        trace: input.trace,
-      },
-      {
-        commits,
-        footer,
-      },
-    )
-    footerView = current
-  }
+function createLayer(input: StreamInput) {
+  return Layer.fresh(
+    Layer.effect(
+      Service,
+      Effect.gen(function* () {
+        const scope = yield* Scope.make()
+        const abort = yield* Scope.provide(scope)(
+          Effect.acquireRelease(
+            Effect.sync(() => new AbortController()),
+            (abort) => Effect.sync(() => abort.abort()),
+          ),
+        )
+        let closed = false
+        let closeStream = () => {}
+        const halt = () => {
+          abort.abort()
+        }
+        const stop = () => {
+          input.signal?.removeEventListener("abort", halt)
+          abort.abort()
+          closeStream()
+        }
+        const closeScope = () => {
+          if (closed) {
+            return Effect.void
+          }
+
+          closed = true
+          stop()
+          return Scope.close(scope, Exit.void)
+        }
 
-  const bootstrap = async () => {
-    const [messages, children, permissions, questions] = await Promise.all([
-      input.sdk.session
-        .messages({
-          sessionID: input.sessionID,
-          limit: SUBAGENT_BOOTSTRAP_LIMIT,
-        })
-        .then((x) => x.data ?? [])
-        .catch(() => []),
-      input.sdk.session
-        .children({
+        input.signal?.addEventListener("abort", halt, { once: true })
+        yield* Effect.addFinalizer(() => closeScope())
+
+        const events = yield* Scope.provide(scope)(
+          Effect.acquireRelease(
+            Effect.promise(() =>
+              input.sdk.event.subscribe(undefined, {
+                signal: abort.signal,
+              }),
+            ),
+            (events) =>
+              Effect.sync(() => {
+                void events.stream.return(undefined).catch(() => {})
+                }),
+          ),
+        )
+        closeStream = () => {
+          void events.stream.return(undefined).catch(() => {})
+        }
+        input.trace?.write("recv.subscribe", {
           sessionID: input.sessionID,
         })
-        .then((x) => x.data ?? [])
-        .catch(() => []),
-      input.sdk.permission
-        .list()
-        .then((x) => x.data ?? [])
-        .catch(() => []),
-      input.sdk.question
-        .list()
-        .then((x) => x.data ?? [])
-        .catch(() => []),
-    ])
-
-    bootstrapSessionData({
-      data,
-      messages,
-      permissions: permissions.filter((item) => item.sessionID === input.sessionID),
-      questions: questions.filter((item) => item.sessionID === input.sessionID),
-    })
-    bootstrapSubagentData({
-      data: subagent,
-      messages,
-      children,
-      permissions,
-      questions,
-    })
-
-    const callSessions = [
-      ...new Set(
-        listSubagentPermissions(subagent)
-          .filter((item) => item.tool && item.metadata?.input === undefined)
-          .map((item) => item.sessionID),
-      ),
-    ]
-    if (callSessions.length > 0) {
-      await Promise.all(
-        callSessions.map(async (sessionID) => {
-          const messages = await input.sdk.session
-            .messages({
-              sessionID,
-              limit: SUBAGENT_CALL_BOOTSTRAP_LIMIT,
-            })
-            .then((x) => x.data ?? [])
-            .catch(() => [])
-
-          bootstrapSubagentCalls({
-            data: subagent,
-            sessionID,
-            messages,
-          })
-        }),
-      )
-    }
-
-    for (const request of [
-      ...data.permissions,
-      ...listSubagentPermissions(subagent),
-      ...data.questions,
-      ...listSubagentQuestions(subagent),
-    ].sort((a, b) => a.id.localeCompare(b.id))) {
-      seedBlocker(request.id)
-    }
-
-    const snapshot = currentSubagentState()
-    traceTabs(input.trace, [], snapshot.tabs)
-    syncFooter([], undefined, snapshot)
-  }
-
-  await bootstrap()
 
-  const idle = async () => {
-    try {
-      const out = await input.sdk.session.status()
-      const state = out.data?.[input.sessionID]
-      return !state || state.type === "idle"
-    } catch {
-      return true
-    }
-  }
-
-  const fail = (error: unknown) => {
-    if (fault) {
-      return
-    }
-
-    fault = error
-    const next = wait
-    wait = undefined
-    next?.reject(error)
-  }
-
-  const touch = (event: Event) => {
-    const next = wait
-    if (!next || !active(event, input.sessionID)) {
-      return
-    }
+        const state: State = {
+          data: createSessionData(),
+          subagent: createSubagentData(),
+          tick: 0,
+          footerView: { type: "prompt" },
+          blockerTick: 0,
+          blockers: new Map(),
+        }
 
-    next.live = true
-  }
+        const currentSubagentState = () => {
+          if (state.selectedSubagent && !state.subagent.tabs.has(state.selectedSubagent)) {
+            state.selectedSubagent = undefined
+          }
 
-  const mark = async (event: Event) => {
-    if (
-      event.type !== "session.status" ||
-      event.properties.sessionID !== input.sessionID ||
-      event.properties.status.type !== "idle"
-    ) {
-      return
-    }
+          return snapshotSelectedSubagentData(state.subagent, state.selectedSubagent)
+        }
 
-    const next = wait
-    if (!next || !next.armed || !next.live) {
-      return
-    }
+        const seedBlocker = (id: string) => {
+          if (state.blockers.has(id)) {
+            return
+          }
 
-    if (!(await idle()) || wait !== next) {
-      return
-    }
+          state.blockerTick += 1
+          state.blockers.set(id, state.blockerTick)
+        }
 
-    tick = next.tick + 1
-    wait = undefined
-    next.resolve()
-  }
+        const trackBlocker = (event: Event) => {
+          if (event.type !== "permission.asked" && event.type !== "question.asked") {
+            return
+          }
 
-  const flush = (type: "turn.abort" | "turn.cancel") => {
-    const commits: StreamCommit[] = []
-    flushInterrupted(data, commits)
-    syncFooter(commits)
-    input.trace?.write(type, {
-      sessionID: input.sessionID,
-    })
-  }
+          if (
+            event.properties.sessionID !== input.sessionID &&
+            !state.subagent.tabs.has(event.properties.sessionID)
+          ) {
+            return
+          }
 
-  const watch = (async () => {
-    try {
-      for await (const item of events.stream) {
-        if (input.footer.isClosed) {
-          break
+          seedBlocker(event.properties.id)
         }
 
-        const event = item as Event
-        input.trace?.write("recv.event", event)
-        trackBlocker(event)
-        const prevTabs = event.type === "message.part.updated" ? listSubagentTabs(subagent) : undefined
-        const next = reduceSessionData({
-          data,
-          event,
-          sessionID: input.sessionID,
-          thinking: input.thinking,
-          limits: input.limits(),
-        })
-        data = next.data
+        const releaseBlocker = (event: Event) => {
+          if (
+            event.type !== "permission.replied" &&
+            event.type !== "question.replied" &&
+            event.type !== "question.rejected"
+          ) {
+            return
+          }
 
-        const subagentChanged = reduceSubagentData({
-          data: subagent,
-          event,
-          sessionID: input.sessionID,
-          thinking: input.thinking,
-          limits: input.limits(),
-        })
-        if (subagentChanged && prevTabs) {
-          traceTabs(input.trace, prevTabs, listSubagentTabs(subagent))
+          state.blockers.delete(event.properties.requestID)
         }
-        releaseBlocker(event)
 
-        syncFooter(next.commits, next.footer?.patch, subagentChanged ? currentSubagentState() : undefined)
+        const syncFooter = (commits: StreamCommit[], patch?: FooterPatch, nextSubagent?: FooterSubagentState) => {
+          const current = pickView(state.data, state.subagent, state.blockers)
+          const footer = composeFooter({
+            patch,
+            subagent: nextSubagent,
+            current,
+            previous: state.footerView,
+          })
 
-        touch(event)
-        await mark(event)
-      }
-    } catch (error) {
-      if (!abort.signal.aborted) {
-        fail(error)
-      }
-    } finally {
-      if (!abort.signal.aborted && !fault) {
-        fail(new Error("session event stream closed"))
-      }
-      closeStream()
-    }
-  })()
+          if (commits.length === 0 && !footer) {
+            state.footerView = current
+            return
+          }
 
-  const runPromptTurn = async (next: SessionTurnInput): Promise<void> => {
-    if (next.signal?.aborted || input.footer.isClosed) {
-      return
-    }
+          input.trace?.write("reduce.output", {
+            commits,
+            footer: traceFooterOutput(footer),
+          })
+          writeSessionOutput(
+            {
+              footer: input.footer,
+              trace: input.trace,
+            },
+            {
+              commits,
+              footer,
+            },
+          )
+          state.footerView = current
+        }
 
-    if (fault) {
-      throw fault
-    }
+        const messages = (sessionID: string, limit: number) =>
+          Effect.promise(() =>
+            input.sdk.session.messages({
+              sessionID,
+              limit,
+            }),
+          ).pipe(
+            Effect.map((item) => item.data ?? []),
+            Effect.orElseSucceed(() => []),
+          )
+
+        const bootstrap = Effect.fn("RunStreamTransport.bootstrap")(function* () {
+          const [messagesList, children, permissions, questions] = yield* Effect.all(
+            [
+              messages(input.sessionID, SUBAGENT_BOOTSTRAP_LIMIT),
+              Effect.promise(() =>
+                input.sdk.session.children({
+                  sessionID: input.sessionID,
+                }),
+              ).pipe(
+                Effect.map((item) => item.data ?? []),
+                Effect.orElseSucceed(() => []),
+              ),
+              Effect.promise(() => input.sdk.permission.list()).pipe(
+                Effect.map((item) => item.data ?? []),
+                Effect.orElseSucceed(() => []),
+              ),
+              Effect.promise(() => input.sdk.question.list()).pipe(
+                Effect.map((item) => item.data ?? []),
+                Effect.orElseSucceed(() => []),
+              ),
+            ],
+            {
+              concurrency: "unbounded",
+            },
+          )
+
+          bootstrapSessionData({
+            data: state.data,
+            messages: messagesList,
+            permissions: permissions.filter((item) => item.sessionID === input.sessionID),
+            questions: questions.filter((item) => item.sessionID === input.sessionID),
+          })
+          bootstrapSubagentData({
+            data: state.subagent,
+            messages: messagesList,
+            children,
+            permissions,
+            questions,
+          })
 
-    if (wait) {
-      throw new Error("prompt already running")
-    }
+          const sessions = [
+            ...new Set(
+              listSubagentPermissions(state.subagent)
+                .filter((item) => item.tool && item.metadata?.input === undefined)
+                .map((item) => item.sessionID),
+            ),
+          ]
+          yield* Effect.forEach(
+            sessions,
+            (sessionID) =>
+              messages(sessionID, SUBAGENT_CALL_BOOTSTRAP_LIMIT).pipe(
+                Effect.tap((messagesList) =>
+                  Effect.sync(() => {
+                    bootstrapSubagentCalls({
+                      data: state.subagent,
+                      sessionID,
+                      messages: messagesList,
+                    })
+                  }),
+                ),
+              ),
+            {
+              concurrency: "unbounded",
+              discard: true,
+            },
+          )
+
+          for (const request of [
+            ...state.data.permissions,
+            ...listSubagentPermissions(state.subagent),
+            ...state.data.questions,
+            ...listSubagentQuestions(state.subagent),
+          ].sort((a, b) => a.id.localeCompare(b.id))) {
+            seedBlocker(request.id)
+          }
+
+          const snapshot = currentSubagentState()
+          traceTabs(input.trace, [], snapshot.tabs)
+          syncFooter([], undefined, snapshot)
+        })
 
-    const prevTabs = listSubagentTabs(subagent)
-    if (clearFinishedSubagents(subagent)) {
-      const snapshot = currentSubagentState()
-      traceTabs(input.trace, prevTabs, snapshot.tabs)
-      syncFooter([], undefined, snapshot)
-    }
+        const idle = Effect.fn("RunStreamTransport.idle")(() =>
+          Effect.promise(() => input.sdk.session.status()).pipe(
+            Effect.map((out) => {
+              const item = out.data?.[input.sessionID]
+              return !item || item.type === "idle"
+            }),
+            Effect.orElseSucceed(() => true),
+          ),
+        )
+
+        const fail = Effect.fn("RunStreamTransport.fail")(function* (error: unknown) {
+          if (state.fault) {
+            return
+          }
+
+          state.fault = error
+          const next = state.wait
+          state.wait = undefined
+          if (!next) {
+            return
+          }
+
+          yield* Deferred.fail(next.done, error).pipe(Effect.ignore)
+        })
 
-    const item = defer(tick)
-    wait = item
-    data.announced = false
+        const touch = (event: Event) => {
+          const next = state.wait
+          if (!next || !active(event, input.sessionID)) {
+            return
+          }
 
-    const turn = new AbortController()
-    const stop = () => {
-      turn.abort()
-    }
-    next.signal?.addEventListener("abort", stop, { once: true })
-    abort.signal.addEventListener("abort", stop, { once: true })
-
-    try {
-      const req = {
-        sessionID: input.sessionID,
-        agent: next.agent,
-        model: next.model,
-        variant: next.variant,
-        parts: [
-          ...(next.includeFiles ? next.files : []),
-          { type: "text" as const, text: next.prompt.text },
-          ...next.prompt.parts,
-        ],
-      }
-      input.trace?.write("send.prompt", req)
-      await input.sdk.session.promptAsync(req, {
-        signal: turn.signal,
-      })
-      input.trace?.write("send.prompt.ok", {
-        sessionID: input.sessionID,
-      })
-
-      item.armed = true
-
-      if (turn.signal.aborted || next.signal?.aborted || input.footer.isClosed) {
-        if (wait === item) {
-          wait = undefined
+          next.live = true
         }
-        flush("turn.abort")
-        return
-      }
 
-      if (!input.footer.isClosed && !data.announced) {
-        input.trace?.write("ui.patch", {
-          phase: "running",
-          status: "waiting for assistant",
-        })
-        input.footer.event({
-          type: "turn.wait",
+        const mark = Effect.fn("RunStreamTransport.mark")(function* (event: Event) {
+          if (
+            event.type !== "session.status" ||
+            event.properties.sessionID !== input.sessionID ||
+            event.properties.status.type !== "idle"
+          ) {
+            return
+          }
+
+          const next = state.wait
+          if (!next || !next.armed || !next.live) {
+            return
+          }
+
+          if (!(yield* idle()) || state.wait !== next) {
+            return
+          }
+
+          state.tick = next.tick + 1
+          state.wait = undefined
+          yield* Deferred.succeed(next.done, undefined).pipe(Effect.ignore)
         })
-      }
 
-      if (tick > item.tick) {
-        if (wait === item) {
-          wait = undefined
+        const flush = (type: "turn.abort" | "turn.cancel") => {
+          const commits: StreamCommit[] = []
+          flushInterrupted(state.data, commits)
+          syncFooter(commits)
+          input.trace?.write(type, {
+            sessionID: input.sessionID,
+          })
         }
-        return
-      }
 
-      const state = await waitTurn(item.done, turn.signal)
-      if (wait === item) {
-        wait = undefined
-      }
-
-      if (state === "abort") {
-        flush("turn.abort")
-      }
-
-      return
-    } catch (error) {
-      if (wait === item) {
-        wait = undefined
-      }
-
-      const canceled = turn.signal.aborted || next.signal?.aborted === true || input.footer.isClosed
-      if (canceled) {
-        flush("turn.cancel")
-        return
-      }
+        const watch = Effect.fn("RunStreamTransport.watch")(() =>
+          Stream.fromAsyncIterable(events.stream as AsyncIterable<unknown>, (error) =>
+            error instanceof Error ? error : new Error(String(error)),
+          ).pipe(
+            Stream.takeUntil(() => input.footer.isClosed || abort.signal.aborted),
+            Stream.runForEach(
+              Effect.fn("RunStreamTransport.event")(function* (item: unknown) {
+                if (input.footer.isClosed) {
+                  abort.abort()
+                  return
+                }
+
+                const event = item as Event
+                input.trace?.write("recv.event", event)
+                trackBlocker(event)
+
+                const prev = event.type === "message.part.updated" ? listSubagentTabs(state.subagent) : undefined
+                const next = reduceSessionData({
+                  data: state.data,
+                  event,
+                  sessionID: input.sessionID,
+                  thinking: input.thinking,
+                  limits: input.limits(),
+                })
+                state.data = next.data
+
+                const changed = reduceSubagentData({
+                  data: state.subagent,
+                  event,
+                  sessionID: input.sessionID,
+                  thinking: input.thinking,
+                  limits: input.limits(),
+                })
+                if (changed && prev) {
+                  traceTabs(input.trace, prev, listSubagentTabs(state.subagent))
+                }
+                releaseBlocker(event)
+
+                syncFooter(next.commits, next.footer?.patch, changed ? currentSubagentState() : undefined)
+
+                touch(event)
+                yield* mark(event)
+              }),
+            ),
+            Effect.catch((error) => (abort.signal.aborted ? Effect.void : fail(error))),
+            Effect.ensuring(
+              Effect.gen(function* () {
+                if (!abort.signal.aborted && !state.fault) {
+                  yield* fail(new Error("session event stream closed"))
+                }
+                closeStream()
+              }),
+            ),
+          ),
+        )
+
+        yield* bootstrap()
+        yield* Scope.provide(scope)(watch().pipe(Effect.forkScoped))
+
+        const runPromptTurn = Effect.fn("RunStreamTransport.runPromptTurn")(function* (next: SessionTurnInput) {
+          if (closed || next.signal?.aborted || input.footer.isClosed) {
+            return
+          }
+
+          if (state.fault) {
+            return yield* Effect.fail(state.fault)
+          }
+
+          if (state.wait) {
+            return yield* Effect.fail(new Error("prompt already running"))
+          }
+
+          const prev = listSubagentTabs(state.subagent)
+          if (clearFinishedSubagents(state.subagent)) {
+            const snapshot = currentSubagentState()
+            traceTabs(input.trace, prev, snapshot.tabs)
+            syncFooter([], undefined, snapshot)
+          }
+
+          const item: Wait = {
+            tick: state.tick,
+            armed: false,
+            live: false,
+            done: yield* Deferred.make<void, unknown>(),
+          }
+          state.wait = item
+          state.data.announced = false
+
+          const turn = new AbortController()
+          const stop = () => {
+            turn.abort()
+          }
+          next.signal?.addEventListener("abort", stop, { once: true })
+          abort.signal.addEventListener("abort", stop, { once: true })
+
+          const req = {
+            sessionID: input.sessionID,
+            agent: next.agent,
+            model: next.model,
+            variant: next.variant,
+            parts: [
+              ...(next.includeFiles ? next.files : []),
+              { type: "text" as const, text: next.prompt.text },
+              ...next.prompt.parts,
+            ],
+          }
+          input.trace?.write("send.prompt", req)
+
+          const send = Effect.promise(() =>
+            input.sdk.session.promptAsync(req, {
+              signal: turn.signal,
+            }),
+          ).pipe(
+            Effect.tap(() =>
+              Effect.sync(() => {
+                input.trace?.write("send.prompt.ok", {
+                  sessionID: input.sessionID,
+                })
+                item.armed = true
+              }),
+            ),
+          )
+
+          return yield* send.pipe(
+            Effect.flatMap(() => {
+              if (turn.signal.aborted || next.signal?.aborted || input.footer.isClosed || closed) {
+                if (state.wait === item) {
+                  state.wait = undefined
+                }
+                flush("turn.abort")
+                return Effect.void
+              }
+
+              if (!input.footer.isClosed && !state.data.announced) {
+                input.trace?.write("ui.patch", {
+                  phase: "running",
+                  status: "waiting for assistant",
+                })
+                input.footer.event({
+                  type: "turn.wait",
+                })
+              }
+
+              if (state.tick > item.tick) {
+                if (state.wait === item) {
+                  state.wait = undefined
+                }
+                return Effect.void
+              }
+
+              return waitTurn(item.done, turn.signal).pipe(
+                Effect.flatMap((status) =>
+                  Effect.sync(() => {
+                    if (state.wait === item) {
+                      state.wait = undefined
+                    }
+
+                    if (status === "abort") {
+                      flush("turn.abort")
+                    }
+                  }),
+                ),
+              )
+            }),
+            Effect.catch((error) => {
+              if (state.wait === item) {
+                state.wait = undefined
+              }
+
+              const canceled = turn.signal.aborted || next.signal?.aborted === true || input.footer.isClosed || closed
+              if (canceled) {
+                flush("turn.cancel")
+                return Effect.void
+              }
+
+              if (error === state.fault) {
+                return Effect.fail(error)
+              }
+
+              input.trace?.write("send.prompt.error", {
+                sessionID: input.sessionID,
+                error: formatUnknownError(error),
+              })
+              return Effect.fail(error)
+            }),
+            Effect.ensuring(
+              Effect.sync(() => {
+                input.trace?.write("turn.end", {
+                  sessionID: input.sessionID,
+                })
+                next.signal?.removeEventListener("abort", stop)
+                abort.signal.removeEventListener("abort", stop)
+              }),
+            ),
+          )
+        })
 
-      if (error === fault) {
-        throw error
-      }
+        const selectSubagent = Effect.fn("RunStreamTransport.selectSubagent")((sessionID: string | undefined) =>
+          Effect.sync(() => {
+            if (closed) {
+              return
+            }
 
-      input.trace?.write("send.prompt.error", {
-        sessionID: input.sessionID,
-        error: formatUnknownError(error),
-      })
-      throw error
-    } finally {
-      input.trace?.write("turn.end", {
-        sessionID: input.sessionID,
-      })
-      next.signal?.removeEventListener("abort", stop)
-      abort.signal.removeEventListener("abort", stop)
-    }
-  }
+            const next = sessionID && state.subagent.tabs.has(sessionID) ? sessionID : undefined
+            if (state.selectedSubagent === next) {
+              return
+            }
 
-  const selectSubagent = (sessionID: string | undefined): void => {
-    const next = sessionID && subagent.tabs.has(sessionID) ? sessionID : undefined
-    if (selectedSubagent === next) {
-      return
-    }
+            state.selectedSubagent = next
+            syncFooter([], undefined, currentSubagentState())
+          }),
+        )
 
-    selectedSubagent = next
-    syncFooter([], undefined, currentSubagentState())
-  }
+        const close = Effect.fn("RunStreamTransport.close")(function* () {
+          yield* closeScope()
+        })
 
-  const close = async () => {
-    if (closed) {
-      return
-    }
+        return Service.of({
+          runPromptTurn,
+          selectSubagent,
+          close,
+        })
+      }),
+    ),
+  )
+}
 
-    closed = true
-    input.signal?.removeEventListener("abort", halt)
-    abort.abort()
-    closeStream()
-    await watch.catch(() => {})
-  }
+// Opens an SDK event subscription and returns a SessionTransport.
+//
+// The background `watch` loop consumes every SDK event, runs it through the
+// reducer, and writes output to the footer. When a session.status idle
+// event arrives, it resolves the current turn's Wait so runPromptTurn()
+// can return.
+//
+// The transport is single-turn: only one runPromptTurn() call can be active
+// at a time. The prompt queue enforces this from above.
+export async function createSessionTransport(input: StreamInput): Promise<SessionTransport> {
+  const runtime = makeRuntime(Service, createLayer(input))
+  await runtime.runPromise(() => Effect.void)
 
   return {
-    runPromptTurn,
-    selectSubagent,
-    close,
+    runPromptTurn: (next) => runtime.runPromise((svc) => svc.runPromptTurn(next)),
+    selectSubagent: (sessionID) => runtime.runSync((svc) => svc.selectSubagent(sessionID)),
+    close: () => runtime.runPromise((svc) => svc.close()),
   }
 }

+ 130 - 0
packages/opencode/test/cli/run/stream.transport.test.ts

@@ -102,6 +102,46 @@ function feed() {
   }
 }
 
+function blockingFeed() {
+  let done = false
+  let wake: (() => void) | undefined
+  const started = defer()
+
+  const stream: AsyncIterableIterator<unknown> = {
+    [Symbol.asyncIterator]() {
+      return this
+    },
+    next() {
+      started.resolve()
+      if (done) {
+        return Promise.resolve({ done: true, value: undefined })
+      }
+
+      return new Promise((resolve) => {
+        wake = () => {
+          done = true
+          wake = undefined
+          resolve({ done: true, value: undefined })
+        }
+      })
+    },
+    return() {
+      done = true
+      wake?.()
+      wake = undefined
+      return Promise.resolve({ done: true, value: undefined })
+    },
+    throw(error) {
+      done = true
+      wake?.()
+      wake = undefined
+      return Promise.reject(error)
+    },
+  }
+
+  return { stream, started }
+}
+
 function footer(fn?: (commit: StreamCommit) => void) {
   const commits: StreamCommit[] = []
   const events: FooterEvent[] = []
@@ -580,6 +620,96 @@ describe("run stream transport", () => {
     }
   })
 
+  test("closes an active turn without rejecting it", async () => {
+    const src = feed()
+    const ui = footer()
+    const ready = defer()
+    let aborted = false
+
+    const transport = await createSessionTransport({
+      sdk: sdk(src, {
+        promptAsync: async (_input, opt) => {
+          ready.resolve()
+          await new Promise<void>((resolve) => {
+            const onAbort = () => {
+              aborted = true
+              opt?.signal?.removeEventListener("abort", onAbort)
+              resolve()
+            }
+
+            opt?.signal?.addEventListener("abort", onAbort, { once: true })
+          })
+        },
+      }),
+      sessionID: "session-1",
+      thinking: true,
+      limits: () => ({}),
+      footer: ui.api,
+    })
+
+    try {
+      const task = transport.runPromptTurn({
+        agent: undefined,
+        model: undefined,
+        variant: undefined,
+        prompt: { text: "hello", parts: [] },
+        files: [],
+        includeFiles: false,
+      })
+
+      await ready.promise
+      await transport.close()
+      await task
+
+      expect(aborted).toBe(true)
+    } finally {
+      src.close()
+      await transport.close()
+    }
+  })
+
+  test("closes while the event stream is waiting for the next item", async () => {
+    const src = blockingFeed()
+    const ui = footer()
+    const transport = await createSessionTransport({
+      sdk: {
+        event: {
+          subscribe: async () => ({
+            stream: src.stream,
+          }),
+        },
+        session: {
+          promptAsync: async () => {},
+          status: async () => ({ data: {} }),
+          messages: async () => ({ data: [] }),
+          children: async () => ({ data: [] }),
+        },
+        permission: {
+          list: async () => ({ data: [] }),
+        },
+        question: {
+          list: async () => ({ data: [] }),
+        },
+      } as unknown as OpencodeClient,
+      sessionID: "session-1",
+      thinking: true,
+      limits: () => ({}),
+      footer: ui.api,
+    })
+
+    try {
+      await src.started.promise
+      await Promise.race([
+        transport.close(),
+        new Promise<never>((_, reject) => {
+          setTimeout(() => reject(new Error("close timed out")), 100)
+        }),
+      ])
+    } finally {
+      await transport.close()
+    }
+  })
+
   test("ignores stale idle events from an earlier turn", async () => {
     const src = feed()
     const ui = footer()