Просмотр исходного кода

feat: add sse mcp support

(cherry picked from commit 549b06f46d6192c20e93188667210882f263bbbf)
aheizi 9 месяцев назад
Родитель
Сommit
a77be3feb7
3 измененных файлов с 138 добавлено и 94 удалено
  1. 8 17
      package-lock.json
  2. 2 1
      package.json
  3. 128 76
      src/services/mcp/McpHub.ts

+ 8 - 17
package-lock.json

@@ -15,7 +15,7 @@
 				"@google-cloud/vertexai": "^1.9.3",
 				"@google/generative-ai": "^0.18.0",
 				"@mistralai/mistralai": "^1.3.6",
-				"@modelcontextprotocol/sdk": "^1.0.1",
+				"@modelcontextprotocol/sdk": "^1.5.0",
 				"@types/clone-deep": "^4.0.4",
 				"@types/pdf-parse": "^1.1.4",
 				"@types/tmp": "^0.2.6",
@@ -30,6 +30,7 @@
 				"delay": "^6.0.0",
 				"diff": "^5.2.0",
 				"diff-match-patch": "^1.0.5",
+				"eventsource": "^3.0.5",
 				"fast-deep-equal": "^3.1.3",
 				"fastest-levenshtein": "^1.0.16",
 				"get-folder-size": "^5.0.0",
@@ -95,6 +96,7 @@
 			"resolved": "https://registry.npmjs.org/@ampproject/remapping/-/remapping-2.3.0.tgz",
 			"integrity": "sha512-30iZtAPgz+LTIYoeivqYo853f02jBYSd5uGnGpkFV0M3xOt9aN73erkgYAmZU43x4VfqcnLxW9Kpg3R5LC4YYw==",
 			"dev": true,
+			"license": "Apache-2.0",
 			"dependencies": {
 				"@jridgewell/gen-mapping": "^0.3.5",
 				"@jridgewell/trace-mapping": "^0.3.24"
@@ -10783,15 +10785,6 @@
 				"node": ">=8"
 			}
 		},
-		"node_modules/jest-runtime/node_modules/strip-bom": {
-			"version": "4.0.0",
-			"resolved": "https://registry.npmjs.org/strip-bom/-/strip-bom-4.0.0.tgz",
-			"integrity": "sha512-3xurFv5tEgii33Zi8Jtp55wEIILR9eh34FAW00PZf+JnSsTmV/ioewSgQl97JHvgjoRGwPShsWm+IdrxB35d0w==",
-			"dev": true,
-			"engines": {
-				"node": ">=8"
-			}
-		},
 		"node_modules/jest-simple-dot-reporter": {
 			"version": "1.0.5",
 			"resolved": "https://registry.npmjs.org/jest-simple-dot-reporter/-/jest-simple-dot-reporter-1.0.5.tgz",
@@ -14180,14 +14173,12 @@
 			}
 		},
 		"node_modules/strip-bom": {
-			"version": "5.0.0",
-			"resolved": "https://registry.npmjs.org/strip-bom/-/strip-bom-5.0.0.tgz",
-			"integrity": "sha512-p+byADHF7SzEcVnLvc/r3uognM1hUhObuHXxJcgLCfD194XAkaLbjq3Wzb0N5G2tgIjH0dgT708Z51QxMeu60A==",
+			"version": "4.0.0",
+			"resolved": "https://registry.npmjs.org/strip-bom/-/strip-bom-4.0.0.tgz",
+			"integrity": "sha512-3xurFv5tEgii33Zi8Jtp55wEIILR9eh34FAW00PZf+JnSsTmV/ioewSgQl97JHvgjoRGwPShsWm+IdrxB35d0w==",
+			"dev": true,
 			"engines": {
-				"node": ">=12"
-			},
-			"funding": {
-				"url": "https://github.com/sponsors/sindresorhus"
+				"node": ">=8"
 			}
 		},
 		"node_modules/strip-final-newline": {

+ 2 - 1
package.json

@@ -318,7 +318,7 @@
 		"@google-cloud/vertexai": "^1.9.3",
 		"@google/generative-ai": "^0.18.0",
 		"@mistralai/mistralai": "^1.3.6",
-		"@modelcontextprotocol/sdk": "^1.0.1",
+		"@modelcontextprotocol/sdk": "^1.5.0",
 		"@types/clone-deep": "^4.0.4",
 		"@types/pdf-parse": "^1.1.4",
 		"@types/tmp": "^0.2.6",
@@ -333,6 +333,7 @@
 		"delay": "^6.0.0",
 		"diff": "^5.2.0",
 		"diff-match-patch": "^1.0.5",
+		"eventsource": "^3.0.5",
 		"fast-deep-equal": "^3.1.3",
 		"fastest-levenshtein": "^1.0.16",
 		"get-folder-size": "^5.0.0",

+ 128 - 76
src/services/mcp/McpHub.ts

@@ -1,5 +1,7 @@
 import { Client } from "@modelcontextprotocol/sdk/client/index.js"
 import { StdioClientTransport, StdioServerParameters } from "@modelcontextprotocol/sdk/client/stdio.js"
+import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"
+import { EventSource } from "eventsource"
 import {
 	CallToolResultSchema,
 	ListResourcesResultSchema,
@@ -31,23 +33,40 @@ import { arePathsEqual } from "../../utils/path"
 export type McpConnection = {
 	server: McpServer
 	client: Client
-	transport: StdioClientTransport
+	transport: StdioClientTransport | SSEClientTransport
 }
 
-// StdioServerParameters
-const AlwaysAllowSchema = z.array(z.string()).default([])
+// Base configuration schema for common settings
+const BaseConfigSchema = z.object({
+	disabled: z.boolean().optional(),
+	timeout: z.number().min(1).max(3600).optional().default(60),
+	alwaysAllow: z.array(z.string()).default([]),
+})
 
-export const StdioConfigSchema = z.object({
+// Stdio specific configuration
+export const StdioConfigSchema = BaseConfigSchema.extend({
+	type: z.literal("stdio"),
 	command: z.string(),
 	args: z.array(z.string()).optional(),
 	env: z.record(z.string()).optional(),
-	alwaysAllow: AlwaysAllowSchema.optional(),
-	disabled: z.boolean().optional(),
-	timeout: z.number().min(1).max(3600).optional().default(60),
 })
 
+// SSE specific configuration with simplified format
+const SSEConfigSchema = BaseConfigSchema.extend({
+	type: z.literal("sse").optional(),
+	url: z.string().url(),
+	headers: z.record(z.string()).optional(),
+}).transform((data) => ({
+	...data,
+	type: "sse" as const,
+}))
+
+// Combined server configuration schema
+const ServerConfigSchema = z.union([StdioConfigSchema, SSEConfigSchema])
+
+// Settings schema
 const McpSettingsSchema = z.object({
-	mcpServers: z.record(StdioConfigSchema),
+	mcpServers: z.record(ServerConfigSchema),
 })
 
 export class McpHub {
@@ -55,6 +74,7 @@ export class McpHub {
 	private disposables: vscode.Disposable[] = []
 	private settingsWatcher?: vscode.FileSystemWatcher
 	private fileWatchers: Map<string, FSWatcher> = new Map()
+	private isDisposed: boolean = false
 	connections: McpConnection[] = []
 	isConnecting: boolean = false
 
@@ -147,12 +167,11 @@ export class McpHub {
 		}
 	}
 
-	private async connectToServer(name: string, config: StdioServerParameters): Promise<void> {
-		// Remove existing connection if it exists (should never happen, the connection should be deleted beforehand)
-		this.connections = this.connections.filter((conn) => conn.server.name !== name)
+	private async connectToServer(name: string, config: z.infer<typeof ServerConfigSchema>): Promise<void> {
+		// Remove existing connection if it exists
+		await this.deleteConnection(name)
 
 		try {
-			// Each MCP server requires its own transport connection and has unique capabilities, configurations, and error handling. Having separate clients also allows proper scoping of resources/tools and independent server management like reconnection.
 			const client = new Client(
 				{
 					name: "Roo Code",
@@ -163,90 +182,102 @@ export class McpHub {
 				},
 			)
 
-			const transport = new StdioClientTransport({
-				command: config.command,
-				args: config.args,
-				env: {
-					...config.env,
-					...(process.env.PATH ? { PATH: process.env.PATH } : {}),
-					// ...(process.env.NODE_PATH ? { NODE_PATH: process.env.NODE_PATH } : {}),
-				},
-				stderr: "pipe", // necessary for stderr to be available
-			})
+			let transport: StdioClientTransport | SSEClientTransport
 
-			transport.onerror = async (error) => {
-				console.error(`Transport error for "${name}":`, error)
-				const connection = this.connections.find((conn) => conn.server.name === name)
-				if (connection) {
-					connection.server.status = "disconnected"
-					this.appendErrorMessage(connection, error.message)
+			if (config.type === "stdio") {
+				transport = new StdioClientTransport({
+					command: config.command,
+					args: config.args,
+					env: {
+						...config.env,
+						...(process.env.PATH ? { PATH: process.env.PATH } : {}),
+					},
+					stderr: "pipe",
+				})
+
+				// Set up stdio specific error handling
+				transport.onerror = async (error) => {
+					console.error(`Transport error for "${name}":`, error)
+					const connection = this.connections.find((conn) => conn.server.name === name)
+					if (connection) {
+						connection.server.status = "disconnected"
+						this.appendErrorMessage(connection, error.message)
+					}
+					await this.notifyWebviewOfServerChanges()
 				}
-				await this.notifyWebviewOfServerChanges()
-			}
 
-			transport.onclose = async () => {
-				const connection = this.connections.find((conn) => conn.server.name === name)
-				if (connection) {
-					connection.server.status = "disconnected"
+				transport.onclose = async () => {
+					const connection = this.connections.find((conn) => conn.server.name === name)
+					if (connection) {
+						connection.server.status = "disconnected"
+					}
+					await this.notifyWebviewOfServerChanges()
 				}
-				await this.notifyWebviewOfServerChanges()
-			}
 
-			// If the config is invalid, show an error
-			if (!StdioConfigSchema.safeParse(config).success) {
-				console.error(`Invalid config for "${name}": missing or invalid parameters`)
-				const connection: McpConnection = {
-					server: {
-						name,
-						config: JSON.stringify(config),
-						status: "disconnected",
-						error: "Invalid config: missing or invalid parameters",
+				// transport.stderr is only available after the process has been started. However we can't start it separately from the .connect() call because it also starts the transport. And we can't place this after the connect call since we need to capture the stderr stream before the connection is established, in order to capture errors during the connection process.
+				// As a workaround, we start the transport ourselves, and then monkey-patch the start method to no-op so that .connect() doesn't try to start it again.
+				await transport.start()
+				const stderrStream = transport.stderr
+				if (stderrStream) {
+					stderrStream.on("data", async (data: Buffer) => {
+						const errorOutput = data.toString()
+						console.error(`Server "${name}" stderr:`, errorOutput)
+						const connection = this.connections.find((conn) => conn.server.name === name)
+						if (connection) {
+							// NOTE: we do not set server status to "disconnected" because stderr logs do not necessarily mean the server crashed or disconnected, it could just be informational. In fact when the server first starts up, it immediately logs "<name> server running on stdio" to stderr.
+							this.appendErrorMessage(connection, errorOutput)
+							// Only need to update webview right away if it's already disconnected
+							if (connection.server.status === "disconnected") {
+								await this.notifyWebviewOfServerChanges()
+							}
+						}
+					})
+				} else {
+					console.error(`No stderr stream for ${name}`)
+				}
+				transport.start = async () => {} // No-op now, .connect() won't fail
+			} else {
+				// SSE connection
+				const sseOptions = {
+					headers: config.headers,
+					https: {
+						rejectUnauthorized: false, // Allow self-signed certificates if needed
 					},
-					client,
-					transport,
 				}
-				this.connections.push(connection)
-				return
+				// @ts-ignore - EventSource types are not fully compatible
+				global.EventSource = EventSource
+				transport = new SSEClientTransport(new URL(config.url))
+
+				// Set up SSE specific error handling
+				transport.onerror = async (error) => {
+					console.error(`Transport error for "${name}":`, error)
+					const connection = this.connections.find((conn) => conn.server.name === name)
+					if (connection) {
+						connection.server.status = "disconnected"
+						this.appendErrorMessage(connection, error.message)
+					}
+					await this.notifyWebviewOfServerChanges()
+
+					// Attempt reconnection for SSE
+					if (!this.isDisposed) {
+						await this.handleSSEReconnection(name, config)
+					}
+				}
 			}
 
-			// valid schema
-			const parsedConfig = StdioConfigSchema.parse(config)
 			const connection: McpConnection = {
 				server: {
 					name,
 					config: JSON.stringify(config),
 					status: "connecting",
-					disabled: parsedConfig.disabled,
+					disabled: config.disabled,
 				},
 				client,
 				transport,
 			}
 			this.connections.push(connection)
 
-			// transport.stderr is only available after the process has been started. However we can't start it separately from the .connect() call because it also starts the transport. And we can't place this after the connect call since we need to capture the stderr stream before the connection is established, in order to capture errors during the connection process.
-			// As a workaround, we start the transport ourselves, and then monkey-patch the start method to no-op so that .connect() doesn't try to start it again.
-			await transport.start()
-			const stderrStream = transport.stderr
-			if (stderrStream) {
-				stderrStream.on("data", async (data: Buffer) => {
-					const errorOutput = data.toString()
-					console.error(`Server "${name}" stderr:`, errorOutput)
-					const connection = this.connections.find((conn) => conn.server.name === name)
-					if (connection) {
-						// NOTE: we do not set server status to "disconnected" because stderr logs do not necessarily mean the server crashed or disconnected, it could just be informational. In fact when the server first starts up, it immediately logs "<name> server running on stdio" to stderr.
-						this.appendErrorMessage(connection, errorOutput)
-						// Only need to update webview right away if it's already disconnected
-						if (connection.server.status === "disconnected") {
-							await this.notifyWebviewOfServerChanges()
-						}
-					}
-				})
-			} else {
-				console.error(`No stderr stream for ${name}`)
-			}
-			transport.start = async () => {} // No-op now, .connect() won't fail
-
-			// Connect
+			// Connect (this will automatically start the transport)
 			await client.connect(transport)
 			connection.server.status = "connected"
 			connection.server.error = ""
@@ -266,6 +297,26 @@ export class McpHub {
 		}
 	}
 
+	private async handleSSEReconnection(name: string, config: z.infer<typeof ServerConfigSchema>): Promise<void> {
+		const maxRetries = 3
+		let retryCount = 0
+
+		while (retryCount < maxRetries) {
+			try {
+				await delay(Math.pow(2, retryCount) * 1000) // Exponential backoff
+				await this.connectToServer(name, config)
+				return
+			} catch (error) {
+				retryCount++
+				console.error(`Retry ${retryCount} failed for ${name}:`, error)
+			}
+		}
+
+		vscode.window.showErrorMessage(
+			`Failed to reconnect to ${name} after ${maxRetries} attempts. Please check your connection.`,
+		)
+	}
+
 	private appendErrorMessage(connection: McpConnection, error: string) {
 		const newError = connection.server.error ? `${connection.server.error}\n${error}` : error
 		connection.server.error = newError //.slice(0, 800)
@@ -729,6 +780,7 @@ export class McpHub {
 	}
 
 	async dispose(): Promise<void> {
+		this.isDisposed = true
 		this.removeAllFileWatchers()
 		for (const connection of this.connections) {
 			try {