Просмотр исходного кода

acp: replay conversation history in session/load (#5385)

xu0o0 2 месяцев назад
Родитель
Сommit
a9f27371cf
2 измененных файлов с 269 добавлено и 2 удалено
  1. 238 2
      packages/opencode/src/acp/agent.ts
  2. 31 0
      packages/opencode/src/acp/session.ts

+ 238 - 2
packages/opencode/src/acp/agent.ts

@@ -28,7 +28,7 @@ import { Config } from "@/config/config"
 import { Todo } from "@/session/todo"
 import { z } from "zod"
 import { LoadAPIKeyError } from "ai"
-import type { OpencodeClient } from "@opencode-ai/sdk/v2"
+import type { OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2"
 
 export namespace ACP {
   const log = Log.create({ service: "acp-agent" })
@@ -386,7 +386,7 @@ export namespace ACP {
 
         log.info("creating_session", { sessionId, mcpServers: params.mcpServers.length })
 
-        const load = await this.loadSession({
+        const load = await this.loadSessionMode({
           cwd: directory,
           mcpServers: params.mcpServers,
           sessionId,
@@ -412,6 +412,242 @@ export namespace ACP {
     }
 
     async loadSession(params: LoadSessionRequest) {
+      const directory = params.cwd
+      const sessionId = params.sessionId
+
+      try {
+        const model = await defaultModel(this.config, directory)
+
+        // Store ACP session state
+        const state = await this.sessionManager.load(sessionId, params.cwd, params.mcpServers, model)
+
+        log.info("load_session", { sessionId, mcpServers: params.mcpServers.length })
+
+        const mode = await this.loadSessionMode({
+          cwd: directory,
+          mcpServers: params.mcpServers,
+          sessionId,
+        })
+
+        this.setupEventSubscriptions(state)
+
+        // Replay session history
+        const messages = await this.sdk.session
+          .messages(
+            {
+              sessionID: sessionId,
+              directory,
+            },
+            { throwOnError: true },
+          )
+          .then((x) => x.data)
+          .catch((err) => {
+            log.error("unexpected error when fetching message", { error: err })
+            return undefined
+          })
+
+        for (const msg of messages ?? []) {
+          log.debug("replay message", msg)
+          await this.processMessage(msg)
+        }
+
+        return mode
+      } catch (e) {
+        const error = MessageV2.fromError(e, {
+          providerID: this.config.defaultModel?.providerID ?? "unknown",
+        })
+        if (LoadAPIKeyError.isInstance(error)) {
+          throw RequestError.authRequired()
+        }
+        throw e
+      }
+    }
+
+    private async processMessage(message: SessionMessageResponse) {
+      log.debug("process message", message)
+      if (message.info.role !== "assistant" && message.info.role !== "user") return
+      const sessionId = message.info.sessionID
+
+      for (const part of message.parts) {
+        if (part.type === "tool") {
+          switch (part.state.status) {
+            case "pending":
+              await this.connection
+                .sessionUpdate({
+                  sessionId,
+                  update: {
+                    sessionUpdate: "tool_call",
+                    toolCallId: part.callID,
+                    title: part.tool,
+                    kind: toToolKind(part.tool),
+                    status: "pending",
+                    locations: [],
+                    rawInput: {},
+                  },
+                })
+                .catch((err) => {
+                  log.error("failed to send tool pending to ACP", { error: err })
+                })
+              break
+            case "running":
+              await this.connection
+                .sessionUpdate({
+                  sessionId,
+                  update: {
+                    sessionUpdate: "tool_call_update",
+                    toolCallId: part.callID,
+                    status: "in_progress",
+                    locations: toLocations(part.tool, part.state.input),
+                    rawInput: part.state.input,
+                  },
+                })
+                .catch((err) => {
+                  log.error("failed to send tool in_progress to ACP", { error: err })
+                })
+              break
+            case "completed":
+              const kind = toToolKind(part.tool)
+              const content: ToolCallContent[] = [
+                {
+                  type: "content",
+                  content: {
+                    type: "text",
+                    text: part.state.output,
+                  },
+                },
+              ]
+
+              if (kind === "edit") {
+                const input = part.state.input
+                const filePath = typeof input["filePath"] === "string" ? input["filePath"] : ""
+                const oldText = typeof input["oldString"] === "string" ? input["oldString"] : ""
+                const newText =
+                  typeof input["newString"] === "string"
+                    ? input["newString"]
+                    : typeof input["content"] === "string"
+                      ? input["content"]
+                      : ""
+                content.push({
+                  type: "diff",
+                  path: filePath,
+                  oldText,
+                  newText,
+                })
+              }
+
+              if (part.tool === "todowrite") {
+                const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output))
+                if (parsedTodos.success) {
+                  await this.connection
+                    .sessionUpdate({
+                      sessionId,
+                      update: {
+                        sessionUpdate: "plan",
+                        entries: parsedTodos.data.map((todo) => {
+                          const status: PlanEntry["status"] =
+                            todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"])
+                          return {
+                            priority: "medium",
+                            status,
+                            content: todo.content,
+                          }
+                        }),
+                      },
+                    })
+                    .catch((err) => {
+                      log.error("failed to send session update for todo", { error: err })
+                    })
+                } else {
+                  log.error("failed to parse todo output", { error: parsedTodos.error })
+                }
+              }
+
+              await this.connection
+                .sessionUpdate({
+                  sessionId,
+                  update: {
+                    sessionUpdate: "tool_call_update",
+                    toolCallId: part.callID,
+                    status: "completed",
+                    kind,
+                    content,
+                    title: part.state.title,
+                    rawOutput: {
+                      output: part.state.output,
+                      metadata: part.state.metadata,
+                    },
+                  },
+                })
+                .catch((err) => {
+                  log.error("failed to send tool completed to ACP", { error: err })
+                })
+              break
+            case "error":
+              await this.connection
+                .sessionUpdate({
+                  sessionId,
+                  update: {
+                    sessionUpdate: "tool_call_update",
+                    toolCallId: part.callID,
+                    status: "failed",
+                    content: [
+                      {
+                        type: "content",
+                        content: {
+                          type: "text",
+                          text: part.state.error,
+                        },
+                      },
+                    ],
+                    rawOutput: {
+                      error: part.state.error,
+                    },
+                  },
+                })
+                .catch((err) => {
+                  log.error("failed to send tool error to ACP", { error: err })
+                })
+              break
+          }
+        } else if (part.type === "text") {
+          if (part.text) {
+            await this.connection
+              .sessionUpdate({
+                sessionId,
+                update: {
+                  sessionUpdate: message.info.role === "user" ? "user_message_chunk" : "agent_message_chunk",
+                  content: {
+                    type: "text",
+                    text: part.text,
+                  },
+                },
+              })
+              .catch((err) => {
+                log.error("failed to send text to ACP", { error: err })
+              })
+          }
+        } else if (part.type === "reasoning") {
+          if (part.text) {
+            await this.connection
+              .sessionUpdate({
+                sessionId,
+                update: {
+                  sessionUpdate: "agent_thought_chunk",
+                  content: {
+                    type: "text",
+                    text: part.text,
+                  },
+                },
+              })
+              .catch((err) => {
+                log.error("failed to send reasoning to ACP", { error: err })
+              })
+          }
+        }
+      }
+    }
+
+    private async loadSessionMode(params: LoadSessionRequest) {
       const directory = params.cwd
       const model = await defaultModel(this.config, directory)
       const sessionId = params.sessionId

+ 31 - 0
packages/opencode/src/acp/session.ts

@@ -40,6 +40,37 @@ export class ACPSessionManager {
     return state
   }
 
+  async load(
+    sessionId: string,
+    cwd: string,
+    mcpServers: McpServer[],
+    model?: ACPSessionState["model"],
+  ): Promise<ACPSessionState> {
+    const session = await this.sdk.session
+      .get(
+        {
+          sessionID: sessionId,
+          directory: cwd,
+        },
+        { throwOnError: true },
+      )
+      .then((x) => x.data!)
+
+    const resolvedModel = model
+
+    const state: ACPSessionState = {
+      id: sessionId,
+      cwd,
+      mcpServers,
+      createdAt: new Date(session.time.created),
+      model: resolvedModel,
+    }
+    log.info("loading_session", { state })
+
+    this.sessions.set(sessionId, state)
+    return state
+  }
+
   get(sessionId: string): ACPSessionState {
     const session = this.sessions.get(sessionId)
     if (!session) {