Просмотр исходного кода

fix(mcp): close transport on failed/timed-out connections

When withTimeout rejects during MCP connect, the transport (and its
child process for stdio servers) was never closed. Extract a tryConnect
helper that ensures transport.close() is always called on failure,
eliminating process/connection leaks in all connect paths.

Fixes #19168
Kit Langton 3 недель назад
Родитель
Сommit
ebdecf2ec7
2 измененных файлов с 93 добавлено и 24 удалено
  1. 21 12
      packages/opencode/src/mcp/index.ts
  2. 72 12
      packages/opencode/test/mcp/lifecycle.test.ts

+ 21 - 12
packages/opencode/src/mcp/index.ts

@@ -189,6 +189,20 @@ export namespace MCP {
     return out
     return out
   }
   }
 
 
+  type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport
+
+  /** Try to connect a client via the given transport; closes the transport on failure. */
+  async function tryConnect(transport: Transport, timeout: number): Promise<MCPClient> {
+    const client = new Client({ name: "opencode", version: Installation.VERSION })
+    try {
+      await withTimeout(client.connect(transport), timeout)
+      return client
+    } catch (error) {
+      await transport.close().catch(() => {})
+      throw error
+    }
+  }
+
   async function create(key: string, mcp: Config.Mcp) {
   async function create(key: string, mcp: Config.Mcp) {
     if (mcp.enabled === false) {
     if (mcp.enabled === false) {
       log.info("mcp server disabled", { key })
       log.info("mcp server disabled", { key })
@@ -247,12 +261,7 @@ export namespace MCP {
       const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
       const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
       for (const { name, transport } of transports) {
       for (const { name, transport } of transports) {
         try {
         try {
-          const client = new Client({
-            name: "opencode",
-            version: Installation.VERSION,
-          })
-          await withTimeout(client.connect(transport), connectTimeout)
-          mcpClient = client
+          mcpClient = await tryConnect(transport, connectTimeout)
           log.info("connected", { key, transport: name })
           log.info("connected", { key, transport: name })
           status = { status: "connected" }
           status = { status: "connected" }
           break
           break
@@ -271,6 +280,7 @@ export namespace MCP {
 
 
             // Check if this is a "needs registration" error
             // Check if this is a "needs registration" error
             if (lastError.message.includes("registration") || lastError.message.includes("client_id")) {
             if (lastError.message.includes("registration") || lastError.message.includes("client_id")) {
+              // tryConnect already closed the transport
               status = {
               status = {
                 status: "needs_client_registration" as const,
                 status: "needs_client_registration" as const,
                 error: "Server does not support dynamic client registration. Please provide clientId in config.",
                 error: "Server does not support dynamic client registration. Please provide clientId in config.",
@@ -284,6 +294,8 @@ export namespace MCP {
               }).catch((e) => log.debug("failed to show toast", { error: e }))
               }).catch((e) => log.debug("failed to show toast", { error: e }))
             } else {
             } else {
               // Store transport for later finishAuth call
               // Store transport for later finishAuth call
+              // Note: tryConnect closed the transport, but the SDK's finishAuth
+              // only needs the authProvider reference which survives close()
               pendingOAuthTransports.set(key, transport)
               pendingOAuthTransports.set(key, transport)
               status = { status: "needs_auth" as const }
               status = { status: "needs_auth" as const }
               // Show toast for needs_auth
               // Show toast for needs_auth
@@ -297,6 +309,7 @@ export namespace MCP {
             break
             break
           }
           }
 
 
+          // tryConnect already closed the failed transport
           log.debug("transport connection failed", {
           log.debug("transport connection failed", {
             key,
             key,
             transport: name,
             transport: name,
@@ -331,16 +344,12 @@ export namespace MCP {
 
 
       const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
       const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
       try {
       try {
-        const client = new Client({
-          name: "opencode",
-          version: Installation.VERSION,
-        })
-        await withTimeout(client.connect(transport), connectTimeout)
-        mcpClient = client
+        mcpClient = await tryConnect(transport, connectTimeout)
         status = {
         status = {
           status: "connected",
           status: "connected",
         }
         }
       } catch (error) {
       } catch (error) {
+        // tryConnect already closed the transport (kills orphaned child process)
         log.error("local mcp startup failed", {
         log.error("local mcp startup failed", {
           key,
           key,
           command: mcp.command,
           command: mcp.command,

+ 72 - 12
packages/opencode/test/mcp/lifecycle.test.ts

@@ -23,6 +23,7 @@ let connectShouldHang = false
 let connectError = "Mock transport cannot connect"
 let connectError = "Mock transport cannot connect"
 // Tracks how many Client instances were created (detects leaks)
 // Tracks how many Client instances were created (detects leaks)
 let clientCreateCount = 0
 let clientCreateCount = 0
+// Tracks how many times transport.close() is called across all mock transports
 let transportCloseCount = 0
 let transportCloseCount = 0
 
 
 function getOrCreateClientState(name?: string): MockClientState {
 function getOrCreateClientState(name?: string): MockClientState {
@@ -46,12 +47,13 @@ function getOrCreateClientState(name?: string): MockClientState {
   return state
   return state
 }
 }
 
 
-// Mock transport that succeeds or fails based on connectShouldFail
+// Mock transport that succeeds or fails based on connectShouldFail / connectShouldHang
 class MockStdioTransport {
 class MockStdioTransport {
   stderr: null = null
   stderr: null = null
   pid = 12345
   pid = 12345
   constructor(_opts: any) {}
   constructor(_opts: any) {}
   async start() {
   async start() {
+    if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
     if (connectShouldFail) throw new Error(connectError)
     if (connectShouldFail) throw new Error(connectError)
     if (connectShouldHang) await new Promise(() => {}) // never resolves
     if (connectShouldHang) await new Promise(() => {}) // never resolves
   }
   }
@@ -63,18 +65,24 @@ class MockStdioTransport {
 class MockStreamableHTTP {
 class MockStreamableHTTP {
   constructor(_url: URL, _opts?: any) {}
   constructor(_url: URL, _opts?: any) {}
   async start() {
   async start() {
+    if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
     if (connectShouldFail) throw new Error(connectError)
     if (connectShouldFail) throw new Error(connectError)
   }
   }
-  async close() {}
+  async close() {
+    transportCloseCount++
+  }
   async finishAuth() {}
   async finishAuth() {}
 }
 }
 
 
 class MockSSE {
 class MockSSE {
   constructor(_url: URL, _opts?: any) {}
   constructor(_url: URL, _opts?: any) {}
   async start() {
   async start() {
-    throw new Error("SSE fallback - not used in these tests")
+    if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
+    if (connectShouldFail) throw new Error(connectError)
+  }
+  async close() {
+    transportCloseCount++
   }
   }
-  async close() {}
 }
 }
 
 
 mock.module("@modelcontextprotocol/sdk/client/stdio.js", () => ({
 mock.module("@modelcontextprotocol/sdk/client/stdio.js", () => ({
@@ -667,25 +675,77 @@ test(
 )
 )
 
 
 // ========================================================================
 // ========================================================================
-// Test: timed-out local transport is closed (process leak regression)
+// Test: transport leak — local stdio timeout (#19168)
 // ========================================================================
 // ========================================================================
 
 
 test(
 test(
   "local stdio transport is closed when connect times out (no process leak)",
   "local stdio transport is closed when connect times out (no process leak)",
   withInstance({}, async () => {
   withInstance({}, async () => {
+    lastCreatedClientName = "hanging-server"
+    getOrCreateClientState("hanging-server")
     connectShouldHang = true
     connectShouldHang = true
 
 
-    const result = await MCP.add("hanging-server", {
+    const addResult = await MCP.add("hanging-server", {
       type: "local",
       type: "local",
-      command: ["node", "fake-server.js"],
-      timeout: 100, // 100ms timeout so test is fast
+      command: ["node", "fake.js"],
+      timeout: 100,
     })
     })
 
 
-    const status = (result.status as any)["hanging-server"] ?? result.status
-    expect(status.status).toBe("failed")
-    expect(status.error).toContain("timed out")
+    const serverStatus = (addResult.status as any)["hanging-server"] ?? addResult.status
+    expect(serverStatus.status).toBe("failed")
+    expect(serverStatus.error).toContain("timed out")
+    // Transport must be closed to avoid orphaned child process
+    expect(transportCloseCount).toBeGreaterThanOrEqual(1)
+  }),
+)
 
 
-    // The transport must be closed to kill the child process
+// ========================================================================
+// Test: transport leak — remote timeout (#19168)
+// ========================================================================
+
+test(
+  "remote transport is closed when connect times out",
+  withInstance({}, async () => {
+    lastCreatedClientName = "hanging-remote"
+    getOrCreateClientState("hanging-remote")
+    connectShouldHang = true
+
+    const addResult = await MCP.add("hanging-remote", {
+      type: "remote",
+      url: "http://localhost:9999/mcp",
+      timeout: 100,
+      oauth: false,
+    })
+
+    const serverStatus = (addResult.status as any)["hanging-remote"] ?? addResult.status
+    expect(serverStatus.status).toBe("failed")
+    // Transport must be closed to avoid leaked HTTP connections
     expect(transportCloseCount).toBeGreaterThanOrEqual(1)
     expect(transportCloseCount).toBeGreaterThanOrEqual(1)
   }),
   }),
 )
 )
+
+// ========================================================================
+// Test: transport leak — failed remote transports not closed (#19168)
+// ========================================================================
+
+test(
+  "failed remote transport is closed before trying next transport",
+  withInstance({}, async () => {
+    lastCreatedClientName = "fail-remote"
+    getOrCreateClientState("fail-remote")
+    connectShouldFail = true
+    connectError = "Connection refused"
+
+    const addResult = await MCP.add("fail-remote", {
+      type: "remote",
+      url: "http://localhost:9999/mcp",
+      timeout: 5000,
+      oauth: false,
+    })
+
+    const serverStatus = (addResult.status as any)["fail-remote"] ?? addResult.status
+    expect(serverStatus.status).toBe("failed")
+    // Both StreamableHTTP and SSE transports should be closed
+    expect(transportCloseCount).toBeGreaterThanOrEqual(2)
+  }),
+)