event-subscription.test.ts 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. import { describe, expect, test } from "bun:test"
  2. import { ACP } from "../../src/acp/agent"
  3. import type { AgentSideConnection } from "@agentclientprotocol/sdk"
  4. import type { Event } from "@opencode-ai/sdk/v2"
  5. import { Instance } from "../../src/project/instance"
  6. import { tmpdir } from "../fixture/fixture"
  7. type SessionUpdateParams = Parameters<AgentSideConnection["sessionUpdate"]>[0]
  8. type RequestPermissionParams = Parameters<AgentSideConnection["requestPermission"]>[0]
  9. type RequestPermissionResult = Awaited<ReturnType<AgentSideConnection["requestPermission"]>>
  10. type GlobalEventEnvelope = {
  11. directory?: string
  12. payload?: Event
  13. }
  14. type EventController = {
  15. push: (event: GlobalEventEnvelope) => void
  16. close: () => void
  17. }
  18. function createEventStream() {
  19. const queue: GlobalEventEnvelope[] = []
  20. const waiters: Array<(value: GlobalEventEnvelope | undefined) => void> = []
  21. const state = { closed: false }
  22. const push = (event: GlobalEventEnvelope) => {
  23. const waiter = waiters.shift()
  24. if (waiter) {
  25. waiter(event)
  26. return
  27. }
  28. queue.push(event)
  29. }
  30. const close = () => {
  31. state.closed = true
  32. for (const waiter of waiters.splice(0)) {
  33. waiter(undefined)
  34. }
  35. }
  36. const stream = async function* (signal?: AbortSignal) {
  37. while (true) {
  38. if (signal?.aborted) return
  39. const next = queue.shift()
  40. if (next) {
  41. yield next
  42. continue
  43. }
  44. if (state.closed) return
  45. const value = await new Promise<GlobalEventEnvelope | undefined>((resolve) => {
  46. waiters.push(resolve)
  47. if (!signal) return
  48. signal.addEventListener("abort", () => resolve(undefined), { once: true })
  49. })
  50. if (!value) return
  51. yield value
  52. }
  53. }
  54. return { controller: { push, close } satisfies EventController, stream }
  55. }
  56. function createFakeAgent() {
  57. const updates = new Map<string, string[]>()
  58. const chunks = new Map<string, string>()
  59. const record = (sessionId: string, type: string) => {
  60. const list = updates.get(sessionId) ?? []
  61. list.push(type)
  62. updates.set(sessionId, list)
  63. }
  64. const connection = {
  65. async sessionUpdate(params: SessionUpdateParams) {
  66. const update = params.update
  67. const type = update?.sessionUpdate ?? "unknown"
  68. record(params.sessionId, type)
  69. if (update?.sessionUpdate === "agent_message_chunk") {
  70. const content = update.content
  71. if (content?.type !== "text") return
  72. if (typeof content.text !== "string") return
  73. chunks.set(params.sessionId, (chunks.get(params.sessionId) ?? "") + content.text)
  74. }
  75. },
  76. async requestPermission(_params: RequestPermissionParams): Promise<RequestPermissionResult> {
  77. return { outcome: { outcome: "selected", optionId: "once" } } as RequestPermissionResult
  78. },
  79. } as unknown as AgentSideConnection
  80. const { controller, stream } = createEventStream()
  81. const calls = {
  82. eventSubscribe: 0,
  83. sessionCreate: 0,
  84. }
  85. const sdk = {
  86. global: {
  87. event: async (opts?: { signal?: AbortSignal }) => {
  88. calls.eventSubscribe++
  89. return { stream: stream(opts?.signal) }
  90. },
  91. },
  92. session: {
  93. create: async (_params?: any) => {
  94. calls.sessionCreate++
  95. return {
  96. data: {
  97. id: `ses_${calls.sessionCreate}`,
  98. time: { created: new Date().toISOString() },
  99. },
  100. }
  101. },
  102. get: async (_params?: any) => {
  103. return {
  104. data: {
  105. id: "ses_1",
  106. time: { created: new Date().toISOString() },
  107. },
  108. }
  109. },
  110. messages: async () => {
  111. return { data: [] }
  112. },
  113. message: async () => {
  114. return {
  115. data: {
  116. info: {
  117. role: "assistant",
  118. },
  119. },
  120. }
  121. },
  122. },
  123. permission: {
  124. respond: async () => {
  125. return { data: true }
  126. },
  127. },
  128. config: {
  129. providers: async () => {
  130. return {
  131. data: {
  132. providers: [
  133. {
  134. id: "opencode",
  135. name: "opencode",
  136. models: {
  137. "big-pickle": { id: "big-pickle", name: "big-pickle" },
  138. },
  139. },
  140. ],
  141. },
  142. }
  143. },
  144. },
  145. app: {
  146. agents: async () => {
  147. return {
  148. data: [
  149. {
  150. name: "build",
  151. description: "build",
  152. mode: "agent",
  153. },
  154. ],
  155. }
  156. },
  157. },
  158. command: {
  159. list: async () => {
  160. return { data: [] }
  161. },
  162. },
  163. mcp: {
  164. add: async () => {
  165. return { data: true }
  166. },
  167. },
  168. } as any
  169. const agent = new ACP.Agent(connection, {
  170. sdk,
  171. defaultModel: { providerID: "opencode", modelID: "big-pickle" },
  172. } as any)
  173. const stop = () => {
  174. controller.close()
  175. ;(agent as any).eventAbort.abort()
  176. }
  177. return { agent, controller, calls, updates, chunks, stop, sdk, connection }
  178. }
  179. describe("acp.agent event subscription", () => {
  180. test("routes message.part.updated by the event sessionID (no cross-session pollution)", async () => {
  181. await using tmp = await tmpdir()
  182. await Instance.provide({
  183. directory: tmp.path,
  184. fn: async () => {
  185. const { agent, controller, updates, stop } = createFakeAgent()
  186. const cwd = "/tmp/opencode-acp-test"
  187. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  188. const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  189. controller.push({
  190. directory: cwd,
  191. payload: {
  192. type: "message.part.updated",
  193. properties: {
  194. part: {
  195. sessionID: sessionB,
  196. messageID: "msg_1",
  197. type: "text",
  198. synthetic: false,
  199. },
  200. delta: "hello",
  201. },
  202. },
  203. } as any)
  204. await new Promise((r) => setTimeout(r, 10))
  205. expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false)
  206. expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true)
  207. stop()
  208. },
  209. })
  210. })
  211. test("keeps concurrent sessions isolated when message.part.updated events are interleaved", async () => {
  212. await using tmp = await tmpdir()
  213. await Instance.provide({
  214. directory: tmp.path,
  215. fn: async () => {
  216. const { agent, controller, chunks, stop } = createFakeAgent()
  217. const cwd = "/tmp/opencode-acp-test"
  218. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  219. const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  220. const tokenA = ["ALPHA_", "111", "_X"]
  221. const tokenB = ["BETA_", "222", "_Y"]
  222. const push = (sessionId: string, messageID: string, delta: string) => {
  223. controller.push({
  224. directory: cwd,
  225. payload: {
  226. type: "message.part.updated",
  227. properties: {
  228. part: {
  229. sessionID: sessionId,
  230. messageID,
  231. type: "text",
  232. synthetic: false,
  233. },
  234. delta,
  235. },
  236. },
  237. } as any)
  238. }
  239. push(sessionA, "msg_a", tokenA[0])
  240. push(sessionB, "msg_b", tokenB[0])
  241. push(sessionA, "msg_a", tokenA[1])
  242. push(sessionB, "msg_b", tokenB[1])
  243. push(sessionA, "msg_a", tokenA[2])
  244. push(sessionB, "msg_b", tokenB[2])
  245. await new Promise((r) => setTimeout(r, 20))
  246. const a = chunks.get(sessionA) ?? ""
  247. const b = chunks.get(sessionB) ?? ""
  248. expect(a).toContain(tokenA.join(""))
  249. expect(b).toContain(tokenB.join(""))
  250. for (const part of tokenB) expect(a).not.toContain(part)
  251. for (const part of tokenA) expect(b).not.toContain(part)
  252. stop()
  253. },
  254. })
  255. })
  256. test("does not create additional event subscriptions on repeated loadSession()", async () => {
  257. await using tmp = await tmpdir()
  258. await Instance.provide({
  259. directory: tmp.path,
  260. fn: async () => {
  261. const { agent, calls, stop } = createFakeAgent()
  262. const cwd = "/tmp/opencode-acp-test"
  263. const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  264. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  265. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  266. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  267. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  268. expect(calls.eventSubscribe).toBe(1)
  269. stop()
  270. },
  271. })
  272. })
  273. test("permission.asked events are handled and replied", async () => {
  274. await using tmp = await tmpdir()
  275. await Instance.provide({
  276. directory: tmp.path,
  277. fn: async () => {
  278. const permissionReplies: string[] = []
  279. const { agent, controller, stop, sdk } = createFakeAgent()
  280. sdk.permission.reply = async (params: any) => {
  281. permissionReplies.push(params.requestID)
  282. return { data: true }
  283. }
  284. const cwd = "/tmp/opencode-acp-test"
  285. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  286. controller.push({
  287. directory: cwd,
  288. payload: {
  289. type: "permission.asked",
  290. properties: {
  291. id: "perm_1",
  292. sessionID: sessionA,
  293. permission: "bash",
  294. patterns: ["*"],
  295. metadata: {},
  296. always: [],
  297. },
  298. },
  299. } as any)
  300. await new Promise((r) => setTimeout(r, 20))
  301. expect(permissionReplies).toContain("perm_1")
  302. stop()
  303. },
  304. })
  305. })
  306. test("permission prompt on session A does not block message updates for session B", async () => {
  307. await using tmp = await tmpdir()
  308. await Instance.provide({
  309. directory: tmp.path,
  310. fn: async () => {
  311. const permissionReplies: string[] = []
  312. let resolvePermissionA: (() => void) | undefined
  313. const permissionABlocking = new Promise<void>((r) => {
  314. resolvePermissionA = r
  315. })
  316. const { agent, controller, chunks, stop, sdk, connection } = createFakeAgent()
  317. // Make permission request for session A block until we release it
  318. const originalRequestPermission = connection.requestPermission.bind(connection)
  319. let permissionCalls = 0
  320. connection.requestPermission = async (params: RequestPermissionParams) => {
  321. permissionCalls++
  322. if (params.sessionId.endsWith("1")) {
  323. await permissionABlocking
  324. }
  325. return originalRequestPermission(params)
  326. }
  327. sdk.permission.reply = async (params: any) => {
  328. permissionReplies.push(params.requestID)
  329. return { data: true }
  330. }
  331. const cwd = "/tmp/opencode-acp-test"
  332. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  333. const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  334. // Push permission.asked for session A (will block)
  335. controller.push({
  336. directory: cwd,
  337. payload: {
  338. type: "permission.asked",
  339. properties: {
  340. id: "perm_a",
  341. sessionID: sessionA,
  342. permission: "bash",
  343. patterns: ["*"],
  344. metadata: {},
  345. always: [],
  346. },
  347. },
  348. } as any)
  349. // Give time for permission handling to start
  350. await new Promise((r) => setTimeout(r, 10))
  351. // Push message for session B while A's permission is pending
  352. controller.push({
  353. directory: cwd,
  354. payload: {
  355. type: "message.part.updated",
  356. properties: {
  357. part: {
  358. sessionID: sessionB,
  359. messageID: "msg_b",
  360. type: "text",
  361. synthetic: false,
  362. },
  363. delta: "session_b_message",
  364. },
  365. },
  366. } as any)
  367. // Wait for session B's message to be processed
  368. await new Promise((r) => setTimeout(r, 20))
  369. // Session B should have received message even though A's permission is still pending
  370. expect(chunks.get(sessionB) ?? "").toContain("session_b_message")
  371. expect(permissionReplies).not.toContain("perm_a")
  372. // Release session A's permission
  373. resolvePermissionA!()
  374. await new Promise((r) => setTimeout(r, 20))
  375. // Now session A's permission should be replied
  376. expect(permissionReplies).toContain("perm_a")
  377. stop()
  378. },
  379. })
  380. })
  381. })