|
|
@@ -1,5 +1,7 @@
|
|
|
import { Client } from "@modelcontextprotocol/sdk/client/index.js"
|
|
|
import { StdioClientTransport, StdioServerParameters } from "@modelcontextprotocol/sdk/client/stdio.js"
|
|
|
+import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"
|
|
|
+import ReconnectingEventSource from "reconnecting-eventsource"
|
|
|
import {
|
|
|
CallToolResultSchema,
|
|
|
ListResourcesResultSchema,
|
|
|
@@ -31,23 +33,40 @@ import { arePathsEqual } from "../../utils/path"
|
|
|
export type McpConnection = {
|
|
|
server: McpServer
|
|
|
client: Client
|
|
|
- transport: StdioClientTransport
|
|
|
+ transport: StdioClientTransport | SSEClientTransport
|
|
|
}
|
|
|
|
|
|
-// StdioServerParameters
|
|
|
-const AlwaysAllowSchema = z.array(z.string()).default([])
|
|
|
-
|
|
|
-export const StdioConfigSchema = z.object({
|
|
|
- command: z.string(),
|
|
|
- args: z.array(z.string()).optional(),
|
|
|
- env: z.record(z.string()).optional(),
|
|
|
- alwaysAllow: AlwaysAllowSchema.optional(),
|
|
|
+// Base configuration schema for common settings
|
|
|
+const BaseConfigSchema = z.object({
|
|
|
disabled: z.boolean().optional(),
|
|
|
timeout: z.number().min(1).max(3600).optional().default(60),
|
|
|
+ alwaysAllow: z.array(z.string()).default([]),
|
|
|
})
|
|
|
|
|
|
+// Server configuration schema with automatic type inference
|
|
|
+export const ServerConfigSchema = z.union([
|
|
|
+ // Stdio config (has command field)
|
|
|
+ BaseConfigSchema.extend({
|
|
|
+ command: z.string(),
|
|
|
+ args: z.array(z.string()).optional(),
|
|
|
+ env: z.record(z.string()).optional(),
|
|
|
+ }).transform((data) => ({
|
|
|
+ ...data,
|
|
|
+ type: "stdio" as const,
|
|
|
+ })),
|
|
|
+ // SSE config (has url field)
|
|
|
+ BaseConfigSchema.extend({
|
|
|
+ url: z.string().url(),
|
|
|
+ headers: z.record(z.string()).optional(),
|
|
|
+ }).transform((data) => ({
|
|
|
+ ...data,
|
|
|
+ type: "sse" as const,
|
|
|
+ })),
|
|
|
+])
|
|
|
+
|
|
|
+// Settings schema
|
|
|
const McpSettingsSchema = z.object({
|
|
|
- mcpServers: z.record(StdioConfigSchema),
|
|
|
+ mcpServers: z.record(ServerConfigSchema),
|
|
|
})
|
|
|
|
|
|
export class McpHub {
|
|
|
@@ -55,6 +74,7 @@ export class McpHub {
|
|
|
private disposables: vscode.Disposable[] = []
|
|
|
private settingsWatcher?: vscode.FileSystemWatcher
|
|
|
private fileWatchers: Map<string, FSWatcher> = new Map()
|
|
|
+ private isDisposed: boolean = false
|
|
|
connections: McpConnection[] = []
|
|
|
isConnecting: boolean = false
|
|
|
|
|
|
@@ -147,12 +167,11 @@ export class McpHub {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private async connectToServer(name: string, config: StdioServerParameters): Promise<void> {
|
|
|
- // Remove existing connection if it exists (should never happen, the connection should be deleted beforehand)
|
|
|
- this.connections = this.connections.filter((conn) => conn.server.name !== name)
|
|
|
+ private async connectToServer(name: string, config: z.infer<typeof ServerConfigSchema>): Promise<void> {
|
|
|
+ // Remove existing connection if it exists
|
|
|
+ await this.deleteConnection(name)
|
|
|
|
|
|
try {
|
|
|
- // Each MCP server requires its own transport connection and has unique capabilities, configurations, and error handling. Having separate clients also allows proper scoping of resources/tools and independent server management like reconnection.
|
|
|
const client = new Client(
|
|
|
{
|
|
|
name: "Roo Code",
|
|
|
@@ -163,90 +182,103 @@ export class McpHub {
|
|
|
},
|
|
|
)
|
|
|
|
|
|
- const transport = new StdioClientTransport({
|
|
|
- command: config.command,
|
|
|
- args: config.args,
|
|
|
- env: {
|
|
|
- ...config.env,
|
|
|
- ...(process.env.PATH ? { PATH: process.env.PATH } : {}),
|
|
|
- // ...(process.env.NODE_PATH ? { NODE_PATH: process.env.NODE_PATH } : {}),
|
|
|
- },
|
|
|
- stderr: "pipe", // necessary for stderr to be available
|
|
|
- })
|
|
|
+ let transport: StdioClientTransport | SSEClientTransport
|
|
|
|
|
|
- transport.onerror = async (error) => {
|
|
|
- console.error(`Transport error for "${name}":`, error)
|
|
|
- const connection = this.connections.find((conn) => conn.server.name === name)
|
|
|
- if (connection) {
|
|
|
- connection.server.status = "disconnected"
|
|
|
- this.appendErrorMessage(connection, error.message)
|
|
|
+ if (config.type === "stdio") {
|
|
|
+ transport = new StdioClientTransport({
|
|
|
+ command: config.command,
|
|
|
+ args: config.args,
|
|
|
+ env: {
|
|
|
+ ...config.env,
|
|
|
+ ...(process.env.PATH ? { PATH: process.env.PATH } : {}),
|
|
|
+ },
|
|
|
+ stderr: "pipe",
|
|
|
+ })
|
|
|
+
|
|
|
+ // Set up stdio specific error handling
|
|
|
+ transport.onerror = async (error) => {
|
|
|
+ console.error(`Transport error for "${name}":`, error)
|
|
|
+ const connection = this.connections.find((conn) => conn.server.name === name)
|
|
|
+ if (connection) {
|
|
|
+ connection.server.status = "disconnected"
|
|
|
+ this.appendErrorMessage(connection, error.message)
|
|
|
+ }
|
|
|
+ await this.notifyWebviewOfServerChanges()
|
|
|
}
|
|
|
- await this.notifyWebviewOfServerChanges()
|
|
|
- }
|
|
|
|
|
|
- transport.onclose = async () => {
|
|
|
- const connection = this.connections.find((conn) => conn.server.name === name)
|
|
|
- if (connection) {
|
|
|
- connection.server.status = "disconnected"
|
|
|
+ transport.onclose = async () => {
|
|
|
+ const connection = this.connections.find((conn) => conn.server.name === name)
|
|
|
+ if (connection) {
|
|
|
+ connection.server.status = "disconnected"
|
|
|
+ }
|
|
|
+ await this.notifyWebviewOfServerChanges()
|
|
|
}
|
|
|
- await this.notifyWebviewOfServerChanges()
|
|
|
- }
|
|
|
|
|
|
- // If the config is invalid, show an error
|
|
|
- if (!StdioConfigSchema.safeParse(config).success) {
|
|
|
- console.error(`Invalid config for "${name}": missing or invalid parameters`)
|
|
|
- const connection: McpConnection = {
|
|
|
- server: {
|
|
|
- name,
|
|
|
- config: JSON.stringify(config),
|
|
|
- status: "disconnected",
|
|
|
- error: "Invalid config: missing or invalid parameters",
|
|
|
+ // transport.stderr is only available after the process has been started. However we can't start it separately from the .connect() call because it also starts the transport. And we can't place this after the connect call since we need to capture the stderr stream before the connection is established, in order to capture errors during the connection process.
|
|
|
+ // As a workaround, we start the transport ourselves, and then monkey-patch the start method to no-op so that .connect() doesn't try to start it again.
|
|
|
+ await transport.start()
|
|
|
+ const stderrStream = transport.stderr
|
|
|
+ if (stderrStream) {
|
|
|
+ stderrStream.on("data", async (data: Buffer) => {
|
|
|
+ const errorOutput = data.toString()
|
|
|
+ console.error(`Server "${name}" stderr:`, errorOutput)
|
|
|
+ const connection = this.connections.find((conn) => conn.server.name === name)
|
|
|
+ if (connection) {
|
|
|
+ // NOTE: we do not set server status to "disconnected" because stderr logs do not necessarily mean the server crashed or disconnected, it could just be informational. In fact when the server first starts up, it immediately logs "<name> server running on stdio" to stderr.
|
|
|
+ this.appendErrorMessage(connection, errorOutput)
|
|
|
+ // Only need to update webview right away if it's already disconnected
|
|
|
+ if (connection.server.status === "disconnected") {
|
|
|
+ await this.notifyWebviewOfServerChanges()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+ } else {
|
|
|
+ console.error(`No stderr stream for ${name}`)
|
|
|
+ }
|
|
|
+ transport.start = async () => {} // No-op now, .connect() won't fail
|
|
|
+ } else {
|
|
|
+ // SSE connection
|
|
|
+ const sseOptions = {
|
|
|
+ requestInit: {
|
|
|
+ headers: config.headers,
|
|
|
},
|
|
|
- client,
|
|
|
- transport,
|
|
|
}
|
|
|
- this.connections.push(connection)
|
|
|
- return
|
|
|
+ // Configure ReconnectingEventSource options
|
|
|
+ const reconnectingEventSourceOptions = {
|
|
|
+ max_retry_time: 5000, // Maximum retry time in milliseconds
|
|
|
+ withCredentials: config.headers?.["Authorization"] ? true : false, // Enable credentials if Authorization header exists
|
|
|
+ }
|
|
|
+ global.EventSource = ReconnectingEventSource
|
|
|
+ transport = new SSEClientTransport(new URL(config.url), {
|
|
|
+ ...sseOptions,
|
|
|
+ eventSourceInit: reconnectingEventSourceOptions,
|
|
|
+ })
|
|
|
+
|
|
|
+ // Set up SSE specific error handling
|
|
|
+ transport.onerror = async (error) => {
|
|
|
+ console.error(`Transport error for "${name}":`, error)
|
|
|
+ const connection = this.connections.find((conn) => conn.server.name === name)
|
|
|
+ if (connection) {
|
|
|
+ connection.server.status = "disconnected"
|
|
|
+ this.appendErrorMessage(connection, error.message)
|
|
|
+ }
|
|
|
+ await this.notifyWebviewOfServerChanges()
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- // valid schema
|
|
|
- const parsedConfig = StdioConfigSchema.parse(config)
|
|
|
const connection: McpConnection = {
|
|
|
server: {
|
|
|
name,
|
|
|
config: JSON.stringify(config),
|
|
|
status: "connecting",
|
|
|
- disabled: parsedConfig.disabled,
|
|
|
+ disabled: config.disabled,
|
|
|
},
|
|
|
client,
|
|
|
transport,
|
|
|
}
|
|
|
this.connections.push(connection)
|
|
|
|
|
|
- // transport.stderr is only available after the process has been started. However we can't start it separately from the .connect() call because it also starts the transport. And we can't place this after the connect call since we need to capture the stderr stream before the connection is established, in order to capture errors during the connection process.
|
|
|
- // As a workaround, we start the transport ourselves, and then monkey-patch the start method to no-op so that .connect() doesn't try to start it again.
|
|
|
- await transport.start()
|
|
|
- const stderrStream = transport.stderr
|
|
|
- if (stderrStream) {
|
|
|
- stderrStream.on("data", async (data: Buffer) => {
|
|
|
- const errorOutput = data.toString()
|
|
|
- console.error(`Server "${name}" stderr:`, errorOutput)
|
|
|
- const connection = this.connections.find((conn) => conn.server.name === name)
|
|
|
- if (connection) {
|
|
|
- // NOTE: we do not set server status to "disconnected" because stderr logs do not necessarily mean the server crashed or disconnected, it could just be informational. In fact when the server first starts up, it immediately logs "<name> server running on stdio" to stderr.
|
|
|
- this.appendErrorMessage(connection, errorOutput)
|
|
|
- // Only need to update webview right away if it's already disconnected
|
|
|
- if (connection.server.status === "disconnected") {
|
|
|
- await this.notifyWebviewOfServerChanges()
|
|
|
- }
|
|
|
- }
|
|
|
- })
|
|
|
- } else {
|
|
|
- console.error(`No stderr stream for ${name}`)
|
|
|
- }
|
|
|
- transport.start = async () => {} // No-op now, .connect() won't fail
|
|
|
-
|
|
|
- // Connect
|
|
|
+ // Connect (this will automatically start the transport)
|
|
|
await client.connect(transport)
|
|
|
connection.server.status = "connected"
|
|
|
connection.server.error = ""
|
|
|
@@ -667,7 +699,7 @@ export class McpHub {
|
|
|
|
|
|
let timeout: number
|
|
|
try {
|
|
|
- const parsedConfig = StdioConfigSchema.parse(JSON.parse(connection.server.config))
|
|
|
+ const parsedConfig = ServerConfigSchema.parse(JSON.parse(connection.server.config))
|
|
|
timeout = (parsedConfig.timeout ?? 60) * 1000
|
|
|
} catch (error) {
|
|
|
console.error("Failed to parse server config for timeout:", error)
|
|
|
@@ -729,6 +761,7 @@ export class McpHub {
|
|
|
}
|
|
|
|
|
|
async dispose(): Promise<void> {
|
|
|
+ this.isDisposed = true
|
|
|
this.removeAllFileWatchers()
|
|
|
for (const connection of this.connections) {
|
|
|
try {
|