فهرست منبع

fix(mcp): close transport on failed/timed-out connections (#19200)

Kit Langton 3 هفته پیش
والد
کامیت
2e6ac8ff49
2فایلهای تغییر یافته به همراه350 افزوده شده و 274 حذف شده
  1. 254 269
      packages/opencode/src/mcp/index.ts
  2. 96 5
      packages/opencode/test/mcp/lifecycle.test.ts

+ 254 - 269
packages/opencode/src/mcp/index.ts

@@ -24,7 +24,7 @@ import { BusEvent } from "../bus/bus-event"
 import { Bus } from "@/bus"
 import { TuiEvent } from "@/cli/cmd/tui/event"
 import open from "open"
-import { Effect, Layer, Option, ServiceMap, Stream } from "effect"
+import { Effect, Exit, Layer, Option, ServiceMap, Stream } from "effect"
 import { InstanceState } from "@/effect/instance-state"
 import { makeRuntime } from "@/effect/run-service"
 import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
@@ -129,6 +129,8 @@ export namespace MCP {
     return typeof entry === "object" && entry !== null && "type" in entry
   }
 
+  const sanitize = (s: string) => s.replace(/[^a-zA-Z0-9_-]/g, "_")
+
   // Convert MCP tool definition to AI SDK Tool type
   function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool {
     const inputSchema = mcpTool.inputSchema
@@ -160,233 +162,48 @@ export namespace MCP {
     })
   }
 
-  async function defs(key: string, client: MCPClient, timeout?: number) {
-    const result = await withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT).catch((err) => {
-      log.error("failed to get tools from client", { key, error: err })
-      return undefined
-    })
-    return result?.tools
+  function defs(key: string, client: MCPClient, timeout?: number) {
+    return Effect.tryPromise({
+      try: () => withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT),
+      catch: (err) => err instanceof Error ? err : new Error(String(err)),
+    }).pipe(
+      Effect.map((result) => result.tools),
+      Effect.catch((err) => {
+        log.error("failed to get tools from client", { key, error: err })
+        return Effect.succeed(undefined)
+      }),
+    )
   }
 
-  async function fetchFromClient<T extends { name: string }>(
+  function fetchFromClient<T extends { name: string }>(
     clientName: string,
     client: Client,
     listFn: (c: Client) => Promise<T[]>,
     label: string,
-  ): Promise<Record<string, T & { client: string }> | undefined> {
-    const items = await listFn(client).catch((e: any) => {
-      log.error(`failed to get ${label}`, { clientName, error: e.message })
-      return undefined
-    })
-    if (!items) return undefined
-
-    const out: Record<string, T & { client: string }> = {}
-    const sanitizedClient = clientName.replace(/[^a-zA-Z0-9_-]/g, "_")
-    for (const item of items) {
-      const sanitizedName = item.name.replace(/[^a-zA-Z0-9_-]/g, "_")
-      out[sanitizedClient + ":" + sanitizedName] = { ...item, client: clientName }
-    }
-    return out
-  }
-
-  async function create(key: string, mcp: Config.Mcp) {
-    if (mcp.enabled === false) {
-      log.info("mcp server disabled", { key })
-      return {
-        mcpClient: undefined,
-        status: { status: "disabled" as const },
-      }
-    }
-
-    log.info("found", { key, type: mcp.type })
-    let mcpClient: MCPClient | undefined
-    let status: Status | undefined = undefined
-
-    if (mcp.type === "remote") {
-      // OAuth is enabled by default for remote servers unless explicitly disabled with oauth: false
-      const oauthDisabled = mcp.oauth === false
-      const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined
-      let authProvider: McpOAuthProvider | undefined
-
-      if (!oauthDisabled) {
-        authProvider = new McpOAuthProvider(
-          key,
-          mcp.url,
-          {
-            clientId: oauthConfig?.clientId,
-            clientSecret: oauthConfig?.clientSecret,
-            scope: oauthConfig?.scope,
-          },
-          {
-            onRedirect: async (url) => {
-              log.info("oauth redirect requested", { key, url: url.toString() })
-              // Store the URL - actual browser opening is handled by startAuth
-            },
-          },
-        )
-      }
-
-      const transports: Array<{ name: string; transport: TransportWithAuth }> = [
-        {
-          name: "StreamableHTTP",
-          transport: new StreamableHTTPClientTransport(new URL(mcp.url), {
-            authProvider,
-            requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
-          }),
-        },
-        {
-          name: "SSE",
-          transport: new SSEClientTransport(new URL(mcp.url), {
-            authProvider,
-            requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
-          }),
-        },
-      ]
-
-      let lastError: Error | undefined
-      const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
-      for (const { name, transport } of transports) {
-        try {
-          const client = new Client({
-            name: "opencode",
-            version: Installation.VERSION,
-          })
-          await withTimeout(client.connect(transport), connectTimeout)
-          mcpClient = client
-          log.info("connected", { key, transport: name })
-          status = { status: "connected" }
-          break
-        } catch (error) {
-          lastError = error instanceof Error ? error : new Error(String(error))
-
-          // Handle OAuth-specific errors.
-          // The SDK throws UnauthorizedError when auth() returns 'REDIRECT',
-          // but may also throw plain Errors when auth() fails internally
-          // (e.g. during discovery, registration, or state generation).
-          // When an authProvider is attached, treat both cases as auth-related.
-          const isAuthError =
-            error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth"))
-          if (isAuthError) {
-            log.info("mcp server requires authentication", { key, transport: name })
-
-            // Check if this is a "needs registration" error
-            if (lastError.message.includes("registration") || lastError.message.includes("client_id")) {
-              status = {
-                status: "needs_client_registration" as const,
-                error: "Server does not support dynamic client registration. Please provide clientId in config.",
-              }
-              // Show toast for needs_client_registration
-              Bus.publish(TuiEvent.ToastShow, {
-                title: "MCP Authentication Required",
-                message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`,
-                variant: "warning",
-                duration: 8000,
-              }).catch((e) => log.debug("failed to show toast", { error: e }))
-            } else {
-              // Store transport for later finishAuth call
-              pendingOAuthTransports.set(key, transport)
-              status = { status: "needs_auth" as const }
-              // Show toast for needs_auth
-              Bus.publish(TuiEvent.ToastShow, {
-                title: "MCP Authentication Required",
-                message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`,
-                variant: "warning",
-                duration: 8000,
-              }).catch((e) => log.debug("failed to show toast", { error: e }))
-            }
-            break
-          }
-
-          log.debug("transport connection failed", {
-            key,
-            transport: name,
-            url: mcp.url,
-            error: lastError.message,
-          })
-          status = {
-            status: "failed" as const,
-            error: lastError.message,
-          }
-        }
-      }
-    }
-
-    if (mcp.type === "local") {
-      const [cmd, ...args] = mcp.command
-      const cwd = Instance.directory
-      const transport = new StdioClientTransport({
-        stderr: "pipe",
-        command: cmd,
-        args,
-        cwd,
-        env: {
-          ...process.env,
-          ...(cmd === "opencode" ? { BUN_BE_BUN: "1" } : {}),
-          ...mcp.environment,
-        },
-      })
-      transport.stderr?.on("data", (chunk: Buffer) => {
-        log.info(`mcp stderr: ${chunk.toString()}`, { key })
-      })
-
-      const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
-      try {
-        const client = new Client({
-          name: "opencode",
-          version: Installation.VERSION,
-        })
-        await withTimeout(client.connect(transport), connectTimeout)
-        mcpClient = client
-        status = {
-          status: "connected",
-        }
-      } catch (error) {
-        log.error("local mcp startup failed", {
-          key,
-          command: mcp.command,
-          cwd,
-          error: error instanceof Error ? error.message : String(error),
-        })
-        status = {
-          status: "failed" as const,
-          error: error instanceof Error ? error.message : String(error),
+  ) {
+    return Effect.tryPromise({
+      try: () => listFn(client),
+      catch: (e: any) => {
+        log.error(`failed to get ${label}`, { clientName, error: e.message })
+        return e
+      },
+    }).pipe(
+      Effect.map((items) => {
+        const out: Record<string, T & { client: string }> = {}
+        const sanitizedClient = sanitize(clientName)
+        for (const item of items) {
+          out[sanitizedClient + ":" + sanitize(item.name)] = { ...item, client: clientName }
         }
-      }
-    }
-
-    if (!status) {
-      status = {
-        status: "failed" as const,
-        error: "Unknown error",
-      }
-    }
-
-    if (!mcpClient) {
-      return {
-        mcpClient: undefined,
-        status,
-      }
-    }
-
-    const listed = await defs(key, mcpClient, mcp.timeout)
-    if (!listed) {
-      await mcpClient.close().catch((error) => {
-        log.error("Failed to close MCP client", {
-          error,
-        })
-      })
-      return {
-        mcpClient: undefined,
-        status: { status: "failed" as const, error: "Failed to get tools" },
-      }
-    }
+        return out
+      }),
+      Effect.orElseSucceed(() => undefined),
+    )
+  }
 
-    log.info("create() successfully created client", { key, toolCount: listed.length })
-    return {
-      mcpClient,
-      status,
-      defs: listed,
-    }
+  interface CreateResult {
+    mcpClient?: MCPClient
+    status: Status
+    defs?: MCPToolDef[]
   }
 
   // --- Effect Service ---
@@ -431,6 +248,184 @@ export namespace MCP {
     Effect.gen(function* () {
       const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
       const auth = yield* McpAuth.Service
+      const bus = yield* Bus.Service
+
+      type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport
+
+      /**
+       * Connect a client via the given transport with resource safety:
+       * on failure the transport is closed; on success the caller owns it.
+       */
+      const connectTransport = (transport: Transport, timeout: number) =>
+        Effect.acquireUseRelease(
+          Effect.succeed(transport),
+          (t) =>
+            Effect.tryPromise({
+              try: () => {
+                const client = new Client({ name: "opencode", version: Installation.VERSION })
+                return withTimeout(client.connect(t), timeout).then(() => client)
+              },
+              catch: (e) => (e instanceof Error ? e : new Error(String(e))),
+            }),
+          (t, exit) =>
+            Exit.isFailure(exit)
+              ? Effect.tryPromise(() => t.close()).pipe(Effect.ignore)
+              : Effect.void,
+        )
+
+      const DISABLED_RESULT: CreateResult = { status: { status: "disabled" } }
+
+      const connectRemote = Effect.fn("MCP.connectRemote")(function* (key: string, mcp: Config.Mcp & { type: "remote" }) {
+        const oauthDisabled = mcp.oauth === false
+        const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined
+        let authProvider: McpOAuthProvider | undefined
+
+        if (!oauthDisabled) {
+          authProvider = new McpOAuthProvider(
+            key,
+            mcp.url,
+            {
+              clientId: oauthConfig?.clientId,
+              clientSecret: oauthConfig?.clientSecret,
+              scope: oauthConfig?.scope,
+            },
+            {
+              onRedirect: async (url) => {
+                log.info("oauth redirect requested", { key, url: url.toString() })
+              },
+            },
+          )
+        }
+
+        const transports: Array<{ name: string; transport: TransportWithAuth }> = [
+          {
+            name: "StreamableHTTP",
+            transport: new StreamableHTTPClientTransport(new URL(mcp.url), {
+              authProvider,
+              requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
+            }),
+          },
+          {
+            name: "SSE",
+            transport: new SSEClientTransport(new URL(mcp.url), {
+              authProvider,
+              requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
+            }),
+          },
+        ]
+
+        const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
+        let lastStatus: Status | undefined
+
+        for (const { name, transport } of transports) {
+          const result = yield* connectTransport(transport, connectTimeout).pipe(
+            Effect.map((client) => ({ client, transportName: name })),
+            Effect.catch((error) => {
+              const lastError = error instanceof Error ? error : new Error(String(error))
+              const isAuthError =
+                error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth"))
+
+              if (isAuthError) {
+                log.info("mcp server requires authentication", { key, transport: name })
+
+                if (lastError.message.includes("registration") || lastError.message.includes("client_id")) {
+                  lastStatus = {
+                    status: "needs_client_registration" as const,
+                    error: "Server does not support dynamic client registration. Please provide clientId in config.",
+                  }
+                  return bus.publish(TuiEvent.ToastShow, {
+                    title: "MCP Authentication Required",
+                    message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`,
+                    variant: "warning",
+                    duration: 8000,
+                  }).pipe(Effect.ignore, Effect.as(undefined))
+                } else {
+                  pendingOAuthTransports.set(key, transport)
+                  lastStatus = { status: "needs_auth" as const }
+                  return bus.publish(TuiEvent.ToastShow, {
+                    title: "MCP Authentication Required",
+                    message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`,
+                    variant: "warning",
+                    duration: 8000,
+                  }).pipe(Effect.ignore, Effect.as(undefined))
+                }
+              }
+
+              log.debug("transport connection failed", {
+                key,
+                transport: name,
+                url: mcp.url,
+                error: lastError.message,
+              })
+              lastStatus = { status: "failed" as const, error: lastError.message }
+              return Effect.succeed(undefined)
+            }),
+          )
+          if (result) {
+            log.info("connected", { key, transport: result.transportName })
+            return { client: result.client as MCPClient | undefined, status: { status: "connected" } as Status }
+          }
+          // If this was an auth error, stop trying other transports
+          if (lastStatus?.status === "needs_auth" || lastStatus?.status === "needs_client_registration") break
+        }
+
+        return { client: undefined as MCPClient | undefined, status: (lastStatus ?? { status: "failed", error: "Unknown error" }) as Status }
+      })
+
+      const connectLocal = Effect.fn("MCP.connectLocal")(function* (key: string, mcp: Config.Mcp & { type: "local" }) {
+        const [cmd, ...args] = mcp.command
+        const cwd = Instance.directory
+        const transport = new StdioClientTransport({
+          stderr: "pipe",
+          command: cmd,
+          args,
+          cwd,
+          env: {
+            ...process.env,
+            ...(cmd === "opencode" ? { BUN_BE_BUN: "1" } : {}),
+            ...mcp.environment,
+          },
+        })
+        transport.stderr?.on("data", (chunk: Buffer) => {
+          log.info(`mcp stderr: ${chunk.toString()}`, { key })
+        })
+
+        const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
+        return yield* connectTransport(transport, connectTimeout).pipe(
+          Effect.map((client): { client: MCPClient | undefined; status: Status } => ({ client, status: { status: "connected" } })),
+          Effect.catch((error): Effect.Effect<{ client: MCPClient | undefined; status: Status }> => {
+            const msg = error instanceof Error ? error.message : String(error)
+            log.error("local mcp startup failed", { key, command: mcp.command, cwd, error: msg })
+            return Effect.succeed({ client: undefined, status: { status: "failed", error: msg } })
+          }),
+        )
+      })
+
+      const create = Effect.fn("MCP.create")(function* (key: string, mcp: Config.Mcp) {
+        if (mcp.enabled === false) {
+          log.info("mcp server disabled", { key })
+          return DISABLED_RESULT
+        }
+
+        log.info("found", { key, type: mcp.type })
+
+        const { client: mcpClient, status } = mcp.type === "remote"
+          ? yield* connectRemote(key, mcp as Config.Mcp & { type: "remote" })
+          : yield* connectLocal(key, mcp as Config.Mcp & { type: "local" })
+
+        if (!mcpClient) {
+          return { status } satisfies CreateResult
+        }
+
+        const listed = yield* defs(key, mcpClient, mcp.timeout)
+        if (!listed) {
+          yield* Effect.tryPromise(() => mcpClient.close()).pipe(Effect.ignore)
+          return { status: { status: "failed", error: "Failed to get tools" } } satisfies CreateResult
+        }
+
+        log.info("create() successfully created client", { key, toolCount: listed.length })
+        return { mcpClient, status, defs: listed } satisfies CreateResult
+      })
 
       const descendants = Effect.fnUntraced(
         function* (pid: number) {
@@ -463,20 +458,20 @@ export namespace MCP {
           log.info("tools list changed notification received", { server: name })
           if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
 
-          const listed = await defs(name, client, timeout)
+          const listed = await Effect.runPromise(defs(name, client, timeout))
           if (!listed) return
           if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
 
           s.defs[name] = listed
-          await Bus.publish(ToolsChanged, { server: name }).catch((error) =>
-            log.warn("failed to publish tools changed", { server: name, error }),
-          )
+          await Effect.runPromise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore))
         })
       }
 
+      const getConfig = () => Effect.promise(() => Config.get())
+
       const cache = yield* InstanceState.make<State>(
         Effect.fn("MCP.state")(function* () {
-          const cfg = yield* Effect.promise(() => Config.get())
+          const cfg = yield* getConfig()
           const config = cfg.mcp ?? {}
           const s: State = {
             status: {},
@@ -498,13 +493,15 @@ export namespace MCP {
                   return
                 }
 
-                const result = yield* Effect.promise(() => create(key, mcp).catch(() => undefined))
+                const result = yield* create(key, mcp).pipe(
+                  Effect.catch(() => Effect.succeed(undefined)),
+                )
                 if (!result) return
 
                 s.status[key] = result.status
                 if (result.mcpClient) {
                   s.clients[key] = result.mcpClient
-                  s.defs[key] = result.defs
+                  s.defs[key] = result.defs!
                   watch(s, key, result.mcpClient, mcp.timeout)
                 }
               }),
@@ -542,14 +539,12 @@ export namespace MCP {
         const client = s.clients[name]
         delete s.defs[name]
         if (!client) return Effect.void
-        return Effect.promise(() =>
-          client.close().catch((error: any) => log.error("failed to close MCP client", { name, error })),
-        )
+        return Effect.tryPromise(() => client.close()).pipe(Effect.ignore)
       }
 
       const status = Effect.fn("MCP.status")(function* () {
         const s = yield* InstanceState.get(cache)
-        const cfg = yield* Effect.promise(() => Config.get())
+        const cfg = yield* getConfig()
         const config = cfg.mcp ?? {}
         const result: Record<string, Status> = {}
 
@@ -568,14 +563,7 @@ export namespace MCP {
 
       const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: Config.Mcp) {
         const s = yield* InstanceState.get(cache)
-        const result = yield* Effect.promise(() => create(name, mcp))
-
-        if (!result) {
-          yield* closeClient(s, name)
-          delete s.clients[name]
-          s.status[name] = { status: "failed" as const, error: "unknown error" }
-          return s.status[name]
-        }
+        const result = yield* create(name, mcp)
 
         s.status[name] = result.status
         if (!result.mcpClient) {
@@ -586,7 +574,7 @@ export namespace MCP {
 
         yield* closeClient(s, name)
         s.clients[name] = result.mcpClient
-        s.defs[name] = result.defs
+        s.defs[name] = result.defs!
         watch(s, name, result.mcpClient, mcp.timeout)
         return result.status
       })
@@ -616,7 +604,7 @@ export namespace MCP {
       const tools = Effect.fn("MCP.tools")(function* () {
         const result: Record<string, Tool> = {}
         const s = yield* InstanceState.get(cache)
-        const cfg = yield* Effect.promise(() => Config.get())
+        const cfg = yield* getConfig()
         const config = cfg.mcp ?? {}
         const defaultTimeout = cfg.experimental?.mcp_timeout
 
@@ -639,9 +627,7 @@ export namespace MCP {
 
               const timeout = entry?.timeout ?? defaultTimeout
               for (const mcpTool of listed) {
-                const sanitizedClientName = clientName.replace(/[^a-zA-Z0-9_-]/g, "_")
-                const sanitizedToolName = mcpTool.name.replace(/[^a-zA-Z0-9_-]/g, "_")
-                result[sanitizedClientName + "_" + sanitizedToolName] = convertMcpTool(mcpTool, client, timeout)
+                result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout)
               }
             }),
           { concurrency: "unbounded" },
@@ -649,30 +635,29 @@ export namespace MCP {
         return result
       })
 
-      function collectFromConnected<T>(
+      function collectFromConnected<T extends { name: string }>(
         s: State,
-        fetchFn: (clientName: string, client: Client) => Promise<Record<string, T> | undefined>,
+        listFn: (c: Client) => Promise<T[]>,
+        label: string,
       ) {
         return Effect.forEach(
           Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"),
           ([clientName, client]) =>
-            Effect.promise(async () => Object.entries((await fetchFn(clientName, client)) ?? {})),
+            fetchFromClient(clientName, client, listFn, label).pipe(
+              Effect.map((items) => Object.entries(items ?? {})),
+            ),
           { concurrency: "unbounded" },
-        ).pipe(Effect.map((results) => Object.fromEntries<T>(results.flat())))
+        ).pipe(Effect.map((results) => Object.fromEntries<T & { client: string }>(results.flat())))
       }
 
       const prompts = Effect.fn("MCP.prompts")(function* () {
         const s = yield* InstanceState.get(cache)
-        return yield* collectFromConnected(s, (name, client) =>
-          fetchFromClient(name, client, (c) => c.listPrompts().then((r) => r.prompts), "prompts"),
-        )
+        return yield* collectFromConnected(s, (c) => c.listPrompts().then((r) => r.prompts), "prompts")
       })
 
       const resources = Effect.fn("MCP.resources")(function* () {
         const s = yield* InstanceState.get(cache)
-        return yield* collectFromConnected(s, (name, client) =>
-          fetchFromClient(name, client, (c) => c.listResources().then((r) => r.resources), "resources"),
-        )
+        return yield* collectFromConnected(s, (c) => c.listResources().then((r) => r.resources), "resources")
       })
 
       const withClient = Effect.fnUntraced(function* <A>(
@@ -713,7 +698,7 @@ export namespace MCP {
       })
 
       const getMcpConfig = Effect.fnUntraced(function* (mcpName: string) {
-        const cfg = yield* Effect.promise(() => Config.get())
+        const cfg = yield* getConfig()
         const mcpConfig = cfg.mcp?.[mcpName]
         if (!mcpConfig || !isMcpConfigured(mcpConfig)) return undefined
         return mcpConfig
@@ -750,19 +735,21 @@ export namespace MCP {
 
         const transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url), { authProvider })
 
-        return yield* Effect.promise(async () => {
-          try {
+        return yield* Effect.tryPromise({
+          try: () => {
             const client = new Client({ name: "opencode", version: Installation.VERSION })
-            await client.connect(transport)
-            return { authorizationUrl: "", oauthState }
-          } catch (error) {
+            return client.connect(transport).then(() => ({ authorizationUrl: "", oauthState }))
+          },
+          catch: (error) => error,
+        }).pipe(
+          Effect.catch((error) => {
             if (error instanceof UnauthorizedError && capturedUrl) {
               pendingOAuthTransports.set(mcpName, transport)
-              return { authorizationUrl: capturedUrl.toString(), oauthState }
+              return Effect.succeed({ authorizationUrl: capturedUrl.toString(), oauthState })
             }
-            throw error
-          }
-        })
+            return Effect.die(error)
+          }),
+        )
       })
 
       const authenticate = Effect.fn("MCP.authenticate")(function* (mcpName: string) {
@@ -791,7 +778,7 @@ export namespace MCP {
           ),
           Effect.catch(() => {
             log.warn("failed to open browser, user must open URL manually", { mcpName })
-            return Effect.promise(() => Bus.publish(BrowserOpenFailed, { mcpName, url: authorizationUrl }))
+            return bus.publish(BrowserOpenFailed, { mcpName, url: authorizationUrl }).pipe(Effect.ignore)
           }),
         )
 
@@ -811,10 +798,7 @@ export namespace MCP {
         if (!transport) throw new Error(`No pending OAuth flow for MCP server: ${mcpName}`)
 
         const result = yield* Effect.tryPromise({
-          try: async () => {
-            await transport.finishAuth(authorizationCode)
-            return true
-          },
+          try: () => transport.finishAuth(authorizationCode).then(() => true as const),
           catch: (error) => {
             log.error("failed to finish oauth", { mcpName, error })
             return error
@@ -887,6 +871,7 @@ export namespace MCP {
 
   const defaultLayer = layer.pipe(
     Layer.provide(McpAuth.layer),
+    Layer.provide(Bus.layer),
     Layer.provide(CrossSpawnSpawner.layer),
     Layer.provide(AppFileSystem.defaultLayer),
     Layer.provide(NodeFileSystem.layer),

+ 96 - 5
packages/opencode/test/mcp/lifecycle.test.ts

@@ -19,9 +19,12 @@ interface MockClientState {
 const clientStates = new Map<string, MockClientState>()
 let lastCreatedClientName: string | undefined
 let connectShouldFail = false
+let connectShouldHang = false
 let connectError = "Mock transport cannot connect"
 // Tracks how many Client instances were created (detects leaks)
 let clientCreateCount = 0
+// Tracks how many times transport.close() is called across all mock transports
+let transportCloseCount = 0
 
 function getOrCreateClientState(name?: string): MockClientState {
   const key = name ?? "default"
@@ -44,32 +47,41 @@ function getOrCreateClientState(name?: string): MockClientState {
   return state
 }
 
-// Mock transport that succeeds or fails based on connectShouldFail
+// Mock transport that succeeds or fails based on connectShouldFail / connectShouldHang
 class MockStdioTransport {
   stderr: null = null
   pid = 12345
   constructor(_opts: any) {}
   async start() {
+    if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
     if (connectShouldFail) throw new Error(connectError)
   }
-  async close() {}
+  async close() {
+    transportCloseCount++
+  }
 }
 
 class MockStreamableHTTP {
   constructor(_url: URL, _opts?: any) {}
   async start() {
+    if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
     if (connectShouldFail) throw new Error(connectError)
   }
-  async close() {}
+  async close() {
+    transportCloseCount++
+  }
   async finishAuth() {}
 }
 
 class MockSSE {
   constructor(_url: URL, _opts?: any) {}
   async start() {
-    throw new Error("SSE fallback - not used in these tests")
+    if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
+    if (connectShouldFail) throw new Error(connectError)
+  }
+  async close() {
+    transportCloseCount++
   }
-  async close() {}
 }
 
 mock.module("@modelcontextprotocol/sdk/client/stdio.js", () => ({
@@ -145,8 +157,10 @@ beforeEach(() => {
   clientStates.clear()
   lastCreatedClientName = undefined
   connectShouldFail = false
+  connectShouldHang = false
   connectError = "Mock transport cannot connect"
   clientCreateCount = 0
+  transportCloseCount = 0
 })
 
 // Import after mocks
@@ -658,3 +672,80 @@ test(
     },
   ),
 )
+
+
+// ========================================================================
+// Test: transport leak — local stdio timeout (#19168)
+// ========================================================================
+
+test(
+  "local stdio transport is closed when connect times out (no process leak)",
+  withInstance({}, async () => {
+    lastCreatedClientName = "hanging-server"
+    getOrCreateClientState("hanging-server")
+    connectShouldHang = true
+
+    const addResult = await MCP.add("hanging-server", {
+      type: "local",
+      command: ["node", "fake.js"],
+      timeout: 100,
+    })
+
+    const serverStatus = (addResult.status as any)["hanging-server"] ?? addResult.status
+    expect(serverStatus.status).toBe("failed")
+    expect(serverStatus.error).toContain("timed out")
+    // Transport must be closed to avoid orphaned child process
+    expect(transportCloseCount).toBeGreaterThanOrEqual(1)
+  }),
+)
+
+// ========================================================================
+// Test: transport leak — remote timeout (#19168)
+// ========================================================================
+
+test(
+  "remote transport is closed when connect times out",
+  withInstance({}, async () => {
+    lastCreatedClientName = "hanging-remote"
+    getOrCreateClientState("hanging-remote")
+    connectShouldHang = true
+
+    const addResult = await MCP.add("hanging-remote", {
+      type: "remote",
+      url: "http://localhost:9999/mcp",
+      timeout: 100,
+      oauth: false,
+    })
+
+    const serverStatus = (addResult.status as any)["hanging-remote"] ?? addResult.status
+    expect(serverStatus.status).toBe("failed")
+    // Transport must be closed to avoid leaked HTTP connections
+    expect(transportCloseCount).toBeGreaterThanOrEqual(1)
+  }),
+)
+
+// ========================================================================
+// Test: transport leak — failed remote transports not closed (#19168)
+// ========================================================================
+
+test(
+  "failed remote transport is closed before trying next transport",
+  withInstance({}, async () => {
+    lastCreatedClientName = "fail-remote"
+    getOrCreateClientState("fail-remote")
+    connectShouldFail = true
+    connectError = "Connection refused"
+
+    const addResult = await MCP.add("fail-remote", {
+      type: "remote",
+      url: "http://localhost:9999/mcp",
+      timeout: 5000,
+      oauth: false,
+    })
+
+    const serverStatus = (addResult.status as any)["fail-remote"] ?? addResult.status
+    expect(serverStatus.status).toBe("failed")
+    // Both StreamableHTTP and SSE transports should be closed
+    expect(transportCloseCount).toBeGreaterThanOrEqual(2)
+  }),
+)