import { describe, expect, test } from "bun:test" import { ACP } from "../../src/acp/agent" import type { AgentSideConnection } from "@agentclientprotocol/sdk" import type { Event } from "@opencode-ai/sdk/v2" import { Instance } from "../../src/project/instance" import { tmpdir } from "../fixture/fixture" type SessionUpdateParams = Parameters[0] type RequestPermissionParams = Parameters[0] type RequestPermissionResult = Awaited> type GlobalEventEnvelope = { directory?: string payload?: Event } type EventController = { push: (event: GlobalEventEnvelope) => void close: () => void } function createEventStream() { const queue: GlobalEventEnvelope[] = [] const waiters: Array<(value: GlobalEventEnvelope | undefined) => void> = [] const state = { closed: false } const push = (event: GlobalEventEnvelope) => { const waiter = waiters.shift() if (waiter) { waiter(event) return } queue.push(event) } const close = () => { state.closed = true for (const waiter of waiters.splice(0)) { waiter(undefined) } } const stream = async function* (signal?: AbortSignal) { while (true) { if (signal?.aborted) return const next = queue.shift() if (next) { yield next continue } if (state.closed) return const value = await new Promise((resolve) => { waiters.push(resolve) if (!signal) return signal.addEventListener("abort", () => resolve(undefined), { once: true }) }) if (!value) return yield value } } return { controller: { push, close } satisfies EventController, stream } } function createFakeAgent() { const updates = new Map() const chunks = new Map() const record = (sessionId: string, type: string) => { const list = updates.get(sessionId) ?? [] list.push(type) updates.set(sessionId, list) } const connection = { async sessionUpdate(params: SessionUpdateParams) { const update = params.update const type = update?.sessionUpdate ?? "unknown" record(params.sessionId, type) if (update?.sessionUpdate === "agent_message_chunk") { const content = update.content if (content?.type !== "text") return if (typeof content.text !== "string") return chunks.set(params.sessionId, (chunks.get(params.sessionId) ?? "") + content.text) } }, async requestPermission(_params: RequestPermissionParams): Promise { return { outcome: { outcome: "selected", optionId: "once" } } as RequestPermissionResult }, } as unknown as AgentSideConnection const { controller, stream } = createEventStream() const calls = { eventSubscribe: 0, sessionCreate: 0, } const sdk = { global: { event: async (opts?: { signal?: AbortSignal }) => { calls.eventSubscribe++ return { stream: stream(opts?.signal) } }, }, session: { create: async (_params?: any) => { calls.sessionCreate++ return { data: { id: `ses_${calls.sessionCreate}`, time: { created: new Date().toISOString() }, }, } }, get: async (_params?: any) => { return { data: { id: "ses_1", time: { created: new Date().toISOString() }, }, } }, messages: async () => { return { data: [] } }, message: async () => { return { data: { info: { role: "assistant", }, }, } }, }, permission: { respond: async () => { return { data: true } }, }, config: { providers: async () => { return { data: { providers: [ { id: "opencode", name: "opencode", models: { "big-pickle": { id: "big-pickle", name: "big-pickle" }, }, }, ], }, } }, }, app: { agents: async () => { return { data: [ { name: "build", description: "build", mode: "agent", }, ], } }, }, command: { list: async () => { return { data: [] } }, }, mcp: { add: async () => { return { data: true } }, }, } as any const agent = new ACP.Agent(connection, { sdk, defaultModel: { providerID: "opencode", modelID: "big-pickle" }, } as any) const stop = () => { controller.close() ;(agent as any).eventAbort.abort() } return { agent, controller, calls, updates, chunks, stop, sdk, connection } } describe("acp.agent event subscription", () => { test("routes message.part.updated by the event sessionID (no cross-session pollution)", async () => { await using tmp = await tmpdir() await Instance.provide({ directory: tmp.path, fn: async () => { const { agent, controller, updates, stop } = createFakeAgent() const cwd = "/tmp/opencode-acp-test" const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) controller.push({ directory: cwd, payload: { type: "message.part.updated", properties: { part: { sessionID: sessionB, messageID: "msg_1", type: "text", synthetic: false, }, delta: "hello", }, }, } as any) await new Promise((r) => setTimeout(r, 10)) expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false) expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true) stop() }, }) }) test("keeps concurrent sessions isolated when message.part.updated events are interleaved", async () => { await using tmp = await tmpdir() await Instance.provide({ directory: tmp.path, fn: async () => { const { agent, controller, chunks, stop } = createFakeAgent() const cwd = "/tmp/opencode-acp-test" const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) const tokenA = ["ALPHA_", "111", "_X"] const tokenB = ["BETA_", "222", "_Y"] const push = (sessionId: string, messageID: string, delta: string) => { controller.push({ directory: cwd, payload: { type: "message.part.updated", properties: { part: { sessionID: sessionId, messageID, type: "text", synthetic: false, }, delta, }, }, } as any) } push(sessionA, "msg_a", tokenA[0]) push(sessionB, "msg_b", tokenB[0]) push(sessionA, "msg_a", tokenA[1]) push(sessionB, "msg_b", tokenB[1]) push(sessionA, "msg_a", tokenA[2]) push(sessionB, "msg_b", tokenB[2]) await new Promise((r) => setTimeout(r, 20)) const a = chunks.get(sessionA) ?? "" const b = chunks.get(sessionB) ?? "" expect(a).toContain(tokenA.join("")) expect(b).toContain(tokenB.join("")) for (const part of tokenB) expect(a).not.toContain(part) for (const part of tokenA) expect(b).not.toContain(part) stop() }, }) }) test("does not create additional event subscriptions on repeated loadSession()", async () => { await using tmp = await tmpdir() await Instance.provide({ directory: tmp.path, fn: async () => { const { agent, calls, stop } = createFakeAgent() const cwd = "/tmp/opencode-acp-test" const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) expect(calls.eventSubscribe).toBe(1) stop() }, }) }) test("permission.asked events are handled and replied", async () => { await using tmp = await tmpdir() await Instance.provide({ directory: tmp.path, fn: async () => { const permissionReplies: string[] = [] const { agent, controller, stop, sdk } = createFakeAgent() sdk.permission.reply = async (params: any) => { permissionReplies.push(params.requestID) return { data: true } } const cwd = "/tmp/opencode-acp-test" const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) controller.push({ directory: cwd, payload: { type: "permission.asked", properties: { id: "perm_1", sessionID: sessionA, permission: "bash", patterns: ["*"], metadata: {}, always: [], }, }, } as any) await new Promise((r) => setTimeout(r, 20)) expect(permissionReplies).toContain("perm_1") stop() }, }) }) test("permission prompt on session A does not block message updates for session B", async () => { await using tmp = await tmpdir() await Instance.provide({ directory: tmp.path, fn: async () => { const permissionReplies: string[] = [] let resolvePermissionA: (() => void) | undefined const permissionABlocking = new Promise((r) => { resolvePermissionA = r }) const { agent, controller, chunks, stop, sdk, connection } = createFakeAgent() // Make permission request for session A block until we release it const originalRequestPermission = connection.requestPermission.bind(connection) let permissionCalls = 0 connection.requestPermission = async (params: RequestPermissionParams) => { permissionCalls++ if (params.sessionId.endsWith("1")) { await permissionABlocking } return originalRequestPermission(params) } sdk.permission.reply = async (params: any) => { permissionReplies.push(params.requestID) return { data: true } } const cwd = "/tmp/opencode-acp-test" const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) // Push permission.asked for session A (will block) controller.push({ directory: cwd, payload: { type: "permission.asked", properties: { id: "perm_a", sessionID: sessionA, permission: "bash", patterns: ["*"], metadata: {}, always: [], }, }, } as any) // Give time for permission handling to start await new Promise((r) => setTimeout(r, 10)) // Push message for session B while A's permission is pending controller.push({ directory: cwd, payload: { type: "message.part.updated", properties: { part: { sessionID: sessionB, messageID: "msg_b", type: "text", synthetic: false, }, delta: "session_b_message", }, }, } as any) // Wait for session B's message to be processed await new Promise((r) => setTimeout(r, 20)) // Session B should have received message even though A's permission is still pending expect(chunks.get(sessionB) ?? "").toContain("session_b_message") expect(permissionReplies).not.toContain("perm_a") // Release session A's permission resolvePermissionA!() await new Promise((r) => setTimeout(r, 20)) // Now session A's permission should be replied expect(permissionReplies).toContain("perm_a") stop() }, }) }) })