Преглед изворни кода

refactor(session): move llm stream into layer (#22358)

Kit Langton пре 4 дана
родитељ
комит
e8471256f2
2 измењених фајлова са 355 додато и 439 уклоњено
  1. 342 333
      packages/opencode/src/session/llm.ts
  2. 13 106
      packages/opencode/test/session/llm.test.ts

+ 342 - 333
packages/opencode/src/session/llm.ts

@@ -1,7 +1,6 @@
 import { Provider } from "@/provider/provider"
 import { Log } from "@/util/log"
-import { Cause, Effect, Layer, Record, Context } from "effect"
-import * as Queue from "effect/Queue"
+import { Context, Effect, Layer, Record } from "effect"
 import * as Stream from "effect/Stream"
 import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai"
 import { mergeDeep, pipe } from "remeda"
@@ -21,11 +20,13 @@ import { Wildcard } from "@/util/wildcard"
 import { SessionID } from "@/session/schema"
 import { Auth } from "@/auth"
 import { Installation } from "@/installation"
-import { AppRuntime } from "@/effect/app-runtime"
+import { makeRuntime } from "@/effect/run-service"
 
 export namespace LLM {
   const log = Log.create({ service: "llm" })
+  const perms = makeRuntime(Permission.Service, Permission.defaultLayer)
   export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX
+  type Result = Awaited<ReturnType<typeof streamText>>
 
   export type StreamInput = {
     user: MessageV2.User
@@ -46,7 +47,7 @@ export namespace LLM {
     abort: AbortSignal
   }
 
-  export type Event = Awaited<ReturnType<typeof stream>>["fullStream"] extends AsyncIterable<infer T> ? T : never
+  export type Event = Result["fullStream"] extends AsyncIterable<infer T> ? T : never
 
   export interface Interface {
     readonly stream: (input: StreamInput) => Stream.Stream<Event, unknown>
@@ -54,360 +55,368 @@ export namespace LLM {
 
   export class Service extends Context.Service<Service, Interface>()("@opencode/LLM") {}
 
-  export const layer = Layer.effect(
-    Service,
-    Effect.gen(function* () {
-      return Service.of({
-        stream(input) {
-          return Stream.scoped(
-            Stream.unwrap(
-              Effect.gen(function* () {
-                const ctrl = yield* Effect.acquireRelease(
-                  Effect.sync(() => new AbortController()),
-                  (ctrl) => Effect.sync(() => ctrl.abort()),
-                )
+  export const layer: Layer.Layer<Service, never, Auth.Service | Config.Service | Provider.Service | Plugin.Service> =
+    Layer.effect(
+      Service,
+      Effect.gen(function* () {
+        const auth = yield* Auth.Service
+        const config = yield* Config.Service
+        const provider = yield* Provider.Service
+        const plugin = yield* Plugin.Service
 
-                const result = yield* Effect.promise(() => LLM.stream({ ...input, abort: ctrl.signal }))
+        const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
+          const l = log
+            .clone()
+            .tag("providerID", input.model.providerID)
+            .tag("modelID", input.model.id)
+            .tag("sessionID", input.sessionID)
+            .tag("small", (input.small ?? false).toString())
+            .tag("agent", input.agent.name)
+            .tag("mode", input.agent.mode)
+          l.info("stream", {
+            modelID: input.model.id,
+            providerID: input.model.providerID,
+          })
 
-                return Stream.fromAsyncIterable(result.fullStream, (e) =>
-                  e instanceof Error ? e : new Error(String(e)),
-                )
-              }),
-            ),
+          const [language, cfg, item, info] = yield* Effect.all(
+            [
+              provider.getLanguage(input.model),
+              config.get(),
+              provider.getProvider(input.model.providerID),
+              auth.get(input.model.providerID),
+            ],
+            { concurrency: "unbounded" },
           )
-        },
-      })
-    }),
-  )
 
-  export const defaultLayer = layer
+          // TODO: move this to a proper hook
+          const isOpenaiOauth = item.id === "openai" && info?.type === "oauth"
 
-  export async function stream(input: StreamRequest) {
-    const l = log
-      .clone()
-      .tag("providerID", input.model.providerID)
-      .tag("modelID", input.model.id)
-      .tag("sessionID", input.sessionID)
-      .tag("small", (input.small ?? false).toString())
-      .tag("agent", input.agent.name)
-      .tag("mode", input.agent.mode)
-    l.info("stream", {
-      modelID: input.model.id,
-      providerID: input.model.providerID,
-    })
-    const [language, cfg, provider, info] = await Effect.runPromise(
-      Effect.gen(function* () {
-        const auth = yield* Auth.Service
-        const cfg = yield* Config.Service
-        const provider = yield* Provider.Service
-        return yield* Effect.all(
-          [
-            provider.getLanguage(input.model),
-            cfg.get(),
-            provider.getProvider(input.model.providerID),
-            auth.get(input.model.providerID),
-          ],
-          { concurrency: "unbounded" },
-        )
-      }).pipe(Effect.provide(Layer.mergeAll(Auth.defaultLayer, Config.defaultLayer, Provider.defaultLayer))),
-    )
-    // TODO: move this to a proper hook
-    const isOpenaiOauth = provider.id === "openai" && info?.type === "oauth"
+          const system: string[] = []
+          system.push(
+            [
+              // use agent prompt otherwise provider prompt
+              ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
+              // any custom prompt passed into this call
+              ...input.system,
+              // any custom prompt from last user message
+              ...(input.user.system ? [input.user.system] : []),
+            ]
+              .filter((x) => x)
+              .join("\n"),
+          )
 
-    const system: string[] = []
-    system.push(
-      [
-        // use agent prompt otherwise provider prompt
-        ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
-        // any custom prompt passed into this call
-        ...input.system,
-        // any custom prompt from last user message
-        ...(input.user.system ? [input.user.system] : []),
-      ]
-        .filter((x) => x)
-        .join("\n"),
-    )
+          const header = system[0]
+          yield* plugin.trigger(
+            "experimental.chat.system.transform",
+            { sessionID: input.sessionID, model: input.model },
+            { system },
+          )
+          // rejoin to maintain 2-part structure for caching if header unchanged
+          if (system.length > 2 && system[0] === header) {
+            const rest = system.slice(1)
+            system.length = 0
+            system.push(header, rest.join("\n"))
+          }
 
-    const header = system[0]
-    await Plugin.trigger(
-      "experimental.chat.system.transform",
-      { sessionID: input.sessionID, model: input.model },
-      { system },
-    )
-    // rejoin to maintain 2-part structure for caching if header unchanged
-    if (system.length > 2 && system[0] === header) {
-      const rest = system.slice(1)
-      system.length = 0
-      system.push(header, rest.join("\n"))
-    }
+          const variant =
+            !input.small && input.model.variants && input.user.model.variant
+              ? input.model.variants[input.user.model.variant]
+              : {}
+          const base = input.small
+            ? ProviderTransform.smallOptions(input.model)
+            : ProviderTransform.options({
+                model: input.model,
+                sessionID: input.sessionID,
+                providerOptions: item.options,
+              })
+          const options: Record<string, any> = pipe(
+            base,
+            mergeDeep(input.model.options),
+            mergeDeep(input.agent.options),
+            mergeDeep(variant),
+          )
+          if (isOpenaiOauth) {
+            options.instructions = system.join("\n")
+          }
 
-    const variant =
-      !input.small && input.model.variants && input.user.model.variant
-        ? input.model.variants[input.user.model.variant]
-        : {}
-    const base = input.small
-      ? ProviderTransform.smallOptions(input.model)
-      : ProviderTransform.options({
-          model: input.model,
-          sessionID: input.sessionID,
-          providerOptions: provider.options,
-        })
-    const options: Record<string, any> = pipe(
-      base,
-      mergeDeep(input.model.options),
-      mergeDeep(input.agent.options),
-      mergeDeep(variant),
-    )
-    if (isOpenaiOauth) {
-      options.instructions = system.join("\n")
-    }
+          const isWorkflow = language instanceof GitLabWorkflowLanguageModel
+          const messages = isOpenaiOauth
+            ? input.messages
+            : isWorkflow
+              ? input.messages
+              : [
+                  ...system.map(
+                    (x): ModelMessage => ({
+                      role: "system",
+                      content: x,
+                    }),
+                  ),
+                  ...input.messages,
+                ]
 
-    const isWorkflow = language instanceof GitLabWorkflowLanguageModel
-    const messages = isOpenaiOauth
-      ? input.messages
-      : isWorkflow
-        ? input.messages
-        : [
-            ...system.map(
-              (x): ModelMessage => ({
-                role: "system",
-                content: x,
-              }),
-            ),
-            ...input.messages,
-          ]
+          const params = yield* plugin.trigger(
+            "chat.params",
+            {
+              sessionID: input.sessionID,
+              agent: input.agent.name,
+              model: input.model,
+              provider: item,
+              message: input.user,
+            },
+            {
+              temperature: input.model.capabilities.temperature
+                ? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
+                : undefined,
+              topP: input.agent.topP ?? ProviderTransform.topP(input.model),
+              topK: ProviderTransform.topK(input.model),
+              maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
+              options,
+            },
+          )
 
-    const params = await Plugin.trigger(
-      "chat.params",
-      {
-        sessionID: input.sessionID,
-        agent: input.agent.name,
-        model: input.model,
-        provider,
-        message: input.user,
-      },
-      {
-        temperature: input.model.capabilities.temperature
-          ? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
-          : undefined,
-        topP: input.agent.topP ?? ProviderTransform.topP(input.model),
-        topK: ProviderTransform.topK(input.model),
-        maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
-        options,
-      },
-    )
+          const { headers } = yield* plugin.trigger(
+            "chat.headers",
+            {
+              sessionID: input.sessionID,
+              agent: input.agent.name,
+              model: input.model,
+              provider: item,
+              message: input.user,
+            },
+            {
+              headers: {},
+            },
+          )
 
-    const { headers } = await Plugin.trigger(
-      "chat.headers",
-      {
-        sessionID: input.sessionID,
-        agent: input.agent.name,
-        model: input.model,
-        provider,
-        message: input.user,
-      },
-      {
-        headers: {},
-      },
-    )
+          const tools = resolveTools(input)
 
-    const tools = resolveTools(input)
+          // LiteLLM and some Anthropic proxies require the tools parameter to be present
+          // when message history contains tool calls, even if no tools are being used.
+          // Add a dummy tool that is never called to satisfy this validation.
+          // This is enabled for:
+          // 1. Providers with "litellm" in their ID or API ID (auto-detected)
+          // 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
+          const isLiteLLMProxy =
+            item.options?.["litellmProxy"] === true ||
+            input.model.providerID.toLowerCase().includes("litellm") ||
+            input.model.api.id.toLowerCase().includes("litellm")
 
-    // LiteLLM and some Anthropic proxies require the tools parameter to be present
-    // when message history contains tool calls, even if no tools are being used.
-    // Add a dummy tool that is never called to satisfy this validation.
-    // This is enabled for:
-    // 1. Providers with "litellm" in their ID or API ID (auto-detected)
-    // 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
-    const isLiteLLMProxy =
-      provider.options?.["litellmProxy"] === true ||
-      input.model.providerID.toLowerCase().includes("litellm") ||
-      input.model.api.id.toLowerCase().includes("litellm")
+          // LiteLLM/Bedrock rejects requests where the message history contains tool
+          // calls but no tools param is present. When there are no active tools (e.g.
+          // during compaction), inject a stub tool to satisfy the validation requirement.
+          // The stub description explicitly tells the model not to call it.
+          if (isLiteLLMProxy && Object.keys(tools).length === 0 && hasToolCalls(input.messages)) {
+            tools["_noop"] = tool({
+              description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
+              inputSchema: jsonSchema({
+                type: "object",
+                properties: {
+                  reason: { type: "string", description: "Unused" },
+                },
+              }),
+              execute: async () => ({ output: "", title: "", metadata: {} }),
+            })
+          }
 
-    // LiteLLM/Bedrock rejects requests where the message history contains tool
-    // calls but no tools param is present. When there are no active tools (e.g.
-    // during compaction), inject a stub tool to satisfy the validation requirement.
-    // The stub description explicitly tells the model not to call it.
-    if (isLiteLLMProxy && Object.keys(tools).length === 0 && hasToolCalls(input.messages)) {
-      tools["_noop"] = tool({
-        description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
-        inputSchema: jsonSchema({
-          type: "object",
-          properties: {
-            reason: { type: "string", description: "Unused" },
-          },
-        }),
-        execute: async () => ({ output: "", title: "", metadata: {} }),
-      })
-    }
+          // Wire up toolExecutor for DWS workflow models so that tool calls
+          // from the workflow service are executed via opencode's tool system
+          // and results sent back over the WebSocket.
+          if (language instanceof GitLabWorkflowLanguageModel) {
+            const workflowModel = language as GitLabWorkflowLanguageModel & {
+              sessionID?: string
+              sessionPreapprovedTools?: string[]
+              approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
+            }
+            workflowModel.sessionID = input.sessionID
+            workflowModel.systemPrompt = system.join("\n")
+            workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
+              const t = tools[toolName]
+              if (!t || !t.execute) {
+                return { result: "", error: `Unknown tool: ${toolName}` }
+              }
+              try {
+                const result = await t.execute!(JSON.parse(argsJson), {
+                  toolCallId: _requestID,
+                  messages: input.messages,
+                  abortSignal: input.abort,
+                })
+                const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
+                return {
+                  result: output,
+                  metadata: typeof result === "object" ? result?.metadata : undefined,
+                  title: typeof result === "object" ? result?.title : undefined,
+                }
+              } catch (e: any) {
+                return { result: "", error: e.message ?? String(e) }
+              }
+            }
 
-    // Wire up toolExecutor for DWS workflow models so that tool calls
-    // from the workflow service are executed via opencode's tool system
-    // and results sent back over the WebSocket.
-    if (language instanceof GitLabWorkflowLanguageModel) {
-      const workflowModel = language as GitLabWorkflowLanguageModel & {
-        sessionID?: string
-        sessionPreapprovedTools?: string[]
-        approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
-      }
-      workflowModel.sessionID = input.sessionID
-      workflowModel.systemPrompt = system.join("\n")
-      workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
-        const t = tools[toolName]
-        if (!t || !t.execute) {
-          return { result: "", error: `Unknown tool: ${toolName}` }
-        }
-        try {
-          const result = await t.execute!(JSON.parse(argsJson), {
-            toolCallId: _requestID,
-            messages: input.messages,
+            const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
+            workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
+              const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
+              return !match || match.action !== "ask"
+            })
+
+            const approvedToolsForSession = new Set<string>()
+            workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
+              const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
+              // Auto-approve tools that were already approved in this session
+              // (prevents infinite approval loops for server-side MCP tools)
+              if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
+                return { approved: true }
+              }
+
+              const id = PermissionID.ascending()
+              let reply: Permission.Reply | undefined
+              let unsub: (() => void) | undefined
+              try {
+                unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
+                  if (evt.properties.requestID === id) reply = evt.properties.reply
+                })
+                const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
+                  try {
+                    const parsed = JSON.parse(t.args) as Record<string, unknown>
+                    const title = (parsed?.title ?? parsed?.name ?? "") as string
+                    return title ? `${t.name}: ${title}` : t.name
+                  } catch {
+                    return t.name
+                  }
+                })
+                const uniquePatterns = [...new Set(toolPatterns)] as string[]
+                await perms.runPromise((svc) =>
+                  svc.ask({
+                    id,
+                    sessionID: SessionID.make(input.sessionID),
+                    permission: "workflow_tool_approval",
+                    patterns: uniquePatterns,
+                    metadata: { tools: approvalTools },
+                    always: uniquePatterns,
+                    ruleset: [],
+                  }),
+                )
+                for (const name of uniqueNames) approvedToolsForSession.add(name)
+                workflowModel.sessionPreapprovedTools = [
+                  ...(workflowModel.sessionPreapprovedTools ?? []),
+                  ...uniqueNames,
+                ]
+                return { approved: true }
+              } catch {
+                return { approved: false }
+              } finally {
+                unsub?.()
+              }
+            })
+          }
+
+          return streamText({
+            onError(error) {
+              l.error("stream error", {
+                error,
+              })
+            },
+            async experimental_repairToolCall(failed) {
+              const lower = failed.toolCall.toolName.toLowerCase()
+              if (lower !== failed.toolCall.toolName && tools[lower]) {
+                l.info("repairing tool call", {
+                  tool: failed.toolCall.toolName,
+                  repaired: lower,
+                })
+                return {
+                  ...failed.toolCall,
+                  toolName: lower,
+                }
+              }
+              return {
+                ...failed.toolCall,
+                input: JSON.stringify({
+                  tool: failed.toolCall.toolName,
+                  error: failed.error.message,
+                }),
+                toolName: "invalid",
+              }
+            },
+            temperature: params.temperature,
+            topP: params.topP,
+            topK: params.topK,
+            providerOptions: ProviderTransform.providerOptions(input.model, params.options),
+            activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
+            tools,
+            toolChoice: input.toolChoice,
+            maxOutputTokens: params.maxOutputTokens,
             abortSignal: input.abort,
+            headers: {
+              ...(input.model.providerID.startsWith("opencode")
+                ? {
+                    "x-opencode-project": Instance.project.id,
+                    "x-opencode-session": input.sessionID,
+                    "x-opencode-request": input.user.id,
+                    "x-opencode-client": Flag.OPENCODE_CLIENT,
+                  }
+                : {
+                    "x-session-affinity": input.sessionID,
+                    ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
+                    "User-Agent": `opencode/${Installation.VERSION}`,
+                  }),
+              ...input.model.headers,
+              ...headers,
+            },
+            maxRetries: input.retries ?? 0,
+            messages,
+            model: wrapLanguageModel({
+              model: language,
+              middleware: [
+                {
+                  specificationVersion: "v3" as const,
+                  async transformParams(args) {
+                    if (args.type === "stream") {
+                      // @ts-expect-error
+                      args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
+                    }
+                    return args.params
+                  },
+                },
+              ],
+            }),
+            experimental_telemetry: {
+              isEnabled: cfg.experimental?.openTelemetry,
+              metadata: {
+                userId: cfg.username ?? "unknown",
+                sessionId: input.sessionID,
+              },
+            },
           })
-          const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
-          return {
-            result: output,
-            metadata: typeof result === "object" ? result?.metadata : undefined,
-            title: typeof result === "object" ? result?.title : undefined,
-          }
-        } catch (e: any) {
-          return { result: "", error: e.message ?? String(e) }
-        }
-      }
+        })
 
-      const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
-      workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
-        const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
-        return !match || match.action !== "ask"
-      })
+        const stream: Interface["stream"] = (input) =>
+          Stream.scoped(
+            Stream.unwrap(
+              Effect.gen(function* () {
+                const ctrl = yield* Effect.acquireRelease(
+                  Effect.sync(() => new AbortController()),
+                  (ctrl) => Effect.sync(() => ctrl.abort()),
+                )
 
-      const approvedToolsForSession = new Set<string>()
-      workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
-        const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
-        // Auto-approve tools that were already approved in this session
-        // (prevents infinite approval loops for server-side MCP tools)
-        if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
-          return { approved: true }
-        }
+                const result = yield* run({ ...input, abort: ctrl.signal })
 
-        const id = PermissionID.ascending()
-        let reply: Permission.Reply | undefined
-        let unsub: (() => void) | undefined
-        try {
-          unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
-            if (evt.properties.requestID === id) reply = evt.properties.reply
-          })
-          const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
-            try {
-              const parsed = JSON.parse(t.args) as Record<string, unknown>
-              const title = (parsed?.title ?? parsed?.name ?? "") as string
-              return title ? `${t.name}: ${title}` : t.name
-            } catch {
-              return t.name
-            }
-          })
-          const uniquePatterns = [...new Set(toolPatterns)] as string[]
-          await AppRuntime.runPromise(
-            Permission.Service.use((svc) =>
-              svc.ask({
-                id,
-                sessionID: SessionID.make(input.sessionID),
-                permission: "workflow_tool_approval",
-                patterns: uniquePatterns,
-                metadata: { tools: approvalTools },
-                always: uniquePatterns,
-                ruleset: [],
+                return Stream.fromAsyncIterable(result.fullStream, (e) =>
+                  e instanceof Error ? e : new Error(String(e)),
+                )
               }),
             ),
           )
-          for (const name of uniqueNames) approvedToolsForSession.add(name)
-          workflowModel.sessionPreapprovedTools = [...(workflowModel.sessionPreapprovedTools ?? []), ...uniqueNames]
-          return { approved: true }
-        } catch {
-          return { approved: false }
-        } finally {
-          unsub?.()
-        }
-      })
-    }
 
-    return streamText({
-      onError(error) {
-        l.error("stream error", {
-          error,
-        })
-      },
-      async experimental_repairToolCall(failed) {
-        const lower = failed.toolCall.toolName.toLowerCase()
-        if (lower !== failed.toolCall.toolName && tools[lower]) {
-          l.info("repairing tool call", {
-            tool: failed.toolCall.toolName,
-            repaired: lower,
-          })
-          return {
-            ...failed.toolCall,
-            toolName: lower,
-          }
-        }
-        return {
-          ...failed.toolCall,
-          input: JSON.stringify({
-            tool: failed.toolCall.toolName,
-            error: failed.error.message,
-          }),
-          toolName: "invalid",
-        }
-      },
-      temperature: params.temperature,
-      topP: params.topP,
-      topK: params.topK,
-      providerOptions: ProviderTransform.providerOptions(input.model, params.options),
-      activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
-      tools,
-      toolChoice: input.toolChoice,
-      maxOutputTokens: params.maxOutputTokens,
-      abortSignal: input.abort,
-      headers: {
-        ...(input.model.providerID.startsWith("opencode")
-          ? {
-              "x-opencode-project": Instance.project.id,
-              "x-opencode-session": input.sessionID,
-              "x-opencode-request": input.user.id,
-              "x-opencode-client": Flag.OPENCODE_CLIENT,
-            }
-          : {
-              "x-session-affinity": input.sessionID,
-              ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
-              "User-Agent": `opencode/${Installation.VERSION}`,
-            }),
-        ...input.model.headers,
-        ...headers,
-      },
-      maxRetries: input.retries ?? 0,
-      messages,
-      model: wrapLanguageModel({
-        model: language,
-        middleware: [
-          {
-            specificationVersion: "v3" as const,
-            async transformParams(args) {
-              if (args.type === "stream") {
-                // @ts-expect-error
-                args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
-              }
-              return args.params
-            },
-          },
-        ],
+        return Service.of({ stream })
       }),
-      experimental_telemetry: {
-        isEnabled: cfg.experimental?.openTelemetry,
-        metadata: {
-          userId: cfg.username ?? "unknown",
-          sessionId: input.sessionID,
-        },
-      },
-    })
-  }
+    )
+
+  export const defaultLayer = Layer.suspend(() =>
+    layer.pipe(
+      Layer.provide(Auth.defaultLayer),
+      Layer.provide(Config.defaultLayer),
+      Layer.provide(Provider.defaultLayer),
+      Layer.provide(Plugin.defaultLayer),
+    ),
+  )
 
   function resolveTools(input: Pick<StreamInput, "tools" | "agent" | "permission" | "user">) {
     const disabled = Permission.disabled(

+ 13 - 106
packages/opencode/test/session/llm.test.ts

@@ -26,6 +26,12 @@ async function getModel(providerID: ProviderID, modelID: ModelID) {
   )
 }
 
+const llm = makeRuntime(LLM.Service, LLM.defaultLayer)
+
+async function drain(input: LLM.StreamInput) {
+  return llm.runPromise((svc) => svc.stream(input).pipe(Stream.runDrain))
+}
+
 describe("session.llm.hasToolCalls", () => {
   test("returns false for empty messages array", () => {
     expect(LLM.hasToolCalls([])).toBe(false)
@@ -355,20 +361,16 @@ describe("session.llm.stream", () => {
           model: { providerID: ProviderID.make(providerID), modelID: resolved.id, variant: "high" },
         } satisfies MessageV2.User
 
-        const stream = await LLM.stream({
+        await drain({
           user,
           sessionID,
           model: resolved,
           agent,
           system: ["You are a helpful assistant."],
-          abort: new AbortController().signal,
           messages: [{ role: "user", content: "Hello" }],
           tools: {},
         })
 
-        for await (const _ of stream.fullStream) {
-        }
-
         const capture = await request
         const body = capture.body
         const headers = capture.headers
@@ -393,80 +395,6 @@ describe("session.llm.stream", () => {
     })
   })
 
-  test("raw stream abort signal cancels provider response body promptly", async () => {
-    const server = state.server
-    if (!server) throw new Error("Server not initialized")
-
-    const providerID = "alibaba"
-    const modelID = "qwen-plus"
-    const fixture = await loadFixture(providerID, modelID)
-    const model = fixture.model
-    const pending = waitStreamingRequest("/chat/completions")
-
-    await using tmp = await tmpdir({
-      init: async (dir) => {
-        await Bun.write(
-          path.join(dir, "opencode.json"),
-          JSON.stringify({
-            $schema: "https://opencode.ai/config.json",
-            enabled_providers: [providerID],
-            provider: {
-              [providerID]: {
-                options: {
-                  apiKey: "test-key",
-                  baseURL: `${server.url.origin}/v1`,
-                },
-              },
-            },
-          }),
-        )
-      },
-    })
-
-    await Instance.provide({
-      directory: tmp.path,
-      fn: async () => {
-        const resolved = await getModel(ProviderID.make(providerID), ModelID.make(model.id))
-        const sessionID = SessionID.make("session-test-raw-abort")
-        const agent = {
-          name: "test",
-          mode: "primary",
-          options: {},
-          permission: [{ permission: "*", pattern: "*", action: "allow" }],
-        } satisfies Agent.Info
-        const user = {
-          id: MessageID.make("user-raw-abort"),
-          sessionID,
-          role: "user",
-          time: { created: Date.now() },
-          agent: agent.name,
-          model: { providerID: ProviderID.make(providerID), modelID: resolved.id },
-        } satisfies MessageV2.User
-
-        const ctrl = new AbortController()
-        const result = await LLM.stream({
-          user,
-          sessionID,
-          model: resolved,
-          agent,
-          system: ["You are a helpful assistant."],
-          abort: ctrl.signal,
-          messages: [{ role: "user", content: "Hello" }],
-          tools: {},
-        })
-
-        const iter = result.fullStream[Symbol.asyncIterator]()
-        await pending.request
-        await iter.next()
-        ctrl.abort()
-
-        await Promise.race([pending.responseCanceled, timeout(500)])
-        await Promise.race([pending.requestAborted, timeout(500)]).catch(() => undefined)
-        await iter.return?.()
-      },
-    })
-  })
-
   test("service stream cancellation cancels provider response body promptly", async () => {
     const server = state.server
     if (!server) throw new Error("Server not initialized")
@@ -518,8 +446,7 @@ describe("session.llm.stream", () => {
         } satisfies MessageV2.User
 
         const ctrl = new AbortController()
-        const { runPromiseExit } = makeRuntime(LLM.Service, LLM.defaultLayer)
-        const run = runPromiseExit(
+        const run = llm.runPromiseExit(
           (svc) =>
             svc
               .stream({
@@ -610,14 +537,13 @@ describe("session.llm.stream", () => {
           tools: { question: true },
         } satisfies MessageV2.User
 
-        const stream = await LLM.stream({
+        await drain({
           user,
           sessionID,
           model: resolved,
           agent,
           permission: [{ permission: "question", pattern: "*", action: "allow" }],
           system: ["You are a helpful assistant."],
-          abort: new AbortController().signal,
           messages: [{ role: "user", content: "Hello" }],
           tools: {
             question: tool({
@@ -628,9 +554,6 @@ describe("session.llm.stream", () => {
           },
         })
 
-        for await (const _ of stream.fullStream) {
-        }
-
         const capture = await request
         const tools = capture.body.tools as Array<{ function?: { name?: string } }> | undefined
         expect(tools?.some((item) => item.function?.name === "question")).toBe(true)
@@ -728,20 +651,16 @@ describe("session.llm.stream", () => {
           model: { providerID: ProviderID.make("openai"), modelID: resolved.id, variant: "high" },
         } satisfies MessageV2.User
 
-        const stream = await LLM.stream({
+        await drain({
           user,
           sessionID,
           model: resolved,
           agent,
           system: ["You are a helpful assistant."],
-          abort: new AbortController().signal,
           messages: [{ role: "user", content: "Hello" }],
           tools: {},
         })
 
-        for await (const _ of stream.fullStream) {
-        }
-
         const capture = await request
         const body = capture.body
 
@@ -847,13 +766,12 @@ describe("session.llm.stream", () => {
           model: { providerID: ProviderID.make("openai"), modelID: resolved.id },
         } satisfies MessageV2.User
 
-        const stream = await LLM.stream({
+        await drain({
           user,
           sessionID,
           model: resolved,
           agent,
           system: ["You are a helpful assistant."],
-          abort: new AbortController().signal,
           messages: [
             {
               role: "user",
@@ -871,9 +789,6 @@ describe("session.llm.stream", () => {
           tools: {},
         })
 
-        for await (const _ of stream.fullStream) {
-        }
-
         const capture = await request
         expect(capture.url.pathname.endsWith("/responses")).toBe(true)
       },
@@ -972,20 +887,16 @@ describe("session.llm.stream", () => {
           model: { providerID: ProviderID.make("minimax"), modelID: ModelID.make("MiniMax-M2.5") },
         } satisfies MessageV2.User
 
-        const stream = await LLM.stream({
+        await drain({
           user,
           sessionID,
           model: resolved,
           agent,
           system: ["You are a helpful assistant."],
-          abort: new AbortController().signal,
           messages: [{ role: "user", content: "Hello" }],
           tools: {},
         })
 
-        for await (const _ of stream.fullStream) {
-        }
-
         const capture = await request
         const body = capture.body
 
@@ -1073,20 +984,16 @@ describe("session.llm.stream", () => {
           model: { providerID: ProviderID.make(providerID), modelID: resolved.id },
         } satisfies MessageV2.User
 
-        const stream = await LLM.stream({
+        await drain({
           user,
           sessionID,
           model: resolved,
           agent,
           system: ["You are a helpful assistant."],
-          abort: new AbortController().signal,
           messages: [{ role: "user", content: "Hello" }],
           tools: {},
         })
 
-        for await (const _ of stream.fullStream) {
-        }
-
         const capture = await request
         const body = capture.body
         const config = body.generationConfig as