event-subscription.test.ts 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683
  1. import { describe, expect, test } from "bun:test"
  2. import { ACP } from "../../src/acp/agent"
  3. import type { AgentSideConnection } from "@agentclientprotocol/sdk"
  4. import type { Event, EventMessagePartUpdated, ToolStatePending, ToolStateRunning } from "@opencode-ai/sdk/v2"
  5. import { Instance } from "../../src/project/instance"
  6. import { tmpdir } from "../fixture/fixture"
  7. type SessionUpdateParams = Parameters<AgentSideConnection["sessionUpdate"]>[0]
  8. type RequestPermissionParams = Parameters<AgentSideConnection["requestPermission"]>[0]
  9. type RequestPermissionResult = Awaited<ReturnType<AgentSideConnection["requestPermission"]>>
  10. type GlobalEventEnvelope = {
  11. directory?: string
  12. payload?: Event
  13. }
  14. type EventController = {
  15. push: (event: GlobalEventEnvelope) => void
  16. close: () => void
  17. }
  18. function inProgressText(update: SessionUpdateParams["update"]) {
  19. if (update.sessionUpdate !== "tool_call_update") return undefined
  20. if (update.status !== "in_progress") return undefined
  21. if (!update.content || !Array.isArray(update.content)) return undefined
  22. const first = update.content[0]
  23. if (!first || first.type !== "content") return undefined
  24. if (first.content.type !== "text") return undefined
  25. return first.content.text
  26. }
  27. function isToolCallUpdate(
  28. update: SessionUpdateParams["update"],
  29. ): update is Extract<SessionUpdateParams["update"], { sessionUpdate: "tool_call_update" }> {
  30. return update.sessionUpdate === "tool_call_update"
  31. }
  32. function toolEvent(
  33. sessionId: string,
  34. cwd: string,
  35. opts: {
  36. callID: string
  37. tool: string
  38. input: Record<string, unknown>
  39. } & ({ status: "running"; metadata?: Record<string, unknown> } | { status: "pending"; raw: string }),
  40. ): GlobalEventEnvelope {
  41. const state: ToolStatePending | ToolStateRunning =
  42. opts.status === "running"
  43. ? {
  44. status: "running",
  45. input: opts.input,
  46. ...(opts.metadata && { metadata: opts.metadata }),
  47. time: { start: Date.now() },
  48. }
  49. : {
  50. status: "pending",
  51. input: opts.input,
  52. raw: opts.raw,
  53. }
  54. const payload: EventMessagePartUpdated = {
  55. type: "message.part.updated",
  56. properties: {
  57. part: {
  58. id: `part_${opts.callID}`,
  59. sessionID: sessionId,
  60. messageID: `msg_${opts.callID}`,
  61. type: "tool",
  62. callID: opts.callID,
  63. tool: opts.tool,
  64. state,
  65. },
  66. },
  67. }
  68. return { directory: cwd, payload }
  69. }
  70. function createEventStream() {
  71. const queue: GlobalEventEnvelope[] = []
  72. const waiters: Array<(value: GlobalEventEnvelope | undefined) => void> = []
  73. const state = { closed: false }
  74. const push = (event: GlobalEventEnvelope) => {
  75. const waiter = waiters.shift()
  76. if (waiter) {
  77. waiter(event)
  78. return
  79. }
  80. queue.push(event)
  81. }
  82. const close = () => {
  83. state.closed = true
  84. for (const waiter of waiters.splice(0)) {
  85. waiter(undefined)
  86. }
  87. }
  88. const stream = async function* (signal?: AbortSignal) {
  89. while (true) {
  90. if (signal?.aborted) return
  91. const next = queue.shift()
  92. if (next) {
  93. yield next
  94. continue
  95. }
  96. if (state.closed) return
  97. const value = await new Promise<GlobalEventEnvelope | undefined>((resolve) => {
  98. waiters.push(resolve)
  99. if (!signal) return
  100. signal.addEventListener("abort", () => resolve(undefined), { once: true })
  101. })
  102. if (!value) return
  103. yield value
  104. }
  105. }
  106. return { controller: { push, close } satisfies EventController, stream }
  107. }
  108. function createFakeAgent() {
  109. const updates = new Map<string, string[]>()
  110. const chunks = new Map<string, string>()
  111. const sessionUpdates: SessionUpdateParams[] = []
  112. const record = (sessionId: string, type: string) => {
  113. const list = updates.get(sessionId) ?? []
  114. list.push(type)
  115. updates.set(sessionId, list)
  116. }
  117. const connection = {
  118. async sessionUpdate(params: SessionUpdateParams) {
  119. sessionUpdates.push(params)
  120. const update = params.update
  121. const type = update?.sessionUpdate ?? "unknown"
  122. record(params.sessionId, type)
  123. if (update?.sessionUpdate === "agent_message_chunk") {
  124. const content = update.content
  125. if (content?.type !== "text") return
  126. if (typeof content.text !== "string") return
  127. chunks.set(params.sessionId, (chunks.get(params.sessionId) ?? "") + content.text)
  128. }
  129. },
  130. async requestPermission(_params: RequestPermissionParams): Promise<RequestPermissionResult> {
  131. return { outcome: { outcome: "selected", optionId: "once" } } as RequestPermissionResult
  132. },
  133. } as unknown as AgentSideConnection
  134. const { controller, stream } = createEventStream()
  135. const calls = {
  136. eventSubscribe: 0,
  137. sessionCreate: 0,
  138. }
  139. const sdk = {
  140. global: {
  141. event: async (opts?: { signal?: AbortSignal }) => {
  142. calls.eventSubscribe++
  143. return { stream: stream(opts?.signal) }
  144. },
  145. },
  146. session: {
  147. create: async (_params?: any) => {
  148. calls.sessionCreate++
  149. return {
  150. data: {
  151. id: `ses_${calls.sessionCreate}`,
  152. time: { created: new Date().toISOString() },
  153. },
  154. }
  155. },
  156. get: async (_params?: any) => {
  157. return {
  158. data: {
  159. id: "ses_1",
  160. time: { created: new Date().toISOString() },
  161. },
  162. }
  163. },
  164. messages: async () => {
  165. return { data: [] }
  166. },
  167. message: async (params?: any) => {
  168. // Return a message with parts that can be looked up by partID
  169. return {
  170. data: {
  171. info: {
  172. role: "assistant",
  173. },
  174. parts: [
  175. {
  176. id: params?.messageID ? `${params.messageID}_part` : "part_1",
  177. type: "text",
  178. text: "",
  179. },
  180. ],
  181. },
  182. }
  183. },
  184. },
  185. permission: {
  186. respond: async () => {
  187. return { data: true }
  188. },
  189. },
  190. config: {
  191. providers: async () => {
  192. return {
  193. data: {
  194. providers: [
  195. {
  196. id: "opencode",
  197. name: "opencode",
  198. models: {
  199. "big-pickle": { id: "big-pickle", name: "big-pickle" },
  200. },
  201. },
  202. ],
  203. },
  204. }
  205. },
  206. },
  207. app: {
  208. agents: async () => {
  209. return {
  210. data: [
  211. {
  212. name: "build",
  213. description: "build",
  214. mode: "agent",
  215. },
  216. ],
  217. }
  218. },
  219. },
  220. command: {
  221. list: async () => {
  222. return { data: [] }
  223. },
  224. },
  225. mcp: {
  226. add: async () => {
  227. return { data: true }
  228. },
  229. },
  230. } as any
  231. const agent = new ACP.Agent(connection, {
  232. sdk,
  233. defaultModel: { providerID: "opencode", modelID: "big-pickle" },
  234. } as any)
  235. const stop = () => {
  236. controller.close()
  237. ;(agent as any).eventAbort.abort()
  238. }
  239. return { agent, controller, calls, updates, chunks, sessionUpdates, stop, sdk, connection }
  240. }
  241. describe("acp.agent event subscription", () => {
  242. test("routes message.part.delta by the event sessionID (no cross-session pollution)", async () => {
  243. await using tmp = await tmpdir()
  244. await Instance.provide({
  245. directory: tmp.path,
  246. fn: async () => {
  247. const { agent, controller, updates, stop } = createFakeAgent()
  248. const cwd = "/tmp/opencode-acp-test"
  249. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  250. const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  251. controller.push({
  252. directory: cwd,
  253. payload: {
  254. type: "message.part.delta",
  255. properties: {
  256. sessionID: sessionB,
  257. messageID: "msg_1",
  258. partID: "msg_1_part",
  259. field: "text",
  260. delta: "hello",
  261. },
  262. },
  263. } as any)
  264. await new Promise((r) => setTimeout(r, 10))
  265. expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false)
  266. expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true)
  267. stop()
  268. },
  269. })
  270. })
  271. test("keeps concurrent sessions isolated when message.part.delta events are interleaved", async () => {
  272. await using tmp = await tmpdir()
  273. await Instance.provide({
  274. directory: tmp.path,
  275. fn: async () => {
  276. const { agent, controller, chunks, stop } = createFakeAgent()
  277. const cwd = "/tmp/opencode-acp-test"
  278. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  279. const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  280. const tokenA = ["ALPHA_", "111", "_X"]
  281. const tokenB = ["BETA_", "222", "_Y"]
  282. const push = (sessionId: string, messageID: string, delta: string) => {
  283. controller.push({
  284. directory: cwd,
  285. payload: {
  286. type: "message.part.delta",
  287. properties: {
  288. sessionID: sessionId,
  289. messageID,
  290. partID: `${messageID}_part`,
  291. field: "text",
  292. delta,
  293. },
  294. },
  295. } as any)
  296. }
  297. push(sessionA, "msg_a", tokenA[0])
  298. push(sessionB, "msg_b", tokenB[0])
  299. push(sessionA, "msg_a", tokenA[1])
  300. push(sessionB, "msg_b", tokenB[1])
  301. push(sessionA, "msg_a", tokenA[2])
  302. push(sessionB, "msg_b", tokenB[2])
  303. await new Promise((r) => setTimeout(r, 20))
  304. const a = chunks.get(sessionA) ?? ""
  305. const b = chunks.get(sessionB) ?? ""
  306. expect(a).toContain(tokenA.join(""))
  307. expect(b).toContain(tokenB.join(""))
  308. for (const part of tokenB) expect(a).not.toContain(part)
  309. for (const part of tokenA) expect(b).not.toContain(part)
  310. stop()
  311. },
  312. })
  313. })
  314. test("does not create additional event subscriptions on repeated loadSession()", async () => {
  315. await using tmp = await tmpdir()
  316. await Instance.provide({
  317. directory: tmp.path,
  318. fn: async () => {
  319. const { agent, calls, stop } = createFakeAgent()
  320. const cwd = "/tmp/opencode-acp-test"
  321. const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  322. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  323. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  324. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  325. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  326. expect(calls.eventSubscribe).toBe(1)
  327. stop()
  328. },
  329. })
  330. })
  331. test("permission.asked events are handled and replied", async () => {
  332. await using tmp = await tmpdir()
  333. await Instance.provide({
  334. directory: tmp.path,
  335. fn: async () => {
  336. const permissionReplies: string[] = []
  337. const { agent, controller, stop, sdk } = createFakeAgent()
  338. sdk.permission.reply = async (params: any) => {
  339. permissionReplies.push(params.requestID)
  340. return { data: true }
  341. }
  342. const cwd = "/tmp/opencode-acp-test"
  343. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  344. controller.push({
  345. directory: cwd,
  346. payload: {
  347. type: "permission.asked",
  348. properties: {
  349. id: "perm_1",
  350. sessionID: sessionA,
  351. permission: "bash",
  352. patterns: ["*"],
  353. metadata: {},
  354. always: [],
  355. },
  356. },
  357. } as any)
  358. await new Promise((r) => setTimeout(r, 20))
  359. expect(permissionReplies).toContain("perm_1")
  360. stop()
  361. },
  362. })
  363. })
  364. test("permission prompt on session A does not block message updates for session B", async () => {
  365. await using tmp = await tmpdir()
  366. await Instance.provide({
  367. directory: tmp.path,
  368. fn: async () => {
  369. const permissionReplies: string[] = []
  370. let resolvePermissionA: (() => void) | undefined
  371. const permissionABlocking = new Promise<void>((r) => {
  372. resolvePermissionA = r
  373. })
  374. const { agent, controller, chunks, stop, sdk, connection } = createFakeAgent()
  375. // Make permission request for session A block until we release it
  376. const originalRequestPermission = connection.requestPermission.bind(connection)
  377. let permissionCalls = 0
  378. connection.requestPermission = async (params: RequestPermissionParams) => {
  379. permissionCalls++
  380. if (params.sessionId.endsWith("1")) {
  381. await permissionABlocking
  382. }
  383. return originalRequestPermission(params)
  384. }
  385. sdk.permission.reply = async (params: any) => {
  386. permissionReplies.push(params.requestID)
  387. return { data: true }
  388. }
  389. const cwd = "/tmp/opencode-acp-test"
  390. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  391. const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  392. // Push permission.asked for session A (will block)
  393. controller.push({
  394. directory: cwd,
  395. payload: {
  396. type: "permission.asked",
  397. properties: {
  398. id: "perm_a",
  399. sessionID: sessionA,
  400. permission: "bash",
  401. patterns: ["*"],
  402. metadata: {},
  403. always: [],
  404. },
  405. },
  406. } as any)
  407. // Give time for permission handling to start
  408. await new Promise((r) => setTimeout(r, 10))
  409. // Push message for session B while A's permission is pending
  410. controller.push({
  411. directory: cwd,
  412. payload: {
  413. type: "message.part.delta",
  414. properties: {
  415. sessionID: sessionB,
  416. messageID: "msg_b",
  417. partID: "msg_b_part",
  418. field: "text",
  419. delta: "session_b_message",
  420. },
  421. },
  422. } as any)
  423. // Wait for session B's message to be processed
  424. await new Promise((r) => setTimeout(r, 20))
  425. // Session B should have received message even though A's permission is still pending
  426. expect(chunks.get(sessionB) ?? "").toContain("session_b_message")
  427. expect(permissionReplies).not.toContain("perm_a")
  428. // Release session A's permission
  429. resolvePermissionA!()
  430. await new Promise((r) => setTimeout(r, 20))
  431. // Now session A's permission should be replied
  432. expect(permissionReplies).toContain("perm_a")
  433. stop()
  434. },
  435. })
  436. })
  437. test("streams running bash output snapshots and de-dupes identical snapshots", async () => {
  438. await using tmp = await tmpdir()
  439. await Instance.provide({
  440. directory: tmp.path,
  441. fn: async () => {
  442. const { agent, controller, sessionUpdates, stop } = createFakeAgent()
  443. const cwd = "/tmp/opencode-acp-test"
  444. const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  445. const input = { command: "echo hello", description: "run command" }
  446. for (const output of ["a", "a", "ab"]) {
  447. controller.push(
  448. toolEvent(sessionId, cwd, {
  449. callID: "call_1",
  450. tool: "bash",
  451. status: "running",
  452. input,
  453. metadata: { output },
  454. }),
  455. )
  456. }
  457. await new Promise((r) => setTimeout(r, 20))
  458. const snapshots = sessionUpdates
  459. .filter((u) => u.sessionId === sessionId)
  460. .filter((u) => isToolCallUpdate(u.update))
  461. .map((u) => inProgressText(u.update))
  462. expect(snapshots).toEqual(["a", undefined, "ab"])
  463. stop()
  464. },
  465. })
  466. })
  467. test("emits synthetic pending before first running update for any tool", async () => {
  468. await using tmp = await tmpdir()
  469. await Instance.provide({
  470. directory: tmp.path,
  471. fn: async () => {
  472. const { agent, controller, sessionUpdates, stop } = createFakeAgent()
  473. const cwd = "/tmp/opencode-acp-test"
  474. const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  475. controller.push(
  476. toolEvent(sessionId, cwd, {
  477. callID: "call_bash",
  478. tool: "bash",
  479. status: "running",
  480. input: { command: "echo hi", description: "run command" },
  481. metadata: { output: "hi\n" },
  482. }),
  483. )
  484. controller.push(
  485. toolEvent(sessionId, cwd, {
  486. callID: "call_read",
  487. tool: "read",
  488. status: "running",
  489. input: { filePath: "/tmp/example.txt" },
  490. }),
  491. )
  492. await new Promise((r) => setTimeout(r, 20))
  493. const types = sessionUpdates
  494. .filter((u) => u.sessionId === sessionId)
  495. .map((u) => u.update.sessionUpdate)
  496. .filter((u) => u === "tool_call" || u === "tool_call_update")
  497. expect(types).toEqual(["tool_call", "tool_call_update", "tool_call", "tool_call_update"])
  498. const pendings = sessionUpdates.filter(
  499. (u) => u.sessionId === sessionId && u.update.sessionUpdate === "tool_call",
  500. )
  501. expect(pendings.every((p) => p.update.sessionUpdate === "tool_call" && p.update.status === "pending")).toBe(
  502. true,
  503. )
  504. stop()
  505. },
  506. })
  507. })
  508. test("does not emit duplicate synthetic pending after replayed running tool", async () => {
  509. await using tmp = await tmpdir()
  510. await Instance.provide({
  511. directory: tmp.path,
  512. fn: async () => {
  513. const { agent, controller, sessionUpdates, stop, sdk } = createFakeAgent()
  514. const cwd = "/tmp/opencode-acp-test"
  515. const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  516. const input = { command: "echo hi", description: "run command" }
  517. sdk.session.messages = async () => ({
  518. data: [
  519. {
  520. info: {
  521. role: "assistant",
  522. sessionID: sessionId,
  523. },
  524. parts: [
  525. {
  526. type: "tool",
  527. callID: "call_1",
  528. tool: "bash",
  529. state: {
  530. status: "running",
  531. input,
  532. metadata: { output: "hi\n" },
  533. time: { start: Date.now() },
  534. },
  535. },
  536. ],
  537. },
  538. ],
  539. })
  540. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  541. controller.push(
  542. toolEvent(sessionId, cwd, {
  543. callID: "call_1",
  544. tool: "bash",
  545. status: "running",
  546. input,
  547. metadata: { output: "hi\nthere\n" },
  548. }),
  549. )
  550. await new Promise((r) => setTimeout(r, 20))
  551. const types = sessionUpdates
  552. .filter((u) => u.sessionId === sessionId)
  553. .map((u) => u.update)
  554. .filter((u) => "toolCallId" in u && u.toolCallId === "call_1")
  555. .map((u) => u.sessionUpdate)
  556. .filter((u) => u === "tool_call" || u === "tool_call_update")
  557. expect(types).toEqual(["tool_call", "tool_call_update", "tool_call_update"])
  558. stop()
  559. },
  560. })
  561. })
  562. test("clears bash snapshot marker on pending state", async () => {
  563. await using tmp = await tmpdir()
  564. await Instance.provide({
  565. directory: tmp.path,
  566. fn: async () => {
  567. const { agent, controller, sessionUpdates, stop } = createFakeAgent()
  568. const cwd = "/tmp/opencode-acp-test"
  569. const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  570. const input = { command: "echo hello", description: "run command" }
  571. controller.push(
  572. toolEvent(sessionId, cwd, {
  573. callID: "call_1",
  574. tool: "bash",
  575. status: "running",
  576. input,
  577. metadata: { output: "a" },
  578. }),
  579. )
  580. controller.push(
  581. toolEvent(sessionId, cwd, {
  582. callID: "call_1",
  583. tool: "bash",
  584. status: "pending",
  585. input,
  586. raw: '{"command":"echo hello"}',
  587. }),
  588. )
  589. controller.push(
  590. toolEvent(sessionId, cwd, {
  591. callID: "call_1",
  592. tool: "bash",
  593. status: "running",
  594. input,
  595. metadata: { output: "a" },
  596. }),
  597. )
  598. await new Promise((r) => setTimeout(r, 20))
  599. const snapshots = sessionUpdates
  600. .filter((u) => u.sessionId === sessionId)
  601. .filter((u) => isToolCallUpdate(u.update))
  602. .map((u) => inProgressText(u.update))
  603. expect(snapshots).toEqual(["a", "a"])
  604. stop()
  605. },
  606. })
  607. })
  608. })