event-subscription.test.ts 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. import { describe, expect, test } from "bun:test"
  2. import { ACP } from "../../src/acp/agent"
  3. import type { AgentSideConnection } from "@agentclientprotocol/sdk"
  4. import type { Event } from "@opencode-ai/sdk/v2"
  5. import { Instance } from "../../src/project/instance"
  6. import { tmpdir } from "../fixture/fixture"
  7. type SessionUpdateParams = Parameters<AgentSideConnection["sessionUpdate"]>[0]
  8. type RequestPermissionParams = Parameters<AgentSideConnection["requestPermission"]>[0]
  9. type RequestPermissionResult = Awaited<ReturnType<AgentSideConnection["requestPermission"]>>
  10. type GlobalEventEnvelope = {
  11. directory?: string
  12. payload?: Event
  13. }
  14. type EventController = {
  15. push: (event: GlobalEventEnvelope) => void
  16. close: () => void
  17. }
  18. function createEventStream() {
  19. const queue: GlobalEventEnvelope[] = []
  20. const waiters: Array<(value: GlobalEventEnvelope | undefined) => void> = []
  21. const state = { closed: false }
  22. const push = (event: GlobalEventEnvelope) => {
  23. const waiter = waiters.shift()
  24. if (waiter) {
  25. waiter(event)
  26. return
  27. }
  28. queue.push(event)
  29. }
  30. const close = () => {
  31. state.closed = true
  32. for (const waiter of waiters.splice(0)) {
  33. waiter(undefined)
  34. }
  35. }
  36. const stream = async function* (signal?: AbortSignal) {
  37. while (true) {
  38. if (signal?.aborted) return
  39. const next = queue.shift()
  40. if (next) {
  41. yield next
  42. continue
  43. }
  44. if (state.closed) return
  45. const value = await new Promise<GlobalEventEnvelope | undefined>((resolve) => {
  46. waiters.push(resolve)
  47. if (!signal) return
  48. signal.addEventListener("abort", () => resolve(undefined), { once: true })
  49. })
  50. if (!value) return
  51. yield value
  52. }
  53. }
  54. return { controller: { push, close } satisfies EventController, stream }
  55. }
  56. function createFakeAgent() {
  57. const updates = new Map<string, string[]>()
  58. const chunks = new Map<string, string>()
  59. const record = (sessionId: string, type: string) => {
  60. const list = updates.get(sessionId) ?? []
  61. list.push(type)
  62. updates.set(sessionId, list)
  63. }
  64. const connection = {
  65. async sessionUpdate(params: SessionUpdateParams) {
  66. const update = params.update
  67. const type = update?.sessionUpdate ?? "unknown"
  68. record(params.sessionId, type)
  69. if (update?.sessionUpdate === "agent_message_chunk") {
  70. const content = update.content
  71. if (content?.type !== "text") return
  72. if (typeof content.text !== "string") return
  73. chunks.set(params.sessionId, (chunks.get(params.sessionId) ?? "") + content.text)
  74. }
  75. },
  76. async requestPermission(_params: RequestPermissionParams): Promise<RequestPermissionResult> {
  77. return { outcome: { outcome: "selected", optionId: "once" } } as RequestPermissionResult
  78. },
  79. } as unknown as AgentSideConnection
  80. const { controller, stream } = createEventStream()
  81. const calls = {
  82. eventSubscribe: 0,
  83. sessionCreate: 0,
  84. }
  85. const sdk = {
  86. global: {
  87. event: async (opts?: { signal?: AbortSignal }) => {
  88. calls.eventSubscribe++
  89. return { stream: stream(opts?.signal) }
  90. },
  91. },
  92. session: {
  93. create: async (_params?: any) => {
  94. calls.sessionCreate++
  95. return {
  96. data: {
  97. id: `ses_${calls.sessionCreate}`,
  98. time: { created: new Date().toISOString() },
  99. },
  100. }
  101. },
  102. get: async (_params?: any) => {
  103. return {
  104. data: {
  105. id: "ses_1",
  106. time: { created: new Date().toISOString() },
  107. },
  108. }
  109. },
  110. messages: async () => {
  111. return { data: [] }
  112. },
  113. message: async (params?: any) => {
  114. // Return a message with parts that can be looked up by partID
  115. return {
  116. data: {
  117. info: {
  118. role: "assistant",
  119. },
  120. parts: [
  121. {
  122. id: params?.messageID ? `${params.messageID}_part` : "part_1",
  123. type: "text",
  124. text: "",
  125. },
  126. ],
  127. },
  128. }
  129. },
  130. },
  131. permission: {
  132. respond: async () => {
  133. return { data: true }
  134. },
  135. },
  136. config: {
  137. providers: async () => {
  138. return {
  139. data: {
  140. providers: [
  141. {
  142. id: "opencode",
  143. name: "opencode",
  144. models: {
  145. "big-pickle": { id: "big-pickle", name: "big-pickle" },
  146. },
  147. },
  148. ],
  149. },
  150. }
  151. },
  152. },
  153. app: {
  154. agents: async () => {
  155. return {
  156. data: [
  157. {
  158. name: "build",
  159. description: "build",
  160. mode: "agent",
  161. },
  162. ],
  163. }
  164. },
  165. },
  166. command: {
  167. list: async () => {
  168. return { data: [] }
  169. },
  170. },
  171. mcp: {
  172. add: async () => {
  173. return { data: true }
  174. },
  175. },
  176. } as any
  177. const agent = new ACP.Agent(connection, {
  178. sdk,
  179. defaultModel: { providerID: "opencode", modelID: "big-pickle" },
  180. } as any)
  181. const stop = () => {
  182. controller.close()
  183. ;(agent as any).eventAbort.abort()
  184. }
  185. return { agent, controller, calls, updates, chunks, stop, sdk, connection }
  186. }
  187. describe("acp.agent event subscription", () => {
  188. test("routes message.part.delta by the event sessionID (no cross-session pollution)", async () => {
  189. await using tmp = await tmpdir()
  190. await Instance.provide({
  191. directory: tmp.path,
  192. fn: async () => {
  193. const { agent, controller, updates, stop } = createFakeAgent()
  194. const cwd = "/tmp/opencode-acp-test"
  195. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  196. const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  197. controller.push({
  198. directory: cwd,
  199. payload: {
  200. type: "message.part.delta",
  201. properties: {
  202. sessionID: sessionB,
  203. messageID: "msg_1",
  204. partID: "msg_1_part",
  205. field: "text",
  206. delta: "hello",
  207. },
  208. },
  209. } as any)
  210. await new Promise((r) => setTimeout(r, 10))
  211. expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false)
  212. expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true)
  213. stop()
  214. },
  215. })
  216. })
  217. test("keeps concurrent sessions isolated when message.part.delta events are interleaved", async () => {
  218. await using tmp = await tmpdir()
  219. await Instance.provide({
  220. directory: tmp.path,
  221. fn: async () => {
  222. const { agent, controller, chunks, stop } = createFakeAgent()
  223. const cwd = "/tmp/opencode-acp-test"
  224. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  225. const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  226. const tokenA = ["ALPHA_", "111", "_X"]
  227. const tokenB = ["BETA_", "222", "_Y"]
  228. const push = (sessionId: string, messageID: string, delta: string) => {
  229. controller.push({
  230. directory: cwd,
  231. payload: {
  232. type: "message.part.delta",
  233. properties: {
  234. sessionID: sessionId,
  235. messageID,
  236. partID: `${messageID}_part`,
  237. field: "text",
  238. delta,
  239. },
  240. },
  241. } as any)
  242. }
  243. push(sessionA, "msg_a", tokenA[0])
  244. push(sessionB, "msg_b", tokenB[0])
  245. push(sessionA, "msg_a", tokenA[1])
  246. push(sessionB, "msg_b", tokenB[1])
  247. push(sessionA, "msg_a", tokenA[2])
  248. push(sessionB, "msg_b", tokenB[2])
  249. await new Promise((r) => setTimeout(r, 20))
  250. const a = chunks.get(sessionA) ?? ""
  251. const b = chunks.get(sessionB) ?? ""
  252. expect(a).toContain(tokenA.join(""))
  253. expect(b).toContain(tokenB.join(""))
  254. for (const part of tokenB) expect(a).not.toContain(part)
  255. for (const part of tokenA) expect(b).not.toContain(part)
  256. stop()
  257. },
  258. })
  259. })
  260. test("does not create additional event subscriptions on repeated loadSession()", async () => {
  261. await using tmp = await tmpdir()
  262. await Instance.provide({
  263. directory: tmp.path,
  264. fn: async () => {
  265. const { agent, calls, stop } = createFakeAgent()
  266. const cwd = "/tmp/opencode-acp-test"
  267. const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  268. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  269. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  270. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  271. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  272. expect(calls.eventSubscribe).toBe(1)
  273. stop()
  274. },
  275. })
  276. })
  277. test("permission.asked events are handled and replied", async () => {
  278. await using tmp = await tmpdir()
  279. await Instance.provide({
  280. directory: tmp.path,
  281. fn: async () => {
  282. const permissionReplies: string[] = []
  283. const { agent, controller, stop, sdk } = createFakeAgent()
  284. sdk.permission.reply = async (params: any) => {
  285. permissionReplies.push(params.requestID)
  286. return { data: true }
  287. }
  288. const cwd = "/tmp/opencode-acp-test"
  289. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  290. controller.push({
  291. directory: cwd,
  292. payload: {
  293. type: "permission.asked",
  294. properties: {
  295. id: "perm_1",
  296. sessionID: sessionA,
  297. permission: "bash",
  298. patterns: ["*"],
  299. metadata: {},
  300. always: [],
  301. },
  302. },
  303. } as any)
  304. await new Promise((r) => setTimeout(r, 20))
  305. expect(permissionReplies).toContain("perm_1")
  306. stop()
  307. },
  308. })
  309. })
  310. test("permission prompt on session A does not block message updates for session B", async () => {
  311. await using tmp = await tmpdir()
  312. await Instance.provide({
  313. directory: tmp.path,
  314. fn: async () => {
  315. const permissionReplies: string[] = []
  316. let resolvePermissionA: (() => void) | undefined
  317. const permissionABlocking = new Promise<void>((r) => {
  318. resolvePermissionA = r
  319. })
  320. const { agent, controller, chunks, stop, sdk, connection } = createFakeAgent()
  321. // Make permission request for session A block until we release it
  322. const originalRequestPermission = connection.requestPermission.bind(connection)
  323. let permissionCalls = 0
  324. connection.requestPermission = async (params: RequestPermissionParams) => {
  325. permissionCalls++
  326. if (params.sessionId.endsWith("1")) {
  327. await permissionABlocking
  328. }
  329. return originalRequestPermission(params)
  330. }
  331. sdk.permission.reply = async (params: any) => {
  332. permissionReplies.push(params.requestID)
  333. return { data: true }
  334. }
  335. const cwd = "/tmp/opencode-acp-test"
  336. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  337. const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  338. // Push permission.asked for session A (will block)
  339. controller.push({
  340. directory: cwd,
  341. payload: {
  342. type: "permission.asked",
  343. properties: {
  344. id: "perm_a",
  345. sessionID: sessionA,
  346. permission: "bash",
  347. patterns: ["*"],
  348. metadata: {},
  349. always: [],
  350. },
  351. },
  352. } as any)
  353. // Give time for permission handling to start
  354. await new Promise((r) => setTimeout(r, 10))
  355. // Push message for session B while A's permission is pending
  356. controller.push({
  357. directory: cwd,
  358. payload: {
  359. type: "message.part.delta",
  360. properties: {
  361. sessionID: sessionB,
  362. messageID: "msg_b",
  363. partID: "msg_b_part",
  364. field: "text",
  365. delta: "session_b_message",
  366. },
  367. },
  368. } as any)
  369. // Wait for session B's message to be processed
  370. await new Promise((r) => setTimeout(r, 20))
  371. // Session B should have received message even though A's permission is still pending
  372. expect(chunks.get(sessionB) ?? "").toContain("session_b_message")
  373. expect(permissionReplies).not.toContain("perm_a")
  374. // Release session A's permission
  375. resolvePermissionA!()
  376. await new Promise((r) => setTimeout(r, 20))
  377. // Now session A's permission should be replied
  378. expect(permissionReplies).toContain("perm_a")
  379. stop()
  380. },
  381. })
  382. })
  383. })