|
|
@@ -11,12 +11,12 @@ import {
|
|
|
} from "@modelcontextprotocol/sdk/types.js"
|
|
|
import { Config } from "../config/config"
|
|
|
import { Log } from "../util/log"
|
|
|
-import { Process } from "../util/process"
|
|
|
import { NamedError } from "@opencode-ai/util/error"
|
|
|
import z from "zod/v4"
|
|
|
import { Instance } from "../project/instance"
|
|
|
import { Installation } from "../installation"
|
|
|
import { withTimeout } from "@/util/timeout"
|
|
|
+import { AppFileSystem } from "@/filesystem"
|
|
|
import { McpOAuthProvider } from "./oauth-provider"
|
|
|
import { McpOAuthCallback } from "./oauth-callback"
|
|
|
import { McpAuth } from "./auth"
|
|
|
@@ -24,6 +24,13 @@ import { BusEvent } from "../bus/bus-event"
|
|
|
import { Bus } from "@/bus"
|
|
|
import { TuiEvent } from "@/cli/cmd/tui/event"
|
|
|
import open from "open"
|
|
|
+import { Effect, Layer, Option, ServiceMap, Stream } from "effect"
|
|
|
+import { InstanceState } from "@/effect/instance-state"
|
|
|
+import { makeRuntime } from "@/effect/run-service"
|
|
|
+import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
|
|
+import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
|
|
+import { NodeFileSystem } from "@effect/platform-node"
|
|
|
+import * as NodePath from "@effect/platform-node/NodePath"
|
|
|
|
|
|
export namespace MCP {
|
|
|
const log = Log.create({ service: "mcp" })
|
|
|
@@ -109,16 +116,21 @@ export namespace MCP {
|
|
|
})
|
|
|
export type Status = z.infer<typeof Status>
|
|
|
|
|
|
- // Register notification handlers for MCP client
|
|
|
- function registerNotificationHandlers(client: MCPClient, serverName: string) {
|
|
|
- client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
|
|
|
- log.info("tools list changed notification received", { server: serverName })
|
|
|
- Bus.publish(ToolsChanged, { server: serverName })
|
|
|
- })
|
|
|
+ // Store transports for OAuth servers to allow finishing auth
|
|
|
+ type TransportWithAuth = StreamableHTTPClientTransport | SSEClientTransport
|
|
|
+ const pendingOAuthTransports = new Map<string, TransportWithAuth>()
|
|
|
+
|
|
|
+ // Prompt cache types
|
|
|
+ type PromptInfo = Awaited<ReturnType<MCPClient["listPrompts"]>>["prompts"][number]
|
|
|
+ type ResourceInfo = Awaited<ReturnType<MCPClient["listResources"]>>["resources"][number]
|
|
|
+ type McpEntry = NonNullable<Config.Info["mcp"]>[string]
|
|
|
+
|
|
|
+ function isMcpConfigured(entry: McpEntry): entry is Config.Mcp {
|
|
|
+ return typeof entry === "object" && entry !== null && "type" in entry
|
|
|
}
|
|
|
|
|
|
// Convert MCP tool definition to AI SDK Tool type
|
|
|
- async function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Promise<Tool> {
|
|
|
+ function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool {
|
|
|
const inputSchema = mcpTool.inputSchema
|
|
|
|
|
|
// Spread first, then override type to ensure it's always "object"
|
|
|
@@ -148,178 +160,33 @@ export namespace MCP {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
- // Store transports for OAuth servers to allow finishing auth
|
|
|
- type TransportWithAuth = StreamableHTTPClientTransport | SSEClientTransport
|
|
|
- const pendingOAuthTransports = new Map<string, TransportWithAuth>()
|
|
|
-
|
|
|
- // Prompt cache types
|
|
|
- type PromptInfo = Awaited<ReturnType<MCPClient["listPrompts"]>>["prompts"][number]
|
|
|
-
|
|
|
- type ResourceInfo = Awaited<ReturnType<MCPClient["listResources"]>>["resources"][number]
|
|
|
- type McpEntry = NonNullable<Config.Info["mcp"]>[string]
|
|
|
- function isMcpConfigured(entry: McpEntry): entry is Config.Mcp {
|
|
|
- return typeof entry === "object" && entry !== null && "type" in entry
|
|
|
- }
|
|
|
-
|
|
|
- async function descendants(pid: number): Promise<number[]> {
|
|
|
- if (process.platform === "win32") return []
|
|
|
- const pids: number[] = []
|
|
|
- const queue = [pid]
|
|
|
- while (queue.length > 0) {
|
|
|
- const current = queue.shift()!
|
|
|
- const lines = await Process.lines(["pgrep", "-P", String(current)], { nothrow: true })
|
|
|
- for (const tok of lines) {
|
|
|
- const cpid = parseInt(tok, 10)
|
|
|
- if (!isNaN(cpid) && !pids.includes(cpid)) {
|
|
|
- pids.push(cpid)
|
|
|
- queue.push(cpid)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return pids
|
|
|
- }
|
|
|
-
|
|
|
- const state = Instance.state(
|
|
|
- async () => {
|
|
|
- const cfg = await Config.get()
|
|
|
- const config = cfg.mcp ?? {}
|
|
|
- const clients: Record<string, MCPClient> = {}
|
|
|
- const status: Record<string, Status> = {}
|
|
|
-
|
|
|
- await Promise.all(
|
|
|
- Object.entries(config).map(async ([key, mcp]) => {
|
|
|
- if (!isMcpConfigured(mcp)) {
|
|
|
- log.error("Ignoring MCP config entry without type", { key })
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- // If disabled by config, mark as disabled without trying to connect
|
|
|
- if (mcp.enabled === false) {
|
|
|
- status[key] = { status: "disabled" }
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- const result = await create(key, mcp).catch(() => undefined)
|
|
|
- if (!result) return
|
|
|
-
|
|
|
- status[key] = result.status
|
|
|
-
|
|
|
- if (result.mcpClient) {
|
|
|
- clients[key] = result.mcpClient
|
|
|
- }
|
|
|
- }),
|
|
|
- )
|
|
|
- return {
|
|
|
- status,
|
|
|
- clients,
|
|
|
- }
|
|
|
- },
|
|
|
- async (state) => {
|
|
|
- // The MCP SDK only signals the direct child process on close.
|
|
|
- // Servers like chrome-devtools-mcp spawn grandchild processes
|
|
|
- // (e.g. Chrome) that the SDK never reaches, leaving them orphaned.
|
|
|
- // Kill the full descendant tree first so the server exits promptly
|
|
|
- // and no processes are left behind.
|
|
|
- for (const client of Object.values(state.clients)) {
|
|
|
- const pid = (client.transport as any)?.pid
|
|
|
- if (typeof pid !== "number") continue
|
|
|
- for (const dpid of await descendants(pid)) {
|
|
|
- try {
|
|
|
- process.kill(dpid, "SIGTERM")
|
|
|
- } catch {}
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- await Promise.all(
|
|
|
- Object.values(state.clients).map((client) =>
|
|
|
- client.close().catch((error) => {
|
|
|
- log.error("Failed to close MCP client", {
|
|
|
- error,
|
|
|
- })
|
|
|
- }),
|
|
|
- ),
|
|
|
- )
|
|
|
- pendingOAuthTransports.clear()
|
|
|
- },
|
|
|
- )
|
|
|
-
|
|
|
- // Helper function to fetch prompts for a specific client
|
|
|
- async function fetchPromptsForClient(clientName: string, client: Client) {
|
|
|
- const prompts = await client.listPrompts().catch((e) => {
|
|
|
- log.error("failed to get prompts", { clientName, error: e.message })
|
|
|
+ async function defs(key: string, client: MCPClient, timeout?: number) {
|
|
|
+ const result = await withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT).catch((err) => {
|
|
|
+ log.error("failed to get tools from client", { key, error: err })
|
|
|
return undefined
|
|
|
})
|
|
|
-
|
|
|
- if (!prompts) {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- const commands: Record<string, PromptInfo & { client: string }> = {}
|
|
|
-
|
|
|
- for (const prompt of prompts.prompts) {
|
|
|
- const sanitizedClientName = clientName.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
- const sanitizedPromptName = prompt.name.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
- const key = sanitizedClientName + ":" + sanitizedPromptName
|
|
|
-
|
|
|
- commands[key] = { ...prompt, client: clientName }
|
|
|
- }
|
|
|
- return commands
|
|
|
+ return result?.tools
|
|
|
}
|
|
|
|
|
|
- async function fetchResourcesForClient(clientName: string, client: Client) {
|
|
|
- const resources = await client.listResources().catch((e) => {
|
|
|
- log.error("failed to get prompts", { clientName, error: e.message })
|
|
|
+ async function fetchFromClient<T extends { name: string }>(
|
|
|
+ clientName: string,
|
|
|
+ client: Client,
|
|
|
+ listFn: (c: Client) => Promise<T[]>,
|
|
|
+ label: string,
|
|
|
+ ): Promise<Record<string, T & { client: string }> | undefined> {
|
|
|
+ const items = await listFn(client).catch((e: any) => {
|
|
|
+ log.error(`failed to get ${label}`, { clientName, error: e.message })
|
|
|
return undefined
|
|
|
})
|
|
|
+ if (!items) return undefined
|
|
|
|
|
|
- if (!resources) {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- const commands: Record<string, ResourceInfo & { client: string }> = {}
|
|
|
-
|
|
|
- for (const resource of resources.resources) {
|
|
|
- const sanitizedClientName = clientName.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
- const sanitizedResourceName = resource.name.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
- const key = sanitizedClientName + ":" + sanitizedResourceName
|
|
|
-
|
|
|
- commands[key] = { ...resource, client: clientName }
|
|
|
- }
|
|
|
- return commands
|
|
|
- }
|
|
|
-
|
|
|
- export async function add(name: string, mcp: Config.Mcp) {
|
|
|
- const s = await state()
|
|
|
- const result = await create(name, mcp)
|
|
|
- if (!result) {
|
|
|
- const status = {
|
|
|
- status: "failed" as const,
|
|
|
- error: "unknown error",
|
|
|
- }
|
|
|
- s.status[name] = status
|
|
|
- return {
|
|
|
- status,
|
|
|
- }
|
|
|
- }
|
|
|
- if (!result.mcpClient) {
|
|
|
- s.status[name] = result.status
|
|
|
- return {
|
|
|
- status: s.status,
|
|
|
- }
|
|
|
- }
|
|
|
- // Close existing client if present to prevent memory leaks
|
|
|
- const existingClient = s.clients[name]
|
|
|
- if (existingClient) {
|
|
|
- await existingClient.close().catch((error) => {
|
|
|
- log.error("Failed to close existing MCP client", { name, error })
|
|
|
- })
|
|
|
- }
|
|
|
- s.clients[name] = result.mcpClient
|
|
|
- s.status[name] = result.status
|
|
|
-
|
|
|
- return {
|
|
|
- status: s.status,
|
|
|
+ const out: Record<string, T & { client: string }> = {}
|
|
|
+ const sanitizedClient = clientName.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
+ for (const item of items) {
|
|
|
+ const sanitizedName = item.name.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
+ out[sanitizedClient + ":" + sanitizedName] = { ...item, client: clientName }
|
|
|
}
|
|
|
+ return out
|
|
|
}
|
|
|
|
|
|
async function create(key: string, mcp: Config.Mcp) {
|
|
|
@@ -385,7 +252,6 @@ export namespace MCP {
|
|
|
version: Installation.VERSION,
|
|
|
})
|
|
|
await withTimeout(client.connect(transport), connectTimeout)
|
|
|
- registerNotificationHandlers(client, key)
|
|
|
mcpClient = client
|
|
|
log.info("connected", { key, transport: name })
|
|
|
status = { status: "connected" }
|
|
|
@@ -470,7 +336,6 @@ export namespace MCP {
|
|
|
version: Installation.VERSION,
|
|
|
})
|
|
|
await withTimeout(client.connect(transport), connectTimeout)
|
|
|
- registerNotificationHandlers(client, key)
|
|
|
mcpClient = client
|
|
|
status = {
|
|
|
status: "connected",
|
|
|
@@ -503,475 +368,569 @@ export namespace MCP {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- const result = await withTimeout(mcpClient.listTools(), mcp.timeout ?? DEFAULT_TIMEOUT).catch((err) => {
|
|
|
- log.error("failed to get tools from client", { key, error: err })
|
|
|
- return undefined
|
|
|
- })
|
|
|
- if (!result) {
|
|
|
+ const listed = await defs(key, mcpClient, mcp.timeout)
|
|
|
+ if (!listed) {
|
|
|
await mcpClient.close().catch((error) => {
|
|
|
log.error("Failed to close MCP client", {
|
|
|
error,
|
|
|
})
|
|
|
})
|
|
|
- status = {
|
|
|
- status: "failed",
|
|
|
- error: "Failed to get tools",
|
|
|
- }
|
|
|
return {
|
|
|
mcpClient: undefined,
|
|
|
- status: {
|
|
|
- status: "failed" as const,
|
|
|
- error: "Failed to get tools",
|
|
|
- },
|
|
|
+ status: { status: "failed" as const, error: "Failed to get tools" },
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- log.info("create() successfully created client", { key, toolCount: result.tools.length })
|
|
|
+ log.info("create() successfully created client", { key, toolCount: listed.length })
|
|
|
return {
|
|
|
mcpClient,
|
|
|
status,
|
|
|
+ defs: listed,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- export async function status() {
|
|
|
- const s = await state()
|
|
|
- const cfg = await Config.get()
|
|
|
- const config = cfg.mcp ?? {}
|
|
|
- const result: Record<string, Status> = {}
|
|
|
+ // --- Effect Service ---
|
|
|
|
|
|
- // Include all configured MCPs from config, not just connected ones
|
|
|
- for (const [key, mcp] of Object.entries(config)) {
|
|
|
- if (!isMcpConfigured(mcp)) continue
|
|
|
- result[key] = s.status[key] ?? { status: "disabled" }
|
|
|
- }
|
|
|
-
|
|
|
- return result
|
|
|
+ interface State {
|
|
|
+ status: Record<string, Status>
|
|
|
+ clients: Record<string, MCPClient>
|
|
|
+ defs: Record<string, MCPToolDef[]>
|
|
|
}
|
|
|
|
|
|
- export async function clients() {
|
|
|
- return state().then((state) => state.clients)
|
|
|
+ export interface Interface {
|
|
|
+ readonly status: () => Effect.Effect<Record<string, Status>>
|
|
|
+ readonly clients: () => Effect.Effect<Record<string, MCPClient>>
|
|
|
+ readonly tools: () => Effect.Effect<Record<string, Tool>>
|
|
|
+ readonly prompts: () => Effect.Effect<Record<string, PromptInfo & { client: string }>>
|
|
|
+ readonly resources: () => Effect.Effect<Record<string, ResourceInfo & { client: string }>>
|
|
|
+ readonly add: (name: string, mcp: Config.Mcp) => Effect.Effect<{ status: Record<string, Status> | Status }>
|
|
|
+ readonly connect: (name: string) => Effect.Effect<void>
|
|
|
+ readonly disconnect: (name: string) => Effect.Effect<void>
|
|
|
+ readonly getPrompt: (
|
|
|
+ clientName: string,
|
|
|
+ name: string,
|
|
|
+ args?: Record<string, string>,
|
|
|
+ ) => Effect.Effect<Awaited<ReturnType<MCPClient["getPrompt"]>> | undefined>
|
|
|
+ readonly readResource: (
|
|
|
+ clientName: string,
|
|
|
+ resourceUri: string,
|
|
|
+ ) => Effect.Effect<Awaited<ReturnType<MCPClient["readResource"]>> | undefined>
|
|
|
+ readonly startAuth: (mcpName: string) => Effect.Effect<{ authorizationUrl: string; oauthState: string }>
|
|
|
+ readonly authenticate: (mcpName: string) => Effect.Effect<Status>
|
|
|
+ readonly finishAuth: (mcpName: string, authorizationCode: string) => Effect.Effect<Status>
|
|
|
+ readonly removeAuth: (mcpName: string) => Effect.Effect<void>
|
|
|
+ readonly supportsOAuth: (mcpName: string) => Effect.Effect<boolean>
|
|
|
+ readonly hasStoredTokens: (mcpName: string) => Effect.Effect<boolean>
|
|
|
+ readonly getAuthStatus: (mcpName: string) => Effect.Effect<AuthStatus>
|
|
|
}
|
|
|
|
|
|
- export async function connect(name: string) {
|
|
|
- const cfg = await Config.get()
|
|
|
- const config = cfg.mcp ?? {}
|
|
|
- const mcp = config[name]
|
|
|
- if (!mcp) {
|
|
|
- log.error("MCP config not found", { name })
|
|
|
- return
|
|
|
- }
|
|
|
+ export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/MCP") {}
|
|
|
+
|
|
|
+ export const layer = Layer.effect(
|
|
|
+ Service,
|
|
|
+ Effect.gen(function* () {
|
|
|
+ const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
|
|
|
+ const auth = yield* McpAuth.Service
|
|
|
+
|
|
|
+ const descendants = Effect.fnUntraced(
|
|
|
+ function* (pid: number) {
|
|
|
+ if (process.platform === "win32") return [] as number[]
|
|
|
+ const pids: number[] = []
|
|
|
+ const queue = [pid]
|
|
|
+ while (queue.length > 0) {
|
|
|
+ const current = queue.shift()!
|
|
|
+ const handle = yield* spawner.spawn(
|
|
|
+ ChildProcess.make("pgrep", ["-P", String(current)], { stdin: "ignore" }),
|
|
|
+ )
|
|
|
+ const text = yield* Stream.mkString(Stream.decodeText(handle.stdout))
|
|
|
+ yield* handle.exitCode
|
|
|
+ for (const tok of text.split("\n")) {
|
|
|
+ const cpid = parseInt(tok, 10)
|
|
|
+ if (!isNaN(cpid) && !pids.includes(cpid)) {
|
|
|
+ pids.push(cpid)
|
|
|
+ queue.push(cpid)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return pids
|
|
|
+ },
|
|
|
+ Effect.scoped,
|
|
|
+ Effect.catch(() => Effect.succeed([] as number[])),
|
|
|
+ )
|
|
|
|
|
|
- if (!isMcpConfigured(mcp)) {
|
|
|
- log.error("Ignoring MCP connect request for config without type", { name })
|
|
|
- return
|
|
|
- }
|
|
|
+ function watch(s: State, name: string, client: MCPClient, timeout?: number) {
|
|
|
+ client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
|
|
|
+ log.info("tools list changed notification received", { server: name })
|
|
|
+ if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
|
|
|
|
|
- const result = await create(name, { ...mcp, enabled: true })
|
|
|
+ const listed = await defs(name, client, timeout)
|
|
|
+ if (!listed) return
|
|
|
+ if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
|
|
|
|
|
- if (!result) {
|
|
|
- const s = await state()
|
|
|
- s.status[name] = {
|
|
|
- status: "failed",
|
|
|
- error: "Unknown error during connection",
|
|
|
+ s.defs[name] = listed
|
|
|
+ await Bus.publish(ToolsChanged, { server: name }).catch((error) =>
|
|
|
+ log.warn("failed to publish tools changed", { server: name, error }),
|
|
|
+ )
|
|
|
+ })
|
|
|
}
|
|
|
- return
|
|
|
- }
|
|
|
|
|
|
- const s = await state()
|
|
|
- s.status[name] = result.status
|
|
|
- if (result.mcpClient) {
|
|
|
- // Close existing client if present to prevent memory leaks
|
|
|
- const existingClient = s.clients[name]
|
|
|
- if (existingClient) {
|
|
|
- await existingClient.close().catch((error) => {
|
|
|
- log.error("Failed to close existing MCP client", { name, error })
|
|
|
- })
|
|
|
+ const cache = yield* InstanceState.make<State>(
|
|
|
+ Effect.fn("MCP.state")(function* () {
|
|
|
+ const cfg = yield* Effect.promise(() => Config.get())
|
|
|
+ const config = cfg.mcp ?? {}
|
|
|
+ const s: State = {
|
|
|
+ status: {},
|
|
|
+ clients: {},
|
|
|
+ defs: {},
|
|
|
+ }
|
|
|
+
|
|
|
+ yield* Effect.forEach(
|
|
|
+ Object.entries(config),
|
|
|
+ ([key, mcp]) =>
|
|
|
+ Effect.gen(function* () {
|
|
|
+ if (!isMcpConfigured(mcp)) {
|
|
|
+ log.error("Ignoring MCP config entry without type", { key })
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if (mcp.enabled === false) {
|
|
|
+ s.status[key] = { status: "disabled" }
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ const result = yield* Effect.promise(() => create(key, mcp).catch(() => undefined))
|
|
|
+ if (!result) return
|
|
|
+
|
|
|
+ s.status[key] = result.status
|
|
|
+ if (result.mcpClient) {
|
|
|
+ s.clients[key] = result.mcpClient
|
|
|
+ s.defs[key] = result.defs
|
|
|
+ watch(s, key, result.mcpClient, mcp.timeout)
|
|
|
+ }
|
|
|
+ }),
|
|
|
+ { concurrency: "unbounded" },
|
|
|
+ )
|
|
|
+
|
|
|
+ yield* Effect.addFinalizer(() =>
|
|
|
+ Effect.gen(function* () {
|
|
|
+ yield* Effect.forEach(
|
|
|
+ Object.values(s.clients),
|
|
|
+ (client) =>
|
|
|
+ Effect.gen(function* () {
|
|
|
+ const pid = (client.transport as any)?.pid
|
|
|
+ if (typeof pid === "number") {
|
|
|
+ const pids = yield* descendants(pid)
|
|
|
+ for (const dpid of pids) {
|
|
|
+ try {
|
|
|
+ process.kill(dpid, "SIGTERM")
|
|
|
+ } catch {}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ yield* Effect.tryPromise(() => client.close()).pipe(Effect.ignore)
|
|
|
+ }),
|
|
|
+ { concurrency: "unbounded" },
|
|
|
+ )
|
|
|
+ pendingOAuthTransports.clear()
|
|
|
+ }),
|
|
|
+ )
|
|
|
+
|
|
|
+ return s
|
|
|
+ }),
|
|
|
+ )
|
|
|
+
|
|
|
+ function closeClient(s: State, name: string) {
|
|
|
+ const client = s.clients[name]
|
|
|
+ delete s.defs[name]
|
|
|
+ if (!client) return Effect.void
|
|
|
+ return Effect.promise(() =>
|
|
|
+ client.close().catch((error: any) => log.error("failed to close MCP client", { name, error })),
|
|
|
+ )
|
|
|
}
|
|
|
- s.clients[name] = result.mcpClient
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- export async function disconnect(name: string) {
|
|
|
- const s = await state()
|
|
|
- const client = s.clients[name]
|
|
|
- if (client) {
|
|
|
- await client.close().catch((error) => {
|
|
|
- log.error("Failed to close MCP client", { name, error })
|
|
|
+ const status = Effect.fn("MCP.status")(function* () {
|
|
|
+ const s = yield* InstanceState.get(cache)
|
|
|
+ const cfg = yield* Effect.promise(() => Config.get())
|
|
|
+ const config = cfg.mcp ?? {}
|
|
|
+ const result: Record<string, Status> = {}
|
|
|
+
|
|
|
+ for (const [key, mcp] of Object.entries(config)) {
|
|
|
+ if (!isMcpConfigured(mcp)) continue
|
|
|
+ result[key] = s.status[key] ?? { status: "disabled" }
|
|
|
+ }
|
|
|
+
|
|
|
+ return result
|
|
|
})
|
|
|
- delete s.clients[name]
|
|
|
- }
|
|
|
- s.status[name] = { status: "disabled" }
|
|
|
- }
|
|
|
|
|
|
- export async function tools() {
|
|
|
- const result: Record<string, Tool> = {}
|
|
|
- const s = await state()
|
|
|
- const cfg = await Config.get()
|
|
|
- const config = cfg.mcp ?? {}
|
|
|
- const clientsSnapshot = await clients()
|
|
|
- const defaultTimeout = cfg.experimental?.mcp_timeout
|
|
|
-
|
|
|
- const connectedClients = Object.entries(clientsSnapshot).filter(
|
|
|
- ([clientName]) => s.status[clientName]?.status === "connected",
|
|
|
- )
|
|
|
-
|
|
|
- const toolsResults = await Promise.all(
|
|
|
- connectedClients.map(async ([clientName, client]) => {
|
|
|
- const toolsResult = await client.listTools().catch((e) => {
|
|
|
- log.error("failed to get tools", { clientName, error: e.message })
|
|
|
- const failedStatus = {
|
|
|
- status: "failed" as const,
|
|
|
- error: e instanceof Error ? e.message : String(e),
|
|
|
- }
|
|
|
- s.status[clientName] = failedStatus
|
|
|
- delete s.clients[clientName]
|
|
|
- return undefined
|
|
|
- })
|
|
|
- return { clientName, client, toolsResult }
|
|
|
- }),
|
|
|
- )
|
|
|
-
|
|
|
- for (const { clientName, client, toolsResult } of toolsResults) {
|
|
|
- if (!toolsResult) continue
|
|
|
- const mcpConfig = config[clientName]
|
|
|
- const entry = isMcpConfigured(mcpConfig) ? mcpConfig : undefined
|
|
|
- const timeout = entry?.timeout ?? defaultTimeout
|
|
|
- for (const mcpTool of toolsResult.tools) {
|
|
|
- const sanitizedClientName = clientName.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
- const sanitizedToolName = mcpTool.name.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
- result[sanitizedClientName + "_" + sanitizedToolName] = await convertMcpTool(mcpTool, client, timeout)
|
|
|
- }
|
|
|
- }
|
|
|
- return result
|
|
|
- }
|
|
|
+ const clients = Effect.fn("MCP.clients")(function* () {
|
|
|
+ const s = yield* InstanceState.get(cache)
|
|
|
+ return s.clients
|
|
|
+ })
|
|
|
|
|
|
- export async function prompts() {
|
|
|
- const s = await state()
|
|
|
- const clientsSnapshot = await clients()
|
|
|
+ const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: Config.Mcp) {
|
|
|
+ const s = yield* InstanceState.get(cache)
|
|
|
+ const result = yield* Effect.promise(() => create(name, mcp))
|
|
|
|
|
|
- const prompts = Object.fromEntries<PromptInfo & { client: string }>(
|
|
|
- (
|
|
|
- await Promise.all(
|
|
|
- Object.entries(clientsSnapshot).map(async ([clientName, client]) => {
|
|
|
- if (s.status[clientName]?.status !== "connected") {
|
|
|
- return []
|
|
|
- }
|
|
|
+ if (!result) {
|
|
|
+ yield* closeClient(s, name)
|
|
|
+ delete s.clients[name]
|
|
|
+ s.status[name] = { status: "failed" as const, error: "unknown error" }
|
|
|
+ return s.status[name]
|
|
|
+ }
|
|
|
|
|
|
- return Object.entries((await fetchPromptsForClient(clientName, client)) ?? {})
|
|
|
- }),
|
|
|
- )
|
|
|
- ).flat(),
|
|
|
- )
|
|
|
+ s.status[name] = result.status
|
|
|
+ if (!result.mcpClient) {
|
|
|
+ yield* closeClient(s, name)
|
|
|
+ delete s.clients[name]
|
|
|
+ return result.status
|
|
|
+ }
|
|
|
|
|
|
- return prompts
|
|
|
- }
|
|
|
+ yield* closeClient(s, name)
|
|
|
+ s.clients[name] = result.mcpClient
|
|
|
+ s.defs[name] = result.defs
|
|
|
+ watch(s, name, result.mcpClient, mcp.timeout)
|
|
|
+ return result.status
|
|
|
+ })
|
|
|
|
|
|
- export async function resources() {
|
|
|
- const s = await state()
|
|
|
- const clientsSnapshot = await clients()
|
|
|
+ const add = Effect.fn("MCP.add")(function* (name: string, mcp: Config.Mcp) {
|
|
|
+ yield* createAndStore(name, mcp)
|
|
|
+ const s = yield* InstanceState.get(cache)
|
|
|
+ return { status: s.status }
|
|
|
+ })
|
|
|
|
|
|
- const result = Object.fromEntries<ResourceInfo & { client: string }>(
|
|
|
- (
|
|
|
- await Promise.all(
|
|
|
- Object.entries(clientsSnapshot).map(async ([clientName, client]) => {
|
|
|
- if (s.status[clientName]?.status !== "connected") {
|
|
|
- return []
|
|
|
- }
|
|
|
+ const connect = Effect.fn("MCP.connect")(function* (name: string) {
|
|
|
+ const mcp = yield* getMcpConfig(name)
|
|
|
+ if (!mcp) {
|
|
|
+ log.error("MCP config not found or invalid", { name })
|
|
|
+ return
|
|
|
+ }
|
|
|
+ yield* createAndStore(name, { ...mcp, enabled: true })
|
|
|
+ })
|
|
|
|
|
|
- return Object.entries((await fetchResourcesForClient(clientName, client)) ?? {})
|
|
|
- }),
|
|
|
+ const disconnect = Effect.fn("MCP.disconnect")(function* (name: string) {
|
|
|
+ const s = yield* InstanceState.get(cache)
|
|
|
+ yield* closeClient(s, name)
|
|
|
+ delete s.clients[name]
|
|
|
+ s.status[name] = { status: "disabled" }
|
|
|
+ })
|
|
|
+
|
|
|
+ const tools = Effect.fn("MCP.tools")(function* () {
|
|
|
+ const result: Record<string, Tool> = {}
|
|
|
+ const s = yield* InstanceState.get(cache)
|
|
|
+ const cfg = yield* Effect.promise(() => Config.get())
|
|
|
+ const config = cfg.mcp ?? {}
|
|
|
+ const defaultTimeout = cfg.experimental?.mcp_timeout
|
|
|
+
|
|
|
+ const connectedClients = Object.entries(s.clients).filter(
|
|
|
+ ([clientName]) => s.status[clientName]?.status === "connected",
|
|
|
)
|
|
|
- ).flat(),
|
|
|
- )
|
|
|
|
|
|
- return result
|
|
|
- }
|
|
|
+ yield* Effect.forEach(
|
|
|
+ connectedClients,
|
|
|
+ ([clientName, client]) =>
|
|
|
+ Effect.gen(function* () {
|
|
|
+ const mcpConfig = config[clientName]
|
|
|
+ const entry = mcpConfig && isMcpConfigured(mcpConfig) ? mcpConfig : undefined
|
|
|
+
|
|
|
+ const listed = s.defs[clientName]
|
|
|
+ if (!listed) {
|
|
|
+ log.warn("missing cached tools for connected server", { clientName })
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ const timeout = entry?.timeout ?? defaultTimeout
|
|
|
+ for (const mcpTool of listed) {
|
|
|
+ const sanitizedClientName = clientName.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
+ const sanitizedToolName = mcpTool.name.replace(/[^a-zA-Z0-9_-]/g, "_")
|
|
|
+ result[sanitizedClientName + "_" + sanitizedToolName] = convertMcpTool(mcpTool, client, timeout)
|
|
|
+ }
|
|
|
+ }),
|
|
|
+ { concurrency: "unbounded" },
|
|
|
+ )
|
|
|
+ return result
|
|
|
+ })
|
|
|
|
|
|
- export async function getPrompt(clientName: string, name: string, args?: Record<string, string>) {
|
|
|
- const clientsSnapshot = await clients()
|
|
|
- const client = clientsSnapshot[clientName]
|
|
|
+ function collectFromConnected<T>(
|
|
|
+ s: State,
|
|
|
+ fetchFn: (clientName: string, client: Client) => Promise<Record<string, T> | undefined>,
|
|
|
+ ) {
|
|
|
+ return Effect.forEach(
|
|
|
+ Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"),
|
|
|
+ ([clientName, client]) =>
|
|
|
+ Effect.promise(async () => Object.entries((await fetchFn(clientName, client)) ?? {})),
|
|
|
+ { concurrency: "unbounded" },
|
|
|
+ ).pipe(Effect.map((results) => Object.fromEntries<T>(results.flat())))
|
|
|
+ }
|
|
|
|
|
|
- if (!client) {
|
|
|
- log.warn("client not found for prompt", {
|
|
|
- clientName,
|
|
|
+ const prompts = Effect.fn("MCP.prompts")(function* () {
|
|
|
+ const s = yield* InstanceState.get(cache)
|
|
|
+ return yield* collectFromConnected(s, (name, client) =>
|
|
|
+ fetchFromClient(name, client, (c) => c.listPrompts().then((r) => r.prompts), "prompts"),
|
|
|
+ )
|
|
|
})
|
|
|
- return undefined
|
|
|
- }
|
|
|
|
|
|
- const result = await client
|
|
|
- .getPrompt({
|
|
|
- name: name,
|
|
|
- arguments: args,
|
|
|
+ const resources = Effect.fn("MCP.resources")(function* () {
|
|
|
+ const s = yield* InstanceState.get(cache)
|
|
|
+ return yield* collectFromConnected(s, (name, client) =>
|
|
|
+ fetchFromClient(name, client, (c) => c.listResources().then((r) => r.resources), "resources"),
|
|
|
+ )
|
|
|
})
|
|
|
- .catch((e) => {
|
|
|
- log.error("failed to get prompt from MCP server", {
|
|
|
- clientName,
|
|
|
+
|
|
|
+ const withClient = Effect.fnUntraced(function* <A>(
|
|
|
+ clientName: string,
|
|
|
+ fn: (client: MCPClient) => Promise<A>,
|
|
|
+ label: string,
|
|
|
+ meta?: Record<string, unknown>,
|
|
|
+ ) {
|
|
|
+ const s = yield* InstanceState.get(cache)
|
|
|
+ const client = s.clients[clientName]
|
|
|
+ if (!client) {
|
|
|
+ log.warn(`client not found for ${label}`, { clientName })
|
|
|
+ return undefined
|
|
|
+ }
|
|
|
+ return yield* Effect.tryPromise({
|
|
|
+ try: () => fn(client),
|
|
|
+ catch: (e: any) => {
|
|
|
+ log.error(`failed to ${label}`, { clientName, ...meta, error: e?.message })
|
|
|
+ return e
|
|
|
+ },
|
|
|
+ }).pipe(Effect.orElseSucceed(() => undefined))
|
|
|
+ })
|
|
|
+
|
|
|
+ const getPrompt = Effect.fn("MCP.getPrompt")(function* (
|
|
|
+ clientName: string,
|
|
|
+ name: string,
|
|
|
+ args?: Record<string, string>,
|
|
|
+ ) {
|
|
|
+ return yield* withClient(clientName, (client) => client.getPrompt({ name, arguments: args }), "getPrompt", {
|
|
|
promptName: name,
|
|
|
- error: e.message,
|
|
|
})
|
|
|
- return undefined
|
|
|
})
|
|
|
|
|
|
- return result
|
|
|
- }
|
|
|
-
|
|
|
- export async function readResource(clientName: string, resourceUri: string) {
|
|
|
- const clientsSnapshot = await clients()
|
|
|
- const client = clientsSnapshot[clientName]
|
|
|
-
|
|
|
- if (!client) {
|
|
|
- log.warn("client not found for prompt", {
|
|
|
- clientName: clientName,
|
|
|
+ const readResource = Effect.fn("MCP.readResource")(function* (clientName: string, resourceUri: string) {
|
|
|
+ return yield* withClient(clientName, (client) => client.readResource({ uri: resourceUri }), "readResource", {
|
|
|
+ resourceUri,
|
|
|
+ })
|
|
|
})
|
|
|
- return undefined
|
|
|
- }
|
|
|
|
|
|
- const result = await client
|
|
|
- .readResource({
|
|
|
- uri: resourceUri,
|
|
|
+ const getMcpConfig = Effect.fnUntraced(function* (mcpName: string) {
|
|
|
+ const cfg = yield* Effect.promise(() => Config.get())
|
|
|
+ const mcpConfig = cfg.mcp?.[mcpName]
|
|
|
+ if (!mcpConfig || !isMcpConfigured(mcpConfig)) return undefined
|
|
|
+ return mcpConfig
|
|
|
})
|
|
|
- .catch((e) => {
|
|
|
- log.error("failed to get prompt from MCP server", {
|
|
|
- clientName: clientName,
|
|
|
- resourceUri: resourceUri,
|
|
|
- error: e.message,
|
|
|
+
|
|
|
+ const startAuth = Effect.fn("MCP.startAuth")(function* (mcpName: string) {
|
|
|
+ const mcpConfig = yield* getMcpConfig(mcpName)
|
|
|
+ if (!mcpConfig) throw new Error(`MCP server ${mcpName} not found or disabled`)
|
|
|
+ if (mcpConfig.type !== "remote") throw new Error(`MCP server ${mcpName} is not a remote server`)
|
|
|
+ if (mcpConfig.oauth === false) throw new Error(`MCP server ${mcpName} has OAuth explicitly disabled`)
|
|
|
+
|
|
|
+ yield* Effect.promise(() => McpOAuthCallback.ensureRunning())
|
|
|
+
|
|
|
+ const oauthState = Array.from(crypto.getRandomValues(new Uint8Array(32)))
|
|
|
+ .map((b) => b.toString(16).padStart(2, "0"))
|
|
|
+ .join("")
|
|
|
+ yield* auth.updateOAuthState(mcpName, oauthState)
|
|
|
+ const oauthConfig = typeof mcpConfig.oauth === "object" ? mcpConfig.oauth : undefined
|
|
|
+ let capturedUrl: URL | undefined
|
|
|
+ const authProvider = new McpOAuthProvider(
|
|
|
+ mcpName,
|
|
|
+ mcpConfig.url,
|
|
|
+ {
|
|
|
+ clientId: oauthConfig?.clientId,
|
|
|
+ clientSecret: oauthConfig?.clientSecret,
|
|
|
+ scope: oauthConfig?.scope,
|
|
|
+ },
|
|
|
+ {
|
|
|
+ onRedirect: async (url) => {
|
|
|
+ capturedUrl = url
|
|
|
+ },
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ const transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url), { authProvider })
|
|
|
+
|
|
|
+ return yield* Effect.promise(async () => {
|
|
|
+ try {
|
|
|
+ const client = new Client({ name: "opencode", version: Installation.VERSION })
|
|
|
+ await client.connect(transport)
|
|
|
+ return { authorizationUrl: "", oauthState }
|
|
|
+ } catch (error) {
|
|
|
+ if (error instanceof UnauthorizedError && capturedUrl) {
|
|
|
+ pendingOAuthTransports.set(mcpName, transport)
|
|
|
+ return { authorizationUrl: capturedUrl.toString(), oauthState }
|
|
|
+ }
|
|
|
+ throw error
|
|
|
+ }
|
|
|
})
|
|
|
- return undefined
|
|
|
})
|
|
|
|
|
|
- return result
|
|
|
- }
|
|
|
+ const authenticate = Effect.fn("MCP.authenticate")(function* (mcpName: string) {
|
|
|
+ const { authorizationUrl, oauthState } = yield* startAuth(mcpName)
|
|
|
+ if (!authorizationUrl) return { status: "connected" } as Status
|
|
|
+
|
|
|
+ log.info("opening browser for oauth", { mcpName, url: authorizationUrl, state: oauthState })
|
|
|
+
|
|
|
+ const callbackPromise = McpOAuthCallback.waitForCallback(oauthState, mcpName)
|
|
|
+
|
|
|
+ yield* Effect.tryPromise(() => open(authorizationUrl)).pipe(
|
|
|
+ Effect.flatMap((subprocess) =>
|
|
|
+ Effect.callback<void, Error>((resume) => {
|
|
|
+ const timer = setTimeout(() => resume(Effect.void), 500)
|
|
|
+ subprocess.on("error", (err) => {
|
|
|
+ clearTimeout(timer)
|
|
|
+ resume(Effect.fail(err))
|
|
|
+ })
|
|
|
+ subprocess.on("exit", (code) => {
|
|
|
+ if (code !== null && code !== 0) {
|
|
|
+ clearTimeout(timer)
|
|
|
+ resume(Effect.fail(new Error(`Browser open failed with exit code ${code}`)))
|
|
|
+ }
|
|
|
+ })
|
|
|
+ }),
|
|
|
+ ),
|
|
|
+ Effect.catch(() => {
|
|
|
+ log.warn("failed to open browser, user must open URL manually", { mcpName })
|
|
|
+ return Effect.promise(() => Bus.publish(BrowserOpenFailed, { mcpName, url: authorizationUrl }))
|
|
|
+ }),
|
|
|
+ )
|
|
|
|
|
|
- /**
|
|
|
- * Start OAuth authentication flow for an MCP server.
|
|
|
- * Returns the authorization URL that should be opened in a browser.
|
|
|
- */
|
|
|
- export async function startAuth(mcpName: string): Promise<{ authorizationUrl: string }> {
|
|
|
- const cfg = await Config.get()
|
|
|
- const mcpConfig = cfg.mcp?.[mcpName]
|
|
|
+ const code = yield* Effect.promise(() => callbackPromise)
|
|
|
|
|
|
- if (!mcpConfig) {
|
|
|
- throw new Error(`MCP server not found: ${mcpName}`)
|
|
|
- }
|
|
|
+ const storedState = yield* auth.getOAuthState(mcpName)
|
|
|
+ if (storedState !== oauthState) {
|
|
|
+ yield* auth.clearOAuthState(mcpName)
|
|
|
+ throw new Error("OAuth state mismatch - potential CSRF attack")
|
|
|
+ }
|
|
|
+ yield* auth.clearOAuthState(mcpName)
|
|
|
+ return yield* finishAuth(mcpName, code)
|
|
|
+ })
|
|
|
|
|
|
- if (!isMcpConfigured(mcpConfig)) {
|
|
|
- throw new Error(`MCP server ${mcpName} is disabled or missing configuration`)
|
|
|
- }
|
|
|
+ const finishAuth = Effect.fn("MCP.finishAuth")(function* (mcpName: string, authorizationCode: string) {
|
|
|
+ const transport = pendingOAuthTransports.get(mcpName)
|
|
|
+ if (!transport) throw new Error(`No pending OAuth flow for MCP server: ${mcpName}`)
|
|
|
|
|
|
- if (mcpConfig.type !== "remote") {
|
|
|
- throw new Error(`MCP server ${mcpName} is not a remote server`)
|
|
|
- }
|
|
|
+ const result = yield* Effect.tryPromise({
|
|
|
+ try: async () => {
|
|
|
+ await transport.finishAuth(authorizationCode)
|
|
|
+ return true
|
|
|
+ },
|
|
|
+ catch: (error) => {
|
|
|
+ log.error("failed to finish oauth", { mcpName, error })
|
|
|
+ return error
|
|
|
+ },
|
|
|
+ }).pipe(Effect.option)
|
|
|
|
|
|
- if (mcpConfig.oauth === false) {
|
|
|
- throw new Error(`MCP server ${mcpName} has OAuth explicitly disabled`)
|
|
|
- }
|
|
|
+ if (Option.isNone(result)) {
|
|
|
+ return { status: "failed", error: "OAuth completion failed" } as Status
|
|
|
+ }
|
|
|
|
|
|
- // Start the callback server
|
|
|
- await McpOAuthCallback.ensureRunning()
|
|
|
-
|
|
|
- // Generate and store a cryptographically secure state parameter BEFORE creating the provider
|
|
|
- // The SDK will call provider.state() to read this value
|
|
|
- const oauthState = Array.from(crypto.getRandomValues(new Uint8Array(32)))
|
|
|
- .map((b) => b.toString(16).padStart(2, "0"))
|
|
|
- .join("")
|
|
|
- await McpAuth.updateOAuthState(mcpName, oauthState)
|
|
|
-
|
|
|
- // Create a new auth provider for this flow
|
|
|
- // OAuth config is optional - if not provided, we'll use auto-discovery
|
|
|
- const oauthConfig = typeof mcpConfig.oauth === "object" ? mcpConfig.oauth : undefined
|
|
|
- let capturedUrl: URL | undefined
|
|
|
- const authProvider = new McpOAuthProvider(
|
|
|
- mcpName,
|
|
|
- mcpConfig.url,
|
|
|
- {
|
|
|
- clientId: oauthConfig?.clientId,
|
|
|
- clientSecret: oauthConfig?.clientSecret,
|
|
|
- scope: oauthConfig?.scope,
|
|
|
- },
|
|
|
- {
|
|
|
- onRedirect: async (url) => {
|
|
|
- capturedUrl = url
|
|
|
- },
|
|
|
- },
|
|
|
- )
|
|
|
+ yield* auth.clearCodeVerifier(mcpName)
|
|
|
+ pendingOAuthTransports.delete(mcpName)
|
|
|
|
|
|
- // Create transport with auth provider
|
|
|
- const transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url), {
|
|
|
- authProvider,
|
|
|
- })
|
|
|
+ const mcpConfig = yield* getMcpConfig(mcpName)
|
|
|
+ if (!mcpConfig) return { status: "failed", error: "MCP config not found after auth" } as Status
|
|
|
|
|
|
- // Try to connect - this will trigger the OAuth flow
|
|
|
- try {
|
|
|
- const client = new Client({
|
|
|
- name: "opencode",
|
|
|
- version: Installation.VERSION,
|
|
|
+ return yield* createAndStore(mcpName, mcpConfig)
|
|
|
})
|
|
|
- await client.connect(transport)
|
|
|
- // If we get here, we're already authenticated
|
|
|
- return { authorizationUrl: "" }
|
|
|
- } catch (error) {
|
|
|
- if (error instanceof UnauthorizedError && capturedUrl) {
|
|
|
- // Store transport for finishAuth
|
|
|
- pendingOAuthTransports.set(mcpName, transport)
|
|
|
- return { authorizationUrl: capturedUrl.toString() }
|
|
|
- }
|
|
|
- throw error
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- /**
|
|
|
- * Complete OAuth authentication after user authorizes in browser.
|
|
|
- * Opens the browser and waits for callback.
|
|
|
- */
|
|
|
- export async function authenticate(mcpName: string): Promise<Status> {
|
|
|
- const { authorizationUrl } = await startAuth(mcpName)
|
|
|
-
|
|
|
- if (!authorizationUrl) {
|
|
|
- // Already authenticated
|
|
|
- const s = await state()
|
|
|
- return s.status[mcpName] ?? { status: "connected" }
|
|
|
- }
|
|
|
+ const removeAuth = Effect.fn("MCP.removeAuth")(function* (mcpName: string) {
|
|
|
+ yield* auth.remove(mcpName)
|
|
|
+ McpOAuthCallback.cancelPending(mcpName)
|
|
|
+ pendingOAuthTransports.delete(mcpName)
|
|
|
+ log.info("removed oauth credentials", { mcpName })
|
|
|
+ })
|
|
|
|
|
|
- // Get the state that was already generated and stored in startAuth()
|
|
|
- const oauthState = await McpAuth.getOAuthState(mcpName)
|
|
|
- if (!oauthState) {
|
|
|
- throw new Error("OAuth state not found - this should not happen")
|
|
|
- }
|
|
|
+ const supportsOAuth = Effect.fn("MCP.supportsOAuth")(function* (mcpName: string) {
|
|
|
+ const mcpConfig = yield* getMcpConfig(mcpName)
|
|
|
+ if (!mcpConfig) return false
|
|
|
+ return mcpConfig.type === "remote" && mcpConfig.oauth !== false
|
|
|
+ })
|
|
|
|
|
|
- // The SDK has already added the state parameter to the authorization URL
|
|
|
- // We just need to open the browser
|
|
|
- log.info("opening browser for oauth", { mcpName, url: authorizationUrl, state: oauthState })
|
|
|
-
|
|
|
- // Register the callback BEFORE opening the browser to avoid race condition
|
|
|
- // when the IdP has an active SSO session and redirects immediately
|
|
|
- const callbackPromise = McpOAuthCallback.waitForCallback(oauthState)
|
|
|
-
|
|
|
- try {
|
|
|
- const subprocess = await open(authorizationUrl)
|
|
|
- // The open package spawns a detached process and returns immediately.
|
|
|
- // We need to listen for errors which fire asynchronously:
|
|
|
- // - "error" event: command not found (ENOENT)
|
|
|
- // - "exit" with non-zero code: command exists but failed (e.g., no display)
|
|
|
- await new Promise<void>((resolve, reject) => {
|
|
|
- // Give the process a moment to fail if it's going to
|
|
|
- const timeout = setTimeout(() => resolve(), 500)
|
|
|
- subprocess.on("error", (error) => {
|
|
|
- clearTimeout(timeout)
|
|
|
- reject(error)
|
|
|
- })
|
|
|
- subprocess.on("exit", (code) => {
|
|
|
- if (code !== null && code !== 0) {
|
|
|
- clearTimeout(timeout)
|
|
|
- reject(new Error(`Browser open failed with exit code ${code}`))
|
|
|
- }
|
|
|
- })
|
|
|
+ const hasStoredTokens = Effect.fn("MCP.hasStoredTokens")(function* (mcpName: string) {
|
|
|
+ const entry = yield* auth.get(mcpName)
|
|
|
+ return !!entry?.tokens
|
|
|
})
|
|
|
- } catch (error) {
|
|
|
- // Browser opening failed (e.g., in remote/headless sessions like SSH, devcontainers)
|
|
|
- // Emit event so CLI can display the URL for manual opening
|
|
|
- log.warn("failed to open browser, user must open URL manually", { mcpName, error })
|
|
|
- Bus.publish(BrowserOpenFailed, { mcpName, url: authorizationUrl })
|
|
|
- }
|
|
|
|
|
|
- // Wait for callback using the already-registered promise
|
|
|
- const code = await callbackPromise
|
|
|
+ const getAuthStatus = Effect.fn("MCP.getAuthStatus")(function* (mcpName: string) {
|
|
|
+ const entry = yield* auth.get(mcpName)
|
|
|
+ if (!entry?.tokens) return "not_authenticated" as AuthStatus
|
|
|
+ const expired = yield* auth.isTokenExpired(mcpName)
|
|
|
+ return (expired ? "expired" : "authenticated") as AuthStatus
|
|
|
+ })
|
|
|
|
|
|
- // Validate and clear the state
|
|
|
- const storedState = await McpAuth.getOAuthState(mcpName)
|
|
|
- if (storedState !== oauthState) {
|
|
|
- await McpAuth.clearOAuthState(mcpName)
|
|
|
- throw new Error("OAuth state mismatch - potential CSRF attack")
|
|
|
- }
|
|
|
+ return Service.of({
|
|
|
+ status,
|
|
|
+ clients,
|
|
|
+ tools,
|
|
|
+ prompts,
|
|
|
+ resources,
|
|
|
+ add,
|
|
|
+ connect,
|
|
|
+ disconnect,
|
|
|
+ getPrompt,
|
|
|
+ readResource,
|
|
|
+ startAuth,
|
|
|
+ authenticate,
|
|
|
+ finishAuth,
|
|
|
+ removeAuth,
|
|
|
+ supportsOAuth,
|
|
|
+ hasStoredTokens,
|
|
|
+ getAuthStatus,
|
|
|
+ })
|
|
|
+ }),
|
|
|
+ )
|
|
|
|
|
|
- await McpAuth.clearOAuthState(mcpName)
|
|
|
+ export type AuthStatus = "authenticated" | "expired" | "not_authenticated"
|
|
|
|
|
|
- // Finish auth
|
|
|
- return finishAuth(mcpName, code)
|
|
|
- }
|
|
|
+ // --- Per-service runtime ---
|
|
|
|
|
|
- /**
|
|
|
- * Complete OAuth authentication with the authorization code.
|
|
|
- */
|
|
|
- export async function finishAuth(mcpName: string, authorizationCode: string): Promise<Status> {
|
|
|
- const transport = pendingOAuthTransports.get(mcpName)
|
|
|
+ const defaultLayer = layer.pipe(
|
|
|
+ Layer.provide(McpAuth.layer),
|
|
|
+ Layer.provide(CrossSpawnSpawner.layer),
|
|
|
+ Layer.provide(AppFileSystem.defaultLayer),
|
|
|
+ Layer.provide(NodeFileSystem.layer),
|
|
|
+ Layer.provide(NodePath.layer),
|
|
|
+ )
|
|
|
|
|
|
- if (!transport) {
|
|
|
- throw new Error(`No pending OAuth flow for MCP server: ${mcpName}`)
|
|
|
- }
|
|
|
+ const { runPromise } = makeRuntime(Service, defaultLayer)
|
|
|
|
|
|
- try {
|
|
|
- // Call finishAuth on the transport
|
|
|
- await transport.finishAuth(authorizationCode)
|
|
|
+ // --- Async facade functions ---
|
|
|
|
|
|
- // Clear the code verifier after successful auth
|
|
|
- await McpAuth.clearCodeVerifier(mcpName)
|
|
|
+ export const status = async () => runPromise((svc) => svc.status())
|
|
|
|
|
|
- // Now try to reconnect
|
|
|
- const cfg = await Config.get()
|
|
|
- const mcpConfig = cfg.mcp?.[mcpName]
|
|
|
+ export const clients = async () => runPromise((svc) => svc.clients())
|
|
|
|
|
|
- if (!mcpConfig) {
|
|
|
- throw new Error(`MCP server not found: ${mcpName}`)
|
|
|
- }
|
|
|
+ export const tools = async () => runPromise((svc) => svc.tools())
|
|
|
|
|
|
- if (!isMcpConfigured(mcpConfig)) {
|
|
|
- throw new Error(`MCP server ${mcpName} is disabled or missing configuration`)
|
|
|
- }
|
|
|
+ export const prompts = async () => runPromise((svc) => svc.prompts())
|
|
|
|
|
|
- // Re-add the MCP server to establish connection
|
|
|
- pendingOAuthTransports.delete(mcpName)
|
|
|
- const result = await add(mcpName, mcpConfig)
|
|
|
+ export const resources = async () => runPromise((svc) => svc.resources())
|
|
|
|
|
|
- const statusRecord = result.status as Record<string, Status>
|
|
|
- return statusRecord[mcpName] ?? { status: "failed", error: "Unknown error after auth" }
|
|
|
- } catch (error) {
|
|
|
- log.error("failed to finish oauth", { mcpName, error })
|
|
|
- return {
|
|
|
- status: "failed",
|
|
|
- error: error instanceof Error ? error.message : String(error),
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ export const add = async (name: string, mcp: Config.Mcp) => runPromise((svc) => svc.add(name, mcp))
|
|
|
|
|
|
- /**
|
|
|
- * Remove OAuth credentials for an MCP server.
|
|
|
- */
|
|
|
- export async function removeAuth(mcpName: string): Promise<void> {
|
|
|
- await McpAuth.remove(mcpName)
|
|
|
- McpOAuthCallback.cancelPending(mcpName)
|
|
|
- pendingOAuthTransports.delete(mcpName)
|
|
|
- await McpAuth.clearOAuthState(mcpName)
|
|
|
- log.info("removed oauth credentials", { mcpName })
|
|
|
- }
|
|
|
+ export const connect = async (name: string) => runPromise((svc) => svc.connect(name))
|
|
|
|
|
|
- /**
|
|
|
- * Check if an MCP server supports OAuth (remote servers support OAuth by default unless explicitly disabled).
|
|
|
- */
|
|
|
- export async function supportsOAuth(mcpName: string): Promise<boolean> {
|
|
|
- const cfg = await Config.get()
|
|
|
- const mcpConfig = cfg.mcp?.[mcpName]
|
|
|
- if (!mcpConfig) return false
|
|
|
- if (!isMcpConfigured(mcpConfig)) return false
|
|
|
- return mcpConfig.type === "remote" && mcpConfig.oauth !== false
|
|
|
- }
|
|
|
+ export const disconnect = async (name: string) => runPromise((svc) => svc.disconnect(name))
|
|
|
|
|
|
- /**
|
|
|
- * Check if an MCP server has stored OAuth tokens.
|
|
|
- */
|
|
|
- export async function hasStoredTokens(mcpName: string): Promise<boolean> {
|
|
|
- const entry = await McpAuth.get(mcpName)
|
|
|
- return !!entry?.tokens
|
|
|
- }
|
|
|
+ export const getPrompt = async (clientName: string, name: string, args?: Record<string, string>) =>
|
|
|
+ runPromise((svc) => svc.getPrompt(clientName, name, args))
|
|
|
|
|
|
- export type AuthStatus = "authenticated" | "expired" | "not_authenticated"
|
|
|
+ export const readResource = async (clientName: string, resourceUri: string) =>
|
|
|
+ runPromise((svc) => svc.readResource(clientName, resourceUri))
|
|
|
|
|
|
- /**
|
|
|
- * Get the authentication status for an MCP server.
|
|
|
- */
|
|
|
- export async function getAuthStatus(mcpName: string): Promise<AuthStatus> {
|
|
|
- const hasTokens = await hasStoredTokens(mcpName)
|
|
|
- if (!hasTokens) return "not_authenticated"
|
|
|
- const expired = await McpAuth.isTokenExpired(mcpName)
|
|
|
- return expired ? "expired" : "authenticated"
|
|
|
- }
|
|
|
+ export const startAuth = async (mcpName: string) => runPromise((svc) => svc.startAuth(mcpName))
|
|
|
+
|
|
|
+ export const authenticate = async (mcpName: string) => runPromise((svc) => svc.authenticate(mcpName))
|
|
|
+
|
|
|
+ export const finishAuth = async (mcpName: string, authorizationCode: string) =>
|
|
|
+ runPromise((svc) => svc.finishAuth(mcpName, authorizationCode))
|
|
|
+
|
|
|
+ export const removeAuth = async (mcpName: string) => runPromise((svc) => svc.removeAuth(mcpName))
|
|
|
+
|
|
|
+ export const supportsOAuth = async (mcpName: string) => runPromise((svc) => svc.supportsOAuth(mcpName))
|
|
|
+
|
|
|
+ export const hasStoredTokens = async (mcpName: string) => runPromise((svc) => svc.hasStoredTokens(mcpName))
|
|
|
+
|
|
|
+ export const getAuthStatus = async (mcpName: string) => runPromise((svc) => svc.getAuthStatus(mcpName))
|
|
|
}
|