| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683 |
- import { describe, expect, test } from "bun:test"
- import { ACP } from "../../src/acp/agent"
- import type { AgentSideConnection } from "@agentclientprotocol/sdk"
- import type { Event, EventMessagePartUpdated, ToolStatePending, ToolStateRunning } from "@opencode-ai/sdk/v2"
- import { Instance } from "../../src/project/instance"
- import { tmpdir } from "../fixture/fixture"
- type SessionUpdateParams = Parameters<AgentSideConnection["sessionUpdate"]>[0]
- type RequestPermissionParams = Parameters<AgentSideConnection["requestPermission"]>[0]
- type RequestPermissionResult = Awaited<ReturnType<AgentSideConnection["requestPermission"]>>
- type GlobalEventEnvelope = {
- directory?: string
- payload?: Event
- }
- type EventController = {
- push: (event: GlobalEventEnvelope) => void
- close: () => void
- }
- function inProgressText(update: SessionUpdateParams["update"]) {
- if (update.sessionUpdate !== "tool_call_update") return undefined
- if (update.status !== "in_progress") return undefined
- if (!update.content || !Array.isArray(update.content)) return undefined
- const first = update.content[0]
- if (!first || first.type !== "content") return undefined
- if (first.content.type !== "text") return undefined
- return first.content.text
- }
- function isToolCallUpdate(
- update: SessionUpdateParams["update"],
- ): update is Extract<SessionUpdateParams["update"], { sessionUpdate: "tool_call_update" }> {
- return update.sessionUpdate === "tool_call_update"
- }
- function toolEvent(
- sessionId: string,
- cwd: string,
- opts: {
- callID: string
- tool: string
- input: Record<string, unknown>
- } & ({ status: "running"; metadata?: Record<string, unknown> } | { status: "pending"; raw: string }),
- ): GlobalEventEnvelope {
- const state: ToolStatePending | ToolStateRunning =
- opts.status === "running"
- ? {
- status: "running",
- input: opts.input,
- ...(opts.metadata && { metadata: opts.metadata }),
- time: { start: Date.now() },
- }
- : {
- status: "pending",
- input: opts.input,
- raw: opts.raw,
- }
- const payload: EventMessagePartUpdated = {
- type: "message.part.updated",
- properties: {
- part: {
- id: `part_${opts.callID}`,
- sessionID: sessionId,
- messageID: `msg_${opts.callID}`,
- type: "tool",
- callID: opts.callID,
- tool: opts.tool,
- state,
- },
- },
- }
- return { directory: cwd, payload }
- }
- function createEventStream() {
- const queue: GlobalEventEnvelope[] = []
- const waiters: Array<(value: GlobalEventEnvelope | undefined) => void> = []
- const state = { closed: false }
- const push = (event: GlobalEventEnvelope) => {
- const waiter = waiters.shift()
- if (waiter) {
- waiter(event)
- return
- }
- queue.push(event)
- }
- const close = () => {
- state.closed = true
- for (const waiter of waiters.splice(0)) {
- waiter(undefined)
- }
- }
- const stream = async function* (signal?: AbortSignal) {
- while (true) {
- if (signal?.aborted) return
- const next = queue.shift()
- if (next) {
- yield next
- continue
- }
- if (state.closed) return
- const value = await new Promise<GlobalEventEnvelope | undefined>((resolve) => {
- waiters.push(resolve)
- if (!signal) return
- signal.addEventListener("abort", () => resolve(undefined), { once: true })
- })
- if (!value) return
- yield value
- }
- }
- return { controller: { push, close } satisfies EventController, stream }
- }
- function createFakeAgent() {
- const updates = new Map<string, string[]>()
- const chunks = new Map<string, string>()
- const sessionUpdates: SessionUpdateParams[] = []
- const record = (sessionId: string, type: string) => {
- const list = updates.get(sessionId) ?? []
- list.push(type)
- updates.set(sessionId, list)
- }
- const connection = {
- async sessionUpdate(params: SessionUpdateParams) {
- sessionUpdates.push(params)
- const update = params.update
- const type = update?.sessionUpdate ?? "unknown"
- record(params.sessionId, type)
- if (update?.sessionUpdate === "agent_message_chunk") {
- const content = update.content
- if (content?.type !== "text") return
- if (typeof content.text !== "string") return
- chunks.set(params.sessionId, (chunks.get(params.sessionId) ?? "") + content.text)
- }
- },
- async requestPermission(_params: RequestPermissionParams): Promise<RequestPermissionResult> {
- return { outcome: { outcome: "selected", optionId: "once" } } as RequestPermissionResult
- },
- } as unknown as AgentSideConnection
- const { controller, stream } = createEventStream()
- const calls = {
- eventSubscribe: 0,
- sessionCreate: 0,
- }
- const sdk = {
- global: {
- event: async (opts?: { signal?: AbortSignal }) => {
- calls.eventSubscribe++
- return { stream: stream(opts?.signal) }
- },
- },
- session: {
- create: async (_params?: any) => {
- calls.sessionCreate++
- return {
- data: {
- id: `ses_${calls.sessionCreate}`,
- time: { created: new Date().toISOString() },
- },
- }
- },
- get: async (_params?: any) => {
- return {
- data: {
- id: "ses_1",
- time: { created: new Date().toISOString() },
- },
- }
- },
- messages: async () => {
- return { data: [] }
- },
- message: async (params?: any) => {
- // Return a message with parts that can be looked up by partID
- return {
- data: {
- info: {
- role: "assistant",
- },
- parts: [
- {
- id: params?.messageID ? `${params.messageID}_part` : "part_1",
- type: "text",
- text: "",
- },
- ],
- },
- }
- },
- },
- permission: {
- respond: async () => {
- return { data: true }
- },
- },
- config: {
- providers: async () => {
- return {
- data: {
- providers: [
- {
- id: "opencode",
- name: "opencode",
- models: {
- "big-pickle": { id: "big-pickle", name: "big-pickle" },
- },
- },
- ],
- },
- }
- },
- },
- app: {
- agents: async () => {
- return {
- data: [
- {
- name: "build",
- description: "build",
- mode: "agent",
- },
- ],
- }
- },
- },
- command: {
- list: async () => {
- return { data: [] }
- },
- },
- mcp: {
- add: async () => {
- return { data: true }
- },
- },
- } as any
- const agent = new ACP.Agent(connection, {
- sdk,
- defaultModel: { providerID: "opencode", modelID: "big-pickle" },
- } as any)
- const stop = () => {
- controller.close()
- ;(agent as any).eventAbort.abort()
- }
- return { agent, controller, calls, updates, chunks, sessionUpdates, stop, sdk, connection }
- }
- describe("acp.agent event subscription", () => {
- test("routes message.part.delta by the event sessionID (no cross-session pollution)", async () => {
- await using tmp = await tmpdir()
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const { agent, controller, updates, stop } = createFakeAgent()
- const cwd = "/tmp/opencode-acp-test"
- const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
- const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
- controller.push({
- directory: cwd,
- payload: {
- type: "message.part.delta",
- properties: {
- sessionID: sessionB,
- messageID: "msg_1",
- partID: "msg_1_part",
- field: "text",
- delta: "hello",
- },
- },
- } as any)
- await new Promise((r) => setTimeout(r, 10))
- expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false)
- expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true)
- stop()
- },
- })
- })
- test("keeps concurrent sessions isolated when message.part.delta events are interleaved", async () => {
- await using tmp = await tmpdir()
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const { agent, controller, chunks, stop } = createFakeAgent()
- const cwd = "/tmp/opencode-acp-test"
- const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
- const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
- const tokenA = ["ALPHA_", "111", "_X"]
- const tokenB = ["BETA_", "222", "_Y"]
- const push = (sessionId: string, messageID: string, delta: string) => {
- controller.push({
- directory: cwd,
- payload: {
- type: "message.part.delta",
- properties: {
- sessionID: sessionId,
- messageID,
- partID: `${messageID}_part`,
- field: "text",
- delta,
- },
- },
- } as any)
- }
- push(sessionA, "msg_a", tokenA[0])
- push(sessionB, "msg_b", tokenB[0])
- push(sessionA, "msg_a", tokenA[1])
- push(sessionB, "msg_b", tokenB[1])
- push(sessionA, "msg_a", tokenA[2])
- push(sessionB, "msg_b", tokenB[2])
- await new Promise((r) => setTimeout(r, 20))
- const a = chunks.get(sessionA) ?? ""
- const b = chunks.get(sessionB) ?? ""
- expect(a).toContain(tokenA.join(""))
- expect(b).toContain(tokenB.join(""))
- for (const part of tokenB) expect(a).not.toContain(part)
- for (const part of tokenA) expect(b).not.toContain(part)
- stop()
- },
- })
- })
- test("does not create additional event subscriptions on repeated loadSession()", async () => {
- await using tmp = await tmpdir()
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const { agent, calls, stop } = createFakeAgent()
- const cwd = "/tmp/opencode-acp-test"
- const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
- await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
- await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
- await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
- await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
- expect(calls.eventSubscribe).toBe(1)
- stop()
- },
- })
- })
- test("permission.asked events are handled and replied", async () => {
- await using tmp = await tmpdir()
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const permissionReplies: string[] = []
- const { agent, controller, stop, sdk } = createFakeAgent()
- sdk.permission.reply = async (params: any) => {
- permissionReplies.push(params.requestID)
- return { data: true }
- }
- const cwd = "/tmp/opencode-acp-test"
- const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
- controller.push({
- directory: cwd,
- payload: {
- type: "permission.asked",
- properties: {
- id: "perm_1",
- sessionID: sessionA,
- permission: "bash",
- patterns: ["*"],
- metadata: {},
- always: [],
- },
- },
- } as any)
- await new Promise((r) => setTimeout(r, 20))
- expect(permissionReplies).toContain("perm_1")
- stop()
- },
- })
- })
- test("permission prompt on session A does not block message updates for session B", async () => {
- await using tmp = await tmpdir()
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const permissionReplies: string[] = []
- let resolvePermissionA: (() => void) | undefined
- const permissionABlocking = new Promise<void>((r) => {
- resolvePermissionA = r
- })
- const { agent, controller, chunks, stop, sdk, connection } = createFakeAgent()
- // Make permission request for session A block until we release it
- const originalRequestPermission = connection.requestPermission.bind(connection)
- let permissionCalls = 0
- connection.requestPermission = async (params: RequestPermissionParams) => {
- permissionCalls++
- if (params.sessionId.endsWith("1")) {
- await permissionABlocking
- }
- return originalRequestPermission(params)
- }
- sdk.permission.reply = async (params: any) => {
- permissionReplies.push(params.requestID)
- return { data: true }
- }
- const cwd = "/tmp/opencode-acp-test"
- const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
- const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
- // Push permission.asked for session A (will block)
- controller.push({
- directory: cwd,
- payload: {
- type: "permission.asked",
- properties: {
- id: "perm_a",
- sessionID: sessionA,
- permission: "bash",
- patterns: ["*"],
- metadata: {},
- always: [],
- },
- },
- } as any)
- // Give time for permission handling to start
- await new Promise((r) => setTimeout(r, 10))
- // Push message for session B while A's permission is pending
- controller.push({
- directory: cwd,
- payload: {
- type: "message.part.delta",
- properties: {
- sessionID: sessionB,
- messageID: "msg_b",
- partID: "msg_b_part",
- field: "text",
- delta: "session_b_message",
- },
- },
- } as any)
- // Wait for session B's message to be processed
- await new Promise((r) => setTimeout(r, 20))
- // Session B should have received message even though A's permission is still pending
- expect(chunks.get(sessionB) ?? "").toContain("session_b_message")
- expect(permissionReplies).not.toContain("perm_a")
- // Release session A's permission
- resolvePermissionA!()
- await new Promise((r) => setTimeout(r, 20))
- // Now session A's permission should be replied
- expect(permissionReplies).toContain("perm_a")
- stop()
- },
- })
- })
- test("streams running bash output snapshots and de-dupes identical snapshots", async () => {
- await using tmp = await tmpdir()
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const { agent, controller, sessionUpdates, stop } = createFakeAgent()
- const cwd = "/tmp/opencode-acp-test"
- const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
- const input = { command: "echo hello", description: "run command" }
- for (const output of ["a", "a", "ab"]) {
- controller.push(
- toolEvent(sessionId, cwd, {
- callID: "call_1",
- tool: "bash",
- status: "running",
- input,
- metadata: { output },
- }),
- )
- }
- await new Promise((r) => setTimeout(r, 20))
- const snapshots = sessionUpdates
- .filter((u) => u.sessionId === sessionId)
- .filter((u) => isToolCallUpdate(u.update))
- .map((u) => inProgressText(u.update))
- expect(snapshots).toEqual(["a", undefined, "ab"])
- stop()
- },
- })
- })
- test("emits synthetic pending before first running update for any tool", async () => {
- await using tmp = await tmpdir()
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const { agent, controller, sessionUpdates, stop } = createFakeAgent()
- const cwd = "/tmp/opencode-acp-test"
- const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
- controller.push(
- toolEvent(sessionId, cwd, {
- callID: "call_bash",
- tool: "bash",
- status: "running",
- input: { command: "echo hi", description: "run command" },
- metadata: { output: "hi\n" },
- }),
- )
- controller.push(
- toolEvent(sessionId, cwd, {
- callID: "call_read",
- tool: "read",
- status: "running",
- input: { filePath: "/tmp/example.txt" },
- }),
- )
- await new Promise((r) => setTimeout(r, 20))
- const types = sessionUpdates
- .filter((u) => u.sessionId === sessionId)
- .map((u) => u.update.sessionUpdate)
- .filter((u) => u === "tool_call" || u === "tool_call_update")
- expect(types).toEqual(["tool_call", "tool_call_update", "tool_call", "tool_call_update"])
- const pendings = sessionUpdates.filter(
- (u) => u.sessionId === sessionId && u.update.sessionUpdate === "tool_call",
- )
- expect(pendings.every((p) => p.update.sessionUpdate === "tool_call" && p.update.status === "pending")).toBe(
- true,
- )
- stop()
- },
- })
- })
- test("does not emit duplicate synthetic pending after replayed running tool", async () => {
- await using tmp = await tmpdir()
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const { agent, controller, sessionUpdates, stop, sdk } = createFakeAgent()
- const cwd = "/tmp/opencode-acp-test"
- const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
- const input = { command: "echo hi", description: "run command" }
- sdk.session.messages = async () => ({
- data: [
- {
- info: {
- role: "assistant",
- sessionID: sessionId,
- },
- parts: [
- {
- type: "tool",
- callID: "call_1",
- tool: "bash",
- state: {
- status: "running",
- input,
- metadata: { output: "hi\n" },
- time: { start: Date.now() },
- },
- },
- ],
- },
- ],
- })
- await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
- controller.push(
- toolEvent(sessionId, cwd, {
- callID: "call_1",
- tool: "bash",
- status: "running",
- input,
- metadata: { output: "hi\nthere\n" },
- }),
- )
- await new Promise((r) => setTimeout(r, 20))
- const types = sessionUpdates
- .filter((u) => u.sessionId === sessionId)
- .map((u) => u.update)
- .filter((u) => "toolCallId" in u && u.toolCallId === "call_1")
- .map((u) => u.sessionUpdate)
- .filter((u) => u === "tool_call" || u === "tool_call_update")
- expect(types).toEqual(["tool_call", "tool_call_update", "tool_call_update"])
- stop()
- },
- })
- })
- test("clears bash snapshot marker on pending state", async () => {
- await using tmp = await tmpdir()
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const { agent, controller, sessionUpdates, stop } = createFakeAgent()
- const cwd = "/tmp/opencode-acp-test"
- const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
- const input = { command: "echo hello", description: "run command" }
- controller.push(
- toolEvent(sessionId, cwd, {
- callID: "call_1",
- tool: "bash",
- status: "running",
- input,
- metadata: { output: "a" },
- }),
- )
- controller.push(
- toolEvent(sessionId, cwd, {
- callID: "call_1",
- tool: "bash",
- status: "pending",
- input,
- raw: '{"command":"echo hello"}',
- }),
- )
- controller.push(
- toolEvent(sessionId, cwd, {
- callID: "call_1",
- tool: "bash",
- status: "running",
- input,
- metadata: { output: "a" },
- }),
- )
- await new Promise((r) => setTimeout(r, 20))
- const snapshots = sessionUpdates
- .filter((u) => u.sessionId === sessionId)
- .filter((u) => isToolCallUpdate(u.update))
- .map((u) => inProgressText(u.update))
- expect(snapshots).toEqual(["a", "a"])
- stop()
- },
- })
- })
- })
|