Просмотр исходного кода

fix(core): add historical sync on workspace connect (#23121)

James Long 2 дней назад
Родитель
Сommit
a8c78fc005

+ 27 - 1
packages/opencode/src/cli/cmd/tui/context/sdk.tsx

@@ -2,6 +2,7 @@ import { createOpencodeClient } from "@opencode-ai/sdk/v2"
 import type { GlobalEvent } from "@opencode-ai/sdk/v2"
 import { createSimpleContext } from "./helper"
 import { createGlobalEmitter } from "@solid-primitives/event-bus"
+import { Flag } from "@/flag/flag"
 import { batch, onCleanup, onMount } from "solid-js"
 
 export type EventSource = {
@@ -39,6 +40,8 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
     let queue: GlobalEvent[] = []
     let timer: Timer | undefined
     let last = 0
+    const retryDelay = 1000
+    const maxRetryDelay = 30000
 
     const flush = () => {
       if (queue.length === 0) return
@@ -73,9 +76,20 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
       const ctrl = new AbortController()
       sse = ctrl
       ;(async () => {
+        let attempt = 0
         while (true) {
           if (abort.signal.aborted || ctrl.signal.aborted) break
-          const events = await sdk.global.event({ signal: ctrl.signal })
+
+          const events = await sdk.global.event({
+            signal: ctrl.signal,
+            sseMaxRetryAttempts: 0,
+          })
+
+          if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
+            // Start syncing workspaces, it's important to do this after
+            // we've started listening to events
+            await sdk.sync.start().catch(() => {})
+          }
 
           for await (const event of events.stream) {
             if (ctrl.signal.aborted) break
@@ -84,6 +98,12 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
 
           if (timer) clearTimeout(timer)
           if (queue.length > 0) flush()
+          attempt += 1
+          if (abort.signal.aborted || ctrl.signal.aborted) break
+
+          // Exponential backoff
+          const backoff = Math.min(retryDelay * 2 ** (attempt - 1), maxRetryDelay)
+          await new Promise((resolve) => setTimeout(resolve, backoff))
         }
       })().catch(() => {})
     }
@@ -92,6 +112,12 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
       if (props.events) {
         const unsub = await props.events.subscribe(handleEvent)
         onCleanup(unsub)
+
+        if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
+          // Start syncing workspaces, it's important to do this after
+          // we've started listening to events
+          await sdk.sync.start().catch(() => {})
+        }
       } else {
         startSSE()
       }

+ 85 - 12
packages/opencode/src/control-plane/workspace.ts

@@ -7,7 +7,7 @@ import { BusEvent } from "@/bus/bus-event"
 import { GlobalBus } from "@/bus/global"
 import { Auth } from "@/auth"
 import { SyncEvent } from "@/sync"
-import { EventTable } from "@/sync/event.sql"
+import { EventSequenceTable, EventTable } from "@/sync/event.sql"
 import { Flag } from "@/flag/flag"
 import { Log } from "@/util"
 import { Filesystem } from "@/util"
@@ -23,8 +23,8 @@ import { SessionTable } from "@/session/session.sql"
 import { SessionID } from "@/session/schema"
 import { errorData } from "@/util/error"
 import { AppRuntime } from "@/effect/app-runtime"
-import { EventSequenceTable } from "@/sync/event.sql"
 import { waitEvent } from "./util"
+import { WorkspaceContext } from "./workspace-context"
 
 export const Info = WorkspaceInfo.meta({
   ref: "Workspace",
@@ -297,22 +297,13 @@ export function list(project: Project.Info) {
     db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
   )
   const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
-
-  for (const space of spaces) startSync(space)
   return spaces
 }
 
-function lookup(id: WorkspaceID) {
+export const get = fn(WorkspaceID.zod, async (id) => {
   const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
   if (!row) return
   return fromRow(row)
-}
-
-export const get = fn(WorkspaceID.zod, async (id) => {
-  const space = lookup(id)
-  if (!space) return
-  startSync(space)
-  return space
 })
 
 export const remove = fn(WorkspaceID.zod, async (id) => {
@@ -437,6 +428,70 @@ async function connectSSE(url: URL | string, headers: HeadersInit | undefined, s
   return res.body
 }
 
+async function syncHistory(space: Info, url: URL | string, headers: HeadersInit | undefined, signal: AbortSignal) {
+  const sessionIDs = Database.use((db) =>
+    db
+      .select({ id: SessionTable.id })
+      .from(SessionTable)
+      .where(eq(SessionTable.workspace_id, space.id))
+      .all()
+      .map((row) => row.id),
+  )
+  const state = sessionIDs.length
+    ? Object.fromEntries(
+        Database.use((db) =>
+          db.select().from(EventSequenceTable).where(inArray(EventSequenceTable.aggregate_id, sessionIDs)).all(),
+        ).map((row) => [row.aggregate_id, row.seq]),
+      )
+    : {}
+
+  log.info("syncing workspace history", {
+    workspaceID: space.id,
+    sessions: sessionIDs.length,
+    known: Object.keys(state).length,
+  })
+
+  const requestHeaders = new Headers(headers)
+  requestHeaders.set("content-type", "application/json")
+
+  const res = await fetch(route(url, "/sync/history"), {
+    method: "POST",
+    headers: requestHeaders,
+    body: JSON.stringify(state),
+    signal,
+  })
+
+  if (!res.ok) {
+    const body = await res.text()
+    throw new Error(`Workspace history HTTP failure: ${res.status} ${body}`)
+  }
+
+  const events = await res.json()
+
+  return WorkspaceContext.provide({
+    workspaceID: space.id,
+    fn: () => {
+      for (const event of events) {
+        SyncEvent.replay(
+          {
+            id: event.id,
+            aggregateID: event.aggregate_id,
+            seq: event.seq,
+            type: event.type,
+            data: event.data,
+          },
+          { publish: true },
+        )
+      }
+    },
+  })
+
+  log.info("workspace history synced", {
+    workspaceID: space.id,
+    events: events.length,
+  })
+}
+
 async function syncWorkspaceLoop(space: Info, signal: AbortSignal) {
   const adaptor = await getAdaptor(space.projectID, space.type)
   const target = await adaptor.target(space)
@@ -452,7 +507,9 @@ async function syncWorkspaceLoop(space: Info, signal: AbortSignal) {
     let stream
     try {
       stream = await connectSSE(target.url, target.headers, signal)
+      await syncHistory(space, target.url, target.headers, signal)
     } catch (err) {
+      stream = null
       setStatus(space.id, "error")
       log.info("failed to connect to global sync", {
         workspace: space.name,
@@ -469,6 +526,7 @@ async function syncWorkspaceLoop(space: Info, signal: AbortSignal) {
       await parseSSE(stream, signal, (evt: any) => {
         try {
           if (!("payload" in evt)) return
+          if (evt.payload.type === "server.heartbeat") return
 
           if (evt.payload.type === "sync") {
             SyncEvent.replay(evt.payload.syncEvent as SyncEvent.SerializedEvent)
@@ -536,4 +594,19 @@ function stopSync(id: WorkspaceID) {
   connections.delete(id)
 }
 
+export function startWorkspaceSyncing(projectID: ProjectID) {
+  const spaces = Database.use((db) =>
+    db
+      .select({ workspace: WorkspaceTable })
+      .from(WorkspaceTable)
+      .innerJoin(SessionTable, eq(SessionTable.workspace_id, WorkspaceTable.id))
+      .where(eq(WorkspaceTable.project_id, projectID))
+      .all(),
+  )
+
+  for (const row of new Map(spaces.map((row) => [row.workspace.id, row.workspace])).values()) {
+    void startSync(fromRow(row))
+  }
+}
+
 export * as Workspace from "./workspace"

+ 0 - 7
packages/opencode/src/server/proxy.ts

@@ -130,13 +130,6 @@ export async function http(url: string | URL, extra: HeadersInit | undefined, re
     const done = sync ? Fence.wait(workspaceID, sync, req.signal) : Promise.resolve()
 
     return done.then(async () => {
-      console.log("proxy http response", {
-        method: req.method,
-        request: req.url,
-        url: String(url),
-        status: res.status,
-        statusText: res.statusText,
-      })
       return new Response(res.body, {
         status: res.status,
         statusText: res.statusText,

+ 25 - 1
packages/opencode/src/server/routes/instance/sync.ts

@@ -6,6 +6,8 @@ import { Database, asc, and, not, or, lte, eq } from "@/storage"
 import { EventTable } from "@/sync/event.sql"
 import { lazy } from "@/util/lazy"
 import { Log } from "@/util"
+import { startWorkspaceSyncing } from "@/control-plane/workspace"
+import { Instance } from "@/project/instance"
 import { errors } from "../../error"
 
 const ReplayEvent = z.object({
@@ -20,6 +22,28 @@ const log = Log.create({ service: "server.sync" })
 
 export const SyncRoutes = lazy(() =>
   new Hono()
+    .post(
+      "/start",
+      describeRoute({
+        summary: "Start workspace sync",
+        description: "Start sync loops for workspaces in the current project that have active sessions.",
+        operationId: "sync.start",
+        responses: {
+          200: {
+            description: "Workspace sync started",
+            content: {
+              "application/json": {
+                schema: resolver(z.boolean()),
+              },
+            },
+          },
+        },
+      }),
+      async (c) => {
+        startWorkspaceSyncing(Instance.project.id)
+        return c.json(true)
+      },
+    )
     .post(
       "/replay",
       describeRoute({
@@ -75,7 +99,7 @@ export const SyncRoutes = lazy(() =>
         })
       },
     )
-    .get(
+    .post(
       "/history",
       describeRoute({
         summary: "List sync events",

+ 4 - 1
packages/opencode/test/workspace/workspace-restore.test.ts

@@ -141,9 +141,12 @@ describe("Workspace.sessionRestore", () => {
       Object.assign(
         async (input: URL | RequestInfo, init?: BunFetchRequestInit | RequestInit) => {
           const url = new URL(typeof input === "string" || input instanceof URL ? input : input.url)
-          if (url.pathname !== "/base/sync/replay") {
+          if (url.pathname === "/base/global/event") {
             return eventStreamResponse()
           }
+          if (url.pathname === "/base/sync/history") {
+            return Response.json([])
+          }
           const body = JSON.parse(String(init?.body))
           posts.push({
             path: url.pathname,

+ 32 - 1
packages/sdk/js/src/v2/gen/sdk.gen.ts

@@ -163,6 +163,7 @@ import type {
   SyncHistoryListResponses,
   SyncReplayErrors,
   SyncReplayResponses,
+  SyncStartResponses,
   TextPartInput,
   ToolIdsErrors,
   ToolIdsResponses,
@@ -3038,7 +3039,7 @@ export class History extends HeyApiClient {
         },
       ],
     )
-    return (options?.client ?? this.client).get<SyncHistoryListResponses, SyncHistoryListErrors, ThrowOnError>({
+    return (options?.client ?? this.client).post<SyncHistoryListResponses, SyncHistoryListErrors, ThrowOnError>({
       url: "/sync/history",
       ...options,
       ...params,
@@ -3052,6 +3053,36 @@ export class History extends HeyApiClient {
 }
 
 export class Sync extends HeyApiClient {
+  /**
+   * Start workspace sync
+   *
+   * Start sync loops for workspaces in the current project that have active sessions.
+   */
+  public start<ThrowOnError extends boolean = false>(
+    parameters?: {
+      directory?: string
+      workspace?: string
+    },
+    options?: Options<never, ThrowOnError>,
+  ) {
+    const params = buildClientParams(
+      [parameters],
+      [
+        {
+          args: [
+            { in: "query", key: "directory" },
+            { in: "query", key: "workspace" },
+          ],
+        },
+      ],
+    )
+    return (options?.client ?? this.client).post<SyncStartResponses, unknown, ThrowOnError>({
+      url: "/sync/start",
+      ...options,
+      ...params,
+    })
+  }
+
   /**
    * Replay sync events
    *

+ 19 - 0
packages/sdk/js/src/v2/gen/types.gen.ts

@@ -4502,6 +4502,25 @@ export type ProviderOauthCallbackResponses = {
 
 export type ProviderOauthCallbackResponse = ProviderOauthCallbackResponses[keyof ProviderOauthCallbackResponses]
 
+export type SyncStartData = {
+  body?: never
+  path?: never
+  query?: {
+    directory?: string
+    workspace?: string
+  }
+  url: "/sync/start"
+}
+
+export type SyncStartResponses = {
+  /**
+   * Workspace sync started
+   */
+  200: boolean
+}
+
+export type SyncStartResponse = SyncStartResponses[keyof SyncStartResponses]
+
 export type SyncReplayData = {
   body?: {
     directory: string

+ 42 - 1
packages/sdk/openapi.json

@@ -5224,6 +5224,47 @@
         ]
       }
     },
+    "/sync/start": {
+      "post": {
+        "operationId": "sync.start",
+        "parameters": [
+          {
+            "in": "query",
+            "name": "directory",
+            "schema": {
+              "type": "string"
+            }
+          },
+          {
+            "in": "query",
+            "name": "workspace",
+            "schema": {
+              "type": "string"
+            }
+          }
+        ],
+        "summary": "Start workspace sync",
+        "description": "Start sync loops for workspaces in the current project that have active sessions.",
+        "responses": {
+          "200": {
+            "description": "Workspace sync started",
+            "content": {
+              "application/json": {
+                "schema": {
+                  "type": "boolean"
+                }
+              }
+            }
+          }
+        },
+        "x-codeSamples": [
+          {
+            "lang": "js",
+            "source": "import { createOpencodeClient } from \"@opencode-ai/sdk\n\nconst client = createOpencodeClient()\nawait client.sync.start({\n  ...\n})"
+          }
+        ]
+      }
+    },
     "/sync/replay": {
       "post": {
         "operationId": "sync.replay",
@@ -5328,7 +5369,7 @@
       }
     },
     "/sync/history": {
-      "get": {
+      "post": {
         "operationId": "sync.history.list",
         "parameters": [
           {