ソースを参照

runtime cleanup

Simon Klee 22 時間 前
コミット
a9c993f5ff
1 ファイル変更160 行追加140 行削除
  1. 160 140
      packages/opencode/src/cli/cmd/run/runtime.ts

+ 160 - 140
packages/opencode/src/cli/cmd/run/runtime.ts

@@ -18,7 +18,6 @@ import { createRunDemo } from "./demo"
 import { resolveDiffStyle, resolveFooterKeybinds, resolveModelInfo, resolveSessionInfo } from "./runtime.boot"
 import { createRuntimeLifecycle } from "./runtime.lifecycle"
 import { recordRunSpanError, setRunSpanAttributes, withRunSpan } from "./otel"
-import { reusePendingTask } from "./runtime.shared"
 import { trace } from "./trace"
 import { cycleVariant, formatModelLabel, resolveSavedVariant, resolveVariant, saveVariant } from "./variant.shared"
 import type { RunInput } from "./types"
@@ -68,6 +67,48 @@ type StreamState = {
   handle: Awaited<ReturnType<Awaited<typeof import("./stream.transport")>["createSessionTransport"]>>
 }
 
+type ResolvedSession = {
+  sessionID: string
+  sessionTitle?: string
+  agent?: string | undefined
+}
+
+type RuntimeState = {
+  shown: boolean
+  aborting: boolean
+  variants: string[]
+  limits: Record<string, number>
+  activeVariant: string | undefined
+  sessionID: string
+  sessionTitle?: string
+  agent: string | undefined
+  demo?: ReturnType<typeof createRunDemo>
+  selectSubagent?: (sessionID: string | undefined) => void
+  session?: Promise<void>
+  stream?: Promise<StreamState>
+}
+
+function hasSession(input: RunRuntimeInput, state: RuntimeState) {
+  return !input.resolveSession || !!state.sessionID
+}
+
+function eagerStream(input: RunRuntimeInput, ctx: BootContext) {
+  return ctx.resume === true || !input.resolveSession || !!input.demo
+}
+
+async function resolveExitTitle(ctx: BootContext, input: RunRuntimeInput, state: RuntimeState) {
+  if (!state.shown || !hasSession(input, state)) {
+    return
+  }
+
+  return ctx.sdk.session
+    .get({
+      sessionID: state.sessionID,
+    })
+    .then((x) => x.data?.title)
+    .catch(() => undefined)
+}
+
 // Core runtime loop. Boot resolves the SDK context, then we set up the
 // lifecycle (renderer + footer), wire the stream transport for SDK events,
 // and feed prompts through the queue until the user exits.
@@ -98,55 +139,51 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
               variant: undefined,
             })
       const savedTask = resolveSavedVariant(ctx.model)
-      let variants: string[] = []
-      let limits: Record<string, number> = {}
-      let aborting = false
-      let shown = false
-      let demo: ReturnType<typeof createRunDemo> | undefined
       const [keybinds, diffStyle, session, savedVariant] = await Promise.all([
         keybindTask,
         diffTask,
         sessionTask,
         savedTask,
       ])
-      shown = !session.first
-      let activeVariant = resolveVariant(ctx.variant, session.variant, savedVariant, variants)
-      let sessionID = ctx.sessionID
-      let sessionTitle = ctx.sessionTitle
-      let agent = ctx.agent
-      let hasSession = !input.resolveSession
+      const state: RuntimeState = {
+        shown: !session.first,
+        aborting: false,
+        variants: [],
+        limits: {},
+        activeVariant: resolveVariant(ctx.variant, session.variant, savedVariant, []),
+        sessionID: ctx.sessionID,
+        sessionTitle: ctx.sessionTitle,
+        agent: ctx.agent,
+      }
       setRunSpanAttributes(span, {
         "opencode.directory": ctx.directory,
         "opencode.resume": ctx.resume === true,
-        "opencode.agent.name": agent,
+        "opencode.agent.name": state.agent,
         "opencode.model.provider": ctx.model?.providerID,
         "opencode.model.id": ctx.model?.modelID,
-        "opencode.model.variant": activeVariant,
-        "session.id": sessionID || undefined,
+        "opencode.model.variant": state.activeVariant,
+        "session.id": state.sessionID || undefined,
       })
-      let resolving: Promise<void> | undefined
       const ensureSession = () => {
-        if (!input.resolveSession) {
+        if (!input.resolveSession || state.sessionID) {
           return Promise.resolve()
         }
 
-        if (resolving) {
-          return resolving
+        if (state.session) {
+          return state.session
         }
 
-        resolving = input.resolveSession(ctx).then((next) => {
-          sessionID = next.sessionID
-          sessionTitle = next.sessionTitle
-          agent = next.agent
-          hasSession = true
+        state.session = input.resolveSession(ctx).then((next) => {
+          state.sessionID = next.sessionID
+          state.sessionTitle = next.sessionTitle
+          state.agent = next.agent
           setRunSpanAttributes(span, {
-            "opencode.agent.name": agent,
-            "session.id": sessionID,
+            "opencode.agent.name": state.agent,
+            "session.id": state.sessionID,
           })
         })
-        return resolving
+        return state.session
       }
-      let selectSubagent: ((sessionID: string | undefined) => void) | undefined
 
       const shell = await createRuntimeLifecycle({
         directory: ctx.directory,
@@ -157,18 +194,18 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
             .catch(() => []),
         agents: [],
         resources: [],
-        sessionID,
-        sessionTitle,
-        getSessionID: () => sessionID,
+        sessionID: state.sessionID,
+        sessionTitle: state.sessionTitle,
+        getSessionID: () => state.sessionID,
         first: session.first,
         history: session.history,
-        agent,
+        agent: state.agent,
         model: ctx.model,
-        variant: activeVariant,
+        variant: state.activeVariant,
         keybinds,
         diffStyle,
         onPermissionReply: async (next) => {
-          if (demo?.permission(next)) {
+          if (state.demo?.permission(next)) {
             return
           }
 
@@ -176,57 +213,53 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
           await ctx.sdk.permission.reply(next)
         },
         onQuestionReply: async (next) => {
-          if (demo?.questionReply(next)) {
+          if (state.demo?.questionReply(next)) {
             return
           }
 
           await ctx.sdk.question.reply(next)
         },
         onQuestionReject: async (next) => {
-          if (demo?.questionReject(next)) {
+          if (state.demo?.questionReject(next)) {
             return
           }
 
           await ctx.sdk.question.reject(next)
         },
         onCycleVariant: () => {
-          if (!ctx.model || variants.length === 0) {
+          if (!ctx.model || state.variants.length === 0) {
             return {
               status: "no variants available",
             }
           }
 
-          activeVariant = cycleVariant(activeVariant, variants)
-          saveVariant(ctx.model, activeVariant)
+          state.activeVariant = cycleVariant(state.activeVariant, state.variants)
+          saveVariant(ctx.model, state.activeVariant)
           setRunSpanAttributes(span, {
-            "opencode.model.variant": activeVariant,
+            "opencode.model.variant": state.activeVariant,
           })
           return {
-            status: activeVariant ? `variant ${activeVariant}` : "variant default",
-            modelLabel: formatModelLabel(ctx.model, activeVariant),
+            status: state.activeVariant ? `variant ${state.activeVariant}` : "variant default",
+            modelLabel: formatModelLabel(ctx.model, state.activeVariant),
           }
         },
         onInterrupt: () => {
-          if (!hasSession) {
-            return
-          }
-
-          if (aborting) {
+          if (!hasSession(input, state) || state.aborting) {
             return
           }
 
-          aborting = true
+          state.aborting = true
           void ctx.sdk.session
             .abort({
-              sessionID,
+              sessionID: state.sessionID,
             })
             .catch(() => {})
             .finally(() => {
-              aborting = false
+              state.aborting = false
             })
         },
         onSubagentSelect: (sessionID) => {
-          selectSubagent?.(sessionID)
+          state.selectSubagent?.(sessionID)
           log?.write("subagent.select", {
             sessionID,
           })
@@ -234,38 +267,6 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
       })
       const footer = shell.footer
 
-      let catalogTask: Promise<void> | undefined
-      const loadCatalog = () => {
-        if (catalogTask) {
-          return catalogTask
-        }
-
-        catalogTask = Promise.all([
-          ctx.sdk.app
-            .agents({ directory: ctx.directory })
-            .then((x) => x.data ?? [])
-            .catch(() => []),
-          ctx.sdk.experimental.resource
-            .list({ directory: ctx.directory })
-            .then((x) => Object.values(x.data ?? {}))
-            .catch(() => []),
-        ])
-          .then(([agents, resources]) => {
-            if (footer.isClosed) {
-              return
-            }
-
-            footer.event({
-              type: "catalog",
-              agents,
-              resources,
-            })
-          })
-          .catch(() => {})
-
-        return catalogTask
-      }
-
       void footer
         .idle()
         .then(() => {
@@ -273,7 +274,28 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
             return
           }
 
-          void loadCatalog()
+          return Promise.all([
+            ctx.sdk.app
+              .agents({ directory: ctx.directory })
+              .then((x) => x.data ?? [])
+              .catch(() => []),
+            ctx.sdk.experimental.resource
+              .list({ directory: ctx.directory })
+              .then((x) => Object.values(x.data ?? {}))
+              .catch(() => []),
+          ])
+            .then(([agents, resources]) => {
+              if (footer.isClosed) {
+                return
+              }
+
+              footer.event({
+                type: "catalog",
+                agents,
+                resources,
+              })
+            })
+            .catch(() => {})
         })
         .catch(() => {})
 
@@ -288,13 +310,13 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
 
       if (input.demo) {
         await ensureSession()
-        demo = createRunDemo({
+        state.demo = createRunDemo({
           mode: input.demo,
           text: input.demoText,
           footer,
-          sessionID,
+          sessionID: state.sessionID,
           thinking: input.thinking,
-          limits: () => limits,
+          limits: () => state.limits,
         })
       }
 
@@ -303,17 +325,17 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
       }
 
       void modelTask.then((info) => {
-        variants = info.variants
-        limits = info.limits
+        state.variants = info.variants
+        state.limits = info.limits
 
-        const next = resolveVariant(ctx.variant, session.variant, savedVariant, variants)
-        if (next === activeVariant) {
+        const next = resolveVariant(ctx.variant, session.variant, savedVariant, state.variants)
+        if (next === state.activeVariant) {
           return
         }
 
-        activeVariant = next
+        state.activeVariant = next
         setRunSpanAttributes(span, {
-          "opencode.model.variant": activeVariant,
+          "opencode.model.variant": state.activeVariant,
         })
         if (!ctx.model || footer.isClosed) {
           return
@@ -321,19 +343,19 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
 
         footer.event({
           type: "model",
-          model: formatModelLabel(ctx.model, activeVariant),
+          model: formatModelLabel(ctx.model, state.activeVariant),
         })
       })
 
       const streamTask = import("./stream.transport")
-      let stream: StreamState | undefined
-      const loading: { current?: Promise<StreamState> } = {}
       const ensureStream = () => {
-        if (stream) {
-          return Promise.resolve(stream)
+        if (state.stream) {
+          return state.stream
         }
 
-        return reusePendingTask(loading, async () => {
+        // Share eager prewarm and first-turn boot through one in-flight promise,
+        // but clear it if transport creation fails so a later prompt can retry.
+        const next = (async () => {
           await ensureSession()
           if (footer.isClosed) {
             throw new Error("runtime closed")
@@ -346,9 +368,9 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
 
           const handle = await mod.createSessionTransport({
             sdk: ctx.sdk,
-            sessionID,
+            sessionID: state.sessionID,
             thinking: input.thinking,
-            limits: () => limits,
+            limits: () => state.limits,
             footer,
             trace: log,
           })
@@ -357,17 +379,22 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
             throw new Error("runtime closed")
           }
 
-          selectSubagent = handle.selectSubagent
-          const next = { mod, handle }
-          stream = next
-          return next
+          state.selectSubagent = handle.selectSubagent
+          return { mod, handle }
+        })()
+        state.stream = next
+        void next.catch(() => {
+          if (state.stream === next) {
+            state.stream = undefined
+          }
         })
+        return next
       }
 
       const runQueue = async () => {
         let includeFiles = true
-        if (demo) {
-          await demo.start()
+        if (state.demo) {
+          await state.demo.start()
         }
 
         const mod = await import("./runtime.queue")
@@ -376,38 +403,38 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
           initialInput: input.initialInput,
           trace: log,
           onPrompt: () => {
-            shown = true
+            state.shown = true
           },
           run: async (prompt, signal) => {
-            if (demo && (await demo.prompt(prompt, signal))) {
+            if (state.demo && (await state.demo.prompt(prompt, signal))) {
               return
             }
 
             return withRunSpan(
               "RunInteractive.turn",
               {
-                "opencode.agent.name": agent,
+                "opencode.agent.name": state.agent,
                 "opencode.model.provider": ctx.model?.providerID,
                 "opencode.model.id": ctx.model?.modelID,
-                "opencode.model.variant": activeVariant,
+                "opencode.model.variant": state.activeVariant,
                 "opencode.prompt.chars": prompt.text.length,
                 "opencode.prompt.parts": prompt.parts.length,
                 "opencode.prompt.include_files": includeFiles,
                 "opencode.prompt.file_parts": includeFiles ? input.files.length : 0,
-                "session.id": sessionID || undefined,
+                "session.id": state.sessionID || undefined,
               },
               async (span) => {
                 try {
                   const next = await ensureStream()
                   setRunSpanAttributes(span, {
-                    "opencode.agent.name": agent,
-                    "opencode.model.variant": activeVariant,
-                    "session.id": sessionID || undefined,
+                    "opencode.agent.name": state.agent,
+                    "opencode.model.variant": state.activeVariant,
+                    "session.id": state.sessionID || undefined,
                   })
                   await next.handle.runPromptTurn({
-                    agent,
+                    agent: state.agent,
                     model: ctx.model,
-                    variant: activeVariant,
+                    variant: state.activeVariant,
                     prompt,
                     files: input.files,
                     includeFiles,
@@ -421,7 +448,8 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
 
                   recordRunSpanError(span, error)
                   const text =
-                    stream?.mod.formatUnknownError(error) ?? (error instanceof Error ? error.message : String(error))
+                    (await state.stream?.then((item) => item.mod).catch(() => undefined))?.formatUnknownError(error) ??
+                    (error instanceof Error ? error.message : String(error))
                   footer.append({ kind: "error", text, phase: "start", source: "system" })
                 }
               },
@@ -431,7 +459,7 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
       }
 
       try {
-        const eager = ctx.resume === true || !input.resolveSession || !!input.demo
+        const eager = eagerStream(input, ctx)
         if (eager) {
           await ensureStream()
         }
@@ -449,23 +477,15 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
         try {
           await runQueue()
         } finally {
-          await stream?.handle.close()
+          await state.stream?.then((item) => item.handle.close()).catch(() => {})
         }
       } finally {
-        const title =
-          shown && hasSession
-            ? await ctx.sdk.session
-                .get({
-                  sessionID,
-                })
-                .then((x) => x.data?.title)
-                .catch(() => undefined)
-            : undefined
+        const title = await resolveExitTitle(ctx, input, state)
 
         await shell.close({
-          showExit: shown && hasSession,
+          showExit: state.shown && hasSession(input, state),
           sessionTitle: title,
-          sessionID,
+          sessionID: state.sessionID,
         })
       }
     },
@@ -488,7 +508,7 @@ export async function runInteractiveLocalMode(input: RunLocalInput): Promise<voi
         fetch: input.fetch,
         directory: input.directory,
       })
-      let pending: Promise<{ sessionID: string; sessionTitle?: string; agent?: string | undefined }> | undefined
+      let session: Promise<ResolvedSession> | undefined
 
       return runInteractiveRuntime({
         files: input.files,
@@ -497,23 +517,23 @@ export async function runInteractiveLocalMode(input: RunLocalInput): Promise<voi
         demo: input.demo,
         demoText: input.demoText,
         resolveSession: () => {
-          if (pending) {
-            return pending
+          if (session) {
+            return session
           }
 
-          pending = Promise.all([input.resolveAgent(), input.session(sdk)]).then(([agent, session]) => {
-            if (!session?.id) {
+          session = Promise.all([input.resolveAgent(), input.session(sdk)]).then(([agent, next]) => {
+            if (!next?.id) {
               throw new Error("Session not found")
             }
 
-            void input.share(sdk, session.id).catch(() => {})
+            void input.share(sdk, next.id).catch(() => {})
             return {
-              sessionID: session.id,
-              sessionTitle: session.title,
+              sessionID: next.id,
+              sessionTitle: next.title,
               agent,
             }
           })
-          return pending
+          return session
         },
         boot: async () => {
           return {