Procházet zdrojové kódy

fix(acp): use single global event subscription and route by sessionID (#5628)

Co-authored-by: noamzbr <[email protected]>
Co-authored-by: noam-v <[email protected]>
Noam Bressler před 1 měsícem
rodič
revize
bef1f66281

+ 314 - 268
packages/opencode/src/acp/agent.ts

@@ -20,7 +20,7 @@ import {
 } from "@agentclientprotocol/sdk"
 import { Log } from "../util/log"
 import { ACPSessionManager } from "./session"
-import type { ACPConfig, ACPSessionState } from "./types"
+import type { ACPConfig } from "./types"
 import { Provider } from "../provider/provider"
 import { Agent as AgentModule } from "../agent/agent"
 import { Installation } from "@/installation"
@@ -29,7 +29,7 @@ import { Config } from "@/config/config"
 import { Todo } from "@/session/todo"
 import { z } from "zod"
 import { LoadAPIKeyError } from "ai"
-import type { OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2"
+import type { Event, OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2"
 import { applyPatch } from "diff"
 
 export namespace ACP {
@@ -47,304 +47,354 @@ export namespace ACP {
     private connection: AgentSideConnection
     private config: ACPConfig
     private sdk: OpencodeClient
-    private sessionManager
+    private sessionManager: ACPSessionManager
+    private eventAbort = new AbortController()
+    private eventStarted = false
+    private permissionQueues = new Map<string, Promise<void>>()
+    private permissionOptions: PermissionOption[] = [
+      { optionId: "once", kind: "allow_once", name: "Allow once" },
+      { optionId: "always", kind: "allow_always", name: "Always allow" },
+      { optionId: "reject", kind: "reject_once", name: "Reject" },
+    ]
 
     constructor(connection: AgentSideConnection, config: ACPConfig) {
       this.connection = connection
       this.config = config
       this.sdk = config.sdk
       this.sessionManager = new ACPSessionManager(this.sdk)
+      this.startEventSubscription()
     }
 
-    private setupEventSubscriptions(session: ACPSessionState) {
-      const sessionId = session.id
-      const directory = session.cwd
+    private startEventSubscription() {
+      if (this.eventStarted) return
+      this.eventStarted = true
+      this.runEventSubscription().catch((error) => {
+        if (this.eventAbort.signal.aborted) return
+        log.error("event subscription failed", { error })
+      })
+    }
 
-      const options: PermissionOption[] = [
-        { optionId: "once", kind: "allow_once", name: "Allow once" },
-        { optionId: "always", kind: "allow_always", name: "Always allow" },
-        { optionId: "reject", kind: "reject_once", name: "Reject" },
-      ]
-      this.config.sdk.event.subscribe({ directory }).then(async (events) => {
+    private async runEventSubscription() {
+      while (true) {
+        if (this.eventAbort.signal.aborted) return
+        const events = await this.sdk.global.event({
+          signal: this.eventAbort.signal,
+        })
         for await (const event of events.stream) {
-          switch (event.type) {
-            case "permission.asked":
-              try {
-                const permission = event.properties
-                const res = await this.connection
-                  .requestPermission({
-                    sessionId,
-                    toolCall: {
-                      toolCallId: permission.tool?.callID ?? permission.id,
-                      status: "pending",
-                      title: permission.permission,
-                      rawInput: permission.metadata,
-                      kind: toToolKind(permission.permission),
-                      locations: toLocations(permission.permission, permission.metadata),
-                    },
-                    options,
-                  })
-                  .catch(async (error) => {
-                    log.error("failed to request permission from ACP", {
-                      error,
-                      permissionID: permission.id,
-                      sessionID: permission.sessionID,
-                    })
-                    await this.config.sdk.permission.reply({
-                      requestID: permission.id,
-                      reply: "reject",
-                      directory,
-                    })
-                    return
+          if (this.eventAbort.signal.aborted) return
+          const payload = (event as any)?.payload
+          if (!payload) continue
+          await this.handleEvent(payload as Event).catch((error) => {
+            log.error("failed to handle event", { error, type: payload.type })
+          })
+        }
+      }
+    }
+
+    private async handleEvent(event: Event) {
+      switch (event.type) {
+        case "permission.asked": {
+          const permission = event.properties
+          const session = this.sessionManager.tryGet(permission.sessionID)
+          if (!session) return
+
+          const prev = this.permissionQueues.get(permission.sessionID) ?? Promise.resolve()
+          const next = prev
+            .then(async () => {
+              const directory = session.cwd
+
+              const res = await this.connection
+                .requestPermission({
+                  sessionId: permission.sessionID,
+                  toolCall: {
+                    toolCallId: permission.tool?.callID ?? permission.id,
+                    status: "pending",
+                    title: permission.permission,
+                    rawInput: permission.metadata,
+                    kind: toToolKind(permission.permission),
+                    locations: toLocations(permission.permission, permission.metadata),
+                  },
+                  options: this.permissionOptions,
+                })
+                .catch(async (error) => {
+                  log.error("failed to request permission from ACP", {
+                    error,
+                    permissionID: permission.id,
+                    sessionID: permission.sessionID,
                   })
-                if (!res) return
-                if (res.outcome.outcome !== "selected") {
-                  await this.config.sdk.permission.reply({
+                  await this.sdk.permission.reply({
                     requestID: permission.id,
                     reply: "reject",
                     directory,
                   })
-                  return
-                }
-                if (res.outcome.optionId !== "reject" && permission.permission == "edit") {
-                  const metadata = permission.metadata || {}
-                  const filepath = typeof metadata["filepath"] === "string" ? metadata["filepath"] : ""
-                  const diff = typeof metadata["diff"] === "string" ? metadata["diff"] : ""
-
-                  const content = await Bun.file(filepath).text()
-                  const newContent = getNewContent(content, diff)
-
-                  if (newContent) {
-                    this.connection.writeTextFile({
-                      sessionId: sessionId,
-                      path: filepath,
-                      content: newContent,
-                    })
-                  }
-                }
-                await this.config.sdk.permission.reply({
+                  return undefined
+                })
+
+              if (!res) return
+              if (res.outcome.outcome !== "selected") {
+                await this.sdk.permission.reply({
                   requestID: permission.id,
-                  reply: res.outcome.optionId as "once" | "always" | "reject",
+                  reply: "reject",
                   directory,
                 })
-              } catch (err) {
-                log.error("unexpected error when handling permission", { error: err })
-              } finally {
-                break
+                return
               }
 
-            case "message.part.updated":
-              log.info("message part updated", { event: event.properties })
-              try {
-                const props = event.properties
-                const { part } = props
-
-                const message = await this.config.sdk.session
-                  .message(
-                    {
-                      sessionID: part.sessionID,
-                      messageID: part.messageID,
-                      directory,
+              if (res.outcome.optionId !== "reject" && permission.permission == "edit") {
+                const metadata = permission.metadata || {}
+                const filepath = typeof metadata["filepath"] === "string" ? metadata["filepath"] : ""
+                const diff = typeof metadata["diff"] === "string" ? metadata["diff"] : ""
+
+                const content = await Bun.file(filepath).text()
+                const newContent = getNewContent(content, diff)
+
+                if (newContent) {
+                  this.connection.writeTextFile({
+                    sessionId: session.id,
+                    path: filepath,
+                    content: newContent,
+                  })
+                }
+              }
+
+              await this.sdk.permission.reply({
+                requestID: permission.id,
+                reply: res.outcome.optionId as "once" | "always" | "reject",
+                directory,
+              })
+            })
+            .catch((error) => {
+              log.error("failed to handle permission", { error, permissionID: permission.id })
+            })
+            .finally(() => {
+              if (this.permissionQueues.get(permission.sessionID) === next) {
+                this.permissionQueues.delete(permission.sessionID)
+              }
+            })
+          this.permissionQueues.set(permission.sessionID, next)
+          return
+        }
+
+        case "message.part.updated": {
+          log.info("message part updated", { event: event.properties })
+          const props = event.properties
+          const part = props.part
+          const session = this.sessionManager.tryGet(part.sessionID)
+          if (!session) return
+          const sessionId = session.id
+          const directory = session.cwd
+
+          const message = await this.sdk.session
+            .message(
+              {
+                sessionID: part.sessionID,
+                messageID: part.messageID,
+                directory,
+              },
+              { throwOnError: true },
+            )
+            .then((x) => x.data)
+            .catch((error) => {
+              log.error("unexpected error when fetching message", { error })
+              return undefined
+            })
+
+          if (!message || message.info.role !== "assistant") return
+
+          if (part.type === "tool") {
+            switch (part.state.status) {
+              case "pending":
+                await this.connection
+                  .sessionUpdate({
+                    sessionId,
+                    update: {
+                      sessionUpdate: "tool_call",
+                      toolCallId: part.callID,
+                      title: part.tool,
+                      kind: toToolKind(part.tool),
+                      status: "pending",
+                      locations: [],
+                      rawInput: {},
                     },
-                    { throwOnError: true },
-                  )
-                  .then((x) => x.data)
-                  .catch((err) => {
-                    log.error("unexpected error when fetching message", { error: err })
-                    return undefined
                   })
+                  .catch((error) => {
+                    log.error("failed to send tool pending to ACP", { error })
+                  })
+                return
 
-                if (!message || message.info.role !== "assistant") return
-
-                if (part.type === "tool") {
-                  switch (part.state.status) {
-                    case "pending":
-                      await this.connection
-                        .sessionUpdate({
-                          sessionId,
-                          update: {
-                            sessionUpdate: "tool_call",
-                            toolCallId: part.callID,
-                            title: part.tool,
-                            kind: toToolKind(part.tool),
-                            status: "pending",
-                            locations: [],
-                            rawInput: {},
-                          },
-                        })
-                        .catch((err) => {
-                          log.error("failed to send tool pending to ACP", { error: err })
-                        })
-                      break
-                    case "running":
-                      await this.connection
-                        .sessionUpdate({
-                          sessionId,
-                          update: {
-                            sessionUpdate: "tool_call_update",
-                            toolCallId: part.callID,
-                            status: "in_progress",
-                            kind: toToolKind(part.tool),
-                            title: part.tool,
-                            locations: toLocations(part.tool, part.state.input),
-                            rawInput: part.state.input,
-                          },
-                        })
-                        .catch((err) => {
-                          log.error("failed to send tool in_progress to ACP", { error: err })
-                        })
-                      break
-                    case "completed":
-                      const kind = toToolKind(part.tool)
-                      const content: ToolCallContent[] = [
-                        {
-                          type: "content",
-                          content: {
-                            type: "text",
-                            text: part.state.output,
-                          },
-                        },
-                      ]
-
-                      if (kind === "edit") {
-                        const input = part.state.input
-                        const filePath = typeof input["filePath"] === "string" ? input["filePath"] : ""
-                        const oldText = typeof input["oldString"] === "string" ? input["oldString"] : ""
-                        const newText =
-                          typeof input["newString"] === "string"
-                            ? input["newString"]
-                            : typeof input["content"] === "string"
-                              ? input["content"]
-                              : ""
-                        content.push({
-                          type: "diff",
-                          path: filePath,
-                          oldText,
-                          newText,
-                        })
-                      }
-
-                      if (part.tool === "todowrite") {
-                        const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output))
-                        if (parsedTodos.success) {
-                          await this.connection
-                            .sessionUpdate({
-                              sessionId,
-                              update: {
-                                sessionUpdate: "plan",
-                                entries: parsedTodos.data.map((todo) => {
-                                  const status: PlanEntry["status"] =
-                                    todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"])
-                                  return {
-                                    priority: "medium",
-                                    status,
-                                    content: todo.content,
-                                  }
-                                }),
-                              },
-                            })
-                            .catch((err) => {
-                              log.error("failed to send session update for todo", { error: err })
-                            })
-                        } else {
-                          log.error("failed to parse todo output", { error: parsedTodos.error })
-                        }
-                      }
-
-                      await this.connection
-                        .sessionUpdate({
-                          sessionId,
-                          update: {
-                            sessionUpdate: "tool_call_update",
-                            toolCallId: part.callID,
-                            status: "completed",
-                            kind,
-                            content,
-                            title: part.state.title,
-                            rawInput: part.state.input,
-                            rawOutput: {
-                              output: part.state.output,
-                              metadata: part.state.metadata,
-                            },
-                          },
-                        })
-                        .catch((err) => {
-                          log.error("failed to send tool completed to ACP", { error: err })
-                        })
-                      break
-                    case "error":
-                      await this.connection
-                        .sessionUpdate({
-                          sessionId,
-                          update: {
-                            sessionUpdate: "tool_call_update",
-                            toolCallId: part.callID,
-                            status: "failed",
-                            kind: toToolKind(part.tool),
-                            title: part.tool,
-                            rawInput: part.state.input,
-                            content: [
-                              {
-                                type: "content",
-                                content: {
-                                  type: "text",
-                                  text: part.state.error,
-                                },
-                              },
-                            ],
-                            rawOutput: {
-                              error: part.state.error,
-                            },
-                          },
-                        })
-                        .catch((err) => {
-                          log.error("failed to send tool error to ACP", { error: err })
-                        })
-                      break
-                  }
-                } else if (part.type === "text") {
-                  const delta = props.delta
-                  if (delta && part.synthetic !== true) {
+              case "running":
+                await this.connection
+                  .sessionUpdate({
+                    sessionId,
+                    update: {
+                      sessionUpdate: "tool_call_update",
+                      toolCallId: part.callID,
+                      status: "in_progress",
+                      kind: toToolKind(part.tool),
+                      title: part.tool,
+                      locations: toLocations(part.tool, part.state.input),
+                      rawInput: part.state.input,
+                    },
+                  })
+                  .catch((error) => {
+                    log.error("failed to send tool in_progress to ACP", { error })
+                  })
+                return
+
+              case "completed": {
+                const kind = toToolKind(part.tool)
+                const content: ToolCallContent[] = [
+                  {
+                    type: "content",
+                    content: {
+                      type: "text",
+                      text: part.state.output,
+                    },
+                  },
+                ]
+
+                if (kind === "edit") {
+                  const input = part.state.input
+                  const filePath = typeof input["filePath"] === "string" ? input["filePath"] : ""
+                  const oldText = typeof input["oldString"] === "string" ? input["oldString"] : ""
+                  const newText =
+                    typeof input["newString"] === "string"
+                      ? input["newString"]
+                      : typeof input["content"] === "string"
+                        ? input["content"]
+                        : ""
+                  content.push({
+                    type: "diff",
+                    path: filePath,
+                    oldText,
+                    newText,
+                  })
+                }
+
+                if (part.tool === "todowrite") {
+                  const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output))
+                  if (parsedTodos.success) {
                     await this.connection
                       .sessionUpdate({
                         sessionId,
                         update: {
-                          sessionUpdate: "agent_message_chunk",
-                          content: {
-                            type: "text",
-                            text: delta,
-                          },
+                          sessionUpdate: "plan",
+                          entries: parsedTodos.data.map((todo) => {
+                            const status: PlanEntry["status"] =
+                              todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"])
+                            return {
+                              priority: "medium",
+                              status,
+                              content: todo.content,
+                            }
+                          }),
                         },
                       })
-                      .catch((err) => {
-                        log.error("failed to send text to ACP", { error: err })
+                      .catch((error) => {
+                        log.error("failed to send session update for todo", { error })
                       })
+                  } else {
+                    log.error("failed to parse todo output", { error: parsedTodos.error })
                   }
-                } else if (part.type === "reasoning") {
-                  const delta = props.delta
-                  if (delta) {
-                    await this.connection
-                      .sessionUpdate({
-                        sessionId,
-                        update: {
-                          sessionUpdate: "agent_thought_chunk",
+                }
+
+                await this.connection
+                  .sessionUpdate({
+                    sessionId,
+                    update: {
+                      sessionUpdate: "tool_call_update",
+                      toolCallId: part.callID,
+                      status: "completed",
+                      kind,
+                      content,
+                      title: part.state.title,
+                      rawInput: part.state.input,
+                      rawOutput: {
+                        output: part.state.output,
+                        metadata: part.state.metadata,
+                      },
+                    },
+                  })
+                  .catch((error) => {
+                    log.error("failed to send tool completed to ACP", { error })
+                  })
+                return
+              }
+              case "error":
+                await this.connection
+                  .sessionUpdate({
+                    sessionId,
+                    update: {
+                      sessionUpdate: "tool_call_update",
+                      toolCallId: part.callID,
+                      status: "failed",
+                      kind: toToolKind(part.tool),
+                      title: part.tool,
+                      rawInput: part.state.input,
+                      content: [
+                        {
+                          type: "content",
                           content: {
                             type: "text",
-                            text: delta,
+                            text: part.state.error,
                           },
                         },
-                      })
-                      .catch((err) => {
-                        log.error("failed to send reasoning to ACP", { error: err })
-                      })
-                  }
-                }
-              } finally {
-                break
-              }
+                      ],
+                      rawOutput: {
+                        error: part.state.error,
+                      },
+                    },
+                  })
+                  .catch((error) => {
+                    log.error("failed to send tool error to ACP", { error })
+                  })
+                return
+            }
+          }
+
+          if (part.type === "text") {
+            const delta = props.delta
+            if (delta && part.synthetic !== true) {
+              await this.connection
+                .sessionUpdate({
+                  sessionId,
+                  update: {
+                    sessionUpdate: "agent_message_chunk",
+                    content: {
+                      type: "text",
+                      text: delta,
+                    },
+                  },
+                })
+                .catch((error) => {
+                  log.error("failed to send text to ACP", { error })
+                })
+            }
+            return
           }
+
+          if (part.type === "reasoning") {
+            const delta = props.delta
+            if (delta) {
+              await this.connection
+                .sessionUpdate({
+                  sessionId,
+                  update: {
+                    sessionUpdate: "agent_thought_chunk",
+                    content: {
+                      type: "text",
+                      text: delta,
+                    },
+                  },
+                })
+                .catch((error) => {
+                  log.error("failed to send reasoning to ACP", { error })
+                })
+            }
+          }
+          return
         }
-      })
+      }
     }
 
     async initialize(params: InitializeRequest): Promise<InitializeResponse> {
@@ -409,8 +459,6 @@ export namespace ACP {
           sessionId,
         })
 
-        this.setupEventSubscriptions(state)
-
         return {
           sessionId,
           models: load.models,
@@ -436,7 +484,7 @@ export namespace ACP {
         const model = await defaultModel(this.config, directory)
 
         // Store ACP session state
-        const state = await this.sessionManager.load(sessionId, params.cwd, params.mcpServers, model)
+        await this.sessionManager.load(sessionId, params.cwd, params.mcpServers, model)
 
         log.info("load_session", { sessionId, mcpServers: params.mcpServers.length })
 
@@ -446,8 +494,6 @@ export namespace ACP {
           sessionId,
         })
 
-        this.setupEventSubscriptions(state)
-
         // Replay session history
         const messages = await this.sdk.session
           .messages(

+ 4 - 0
packages/opencode/src/acp/session.ts

@@ -13,6 +13,10 @@ export class ACPSessionManager {
     this.sdk = sdk
   }
 
+  tryGet(sessionId: string): ACPSessionState | undefined {
+    return this.sessions.get(sessionId)
+  }
+
   async create(cwd: string, mcpServers: McpServer[], model?: ACPSessionState["model"]): Promise<ACPSessionState> {
     const session = await this.sdk.session
       .create(

+ 436 - 0
packages/opencode/test/acp/event-subscription.test.ts

@@ -0,0 +1,436 @@
+import { describe, expect, test } from "bun:test"
+import { ACP } from "../../src/acp/agent"
+import type { AgentSideConnection } from "@agentclientprotocol/sdk"
+import type { Event } from "@opencode-ai/sdk/v2"
+import { Instance } from "../../src/project/instance"
+import { tmpdir } from "../fixture/fixture"
+
+type SessionUpdateParams = Parameters<AgentSideConnection["sessionUpdate"]>[0]
+type RequestPermissionParams = Parameters<AgentSideConnection["requestPermission"]>[0]
+type RequestPermissionResult = Awaited<ReturnType<AgentSideConnection["requestPermission"]>>
+
+type GlobalEventEnvelope = {
+  directory?: string
+  payload?: Event
+}
+
+type EventController = {
+  push: (event: GlobalEventEnvelope) => void
+  close: () => void
+}
+
+function createEventStream() {
+  const queue: GlobalEventEnvelope[] = []
+  const waiters: Array<(value: GlobalEventEnvelope | undefined) => void> = []
+  const state = { closed: false }
+
+  const push = (event: GlobalEventEnvelope) => {
+    const waiter = waiters.shift()
+    if (waiter) {
+      waiter(event)
+      return
+    }
+    queue.push(event)
+  }
+
+  const close = () => {
+    state.closed = true
+    for (const waiter of waiters.splice(0)) {
+      waiter(undefined)
+    }
+  }
+
+  const stream = async function* (signal?: AbortSignal) {
+    while (true) {
+      if (signal?.aborted) return
+      const next = queue.shift()
+      if (next) {
+        yield next
+        continue
+      }
+      if (state.closed) return
+      const value = await new Promise<GlobalEventEnvelope | undefined>((resolve) => {
+        waiters.push(resolve)
+        if (!signal) return
+        signal.addEventListener("abort", () => resolve(undefined), { once: true })
+      })
+      if (!value) return
+      yield value
+    }
+  }
+
+  return { controller: { push, close } satisfies EventController, stream }
+}
+
+function createFakeAgent() {
+  const updates = new Map<string, string[]>()
+  const chunks = new Map<string, string>()
+  const record = (sessionId: string, type: string) => {
+    const list = updates.get(sessionId) ?? []
+    list.push(type)
+    updates.set(sessionId, list)
+  }
+
+  const connection = {
+    async sessionUpdate(params: SessionUpdateParams) {
+      const update = params.update
+      const type = update?.sessionUpdate ?? "unknown"
+      record(params.sessionId, type)
+      if (update?.sessionUpdate === "agent_message_chunk") {
+        const content = update.content
+        if (content?.type !== "text") return
+        if (typeof content.text !== "string") return
+        chunks.set(params.sessionId, (chunks.get(params.sessionId) ?? "") + content.text)
+      }
+    },
+    async requestPermission(_params: RequestPermissionParams): Promise<RequestPermissionResult> {
+      return { outcome: { outcome: "selected", optionId: "once" } } as RequestPermissionResult
+    },
+  } as unknown as AgentSideConnection
+
+  const { controller, stream } = createEventStream()
+  const calls = {
+    eventSubscribe: 0,
+    sessionCreate: 0,
+  }
+
+  const sdk = {
+    global: {
+      event: async (opts?: { signal?: AbortSignal }) => {
+        calls.eventSubscribe++
+        return { stream: stream(opts?.signal) }
+      },
+    },
+    session: {
+      create: async (_params?: any) => {
+        calls.sessionCreate++
+        return {
+          data: {
+            id: `ses_${calls.sessionCreate}`,
+            time: { created: new Date().toISOString() },
+          },
+        }
+      },
+      get: async (_params?: any) => {
+        return {
+          data: {
+            id: "ses_1",
+            time: { created: new Date().toISOString() },
+          },
+        }
+      },
+      messages: async () => {
+        return { data: [] }
+      },
+      message: async () => {
+        return {
+          data: {
+            info: {
+              role: "assistant",
+            },
+          },
+        }
+      },
+    },
+    permission: {
+      respond: async () => {
+        return { data: true }
+      },
+    },
+    config: {
+      providers: async () => {
+        return {
+          data: {
+            providers: [
+              {
+                id: "opencode",
+                name: "opencode",
+                models: {
+                  "big-pickle": { id: "big-pickle", name: "big-pickle" },
+                },
+              },
+            ],
+          },
+        }
+      },
+    },
+    app: {
+      agents: async () => {
+        return {
+          data: [
+            {
+              name: "build",
+              description: "build",
+              mode: "agent",
+            },
+          ],
+        }
+      },
+    },
+    command: {
+      list: async () => {
+        return { data: [] }
+      },
+    },
+    mcp: {
+      add: async () => {
+        return { data: true }
+      },
+    },
+  } as any
+
+  const agent = new ACP.Agent(connection, {
+    sdk,
+    defaultModel: { providerID: "opencode", modelID: "big-pickle" },
+  } as any)
+
+  const stop = () => {
+    controller.close()
+    ;(agent as any).eventAbort.abort()
+  }
+
+  return { agent, controller, calls, updates, chunks, stop, sdk, connection }
+}
+
+describe("acp.agent event subscription", () => {
+  test("routes message.part.updated by the event sessionID (no cross-session pollution)", async () => {
+    await using tmp = await tmpdir()
+    await Instance.provide({
+      directory: tmp.path,
+      fn: async () => {
+        const { agent, controller, updates, stop } = createFakeAgent()
+        const cwd = "/tmp/opencode-acp-test"
+
+        const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+        const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+
+        controller.push({
+          directory: cwd,
+          payload: {
+            type: "message.part.updated",
+            properties: {
+              part: {
+                sessionID: sessionB,
+                messageID: "msg_1",
+                type: "text",
+                synthetic: false,
+              },
+              delta: "hello",
+            },
+          },
+        } as any)
+
+        await new Promise((r) => setTimeout(r, 10))
+
+        expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false)
+        expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true)
+
+        stop()
+      },
+    })
+  })
+
+  test("keeps concurrent sessions isolated when message.part.updated events are interleaved", async () => {
+    await using tmp = await tmpdir()
+    await Instance.provide({
+      directory: tmp.path,
+      fn: async () => {
+        const { agent, controller, chunks, stop } = createFakeAgent()
+        const cwd = "/tmp/opencode-acp-test"
+
+        const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+        const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+
+        const tokenA = ["ALPHA_", "111", "_X"]
+        const tokenB = ["BETA_", "222", "_Y"]
+
+        const push = (sessionId: string, messageID: string, delta: string) => {
+          controller.push({
+            directory: cwd,
+            payload: {
+              type: "message.part.updated",
+              properties: {
+                part: {
+                  sessionID: sessionId,
+                  messageID,
+                  type: "text",
+                  synthetic: false,
+                },
+                delta,
+              },
+            },
+          } as any)
+        }
+
+        push(sessionA, "msg_a", tokenA[0])
+        push(sessionB, "msg_b", tokenB[0])
+        push(sessionA, "msg_a", tokenA[1])
+        push(sessionB, "msg_b", tokenB[1])
+        push(sessionA, "msg_a", tokenA[2])
+        push(sessionB, "msg_b", tokenB[2])
+
+        await new Promise((r) => setTimeout(r, 20))
+
+        const a = chunks.get(sessionA) ?? ""
+        const b = chunks.get(sessionB) ?? ""
+
+        expect(a).toContain(tokenA.join(""))
+        expect(b).toContain(tokenB.join(""))
+        for (const part of tokenB) expect(a).not.toContain(part)
+        for (const part of tokenA) expect(b).not.toContain(part)
+
+        stop()
+      },
+    })
+  })
+
+  test("does not create additional event subscriptions on repeated loadSession()", async () => {
+    await using tmp = await tmpdir()
+    await Instance.provide({
+      directory: tmp.path,
+      fn: async () => {
+        const { agent, calls, stop } = createFakeAgent()
+        const cwd = "/tmp/opencode-acp-test"
+
+        const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+
+        await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
+        await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
+        await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
+        await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
+
+        expect(calls.eventSubscribe).toBe(1)
+
+        stop()
+      },
+    })
+  })
+
+  test("permission.asked events are handled and replied", async () => {
+    await using tmp = await tmpdir()
+    await Instance.provide({
+      directory: tmp.path,
+      fn: async () => {
+        const permissionReplies: string[] = []
+        const { agent, controller, stop, sdk } = createFakeAgent()
+        sdk.permission.reply = async (params: any) => {
+          permissionReplies.push(params.requestID)
+          return { data: true }
+        }
+        const cwd = "/tmp/opencode-acp-test"
+
+        const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+
+        controller.push({
+          directory: cwd,
+          payload: {
+            type: "permission.asked",
+            properties: {
+              id: "perm_1",
+              sessionID: sessionA,
+              permission: "bash",
+              patterns: ["*"],
+              metadata: {},
+              always: [],
+            },
+          },
+        } as any)
+
+        await new Promise((r) => setTimeout(r, 20))
+
+        expect(permissionReplies).toContain("perm_1")
+
+        stop()
+      },
+    })
+  })
+
+  test("permission prompt on session A does not block message updates for session B", async () => {
+    await using tmp = await tmpdir()
+    await Instance.provide({
+      directory: tmp.path,
+      fn: async () => {
+        const permissionReplies: string[] = []
+        let resolvePermissionA: (() => void) | undefined
+        const permissionABlocking = new Promise<void>((r) => {
+          resolvePermissionA = r
+        })
+
+        const { agent, controller, chunks, stop, sdk, connection } = createFakeAgent()
+
+        // Make permission request for session A block until we release it
+        const originalRequestPermission = connection.requestPermission.bind(connection)
+        let permissionCalls = 0
+        connection.requestPermission = async (params: RequestPermissionParams) => {
+          permissionCalls++
+          if (params.sessionId.endsWith("1")) {
+            await permissionABlocking
+          }
+          return originalRequestPermission(params)
+        }
+
+        sdk.permission.reply = async (params: any) => {
+          permissionReplies.push(params.requestID)
+          return { data: true }
+        }
+
+        const cwd = "/tmp/opencode-acp-test"
+
+        const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+        const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+
+        // Push permission.asked for session A (will block)
+        controller.push({
+          directory: cwd,
+          payload: {
+            type: "permission.asked",
+            properties: {
+              id: "perm_a",
+              sessionID: sessionA,
+              permission: "bash",
+              patterns: ["*"],
+              metadata: {},
+              always: [],
+            },
+          },
+        } as any)
+
+        // Give time for permission handling to start
+        await new Promise((r) => setTimeout(r, 10))
+
+        // Push message for session B while A's permission is pending
+        controller.push({
+          directory: cwd,
+          payload: {
+            type: "message.part.updated",
+            properties: {
+              part: {
+                sessionID: sessionB,
+                messageID: "msg_b",
+                type: "text",
+                synthetic: false,
+              },
+              delta: "session_b_message",
+            },
+          },
+        } as any)
+
+        // Wait for session B's message to be processed
+        await new Promise((r) => setTimeout(r, 20))
+
+        // Session B should have received message even though A's permission is still pending
+        expect(chunks.get(sessionB) ?? "").toContain("session_b_message")
+        expect(permissionReplies).not.toContain("perm_a")
+
+        // Release session A's permission
+        resolvePermissionA!()
+        await new Promise((r) => setTimeout(r, 20))
+
+        // Now session A's permission should be replied
+        expect(permissionReplies).toContain("perm_a")
+
+        stop()
+      },
+    })
+  })
+})