|
@@ -24,13 +24,11 @@ import { BusEvent } from "../bus/bus-event"
|
|
|
import { Bus } from "@/bus"
|
|
import { Bus } from "@/bus"
|
|
|
import { TuiEvent } from "@/cli/cmd/tui/event"
|
|
import { TuiEvent } from "@/cli/cmd/tui/event"
|
|
|
import open from "open"
|
|
import open from "open"
|
|
|
-import { Effect, Layer, Option, ServiceMap, Stream } from "effect"
|
|
|
|
|
|
|
+import { Effect, Exit, Layer, Option, ServiceMap, Stream } from "effect"
|
|
|
import { InstanceState } from "@/effect/instance-state"
|
|
import { InstanceState } from "@/effect/instance-state"
|
|
|
import { makeRuntime } from "@/effect/run-service"
|
|
import { makeRuntime } from "@/effect/run-service"
|
|
|
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
|
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
|
|
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
|
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
|
|
-import { NodeFileSystem } from "@effect/platform-node"
|
|
|
|
|
-import * as NodePath from "@effect/platform-node/NodePath"
|
|
|
|
|
|
|
|
|
|
export namespace MCP {
|
|
export namespace MCP {
|
|
|
const log = Log.create({ service: "mcp" })
|
|
const log = Log.create({ service: "mcp" })
|
|
@@ -129,6 +127,8 @@ export namespace MCP {
|
|
|
return typeof entry === "object" && entry !== null && "type" in entry
|
|
return typeof entry === "object" && entry !== null && "type" in entry
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ const sanitize = (s: string) => s.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
|
|
+
|
|
|
// Convert MCP tool definition to AI SDK Tool type
|
|
// Convert MCP tool definition to AI SDK Tool type
|
|
|
function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool {
|
|
function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool {
|
|
|
const inputSchema = mcpTool.inputSchema
|
|
const inputSchema = mcpTool.inputSchema
|
|
@@ -160,233 +160,48 @@ export namespace MCP {
|
|
|
})
|
|
})
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- async function defs(key: string, client: MCPClient, timeout?: number) {
|
|
|
|
|
- const result = await withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT).catch((err) => {
|
|
|
|
|
- log.error("failed to get tools from client", { key, error: err })
|
|
|
|
|
- return undefined
|
|
|
|
|
- })
|
|
|
|
|
- return result?.tools
|
|
|
|
|
|
|
+ function defs(key: string, client: MCPClient, timeout?: number) {
|
|
|
|
|
+ return Effect.tryPromise({
|
|
|
|
|
+ try: () => withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT),
|
|
|
|
|
+ catch: (err) => (err instanceof Error ? err : new Error(String(err))),
|
|
|
|
|
+ }).pipe(
|
|
|
|
|
+ Effect.map((result) => result.tools),
|
|
|
|
|
+ Effect.catch((err) => {
|
|
|
|
|
+ log.error("failed to get tools from client", { key, error: err })
|
|
|
|
|
+ return Effect.succeed(undefined)
|
|
|
|
|
+ }),
|
|
|
|
|
+ )
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- async function fetchFromClient<T extends { name: string }>(
|
|
|
|
|
|
|
+ function fetchFromClient<T extends { name: string }>(
|
|
|
clientName: string,
|
|
clientName: string,
|
|
|
client: Client,
|
|
client: Client,
|
|
|
listFn: (c: Client) => Promise<T[]>,
|
|
listFn: (c: Client) => Promise<T[]>,
|
|
|
label: string,
|
|
label: string,
|
|
|
- ): Promise<Record<string, T & { client: string }> | undefined> {
|
|
|
|
|
- const items = await listFn(client).catch((e: any) => {
|
|
|
|
|
- log.error(`failed to get ${label}`, { clientName, error: e.message })
|
|
|
|
|
- return undefined
|
|
|
|
|
- })
|
|
|
|
|
- if (!items) return undefined
|
|
|
|
|
-
|
|
|
|
|
- const out: Record<string, T & { client: string }> = {}
|
|
|
|
|
- const sanitizedClient = clientName.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
|
|
- for (const item of items) {
|
|
|
|
|
- const sanitizedName = item.name.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
|
|
- out[sanitizedClient + ":" + sanitizedName] = { ...item, client: clientName }
|
|
|
|
|
- }
|
|
|
|
|
- return out
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- async function create(key: string, mcp: Config.Mcp) {
|
|
|
|
|
- if (mcp.enabled === false) {
|
|
|
|
|
- log.info("mcp server disabled", { key })
|
|
|
|
|
- return {
|
|
|
|
|
- mcpClient: undefined,
|
|
|
|
|
- status: { status: "disabled" as const },
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- log.info("found", { key, type: mcp.type })
|
|
|
|
|
- let mcpClient: MCPClient | undefined
|
|
|
|
|
- let status: Status | undefined = undefined
|
|
|
|
|
-
|
|
|
|
|
- if (mcp.type === "remote") {
|
|
|
|
|
- // OAuth is enabled by default for remote servers unless explicitly disabled with oauth: false
|
|
|
|
|
- const oauthDisabled = mcp.oauth === false
|
|
|
|
|
- const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined
|
|
|
|
|
- let authProvider: McpOAuthProvider | undefined
|
|
|
|
|
-
|
|
|
|
|
- if (!oauthDisabled) {
|
|
|
|
|
- authProvider = new McpOAuthProvider(
|
|
|
|
|
- key,
|
|
|
|
|
- mcp.url,
|
|
|
|
|
- {
|
|
|
|
|
- clientId: oauthConfig?.clientId,
|
|
|
|
|
- clientSecret: oauthConfig?.clientSecret,
|
|
|
|
|
- scope: oauthConfig?.scope,
|
|
|
|
|
- },
|
|
|
|
|
- {
|
|
|
|
|
- onRedirect: async (url) => {
|
|
|
|
|
- log.info("oauth redirect requested", { key, url: url.toString() })
|
|
|
|
|
- // Store the URL - actual browser opening is handled by startAuth
|
|
|
|
|
- },
|
|
|
|
|
- },
|
|
|
|
|
- )
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- const transports: Array<{ name: string; transport: TransportWithAuth }> = [
|
|
|
|
|
- {
|
|
|
|
|
- name: "StreamableHTTP",
|
|
|
|
|
- transport: new StreamableHTTPClientTransport(new URL(mcp.url), {
|
|
|
|
|
- authProvider,
|
|
|
|
|
- requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
|
|
|
|
|
- }),
|
|
|
|
|
- },
|
|
|
|
|
- {
|
|
|
|
|
- name: "SSE",
|
|
|
|
|
- transport: new SSEClientTransport(new URL(mcp.url), {
|
|
|
|
|
- authProvider,
|
|
|
|
|
- requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
|
|
|
|
|
- }),
|
|
|
|
|
- },
|
|
|
|
|
- ]
|
|
|
|
|
-
|
|
|
|
|
- let lastError: Error | undefined
|
|
|
|
|
- const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
|
|
|
|
|
- for (const { name, transport } of transports) {
|
|
|
|
|
- try {
|
|
|
|
|
- const client = new Client({
|
|
|
|
|
- name: "opencode",
|
|
|
|
|
- version: Installation.VERSION,
|
|
|
|
|
- })
|
|
|
|
|
- await withTimeout(client.connect(transport), connectTimeout)
|
|
|
|
|
- mcpClient = client
|
|
|
|
|
- log.info("connected", { key, transport: name })
|
|
|
|
|
- status = { status: "connected" }
|
|
|
|
|
- break
|
|
|
|
|
- } catch (error) {
|
|
|
|
|
- lastError = error instanceof Error ? error : new Error(String(error))
|
|
|
|
|
-
|
|
|
|
|
- // Handle OAuth-specific errors.
|
|
|
|
|
- // The SDK throws UnauthorizedError when auth() returns 'REDIRECT',
|
|
|
|
|
- // but may also throw plain Errors when auth() fails internally
|
|
|
|
|
- // (e.g. during discovery, registration, or state generation).
|
|
|
|
|
- // When an authProvider is attached, treat both cases as auth-related.
|
|
|
|
|
- const isAuthError =
|
|
|
|
|
- error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth"))
|
|
|
|
|
- if (isAuthError) {
|
|
|
|
|
- log.info("mcp server requires authentication", { key, transport: name })
|
|
|
|
|
-
|
|
|
|
|
- // Check if this is a "needs registration" error
|
|
|
|
|
- if (lastError.message.includes("registration") || lastError.message.includes("client_id")) {
|
|
|
|
|
- status = {
|
|
|
|
|
- status: "needs_client_registration" as const,
|
|
|
|
|
- error: "Server does not support dynamic client registration. Please provide clientId in config.",
|
|
|
|
|
- }
|
|
|
|
|
- // Show toast for needs_client_registration
|
|
|
|
|
- Bus.publish(TuiEvent.ToastShow, {
|
|
|
|
|
- title: "MCP Authentication Required",
|
|
|
|
|
- message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`,
|
|
|
|
|
- variant: "warning",
|
|
|
|
|
- duration: 8000,
|
|
|
|
|
- }).catch((e) => log.debug("failed to show toast", { error: e }))
|
|
|
|
|
- } else {
|
|
|
|
|
- // Store transport for later finishAuth call
|
|
|
|
|
- pendingOAuthTransports.set(key, transport)
|
|
|
|
|
- status = { status: "needs_auth" as const }
|
|
|
|
|
- // Show toast for needs_auth
|
|
|
|
|
- Bus.publish(TuiEvent.ToastShow, {
|
|
|
|
|
- title: "MCP Authentication Required",
|
|
|
|
|
- message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`,
|
|
|
|
|
- variant: "warning",
|
|
|
|
|
- duration: 8000,
|
|
|
|
|
- }).catch((e) => log.debug("failed to show toast", { error: e }))
|
|
|
|
|
- }
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- log.debug("transport connection failed", {
|
|
|
|
|
- key,
|
|
|
|
|
- transport: name,
|
|
|
|
|
- url: mcp.url,
|
|
|
|
|
- error: lastError.message,
|
|
|
|
|
- })
|
|
|
|
|
- status = {
|
|
|
|
|
- status: "failed" as const,
|
|
|
|
|
- error: lastError.message,
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if (mcp.type === "local") {
|
|
|
|
|
- const [cmd, ...args] = mcp.command
|
|
|
|
|
- const cwd = Instance.directory
|
|
|
|
|
- const transport = new StdioClientTransport({
|
|
|
|
|
- stderr: "pipe",
|
|
|
|
|
- command: cmd,
|
|
|
|
|
- args,
|
|
|
|
|
- cwd,
|
|
|
|
|
- env: {
|
|
|
|
|
- ...process.env,
|
|
|
|
|
- ...(cmd === "opencode" ? { BUN_BE_BUN: "1" } : {}),
|
|
|
|
|
- ...mcp.environment,
|
|
|
|
|
- },
|
|
|
|
|
- })
|
|
|
|
|
- transport.stderr?.on("data", (chunk: Buffer) => {
|
|
|
|
|
- log.info(`mcp stderr: ${chunk.toString()}`, { key })
|
|
|
|
|
- })
|
|
|
|
|
-
|
|
|
|
|
- const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
|
|
|
|
|
- try {
|
|
|
|
|
- const client = new Client({
|
|
|
|
|
- name: "opencode",
|
|
|
|
|
- version: Installation.VERSION,
|
|
|
|
|
- })
|
|
|
|
|
- await withTimeout(client.connect(transport), connectTimeout)
|
|
|
|
|
- mcpClient = client
|
|
|
|
|
- status = {
|
|
|
|
|
- status: "connected",
|
|
|
|
|
- }
|
|
|
|
|
- } catch (error) {
|
|
|
|
|
- log.error("local mcp startup failed", {
|
|
|
|
|
- key,
|
|
|
|
|
- command: mcp.command,
|
|
|
|
|
- cwd,
|
|
|
|
|
- error: error instanceof Error ? error.message : String(error),
|
|
|
|
|
- })
|
|
|
|
|
- status = {
|
|
|
|
|
- status: "failed" as const,
|
|
|
|
|
- error: error instanceof Error ? error.message : String(error),
|
|
|
|
|
|
|
+ ) {
|
|
|
|
|
+ return Effect.tryPromise({
|
|
|
|
|
+ try: () => listFn(client),
|
|
|
|
|
+ catch: (e: any) => {
|
|
|
|
|
+ log.error(`failed to get ${label}`, { clientName, error: e.message })
|
|
|
|
|
+ return e
|
|
|
|
|
+ },
|
|
|
|
|
+ }).pipe(
|
|
|
|
|
+ Effect.map((items) => {
|
|
|
|
|
+ const out: Record<string, T & { client: string }> = {}
|
|
|
|
|
+ const sanitizedClient = sanitize(clientName)
|
|
|
|
|
+ for (const item of items) {
|
|
|
|
|
+ out[sanitizedClient + ":" + sanitize(item.name)] = { ...item, client: clientName }
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if (!status) {
|
|
|
|
|
- status = {
|
|
|
|
|
- status: "failed" as const,
|
|
|
|
|
- error: "Unknown error",
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if (!mcpClient) {
|
|
|
|
|
- return {
|
|
|
|
|
- mcpClient: undefined,
|
|
|
|
|
- status,
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- const listed = await defs(key, mcpClient, mcp.timeout)
|
|
|
|
|
- if (!listed) {
|
|
|
|
|
- await mcpClient.close().catch((error) => {
|
|
|
|
|
- log.error("Failed to close MCP client", {
|
|
|
|
|
- error,
|
|
|
|
|
- })
|
|
|
|
|
- })
|
|
|
|
|
- return {
|
|
|
|
|
- mcpClient: undefined,
|
|
|
|
|
- status: { status: "failed" as const, error: "Failed to get tools" },
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ return out
|
|
|
|
|
+ }),
|
|
|
|
|
+ Effect.orElseSucceed(() => undefined),
|
|
|
|
|
+ )
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- log.info("create() successfully created client", { key, toolCount: listed.length })
|
|
|
|
|
- return {
|
|
|
|
|
- mcpClient,
|
|
|
|
|
- status,
|
|
|
|
|
- defs: listed,
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ interface CreateResult {
|
|
|
|
|
+ mcpClient?: MCPClient
|
|
|
|
|
+ status: Status
|
|
|
|
|
+ defs?: MCPToolDef[]
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// --- Effect Service ---
|
|
// --- Effect Service ---
|
|
@@ -431,6 +246,196 @@ export namespace MCP {
|
|
|
Effect.gen(function* () {
|
|
Effect.gen(function* () {
|
|
|
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
|
|
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
|
|
|
const auth = yield* McpAuth.Service
|
|
const auth = yield* McpAuth.Service
|
|
|
|
|
+ const bus = yield* Bus.Service
|
|
|
|
|
+
|
|
|
|
|
+ type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * Connect a client via the given transport with resource safety:
|
|
|
|
|
+ * on failure the transport is closed; on success the caller owns it.
|
|
|
|
|
+ */
|
|
|
|
|
+ const connectTransport = (transport: Transport, timeout: number) =>
|
|
|
|
|
+ Effect.acquireUseRelease(
|
|
|
|
|
+ Effect.succeed(transport),
|
|
|
|
|
+ (t) =>
|
|
|
|
|
+ Effect.tryPromise({
|
|
|
|
|
+ try: () => {
|
|
|
|
|
+ const client = new Client({ name: "opencode", version: Installation.VERSION })
|
|
|
|
|
+ return withTimeout(client.connect(t), timeout).then(() => client)
|
|
|
|
|
+ },
|
|
|
|
|
+ catch: (e) => (e instanceof Error ? e : new Error(String(e))),
|
|
|
|
|
+ }),
|
|
|
|
|
+ (t, exit) => (Exit.isFailure(exit) ? Effect.tryPromise(() => t.close()).pipe(Effect.ignore) : Effect.void),
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ const DISABLED_RESULT: CreateResult = { status: { status: "disabled" } }
|
|
|
|
|
+
|
|
|
|
|
+ const connectRemote = Effect.fn("MCP.connectRemote")(function* (
|
|
|
|
|
+ key: string,
|
|
|
|
|
+ mcp: Config.Mcp & { type: "remote" },
|
|
|
|
|
+ ) {
|
|
|
|
|
+ const oauthDisabled = mcp.oauth === false
|
|
|
|
|
+ const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined
|
|
|
|
|
+ let authProvider: McpOAuthProvider | undefined
|
|
|
|
|
+
|
|
|
|
|
+ if (!oauthDisabled) {
|
|
|
|
|
+ authProvider = new McpOAuthProvider(
|
|
|
|
|
+ key,
|
|
|
|
|
+ mcp.url,
|
|
|
|
|
+ {
|
|
|
|
|
+ clientId: oauthConfig?.clientId,
|
|
|
|
|
+ clientSecret: oauthConfig?.clientSecret,
|
|
|
|
|
+ scope: oauthConfig?.scope,
|
|
|
|
|
+ },
|
|
|
|
|
+ {
|
|
|
|
|
+ onRedirect: async (url) => {
|
|
|
|
|
+ log.info("oauth redirect requested", { key, url: url.toString() })
|
|
|
|
|
+ },
|
|
|
|
|
+ },
|
|
|
|
|
+ )
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ const transports: Array<{ name: string; transport: TransportWithAuth }> = [
|
|
|
|
|
+ {
|
|
|
|
|
+ name: "StreamableHTTP",
|
|
|
|
|
+ transport: new StreamableHTTPClientTransport(new URL(mcp.url), {
|
|
|
|
|
+ authProvider,
|
|
|
|
|
+ requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
|
|
|
|
|
+ }),
|
|
|
|
|
+ },
|
|
|
|
|
+ {
|
|
|
|
|
+ name: "SSE",
|
|
|
|
|
+ transport: new SSEClientTransport(new URL(mcp.url), {
|
|
|
|
|
+ authProvider,
|
|
|
|
|
+ requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
|
|
|
|
|
+ }),
|
|
|
|
|
+ },
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
|
|
|
|
|
+ let lastStatus: Status | undefined
|
|
|
|
|
+
|
|
|
|
|
+ for (const { name, transport } of transports) {
|
|
|
|
|
+ const result = yield* connectTransport(transport, connectTimeout).pipe(
|
|
|
|
|
+ Effect.map((client) => ({ client, transportName: name })),
|
|
|
|
|
+ Effect.catch((error) => {
|
|
|
|
|
+ const lastError = error instanceof Error ? error : new Error(String(error))
|
|
|
|
|
+ const isAuthError =
|
|
|
|
|
+ error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth"))
|
|
|
|
|
+
|
|
|
|
|
+ if (isAuthError) {
|
|
|
|
|
+ log.info("mcp server requires authentication", { key, transport: name })
|
|
|
|
|
+
|
|
|
|
|
+ if (lastError.message.includes("registration") || lastError.message.includes("client_id")) {
|
|
|
|
|
+ lastStatus = {
|
|
|
|
|
+ status: "needs_client_registration" as const,
|
|
|
|
|
+ error: "Server does not support dynamic client registration. Please provide clientId in config.",
|
|
|
|
|
+ }
|
|
|
|
|
+ return bus
|
|
|
|
|
+ .publish(TuiEvent.ToastShow, {
|
|
|
|
|
+ title: "MCP Authentication Required",
|
|
|
|
|
+ message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`,
|
|
|
|
|
+ variant: "warning",
|
|
|
|
|
+ duration: 8000,
|
|
|
|
|
+ })
|
|
|
|
|
+ .pipe(Effect.ignore, Effect.as(undefined))
|
|
|
|
|
+ } else {
|
|
|
|
|
+ pendingOAuthTransports.set(key, transport)
|
|
|
|
|
+ lastStatus = { status: "needs_auth" as const }
|
|
|
|
|
+ return bus
|
|
|
|
|
+ .publish(TuiEvent.ToastShow, {
|
|
|
|
|
+ title: "MCP Authentication Required",
|
|
|
|
|
+ message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`,
|
|
|
|
|
+ variant: "warning",
|
|
|
|
|
+ duration: 8000,
|
|
|
|
|
+ })
|
|
|
|
|
+ .pipe(Effect.ignore, Effect.as(undefined))
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.debug("transport connection failed", {
|
|
|
|
|
+ key,
|
|
|
|
|
+ transport: name,
|
|
|
|
|
+ url: mcp.url,
|
|
|
|
|
+ error: lastError.message,
|
|
|
|
|
+ })
|
|
|
|
|
+ lastStatus = { status: "failed" as const, error: lastError.message }
|
|
|
|
|
+ return Effect.succeed(undefined)
|
|
|
|
|
+ }),
|
|
|
|
|
+ )
|
|
|
|
|
+ if (result) {
|
|
|
|
|
+ log.info("connected", { key, transport: result.transportName })
|
|
|
|
|
+ return { client: result.client as MCPClient | undefined, status: { status: "connected" } as Status }
|
|
|
|
|
+ }
|
|
|
|
|
+ // If this was an auth error, stop trying other transports
|
|
|
|
|
+ if (lastStatus?.status === "needs_auth" || lastStatus?.status === "needs_client_registration") break
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ client: undefined as MCPClient | undefined,
|
|
|
|
|
+ status: (lastStatus ?? { status: "failed", error: "Unknown error" }) as Status,
|
|
|
|
|
+ }
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ const connectLocal = Effect.fn("MCP.connectLocal")(function* (key: string, mcp: Config.Mcp & { type: "local" }) {
|
|
|
|
|
+ const [cmd, ...args] = mcp.command
|
|
|
|
|
+ const cwd = Instance.directory
|
|
|
|
|
+ const transport = new StdioClientTransport({
|
|
|
|
|
+ stderr: "pipe",
|
|
|
|
|
+ command: cmd,
|
|
|
|
|
+ args,
|
|
|
|
|
+ cwd,
|
|
|
|
|
+ env: {
|
|
|
|
|
+ ...process.env,
|
|
|
|
|
+ ...(cmd === "opencode" ? { BUN_BE_BUN: "1" } : {}),
|
|
|
|
|
+ ...mcp.environment,
|
|
|
|
|
+ },
|
|
|
|
|
+ })
|
|
|
|
|
+ transport.stderr?.on("data", (chunk: Buffer) => {
|
|
|
|
|
+ log.info(`mcp stderr: ${chunk.toString()}`, { key })
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
|
|
|
|
|
+ return yield* connectTransport(transport, connectTimeout).pipe(
|
|
|
|
|
+ Effect.map((client): { client: MCPClient | undefined; status: Status } => ({
|
|
|
|
|
+ client,
|
|
|
|
|
+ status: { status: "connected" },
|
|
|
|
|
+ })),
|
|
|
|
|
+ Effect.catch((error): Effect.Effect<{ client: MCPClient | undefined; status: Status }> => {
|
|
|
|
|
+ const msg = error instanceof Error ? error.message : String(error)
|
|
|
|
|
+ log.error("local mcp startup failed", { key, command: mcp.command, cwd, error: msg })
|
|
|
|
|
+ return Effect.succeed({ client: undefined, status: { status: "failed", error: msg } })
|
|
|
|
|
+ }),
|
|
|
|
|
+ )
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ const create = Effect.fn("MCP.create")(function* (key: string, mcp: Config.Mcp) {
|
|
|
|
|
+ if (mcp.enabled === false) {
|
|
|
|
|
+ log.info("mcp server disabled", { key })
|
|
|
|
|
+ return DISABLED_RESULT
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.info("found", { key, type: mcp.type })
|
|
|
|
|
+
|
|
|
|
|
+ const { client: mcpClient, status } =
|
|
|
|
|
+ mcp.type === "remote"
|
|
|
|
|
+ ? yield* connectRemote(key, mcp as Config.Mcp & { type: "remote" })
|
|
|
|
|
+ : yield* connectLocal(key, mcp as Config.Mcp & { type: "local" })
|
|
|
|
|
+
|
|
|
|
|
+ if (!mcpClient) {
|
|
|
|
|
+ return { status } satisfies CreateResult
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ const listed = yield* defs(key, mcpClient, mcp.timeout)
|
|
|
|
|
+ if (!listed) {
|
|
|
|
|
+ yield* Effect.tryPromise(() => mcpClient.close()).pipe(Effect.ignore)
|
|
|
|
|
+ return { status: { status: "failed", error: "Failed to get tools" } } satisfies CreateResult
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.info("create() successfully created client", { key, toolCount: listed.length })
|
|
|
|
|
+ return { mcpClient, status, defs: listed } satisfies CreateResult
|
|
|
|
|
+ })
|
|
|
|
|
+ const cfgSvc = yield* Config.Service
|
|
|
|
|
|
|
|
const descendants = Effect.fnUntraced(
|
|
const descendants = Effect.fnUntraced(
|
|
|
function* (pid: number) {
|
|
function* (pid: number) {
|
|
@@ -463,20 +468,18 @@ export namespace MCP {
|
|
|
log.info("tools list changed notification received", { server: name })
|
|
log.info("tools list changed notification received", { server: name })
|
|
|
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
|
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
|
|
|
|
|
|
|
- const listed = await defs(name, client, timeout)
|
|
|
|
|
|
|
+ const listed = await Effect.runPromise(defs(name, client, timeout))
|
|
|
if (!listed) return
|
|
if (!listed) return
|
|
|
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
|
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
|
|
|
|
|
|
|
s.defs[name] = listed
|
|
s.defs[name] = listed
|
|
|
- await Bus.publish(ToolsChanged, { server: name }).catch((error) =>
|
|
|
|
|
- log.warn("failed to publish tools changed", { server: name, error }),
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ await Effect.runPromise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore))
|
|
|
})
|
|
})
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
const cache = yield* InstanceState.make<State>(
|
|
const cache = yield* InstanceState.make<State>(
|
|
|
Effect.fn("MCP.state")(function* () {
|
|
Effect.fn("MCP.state")(function* () {
|
|
|
- const cfg = yield* Effect.promise(() => Config.get())
|
|
|
|
|
|
|
+ const cfg = yield* cfgSvc.get()
|
|
|
const config = cfg.mcp ?? {}
|
|
const config = cfg.mcp ?? {}
|
|
|
const s: State = {
|
|
const s: State = {
|
|
|
status: {},
|
|
status: {},
|
|
@@ -498,13 +501,13 @@ export namespace MCP {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- const result = yield* Effect.promise(() => create(key, mcp).catch(() => undefined))
|
|
|
|
|
|
|
+ const result = yield* create(key, mcp).pipe(Effect.catch(() => Effect.succeed(undefined)))
|
|
|
if (!result) return
|
|
if (!result) return
|
|
|
|
|
|
|
|
s.status[key] = result.status
|
|
s.status[key] = result.status
|
|
|
if (result.mcpClient) {
|
|
if (result.mcpClient) {
|
|
|
s.clients[key] = result.mcpClient
|
|
s.clients[key] = result.mcpClient
|
|
|
- s.defs[key] = result.defs
|
|
|
|
|
|
|
+ s.defs[key] = result.defs!
|
|
|
watch(s, key, result.mcpClient, mcp.timeout)
|
|
watch(s, key, result.mcpClient, mcp.timeout)
|
|
|
}
|
|
}
|
|
|
}),
|
|
}),
|
|
@@ -542,14 +545,13 @@ export namespace MCP {
|
|
|
const client = s.clients[name]
|
|
const client = s.clients[name]
|
|
|
delete s.defs[name]
|
|
delete s.defs[name]
|
|
|
if (!client) return Effect.void
|
|
if (!client) return Effect.void
|
|
|
- return Effect.promise(() =>
|
|
|
|
|
- client.close().catch((error: any) => log.error("failed to close MCP client", { name, error })),
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ return Effect.tryPromise(() => client.close()).pipe(Effect.ignore)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
const status = Effect.fn("MCP.status")(function* () {
|
|
const status = Effect.fn("MCP.status")(function* () {
|
|
|
const s = yield* InstanceState.get(cache)
|
|
const s = yield* InstanceState.get(cache)
|
|
|
- const cfg = yield* Effect.promise(() => Config.get())
|
|
|
|
|
|
|
+
|
|
|
|
|
+ const cfg = yield* cfgSvc.get()
|
|
|
const config = cfg.mcp ?? {}
|
|
const config = cfg.mcp ?? {}
|
|
|
const result: Record<string, Status> = {}
|
|
const result: Record<string, Status> = {}
|
|
|
|
|
|
|
@@ -568,14 +570,7 @@ export namespace MCP {
|
|
|
|
|
|
|
|
const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: Config.Mcp) {
|
|
const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: Config.Mcp) {
|
|
|
const s = yield* InstanceState.get(cache)
|
|
const s = yield* InstanceState.get(cache)
|
|
|
- const result = yield* Effect.promise(() => create(name, mcp))
|
|
|
|
|
-
|
|
|
|
|
- if (!result) {
|
|
|
|
|
- yield* closeClient(s, name)
|
|
|
|
|
- delete s.clients[name]
|
|
|
|
|
- s.status[name] = { status: "failed" as const, error: "unknown error" }
|
|
|
|
|
- return s.status[name]
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ const result = yield* create(name, mcp)
|
|
|
|
|
|
|
|
s.status[name] = result.status
|
|
s.status[name] = result.status
|
|
|
if (!result.mcpClient) {
|
|
if (!result.mcpClient) {
|
|
@@ -586,7 +581,7 @@ export namespace MCP {
|
|
|
|
|
|
|
|
yield* closeClient(s, name)
|
|
yield* closeClient(s, name)
|
|
|
s.clients[name] = result.mcpClient
|
|
s.clients[name] = result.mcpClient
|
|
|
- s.defs[name] = result.defs
|
|
|
|
|
|
|
+ s.defs[name] = result.defs!
|
|
|
watch(s, name, result.mcpClient, mcp.timeout)
|
|
watch(s, name, result.mcpClient, mcp.timeout)
|
|
|
return result.status
|
|
return result.status
|
|
|
})
|
|
})
|
|
@@ -616,7 +611,8 @@ export namespace MCP {
|
|
|
const tools = Effect.fn("MCP.tools")(function* () {
|
|
const tools = Effect.fn("MCP.tools")(function* () {
|
|
|
const result: Record<string, Tool> = {}
|
|
const result: Record<string, Tool> = {}
|
|
|
const s = yield* InstanceState.get(cache)
|
|
const s = yield* InstanceState.get(cache)
|
|
|
- const cfg = yield* Effect.promise(() => Config.get())
|
|
|
|
|
|
|
+
|
|
|
|
|
+ const cfg = yield* cfgSvc.get()
|
|
|
const config = cfg.mcp ?? {}
|
|
const config = cfg.mcp ?? {}
|
|
|
const defaultTimeout = cfg.experimental?.mcp_timeout
|
|
const defaultTimeout = cfg.experimental?.mcp_timeout
|
|
|
|
|
|
|
@@ -639,9 +635,7 @@ export namespace MCP {
|
|
|
|
|
|
|
|
const timeout = entry?.timeout ?? defaultTimeout
|
|
const timeout = entry?.timeout ?? defaultTimeout
|
|
|
for (const mcpTool of listed) {
|
|
for (const mcpTool of listed) {
|
|
|
- const sanitizedClientName = clientName.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
|
|
- const sanitizedToolName = mcpTool.name.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
|
|
- result[sanitizedClientName + "_" + sanitizedToolName] = convertMcpTool(mcpTool, client, timeout)
|
|
|
|
|
|
|
+ result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout)
|
|
|
}
|
|
}
|
|
|
}),
|
|
}),
|
|
|
{ concurrency: "unbounded" },
|
|
{ concurrency: "unbounded" },
|
|
@@ -649,30 +643,27 @@ export namespace MCP {
|
|
|
return result
|
|
return result
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
- function collectFromConnected<T>(
|
|
|
|
|
|
|
+ function collectFromConnected<T extends { name: string }>(
|
|
|
s: State,
|
|
s: State,
|
|
|
- fetchFn: (clientName: string, client: Client) => Promise<Record<string, T> | undefined>,
|
|
|
|
|
|
|
+ listFn: (c: Client) => Promise<T[]>,
|
|
|
|
|
+ label: string,
|
|
|
) {
|
|
) {
|
|
|
return Effect.forEach(
|
|
return Effect.forEach(
|
|
|
Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"),
|
|
Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"),
|
|
|
([clientName, client]) =>
|
|
([clientName, client]) =>
|
|
|
- Effect.promise(async () => Object.entries((await fetchFn(clientName, client)) ?? {})),
|
|
|
|
|
|
|
+ fetchFromClient(clientName, client, listFn, label).pipe(Effect.map((items) => Object.entries(items ?? {}))),
|
|
|
{ concurrency: "unbounded" },
|
|
{ concurrency: "unbounded" },
|
|
|
- ).pipe(Effect.map((results) => Object.fromEntries<T>(results.flat())))
|
|
|
|
|
|
|
+ ).pipe(Effect.map((results) => Object.fromEntries<T & { client: string }>(results.flat())))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
const prompts = Effect.fn("MCP.prompts")(function* () {
|
|
const prompts = Effect.fn("MCP.prompts")(function* () {
|
|
|
const s = yield* InstanceState.get(cache)
|
|
const s = yield* InstanceState.get(cache)
|
|
|
- return yield* collectFromConnected(s, (name, client) =>
|
|
|
|
|
- fetchFromClient(name, client, (c) => c.listPrompts().then((r) => r.prompts), "prompts"),
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ return yield* collectFromConnected(s, (c) => c.listPrompts().then((r) => r.prompts), "prompts")
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
const resources = Effect.fn("MCP.resources")(function* () {
|
|
const resources = Effect.fn("MCP.resources")(function* () {
|
|
|
const s = yield* InstanceState.get(cache)
|
|
const s = yield* InstanceState.get(cache)
|
|
|
- return yield* collectFromConnected(s, (name, client) =>
|
|
|
|
|
- fetchFromClient(name, client, (c) => c.listResources().then((r) => r.resources), "resources"),
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ return yield* collectFromConnected(s, (c) => c.listResources().then((r) => r.resources), "resources")
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
const withClient = Effect.fnUntraced(function* <A>(
|
|
const withClient = Effect.fnUntraced(function* <A>(
|
|
@@ -713,7 +704,7 @@ export namespace MCP {
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
const getMcpConfig = Effect.fnUntraced(function* (mcpName: string) {
|
|
const getMcpConfig = Effect.fnUntraced(function* (mcpName: string) {
|
|
|
- const cfg = yield* Effect.promise(() => Config.get())
|
|
|
|
|
|
|
+ const cfg = yield* cfgSvc.get()
|
|
|
const mcpConfig = cfg.mcp?.[mcpName]
|
|
const mcpConfig = cfg.mcp?.[mcpName]
|
|
|
if (!mcpConfig || !isMcpConfigured(mcpConfig)) return undefined
|
|
if (!mcpConfig || !isMcpConfigured(mcpConfig)) return undefined
|
|
|
return mcpConfig
|
|
return mcpConfig
|
|
@@ -750,19 +741,21 @@ export namespace MCP {
|
|
|
|
|
|
|
|
const transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url), { authProvider })
|
|
const transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url), { authProvider })
|
|
|
|
|
|
|
|
- return yield* Effect.promise(async () => {
|
|
|
|
|
- try {
|
|
|
|
|
|
|
+ return yield* Effect.tryPromise({
|
|
|
|
|
+ try: () => {
|
|
|
const client = new Client({ name: "opencode", version: Installation.VERSION })
|
|
const client = new Client({ name: "opencode", version: Installation.VERSION })
|
|
|
- await client.connect(transport)
|
|
|
|
|
- return { authorizationUrl: "", oauthState }
|
|
|
|
|
- } catch (error) {
|
|
|
|
|
|
|
+ return client.connect(transport).then(() => ({ authorizationUrl: "", oauthState }))
|
|
|
|
|
+ },
|
|
|
|
|
+ catch: (error) => error,
|
|
|
|
|
+ }).pipe(
|
|
|
|
|
+ Effect.catch((error) => {
|
|
|
if (error instanceof UnauthorizedError && capturedUrl) {
|
|
if (error instanceof UnauthorizedError && capturedUrl) {
|
|
|
pendingOAuthTransports.set(mcpName, transport)
|
|
pendingOAuthTransports.set(mcpName, transport)
|
|
|
- return { authorizationUrl: capturedUrl.toString(), oauthState }
|
|
|
|
|
|
|
+ return Effect.succeed({ authorizationUrl: capturedUrl.toString(), oauthState })
|
|
|
}
|
|
}
|
|
|
- throw error
|
|
|
|
|
- }
|
|
|
|
|
- })
|
|
|
|
|
|
|
+ return Effect.die(error)
|
|
|
|
|
+ }),
|
|
|
|
|
+ )
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
const authenticate = Effect.fn("MCP.authenticate")(function* (mcpName: string) {
|
|
const authenticate = Effect.fn("MCP.authenticate")(function* (mcpName: string) {
|
|
@@ -791,7 +784,7 @@ export namespace MCP {
|
|
|
),
|
|
),
|
|
|
Effect.catch(() => {
|
|
Effect.catch(() => {
|
|
|
log.warn("failed to open browser, user must open URL manually", { mcpName })
|
|
log.warn("failed to open browser, user must open URL manually", { mcpName })
|
|
|
- return Effect.promise(() => Bus.publish(BrowserOpenFailed, { mcpName, url: authorizationUrl }))
|
|
|
|
|
|
|
+ return bus.publish(BrowserOpenFailed, { mcpName, url: authorizationUrl }).pipe(Effect.ignore)
|
|
|
}),
|
|
}),
|
|
|
)
|
|
)
|
|
|
|
|
|
|
@@ -811,10 +804,7 @@ export namespace MCP {
|
|
|
if (!transport) throw new Error(`No pending OAuth flow for MCP server: ${mcpName}`)
|
|
if (!transport) throw new Error(`No pending OAuth flow for MCP server: ${mcpName}`)
|
|
|
|
|
|
|
|
const result = yield* Effect.tryPromise({
|
|
const result = yield* Effect.tryPromise({
|
|
|
- try: async () => {
|
|
|
|
|
- await transport.finishAuth(authorizationCode)
|
|
|
|
|
- return true
|
|
|
|
|
- },
|
|
|
|
|
|
|
+ try: () => transport.finishAuth(authorizationCode).then(() => true as const),
|
|
|
catch: (error) => {
|
|
catch: (error) => {
|
|
|
log.error("failed to finish oauth", { mcpName, error })
|
|
log.error("failed to finish oauth", { mcpName, error })
|
|
|
return error
|
|
return error
|
|
@@ -885,12 +875,12 @@ export namespace MCP {
|
|
|
|
|
|
|
|
// --- Per-service runtime ---
|
|
// --- Per-service runtime ---
|
|
|
|
|
|
|
|
- const defaultLayer = layer.pipe(
|
|
|
|
|
|
|
+ export const defaultLayer = layer.pipe(
|
|
|
Layer.provide(McpAuth.layer),
|
|
Layer.provide(McpAuth.layer),
|
|
|
- Layer.provide(CrossSpawnSpawner.layer),
|
|
|
|
|
|
|
+ Layer.provide(Bus.layer),
|
|
|
|
|
+ Layer.provide(Config.defaultLayer),
|
|
|
|
|
+ Layer.provide(CrossSpawnSpawner.defaultLayer),
|
|
|
Layer.provide(AppFileSystem.defaultLayer),
|
|
Layer.provide(AppFileSystem.defaultLayer),
|
|
|
- Layer.provide(NodeFileSystem.layer),
|
|
|
|
|
- Layer.provide(NodePath.layer),
|
|
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
const { runPromise } = makeRuntime(Service, defaultLayer)
|
|
const { runPromise } = makeRuntime(Service, defaultLayer)
|