Jelajahi Sumber

remove makeRuntime facade for Permission.Service from llm.ts

Yield Permission.Service directly from the LLM layer context instead of
creating a separate ManagedRuntime via makeRuntime. The approvalHandler
callback now uses Effect.runPromise(perm.ask(...)) on the captured
implementation, avoiding the extra runtime and its duplicated memoMap.
Kit Langton 2 hari lalu
induk
melakukan
d57adec496
1 mengubah file dengan 338 tambahan dan 339 penghapusan
  1. 338 339
      packages/opencode/src/session/llm.ts

+ 338 - 339
packages/opencode/src/session/llm.ts

@@ -20,13 +20,11 @@ import { Wildcard } from "@/util/wildcard"
 import { SessionID } from "@/session/schema"
 import { Auth } from "@/auth"
 import { Installation } from "@/installation"
-import { makeRuntime } from "@/effect/run-service"
 import * as Option from "effect/Option"
 import * as OtelTracer from "@effect/opentelemetry/Tracer"
 
 export namespace LLM {
   const log = Log.create({ service: "llm" })
-  const perms = makeRuntime(Permission.Service, Permission.defaultLayer)
   export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX
   type Result = Awaited<ReturnType<typeof streamText>>
 
@@ -57,369 +55,370 @@ export namespace LLM {
 
   export class Service extends Context.Service<Service, Interface>()("@opencode/LLM") {}
 
-  export const layer: Layer.Layer<Service, never, Auth.Service | Config.Service | Provider.Service | Plugin.Service> =
-    Layer.effect(
-      Service,
-      Effect.gen(function* () {
-        const auth = yield* Auth.Service
-        const config = yield* Config.Service
-        const provider = yield* Provider.Service
-        const plugin = yield* Plugin.Service
-
-        const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
-          const l = log
-            .clone()
-            .tag("providerID", input.model.providerID)
-            .tag("modelID", input.model.id)
-            .tag("sessionID", input.sessionID)
-            .tag("small", (input.small ?? false).toString())
-            .tag("agent", input.agent.name)
-            .tag("mode", input.agent.mode)
-          l.info("stream", {
-            modelID: input.model.id,
-            providerID: input.model.providerID,
-          })
-
-          const [language, cfg, item, info] = yield* Effect.all(
-            [
-              provider.getLanguage(input.model),
-              config.get(),
-              provider.getProvider(input.model.providerID),
-              auth.get(input.model.providerID),
-            ],
-            { concurrency: "unbounded" },
-          )
-
-          // TODO: move this to a proper hook
-          const isOpenaiOauth = item.id === "openai" && info?.type === "oauth"
-
-          const system: string[] = []
-          system.push(
-            [
-              // use agent prompt otherwise provider prompt
-              ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
-              // any custom prompt passed into this call
-              ...input.system,
-              // any custom prompt from last user message
-              ...(input.user.system ? [input.user.system] : []),
-            ]
-              .filter((x) => x)
-              .join("\n"),
-          )
-
-          const header = system[0]
-          yield* plugin.trigger(
-            "experimental.chat.system.transform",
-            { sessionID: input.sessionID, model: input.model },
-            { system },
-          )
-          // rejoin to maintain 2-part structure for caching if header unchanged
-          if (system.length > 2 && system[0] === header) {
-            const rest = system.slice(1)
-            system.length = 0
-            system.push(header, rest.join("\n"))
-          }
-
-          const variant =
-            !input.small && input.model.variants && input.user.model.variant
-              ? input.model.variants[input.user.model.variant]
-              : {}
-          const base = input.small
-            ? ProviderTransform.smallOptions(input.model)
-            : ProviderTransform.options({
-                model: input.model,
-                sessionID: input.sessionID,
-                providerOptions: item.options,
-              })
-          const options: Record<string, any> = pipe(
-            base,
-            mergeDeep(input.model.options),
-            mergeDeep(input.agent.options),
-            mergeDeep(variant),
-          )
-          if (isOpenaiOauth) {
-            options.instructions = system.join("\n")
-          }
+  const live: Layer.Layer<
+    Service,
+    never,
+    Auth.Service | Config.Service | Provider.Service | Plugin.Service | Permission.Service
+  > = Layer.effect(
+    Service,
+    Effect.gen(function* () {
+      const auth = yield* Auth.Service
+      const config = yield* Config.Service
+      const provider = yield* Provider.Service
+      const plugin = yield* Plugin.Service
+      const perm = yield* Permission.Service
+
+      const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
+        const l = log
+          .clone()
+          .tag("providerID", input.model.providerID)
+          .tag("modelID", input.model.id)
+          .tag("sessionID", input.sessionID)
+          .tag("small", (input.small ?? false).toString())
+          .tag("agent", input.agent.name)
+          .tag("mode", input.agent.mode)
+        l.info("stream", {
+          modelID: input.model.id,
+          providerID: input.model.providerID,
+        })
 
-          const isWorkflow = language instanceof GitLabWorkflowLanguageModel
-          const messages = isOpenaiOauth
-            ? input.messages
-            : isWorkflow
-              ? input.messages
-              : [
-                  ...system.map(
-                    (x): ModelMessage => ({
-                      role: "system",
-                      content: x,
-                    }),
-                  ),
-                  ...input.messages,
-                ]
-
-          const params = yield* plugin.trigger(
-            "chat.params",
-            {
-              sessionID: input.sessionID,
-              agent: input.agent.name,
+        const [language, cfg, item, info] = yield* Effect.all(
+          [
+            provider.getLanguage(input.model),
+            config.get(),
+            provider.getProvider(input.model.providerID),
+            auth.get(input.model.providerID),
+          ],
+          { concurrency: "unbounded" },
+        )
+
+        // TODO: move this to a proper hook
+        const isOpenaiOauth = item.id === "openai" && info?.type === "oauth"
+
+        const system: string[] = []
+        system.push(
+          [
+            // use agent prompt otherwise provider prompt
+            ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
+            // any custom prompt passed into this call
+            ...input.system,
+            // any custom prompt from last user message
+            ...(input.user.system ? [input.user.system] : []),
+          ]
+            .filter((x) => x)
+            .join("\n"),
+        )
+
+        const header = system[0]
+        yield* plugin.trigger(
+          "experimental.chat.system.transform",
+          { sessionID: input.sessionID, model: input.model },
+          { system },
+        )
+        // rejoin to maintain 2-part structure for caching if header unchanged
+        if (system.length > 2 && system[0] === header) {
+          const rest = system.slice(1)
+          system.length = 0
+          system.push(header, rest.join("\n"))
+        }
+
+        const variant =
+          !input.small && input.model.variants && input.user.model.variant
+            ? input.model.variants[input.user.model.variant]
+            : {}
+        const base = input.small
+          ? ProviderTransform.smallOptions(input.model)
+          : ProviderTransform.options({
               model: input.model,
-              provider: item,
-              message: input.user,
-            },
-            {
-              temperature: input.model.capabilities.temperature
-                ? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
-                : undefined,
-              topP: input.agent.topP ?? ProviderTransform.topP(input.model),
-              topK: ProviderTransform.topK(input.model),
-              maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
-              options,
-            },
-          )
-
-          const { headers } = yield* plugin.trigger(
-            "chat.headers",
-            {
               sessionID: input.sessionID,
-              agent: input.agent.name,
-              model: input.model,
-              provider: item,
-              message: input.user,
-            },
-            {
-              headers: {},
-            },
-          )
-
-          const tools = resolveTools(input)
-
-          // LiteLLM and some Anthropic proxies require the tools parameter to be present
-          // when message history contains tool calls, even if no tools are being used.
-          // Add a dummy tool that is never called to satisfy this validation.
-          // This is enabled for:
-          // 1. Providers with "litellm" in their ID or API ID (auto-detected)
-          // 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
-          const isLiteLLMProxy =
-            item.options?.["litellmProxy"] === true ||
-            input.model.providerID.toLowerCase().includes("litellm") ||
-            input.model.api.id.toLowerCase().includes("litellm")
-
-          // LiteLLM/Bedrock rejects requests where the message history contains tool
-          // calls but no tools param is present. When there are no active tools (e.g.
-          // during compaction), inject a stub tool to satisfy the validation requirement.
-          // The stub description explicitly tells the model not to call it.
-          if (
-            (isLiteLLMProxy || input.model.providerID.includes("github-copilot")) &&
-            Object.keys(tools).length === 0 &&
-            hasToolCalls(input.messages)
-          ) {
-            tools["_noop"] = tool({
-              description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
-              inputSchema: jsonSchema({
-                type: "object",
-                properties: {
-                  reason: { type: "string", description: "Unused" },
-                },
-              }),
-              execute: async () => ({ output: "", title: "", metadata: {} }),
+              providerOptions: item.options,
             })
+        const options: Record<string, any> = pipe(
+          base,
+          mergeDeep(input.model.options),
+          mergeDeep(input.agent.options),
+          mergeDeep(variant),
+        )
+        if (isOpenaiOauth) {
+          options.instructions = system.join("\n")
+        }
+
+        const isWorkflow = language instanceof GitLabWorkflowLanguageModel
+        const messages = isOpenaiOauth
+          ? input.messages
+          : isWorkflow
+            ? input.messages
+            : [
+                ...system.map(
+                  (x): ModelMessage => ({
+                    role: "system",
+                    content: x,
+                  }),
+                ),
+                ...input.messages,
+              ]
+
+        const params = yield* plugin.trigger(
+          "chat.params",
+          {
+            sessionID: input.sessionID,
+            agent: input.agent.name,
+            model: input.model,
+            provider: item,
+            message: input.user,
+          },
+          {
+            temperature: input.model.capabilities.temperature
+              ? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
+              : undefined,
+            topP: input.agent.topP ?? ProviderTransform.topP(input.model),
+            topK: ProviderTransform.topK(input.model),
+            maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
+            options,
+          },
+        )
+
+        const { headers } = yield* plugin.trigger(
+          "chat.headers",
+          {
+            sessionID: input.sessionID,
+            agent: input.agent.name,
+            model: input.model,
+            provider: item,
+            message: input.user,
+          },
+          {
+            headers: {},
+          },
+        )
+
+        const tools = resolveTools(input)
+
+        // LiteLLM and some Anthropic proxies require the tools parameter to be present
+        // when message history contains tool calls, even if no tools are being used.
+        // Add a dummy tool that is never called to satisfy this validation.
+        // This is enabled for:
+        // 1. Providers with "litellm" in their ID or API ID (auto-detected)
+        // 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
+        const isLiteLLMProxy =
+          item.options?.["litellmProxy"] === true ||
+          input.model.providerID.toLowerCase().includes("litellm") ||
+          input.model.api.id.toLowerCase().includes("litellm")
+
+        // LiteLLM/Bedrock rejects requests where the message history contains tool
+        // calls but no tools param is present. When there are no active tools (e.g.
+        // during compaction), inject a stub tool to satisfy the validation requirement.
+        // The stub description explicitly tells the model not to call it.
+        if (
+          (isLiteLLMProxy || input.model.providerID.includes("github-copilot")) &&
+          Object.keys(tools).length === 0 &&
+          hasToolCalls(input.messages)
+        ) {
+          tools["_noop"] = tool({
+            description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
+            inputSchema: jsonSchema({
+              type: "object",
+              properties: {
+                reason: { type: "string", description: "Unused" },
+              },
+            }),
+            execute: async () => ({ output: "", title: "", metadata: {} }),
+          })
+        }
+
+        // Wire up toolExecutor for DWS workflow models so that tool calls
+        // from the workflow service are executed via opencode's tool system
+        // and results sent back over the WebSocket.
+        if (language instanceof GitLabWorkflowLanguageModel) {
+          const workflowModel = language as GitLabWorkflowLanguageModel & {
+            sessionID?: string
+            sessionPreapprovedTools?: string[]
+            approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
           }
-
-          // Wire up toolExecutor for DWS workflow models so that tool calls
-          // from the workflow service are executed via opencode's tool system
-          // and results sent back over the WebSocket.
-          if (language instanceof GitLabWorkflowLanguageModel) {
-            const workflowModel = language as GitLabWorkflowLanguageModel & {
-              sessionID?: string
-              sessionPreapprovedTools?: string[]
-              approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
+          workflowModel.sessionID = input.sessionID
+          workflowModel.systemPrompt = system.join("\n")
+          workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
+            const t = tools[toolName]
+            if (!t || !t.execute) {
+              return { result: "", error: `Unknown tool: ${toolName}` }
             }
-            workflowModel.sessionID = input.sessionID
-            workflowModel.systemPrompt = system.join("\n")
-            workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
-              const t = tools[toolName]
-              if (!t || !t.execute) {
-                return { result: "", error: `Unknown tool: ${toolName}` }
-              }
-              try {
-                const result = await t.execute!(JSON.parse(argsJson), {
-                  toolCallId: _requestID,
-                  messages: input.messages,
-                  abortSignal: input.abort,
-                })
-                const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
-                return {
-                  result: output,
-                  metadata: typeof result === "object" ? result?.metadata : undefined,
-                  title: typeof result === "object" ? result?.title : undefined,
-                }
-              } catch (e: any) {
-                return { result: "", error: e.message ?? String(e) }
+            try {
+              const result = await t.execute!(JSON.parse(argsJson), {
+                toolCallId: _requestID,
+                messages: input.messages,
+                abortSignal: input.abort,
+              })
+              const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
+              return {
+                result: output,
+                metadata: typeof result === "object" ? result?.metadata : undefined,
+                title: typeof result === "object" ? result?.title : undefined,
               }
+            } catch (e: any) {
+              return { result: "", error: e.message ?? String(e) }
             }
+          }
 
-            const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
-            workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
-              const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
-              return !match || match.action !== "ask"
-            })
+          const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
+          workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
+            const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
+            return !match || match.action !== "ask"
+          })
 
-            const approvedToolsForSession = new Set<string>()
-            workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
-              const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
-              // Auto-approve tools that were already approved in this session
-              // (prevents infinite approval loops for server-side MCP tools)
-              if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
-                return { approved: true }
-              }
+          const approvedToolsForSession = new Set<string>()
+          workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
+            const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
+            // Auto-approve tools that were already approved in this session
+            // (prevents infinite approval loops for server-side MCP tools)
+            if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
+              return { approved: true }
+            }
 
-              const id = PermissionID.ascending()
-              let reply: Permission.Reply | undefined
-              let unsub: (() => void) | undefined
-              try {
-                unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
-                  if (evt.properties.requestID === id) reply = evt.properties.reply
-                })
-                const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
-                  try {
-                    const parsed = JSON.parse(t.args) as Record<string, unknown>
-                    const title = (parsed?.title ?? parsed?.name ?? "") as string
-                    return title ? `${t.name}: ${title}` : t.name
-                  } catch {
-                    return t.name
-                  }
-                })
-                const uniquePatterns = [...new Set(toolPatterns)] as string[]
-                await perms.runPromise((svc) =>
-                  svc.ask({
-                    id,
-                    sessionID: SessionID.make(input.sessionID),
-                    permission: "workflow_tool_approval",
-                    patterns: uniquePatterns,
-                    metadata: { tools: approvalTools },
-                    always: uniquePatterns,
-                    ruleset: [],
-                  }),
-                )
-                for (const name of uniqueNames) approvedToolsForSession.add(name)
-                workflowModel.sessionPreapprovedTools = [
-                  ...(workflowModel.sessionPreapprovedTools ?? []),
-                  ...uniqueNames,
-                ]
-                return { approved: true }
-              } catch {
-                return { approved: false }
-              } finally {
-                unsub?.()
-              }
-            })
-          }
+            const id = PermissionID.ascending()
+            let reply: Permission.Reply | undefined
+            let unsub: (() => void) | undefined
+            try {
+              unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
+                if (evt.properties.requestID === id) reply = evt.properties.reply
+              })
+              const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
+                try {
+                  const parsed = JSON.parse(t.args) as Record<string, unknown>
+                  const title = (parsed?.title ?? parsed?.name ?? "") as string
+                  return title ? `${t.name}: ${title}` : t.name
+                } catch {
+                  return t.name
+                }
+              })
+              const uniquePatterns = [...new Set(toolPatterns)] as string[]
+              await Effect.runPromise(
+                perm.ask({
+                  id,
+                  sessionID: SessionID.make(input.sessionID),
+                  permission: "workflow_tool_approval",
+                  patterns: uniquePatterns,
+                  metadata: { tools: approvalTools },
+                  always: uniquePatterns,
+                  ruleset: [],
+                }),
+              )
+              for (const name of uniqueNames) approvedToolsForSession.add(name)
+              workflowModel.sessionPreapprovedTools = [...(workflowModel.sessionPreapprovedTools ?? []), ...uniqueNames]
+              return { approved: true }
+            } catch {
+              return { approved: false }
+            } finally {
+              unsub?.()
+            }
+          })
+        }
 
-          const tracer = cfg.experimental?.openTelemetry
-            ? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer))
-            : undefined
+        const tracer = cfg.experimental?.openTelemetry
+          ? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer))
+          : undefined
 
-          return streamText({
-            onError(error) {
-              l.error("stream error", {
-                error,
+        return streamText({
+          onError(error) {
+            l.error("stream error", {
+              error,
+            })
+          },
+          async experimental_repairToolCall(failed) {
+            const lower = failed.toolCall.toolName.toLowerCase()
+            if (lower !== failed.toolCall.toolName && tools[lower]) {
+              l.info("repairing tool call", {
+                tool: failed.toolCall.toolName,
+                repaired: lower,
               })
-            },
-            async experimental_repairToolCall(failed) {
-              const lower = failed.toolCall.toolName.toLowerCase()
-              if (lower !== failed.toolCall.toolName && tools[lower]) {
-                l.info("repairing tool call", {
-                  tool: failed.toolCall.toolName,
-                  repaired: lower,
-                })
-                return {
-                  ...failed.toolCall,
-                  toolName: lower,
-                }
-              }
               return {
                 ...failed.toolCall,
-                input: JSON.stringify({
-                  tool: failed.toolCall.toolName,
-                  error: failed.error.message,
-                }),
-                toolName: "invalid",
+                toolName: lower,
               }
-            },
-            temperature: params.temperature,
-            topP: params.topP,
-            topK: params.topK,
-            providerOptions: ProviderTransform.providerOptions(input.model, params.options),
-            activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
-            tools,
-            toolChoice: input.toolChoice,
-            maxOutputTokens: params.maxOutputTokens,
-            abortSignal: input.abort,
-            headers: {
-              ...(input.model.providerID.startsWith("opencode")
-                ? {
-                    "x-opencode-project": Instance.project.id,
-                    "x-opencode-session": input.sessionID,
-                    "x-opencode-request": input.user.id,
-                    "x-opencode-client": Flag.OPENCODE_CLIENT,
+            }
+            return {
+              ...failed.toolCall,
+              input: JSON.stringify({
+                tool: failed.toolCall.toolName,
+                error: failed.error.message,
+              }),
+              toolName: "invalid",
+            }
+          },
+          temperature: params.temperature,
+          topP: params.topP,
+          topK: params.topK,
+          providerOptions: ProviderTransform.providerOptions(input.model, params.options),
+          activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
+          tools,
+          toolChoice: input.toolChoice,
+          maxOutputTokens: params.maxOutputTokens,
+          abortSignal: input.abort,
+          headers: {
+            ...(input.model.providerID.startsWith("opencode")
+              ? {
+                  "x-opencode-project": Instance.project.id,
+                  "x-opencode-session": input.sessionID,
+                  "x-opencode-request": input.user.id,
+                  "x-opencode-client": Flag.OPENCODE_CLIENT,
+                }
+              : {
+                  "x-session-affinity": input.sessionID,
+                  ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
+                  "User-Agent": `opencode/${Installation.VERSION}`,
+                }),
+            ...input.model.headers,
+            ...headers,
+          },
+          maxRetries: input.retries ?? 0,
+          messages,
+          model: wrapLanguageModel({
+            model: language,
+            middleware: [
+              {
+                specificationVersion: "v3" as const,
+                async transformParams(args) {
+                  if (args.type === "stream") {
+                    // @ts-expect-error
+                    args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
                   }
-                : {
-                    "x-session-affinity": input.sessionID,
-                    ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
-                    "User-Agent": `opencode/${Installation.VERSION}`,
-                  }),
-              ...input.model.headers,
-              ...headers,
-            },
-            maxRetries: input.retries ?? 0,
-            messages,
-            model: wrapLanguageModel({
-              model: language,
-              middleware: [
-                {
-                  specificationVersion: "v3" as const,
-                  async transformParams(args) {
-                    if (args.type === "stream") {
-                      // @ts-expect-error
-                      args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
-                    }
-                    return args.params
-                  },
+                  return args.params
                 },
-              ],
-            }),
-            experimental_telemetry: {
-              isEnabled: cfg.experimental?.openTelemetry,
-              functionId: "session.llm",
-              tracer,
-              metadata: {
-                userId: cfg.username ?? "unknown",
-                sessionId: input.sessionID,
               },
+            ],
+          }),
+          experimental_telemetry: {
+            isEnabled: cfg.experimental?.openTelemetry,
+            functionId: "session.llm",
+            tracer,
+            metadata: {
+              userId: cfg.username ?? "unknown",
+              sessionId: input.sessionID,
             },
-          })
+          },
         })
+      })
 
-        const stream: Interface["stream"] = (input) =>
-          Stream.scoped(
-            Stream.unwrap(
-              Effect.gen(function* () {
-                const ctrl = yield* Effect.acquireRelease(
-                  Effect.sync(() => new AbortController()),
-                  (ctrl) => Effect.sync(() => ctrl.abort()),
-                )
+      const stream: Interface["stream"] = (input) =>
+        Stream.scoped(
+          Stream.unwrap(
+            Effect.gen(function* () {
+              const ctrl = yield* Effect.acquireRelease(
+                Effect.sync(() => new AbortController()),
+                (ctrl) => Effect.sync(() => ctrl.abort()),
+              )
 
-                const result = yield* run({ ...input, abort: ctrl.signal })
+              const result = yield* run({ ...input, abort: ctrl.signal })
 
-                return Stream.fromAsyncIterable(result.fullStream, (e) =>
-                  e instanceof Error ? e : new Error(String(e)),
-                )
-              }),
-            ),
-          )
+              return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e))))
+            }),
+          ),
+        )
 
-        return Service.of({ stream })
-      }),
-    )
+      return Service.of({ stream })
+    }),
+  )
+
+  export const layer = live.pipe(Layer.provide(Permission.defaultLayer))
 
   export const defaultLayer = Layer.suspend(() =>
     layer.pipe(