event-subscription.test.ts 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727
  1. import { describe, expect, test } from "bun:test"
  2. import { ACP } from "../../src/acp/agent"
  3. import type { AgentSideConnection } from "@agentclientprotocol/sdk"
  4. import type { Event, EventMessagePartUpdated, ToolStatePending, ToolStateRunning } from "@kilocode/sdk/v2"
  5. import { Instance } from "../../src/project/instance"
  6. import { tmpdir } from "../fixture/fixture"
  7. type SessionUpdateParams = Parameters<AgentSideConnection["sessionUpdate"]>[0]
  8. type RequestPermissionParams = Parameters<AgentSideConnection["requestPermission"]>[0]
  9. type RequestPermissionResult = Awaited<ReturnType<AgentSideConnection["requestPermission"]>>
  10. type GlobalEventEnvelope = {
  11. directory?: string
  12. payload?: Event
  13. }
  14. type EventController = {
  15. push: (event: GlobalEventEnvelope) => void
  16. close: () => void
  17. }
  18. function inProgressText(update: SessionUpdateParams["update"]) {
  19. if (update.sessionUpdate !== "tool_call_update") return undefined
  20. if (update.status !== "in_progress") return undefined
  21. if (!update.content || !Array.isArray(update.content)) return undefined
  22. const first = update.content[0]
  23. if (!first || first.type !== "content") return undefined
  24. if (first.content.type !== "text") return undefined
  25. return first.content.text
  26. }
  27. function isToolCallUpdate(
  28. update: SessionUpdateParams["update"],
  29. ): update is Extract<SessionUpdateParams["update"], { sessionUpdate: "tool_call_update" }> {
  30. return update.sessionUpdate === "tool_call_update"
  31. }
  32. function toolEvent(
  33. sessionId: string,
  34. cwd: string,
  35. opts: {
  36. callID: string
  37. tool: string
  38. input: Record<string, unknown>
  39. } & ({ status: "running"; metadata?: Record<string, unknown> } | { status: "pending"; raw: string }),
  40. ): GlobalEventEnvelope {
  41. const state: ToolStatePending | ToolStateRunning =
  42. opts.status === "running"
  43. ? {
  44. status: "running",
  45. input: opts.input,
  46. ...(opts.metadata && { metadata: opts.metadata }),
  47. time: { start: Date.now() },
  48. }
  49. : {
  50. status: "pending",
  51. input: opts.input,
  52. raw: opts.raw,
  53. }
  54. const payload: EventMessagePartUpdated = {
  55. type: "message.part.updated",
  56. properties: {
  57. sessionID: sessionId,
  58. time: Date.now(),
  59. part: {
  60. id: `part_${opts.callID}`,
  61. sessionID: sessionId,
  62. messageID: `msg_${opts.callID}`,
  63. type: "tool",
  64. callID: opts.callID,
  65. tool: opts.tool,
  66. state,
  67. },
  68. },
  69. }
  70. return { directory: cwd, payload }
  71. }
  72. function createEventStream() {
  73. const queue: GlobalEventEnvelope[] = []
  74. const waiters: Array<(value: GlobalEventEnvelope | undefined) => void> = []
  75. const state = { closed: false }
  76. const push = (event: GlobalEventEnvelope) => {
  77. const waiter = waiters.shift()
  78. if (waiter) {
  79. waiter(event)
  80. return
  81. }
  82. queue.push(event)
  83. }
  84. const close = () => {
  85. state.closed = true
  86. for (const waiter of waiters.splice(0)) {
  87. waiter(undefined)
  88. }
  89. }
  90. const stream = async function* (signal?: AbortSignal) {
  91. while (true) {
  92. if (signal?.aborted) return
  93. const next = queue.shift()
  94. if (next) {
  95. yield next
  96. continue
  97. }
  98. if (state.closed) return
  99. const value = await new Promise<GlobalEventEnvelope | undefined>((resolve) => {
  100. waiters.push(resolve)
  101. if (!signal) return
  102. signal.addEventListener("abort", () => resolve(undefined), { once: true })
  103. })
  104. if (!value) return
  105. yield value
  106. }
  107. }
  108. return { controller: { push, close } satisfies EventController, stream }
  109. }
  110. function createFakeAgent() {
  111. const updates = new Map<string, string[]>()
  112. const chunks = new Map<string, string>()
  113. const sessionUpdates: SessionUpdateParams[] = []
  114. const record = (sessionId: string, type: string) => {
  115. const list = updates.get(sessionId) ?? []
  116. list.push(type)
  117. updates.set(sessionId, list)
  118. }
  119. const connection = {
  120. async sessionUpdate(params: SessionUpdateParams) {
  121. sessionUpdates.push(params)
  122. const update = params.update
  123. const type = update?.sessionUpdate ?? "unknown"
  124. record(params.sessionId, type)
  125. if (update?.sessionUpdate === "agent_message_chunk") {
  126. const content = update.content
  127. if (content?.type !== "text") return
  128. if (typeof content.text !== "string") return
  129. chunks.set(params.sessionId, (chunks.get(params.sessionId) ?? "") + content.text)
  130. }
  131. },
  132. async requestPermission(_params: RequestPermissionParams): Promise<RequestPermissionResult> {
  133. return { outcome: { outcome: "selected", optionId: "once" } } as RequestPermissionResult
  134. },
  135. } as unknown as AgentSideConnection
  136. const { controller, stream } = createEventStream()
  137. const calls = {
  138. eventSubscribe: 0,
  139. sessionCreate: 0,
  140. }
  141. const sdk = {
  142. global: {
  143. event: async (opts?: { signal?: AbortSignal }) => {
  144. calls.eventSubscribe++
  145. return { stream: stream(opts?.signal) }
  146. },
  147. },
  148. session: {
  149. create: async (_params?: any) => {
  150. calls.sessionCreate++
  151. return {
  152. data: {
  153. id: `ses_${calls.sessionCreate}`,
  154. time: { created: new Date().toISOString() },
  155. },
  156. }
  157. },
  158. get: async (_params?: any) => {
  159. return {
  160. data: {
  161. id: "ses_1",
  162. time: { created: new Date().toISOString() },
  163. },
  164. }
  165. },
  166. messages: async () => {
  167. return { data: [] }
  168. },
  169. message: async (params?: any) => {
  170. // Return a message with parts that can be looked up by partID
  171. return {
  172. data: {
  173. info: {
  174. role: "assistant",
  175. },
  176. parts: [
  177. {
  178. id: params?.messageID ? `${params.messageID}_part` : "part_1",
  179. type: "text",
  180. text: "",
  181. },
  182. ],
  183. },
  184. }
  185. },
  186. },
  187. permission: {
  188. respond: async () => {
  189. return { data: true }
  190. },
  191. },
  192. config: {
  193. providers: async () => {
  194. return {
  195. data: {
  196. providers: [
  197. {
  198. id: "opencode",
  199. name: "opencode",
  200. models: {
  201. "big-pickle": { id: "big-pickle", name: "big-pickle" },
  202. },
  203. },
  204. ],
  205. },
  206. }
  207. },
  208. },
  209. app: {
  210. agents: async () => {
  211. return {
  212. data: [
  213. {
  214. // kilocode_change start - renamed from "build" to "code"
  215. name: "code",
  216. description: "code",
  217. // kilocode_change end
  218. mode: "agent",
  219. },
  220. ],
  221. }
  222. },
  223. },
  224. command: {
  225. list: async () => {
  226. return { data: [] }
  227. },
  228. },
  229. mcp: {
  230. add: async () => {
  231. return { data: true }
  232. },
  233. },
  234. } as any
  235. const agent = new ACP.Agent(connection, {
  236. sdk,
  237. defaultModel: { providerID: "opencode", modelID: "big-pickle" },
  238. } as any)
  239. const stop = () => {
  240. controller.close()
  241. ;(agent as any).eventAbort.abort()
  242. }
  243. return { agent, controller, calls, updates, chunks, sessionUpdates, stop, sdk, connection }
  244. }
  245. describe("acp.agent event subscription", () => {
  246. test("routes message.part.delta by the event sessionID (no cross-session pollution)", async () => {
  247. await using tmp = await tmpdir()
  248. await Instance.provide({
  249. directory: tmp.path,
  250. fn: async () => {
  251. const { agent, controller, updates, stop } = createFakeAgent()
  252. const cwd = "/tmp/opencode-acp-test"
  253. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  254. const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  255. controller.push({
  256. directory: cwd,
  257. payload: {
  258. type: "message.part.delta",
  259. properties: {
  260. sessionID: sessionB,
  261. messageID: "msg_1",
  262. partID: "msg_1_part",
  263. field: "text",
  264. delta: "hello",
  265. },
  266. },
  267. } as any)
  268. await new Promise((r) => setTimeout(r, 10))
  269. expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false)
  270. expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true)
  271. stop()
  272. },
  273. })
  274. })
  275. test("does not emit user_message_chunk for live prompt parts", async () => {
  276. await using tmp = await tmpdir()
  277. await Instance.provide({
  278. directory: tmp.path,
  279. fn: async () => {
  280. const { agent, controller, sessionUpdates, stop } = createFakeAgent()
  281. const cwd = "/tmp/opencode-acp-test"
  282. const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  283. controller.push({
  284. directory: cwd,
  285. payload: {
  286. type: "message.part.updated",
  287. properties: {
  288. sessionID: sessionId,
  289. time: Date.now(),
  290. part: {
  291. id: "part_1",
  292. sessionID: sessionId,
  293. messageID: "msg_user",
  294. type: "text",
  295. text: "hello",
  296. },
  297. },
  298. },
  299. } as any)
  300. await new Promise((r) => setTimeout(r, 20))
  301. expect(
  302. sessionUpdates
  303. .filter((u) => u.sessionId === sessionId)
  304. .some((u) => u.update.sessionUpdate === "user_message_chunk"),
  305. ).toBe(false)
  306. stop()
  307. },
  308. })
  309. })
  310. test("keeps concurrent sessions isolated when message.part.delta events are interleaved", async () => {
  311. await using tmp = await tmpdir()
  312. await Instance.provide({
  313. directory: tmp.path,
  314. fn: async () => {
  315. const { agent, controller, chunks, stop } = createFakeAgent()
  316. const cwd = "/tmp/opencode-acp-test"
  317. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  318. const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  319. const tokenA = ["ALPHA_", "111", "_X"]
  320. const tokenB = ["BETA_", "222", "_Y"]
  321. const push = (sessionId: string, messageID: string, delta: string) => {
  322. controller.push({
  323. directory: cwd,
  324. payload: {
  325. type: "message.part.delta",
  326. properties: {
  327. sessionID: sessionId,
  328. messageID,
  329. partID: `${messageID}_part`,
  330. field: "text",
  331. delta,
  332. },
  333. },
  334. } as any)
  335. }
  336. push(sessionA, "msg_a", tokenA[0])
  337. push(sessionB, "msg_b", tokenB[0])
  338. push(sessionA, "msg_a", tokenA[1])
  339. push(sessionB, "msg_b", tokenB[1])
  340. push(sessionA, "msg_a", tokenA[2])
  341. push(sessionB, "msg_b", tokenB[2])
  342. await new Promise((r) => setTimeout(r, 20))
  343. const a = chunks.get(sessionA) ?? ""
  344. const b = chunks.get(sessionB) ?? ""
  345. expect(a).toContain(tokenA.join(""))
  346. expect(b).toContain(tokenB.join(""))
  347. for (const part of tokenB) expect(a).not.toContain(part)
  348. for (const part of tokenA) expect(b).not.toContain(part)
  349. stop()
  350. },
  351. })
  352. })
  353. test("does not create additional event subscriptions on repeated loadSession()", async () => {
  354. await using tmp = await tmpdir()
  355. await Instance.provide({
  356. directory: tmp.path,
  357. fn: async () => {
  358. const { agent, calls, stop } = createFakeAgent()
  359. const cwd = "/tmp/opencode-acp-test"
  360. const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  361. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  362. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  363. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  364. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  365. expect(calls.eventSubscribe).toBe(1)
  366. stop()
  367. },
  368. })
  369. })
  370. test("permission.asked events are handled and replied", async () => {
  371. await using tmp = await tmpdir()
  372. await Instance.provide({
  373. directory: tmp.path,
  374. fn: async () => {
  375. const permissionReplies: string[] = []
  376. const { agent, controller, stop, sdk } = createFakeAgent()
  377. sdk.permission.reply = async (params: any) => {
  378. permissionReplies.push(params.requestID)
  379. return { data: true }
  380. }
  381. const cwd = "/tmp/opencode-acp-test"
  382. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  383. controller.push({
  384. directory: cwd,
  385. payload: {
  386. type: "permission.asked",
  387. properties: {
  388. id: "perm_1",
  389. sessionID: sessionA,
  390. permission: "bash",
  391. patterns: ["*"],
  392. metadata: {},
  393. always: [],
  394. },
  395. },
  396. } as any)
  397. await new Promise((r) => setTimeout(r, 20))
  398. expect(permissionReplies).toContain("perm_1")
  399. stop()
  400. },
  401. })
  402. })
  403. test("permission prompt on session A does not block message updates for session B", async () => {
  404. await using tmp = await tmpdir()
  405. await Instance.provide({
  406. directory: tmp.path,
  407. fn: async () => {
  408. const permissionReplies: string[] = []
  409. let resolvePermissionA: (() => void) | undefined
  410. const permissionABlocking = new Promise<void>((r) => {
  411. resolvePermissionA = r
  412. })
  413. const { agent, controller, chunks, stop, sdk, connection } = createFakeAgent()
  414. // Make permission request for session A block until we release it
  415. const originalRequestPermission = connection.requestPermission.bind(connection)
  416. let permissionCalls = 0
  417. connection.requestPermission = async (params: RequestPermissionParams) => {
  418. permissionCalls++
  419. if (params.sessionId.endsWith("1")) {
  420. await permissionABlocking
  421. }
  422. return originalRequestPermission(params)
  423. }
  424. sdk.permission.reply = async (params: any) => {
  425. permissionReplies.push(params.requestID)
  426. return { data: true }
  427. }
  428. const cwd = "/tmp/opencode-acp-test"
  429. const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  430. const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  431. // Push permission.asked for session A (will block)
  432. controller.push({
  433. directory: cwd,
  434. payload: {
  435. type: "permission.asked",
  436. properties: {
  437. id: "perm_a",
  438. sessionID: sessionA,
  439. permission: "bash",
  440. patterns: ["*"],
  441. metadata: {},
  442. always: [],
  443. },
  444. },
  445. } as any)
  446. // Give time for permission handling to start
  447. await new Promise((r) => setTimeout(r, 10))
  448. // Push message for session B while A's permission is pending
  449. controller.push({
  450. directory: cwd,
  451. payload: {
  452. type: "message.part.delta",
  453. properties: {
  454. sessionID: sessionB,
  455. messageID: "msg_b",
  456. partID: "msg_b_part",
  457. field: "text",
  458. delta: "session_b_message",
  459. },
  460. },
  461. } as any)
  462. // Wait for session B's message to be processed
  463. await new Promise((r) => setTimeout(r, 20))
  464. // Session B should have received message even though A's permission is still pending
  465. expect(chunks.get(sessionB) ?? "").toContain("session_b_message")
  466. expect(permissionReplies).not.toContain("perm_a")
  467. // Release session A's permission
  468. resolvePermissionA!()
  469. await new Promise((r) => setTimeout(r, 20))
  470. // Now session A's permission should be replied
  471. expect(permissionReplies).toContain("perm_a")
  472. stop()
  473. },
  474. })
  475. })
  476. test("streams running bash output snapshots and de-dupes identical snapshots", async () => {
  477. await using tmp = await tmpdir()
  478. await Instance.provide({
  479. directory: tmp.path,
  480. fn: async () => {
  481. const { agent, controller, sessionUpdates, stop } = createFakeAgent()
  482. const cwd = "/tmp/opencode-acp-test"
  483. const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  484. const input = { command: "echo hello", description: "run command" }
  485. for (const output of ["a", "a", "ab"]) {
  486. controller.push(
  487. toolEvent(sessionId, cwd, {
  488. callID: "call_1",
  489. tool: "bash",
  490. status: "running",
  491. input,
  492. metadata: { output },
  493. }),
  494. )
  495. }
  496. await new Promise((r) => setTimeout(r, 20))
  497. const snapshots = sessionUpdates
  498. .filter((u) => u.sessionId === sessionId)
  499. .filter((u) => isToolCallUpdate(u.update))
  500. .map((u) => inProgressText(u.update))
  501. expect(snapshots).toEqual(["a", undefined, "ab"])
  502. stop()
  503. },
  504. })
  505. })
  506. test("emits synthetic pending before first running update for any tool", async () => {
  507. await using tmp = await tmpdir()
  508. await Instance.provide({
  509. directory: tmp.path,
  510. fn: async () => {
  511. const { agent, controller, sessionUpdates, stop } = createFakeAgent()
  512. const cwd = "/tmp/opencode-acp-test"
  513. const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  514. controller.push(
  515. toolEvent(sessionId, cwd, {
  516. callID: "call_bash",
  517. tool: "bash",
  518. status: "running",
  519. input: { command: "echo hi", description: "run command" },
  520. metadata: { output: "hi\n" },
  521. }),
  522. )
  523. controller.push(
  524. toolEvent(sessionId, cwd, {
  525. callID: "call_read",
  526. tool: "read",
  527. status: "running",
  528. input: { filePath: "/tmp/example.txt" },
  529. }),
  530. )
  531. await new Promise((r) => setTimeout(r, 20))
  532. const types = sessionUpdates
  533. .filter((u) => u.sessionId === sessionId)
  534. .map((u) => u.update.sessionUpdate)
  535. .filter((u) => u === "tool_call" || u === "tool_call_update")
  536. expect(types).toEqual(["tool_call", "tool_call_update", "tool_call", "tool_call_update"])
  537. const pendings = sessionUpdates.filter(
  538. (u) => u.sessionId === sessionId && u.update.sessionUpdate === "tool_call",
  539. )
  540. expect(pendings.every((p) => p.update.sessionUpdate === "tool_call" && p.update.status === "pending")).toBe(
  541. true,
  542. )
  543. stop()
  544. },
  545. })
  546. })
  547. test("does not emit duplicate synthetic pending after replayed running tool", async () => {
  548. await using tmp = await tmpdir()
  549. await Instance.provide({
  550. directory: tmp.path,
  551. fn: async () => {
  552. const { agent, controller, sessionUpdates, stop, sdk } = createFakeAgent()
  553. const cwd = "/tmp/opencode-acp-test"
  554. const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  555. const input = { command: "echo hi", description: "run command" }
  556. sdk.session.messages = async () => ({
  557. data: [
  558. {
  559. info: {
  560. role: "assistant",
  561. sessionID: sessionId,
  562. },
  563. parts: [
  564. {
  565. type: "tool",
  566. callID: "call_1",
  567. tool: "bash",
  568. state: {
  569. status: "running",
  570. input,
  571. metadata: { output: "hi\n" },
  572. time: { start: Date.now() },
  573. },
  574. },
  575. ],
  576. },
  577. ],
  578. })
  579. await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
  580. controller.push(
  581. toolEvent(sessionId, cwd, {
  582. callID: "call_1",
  583. tool: "bash",
  584. status: "running",
  585. input,
  586. metadata: { output: "hi\nthere\n" },
  587. }),
  588. )
  589. await new Promise((r) => setTimeout(r, 20))
  590. const types = sessionUpdates
  591. .filter((u) => u.sessionId === sessionId)
  592. .map((u) => u.update)
  593. .filter((u) => "toolCallId" in u && u.toolCallId === "call_1")
  594. .map((u) => u.sessionUpdate)
  595. .filter((u) => u === "tool_call" || u === "tool_call_update")
  596. expect(types).toEqual(["tool_call", "tool_call_update", "tool_call_update"])
  597. stop()
  598. },
  599. })
  600. })
  601. test("clears bash snapshot marker on pending state", async () => {
  602. await using tmp = await tmpdir()
  603. await Instance.provide({
  604. directory: tmp.path,
  605. fn: async () => {
  606. const { agent, controller, sessionUpdates, stop } = createFakeAgent()
  607. const cwd = "/tmp/opencode-acp-test"
  608. const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
  609. const input = { command: "echo hello", description: "run command" }
  610. controller.push(
  611. toolEvent(sessionId, cwd, {
  612. callID: "call_1",
  613. tool: "bash",
  614. status: "running",
  615. input,
  616. metadata: { output: "a" },
  617. }),
  618. )
  619. controller.push(
  620. toolEvent(sessionId, cwd, {
  621. callID: "call_1",
  622. tool: "bash",
  623. status: "pending",
  624. input,
  625. raw: '{"command":"echo hello"}',
  626. }),
  627. )
  628. controller.push(
  629. toolEvent(sessionId, cwd, {
  630. callID: "call_1",
  631. tool: "bash",
  632. status: "running",
  633. input,
  634. metadata: { output: "a" },
  635. }),
  636. )
  637. await new Promise((r) => setTimeout(r, 20))
  638. const snapshots = sessionUpdates
  639. .filter((u) => u.sessionId === sessionId)
  640. .filter((u) => isToolCallUpdate(u.update))
  641. .map((u) => inProgressText(u.update))
  642. expect(snapshots).toEqual(["a", "a"])
  643. stop()
  644. },
  645. })
  646. })
  647. })