Jelajahi Sumber

feat(core): add fence to make all methods strongly consistent when syncing

James Long 4 hari lalu
induk
melakukan
7242893f80

+ 37 - 0
packages/opencode/src/control-plane/util.ts

@@ -0,0 +1,37 @@
+import { GlobalBus } from "@/bus/global"
+
+export function waitEvent(input: { timeout: number; signal?: AbortSignal; fn: (event: GlobalEvent) => boolean }) {
+  if (input.signal?.aborted) return Promise.reject(input.signal.reason ?? new Error("Request aborted"))
+
+  return new Promise<void>((resolve, reject) => {
+    const abort = () => {
+      cleanup()
+      reject(input.signal?.reason ?? new Error("Request aborted"))
+    }
+
+    const handler = (event: GlobalEvent) => {
+      try {
+        if (!input.fn(event)) return
+        cleanup()
+        resolve()
+      } catch (error) {
+        cleanup()
+        reject(error)
+      }
+    }
+
+    const cleanup = () => {
+      clearTimeout(timeout)
+      GlobalBus.off("event", handler)
+      input.signal?.removeEventListener("abort", abort)
+    }
+
+    const timeout = setTimeout(() => {
+      cleanup()
+      reject(new Error("Timed out waiting for global event"))
+    }, input.timeout)
+
+    GlobalBus.on("event", handler)
+    input.signal?.addEventListener("abort", abort, { once: true })
+  })
+}

+ 84 - 8
packages/opencode/src/control-plane/workspace.ts

@@ -1,7 +1,7 @@
 import z from "zod"
 import { setTimeout as sleep } from "node:timers/promises"
 import { fn } from "@/util/fn"
-import { Database, asc, eq } from "@/storage/db"
+import { Database, asc, eq, inArray } from "@/storage/db"
 import { Project } from "@/project/project"
 import { BusEvent } from "@/bus/bus-event"
 import { GlobalBus } from "@/bus/global"
@@ -22,6 +22,8 @@ import { SessionTable } from "@/session/session.sql"
 import { SessionID } from "@/session/schema"
 import { errorData } from "@/util/error"
 import { AppRuntime } from "@/effect/app-runtime"
+import { EventSequenceTable } from "@/sync/event.sql"
+import { waitEvent } from "./util"
 
 export namespace Workspace {
   export const Info = WorkspaceInfo.meta({
@@ -114,6 +116,17 @@ export namespace Workspace {
 
     startSync(info)
 
+    await waitEvent({
+      timeout: TIMEOUT,
+      fn(event) {
+        if (event.workspace === info.id && event.payload.type === Event.Status.type) {
+          const { status } = event.payload.properties
+          return status === "error" || status === "connected"
+        }
+        return false
+      },
+    })
+
     return info
   })
 
@@ -285,10 +298,15 @@ export namespace Workspace {
     return spaces
   }
 
-  export const get = fn(WorkspaceID.zod, async (id) => {
+  function lookup(id: WorkspaceID) {
     const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
     if (!row) return
-    const space = fromRow(row)
+    return fromRow(row)
+  }
+
+  export const get = fn(WorkspaceID.zod, async (id) => {
+    const space = lookup(id)
+    if (!space) return
     startSync(space)
     return space
   })
@@ -320,12 +338,18 @@ export namespace Workspace {
 
   const connections = new Map<WorkspaceID, ConnectionStatus>()
   const aborts = new Map<WorkspaceID, AbortController>()
+  const TIMEOUT = 5000
 
   function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
     const prev = connections.get(id)
     if (prev?.status === status && prev?.error === error) return
     const next = { workspaceID: id, status, error }
     connections.set(id, next)
+
+    if (status === "error") {
+      aborts.delete(id)
+    }
+
     GlobalBus.emit("event", {
       directory: "global",
       workspace: id,
@@ -340,6 +364,52 @@ export namespace Workspace {
     return [...connections.values()]
   }
 
+  function synced(state: Record<string, number>) {
+    const ids = Object.keys(state)
+    if (ids.length === 0) return true
+
+    const done = Object.fromEntries(
+      Database.use((db) =>
+        db
+          .select({
+            id: EventSequenceTable.aggregate_id,
+            seq: EventSequenceTable.seq,
+          })
+          .from(EventSequenceTable)
+          .where(inArray(EventSequenceTable.aggregate_id, ids))
+          .all(),
+      ).map((row) => [row.id, row.seq]),
+    ) as Record<string, number>
+
+    return ids.every((id) => {
+      return (done[id] ?? -1) >= state[id]
+    })
+  }
+
+  export async function isSyncing(workspaceID: WorkspaceID) {
+    return aborts.has(workspaceID)
+  }
+
+  export async function waitForSync(workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) {
+    if (synced(state)) return
+
+    try {
+      await waitEvent({
+        timeout: TIMEOUT,
+        signal,
+        fn(event) {
+          if (event.workspace !== workspaceID && event.payload.type !== "sync") {
+            return false
+          }
+          return synced(state)
+        },
+      })
+    } catch (error) {
+      if (signal?.aborted) throw signal.reason ?? new Error("Request aborted")
+      throw new Error(`Timed out waiting for sync fence: ${JSON.stringify(state)}`)
+    }
+  }
+
   const log = Log.create({ service: "workspace-sync" })
 
   function route(url: string | URL, path: string) {
@@ -353,6 +423,7 @@ export namespace Workspace {
   async function syncWorkspace(space: Info, signal: AbortSignal) {
     while (!signal.aborted) {
       log.info("connecting to global sync", { workspace: space.name })
+      setStatus(space.id, "connecting")
 
       const adaptor = await getAdaptor(space.projectID, space.type)
       const target = await adaptor.target(space)
@@ -364,7 +435,7 @@ export namespace Workspace {
         headers: target.headers,
         signal,
       }).catch((err: unknown) => {
-        setStatus(space.id, "error")
+        setStatus(space.id, "error", err instanceof Error ? err.message : String(err))
 
         log.info("failed to connect to global sync", {
           workspace: space.name,
@@ -374,8 +445,9 @@ export namespace Workspace {
       })
 
       if (!res || !res.ok || !res.body) {
-        log.info("failed to connect to global sync", { workspace: space.name })
-        setStatus(space.id, "error")
+        const error = !res ? "No response from global sync" : `Global sync HTTP ${res.status}`
+        log.info("failed to connect to global sync", { workspace: space.name, error })
+        setStatus(space.id, "error", error)
         await sleep(1000)
         continue
       }
@@ -424,12 +496,16 @@ export namespace Workspace {
       return
     }
 
-    if (aborts.has(space.id)) return
+    if (aborts.has(space.id)) return true
+
+    setStatus(space.id, "disconnected")
+
     const abort = new AbortController()
     aborts.set(space.id, abort)
-    setStatus(space.id, "disconnected")
 
     void syncWorkspace(space, abort.signal).catch((error) => {
+      aborts.delete(space.id)
+
       setStatus(space.id, "error", String(error))
       log.warn("workspace listener failed", {
         workspaceID: space.id,

+ 3 - 1
packages/opencode/src/flag/flag.ts

@@ -74,7 +74,6 @@ export namespace Flag {
     Config.withDefault(false),
   )
   export const OPENCODE_EXPERIMENTAL_PLAN_MODE = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_PLAN_MODE")
-  export const OPENCODE_EXPERIMENTAL_WORKSPACES = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES")
   export const OPENCODE_EXPERIMENTAL_MARKDOWN = !falsy("OPENCODE_EXPERIMENTAL_MARKDOWN")
   export const OPENCODE_MODELS_URL = process.env["OPENCODE_MODELS_URL"]
   export const OPENCODE_MODELS_PATH = process.env["OPENCODE_MODELS_PATH"]
@@ -84,6 +83,9 @@ export namespace Flag {
   export const OPENCODE_SKIP_MIGRATIONS = truthy("OPENCODE_SKIP_MIGRATIONS")
   export const OPENCODE_STRICT_CONFIG_DEPS = truthy("OPENCODE_STRICT_CONFIG_DEPS")
 
+  export const OPENCODE_WORKSPACE_ID = process.env["OPENCODE_WORKSPACE_ID"]
+  export const OPENCODE_EXPERIMENTAL_WORKSPACES = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES")
+
   function number(key: string) {
     const value = process.env[key]
     if (!value) return undefined

+ 84 - 0
packages/opencode/src/server/fence.ts

@@ -0,0 +1,84 @@
+import type { MiddlewareHandler } from "hono"
+import { Database, inArray } from "@/storage/db"
+import { EventSequenceTable } from "@/sync/event.sql"
+import { Workspace } from "@/control-plane/workspace"
+import type { WorkspaceID } from "@/control-plane/schema"
+import { Log } from "@/util/log"
+
+const HEADER = "x-opencode-sync"
+type State = Record<string, number>
+const log = Log.create({ service: "fence" })
+
+export function load(ids?: string[]) {
+  const rows = Database.use((db) => {
+    if (!ids?.length) {
+      return db.select().from(EventSequenceTable).all()
+    }
+
+    return db.select().from(EventSequenceTable).where(inArray(EventSequenceTable.aggregate_id, ids)).all()
+  })
+
+  return Object.fromEntries(rows.map((row) => [row.aggregate_id, row.seq])) as State
+}
+
+export function diff(prev: State, next: State) {
+  const ids = new Set([...Object.keys(prev), ...Object.keys(next)])
+  return Object.fromEntries(
+    [...ids]
+      .map((id) => [id, next[id] ?? -1] as const)
+      .filter(([id, seq]) => {
+        return (prev[id] ?? -1) !== seq
+      }),
+  ) as State
+}
+
+export function parse(headers: Headers) {
+  const raw = headers.get(HEADER)
+  if (!raw) return
+
+  let data
+
+  try {
+    data = JSON.parse(raw)
+  } catch (err) {
+    return
+  }
+
+  if (!data || typeof data !== "object") return
+
+  return Object.fromEntries(
+    Object.entries(data).filter(([id, seq]) => {
+      return typeof id === "string" && Number.isInteger(seq)
+    }),
+  ) as State
+}
+
+export async function wait(workspaceID: WorkspaceID, state: State, signal?: AbortSignal) {
+  log.info("waiting for state", {
+    workspaceID,
+    state,
+  })
+  await Workspace.waitForSync(workspaceID, state, signal)
+  log.info("state fully synced", {
+    workspaceID,
+    state,
+  })
+}
+
+export const FenceMiddleware: MiddlewareHandler = async (c, next) => {
+  if (c.req.method === "GET" || c.req.method === "HEAD" || c.req.method === "OPTIONS") return next()
+
+  const prev = load()
+  await next()
+
+  const current = diff(prev, load())
+
+  console.log("FENCE DIFF", current, load())
+
+  if (Object.keys(current).length > 0) {
+    log.info("header", {
+      diff: current,
+    })
+    c.res.headers.set(HEADER, JSON.stringify(current))
+  }
+}

+ 5 - 4
packages/opencode/src/server/instance/middleware.ts

@@ -7,6 +7,7 @@ import { ServerProxy } from "../proxy"
 import { Filesystem } from "@/util/filesystem"
 import { Instance } from "@/project/instance"
 import { InstanceBootstrap } from "@/project/bootstrap"
+import { Flag } from "@/flag/flag"
 import { Session } from "@/session"
 import { SessionID } from "@/session/schema"
 import { WorkspaceContext } from "@/control-plane/workspace-context"
@@ -68,10 +69,10 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
     const sessionWorkspaceID = await getSessionWorkspace(url)
     const workspaceID = sessionWorkspaceID || url.searchParams.get("workspace")
 
-    if (!workspaceID || url.pathname.startsWith("/console") || OPENCODE_WORKSPACE) {
-      if (OPENCODE_WORKSPACE) {
+    if (!workspaceID || url.pathname.startsWith("/console") || Flag.OPENCODE_WORKSPACE_ID) {
+      if (Flag.OPENCODE_WORKSPACE_ID) {
         return WorkspaceContext.provide({
-          workspaceID: WorkspaceID.make(OPENCODE_WORKSPACE),
+          workspaceID: WorkspaceID.make(Flag.OPENCODE_WORKSPACE_ID),
           async fn() {
             return Instance.provide({
               directory,
@@ -148,6 +149,6 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
     headers.delete("x-opencode-workspace")
 
     const req = new Request(c.req.raw, { headers })
-    return ServerProxy.http(proxyURL, target.headers, req)
+    return ServerProxy.http(proxyURL, target.headers, req, workspace.id)
   }
 }

+ 35 - 12
packages/opencode/src/server/proxy.ts

@@ -1,6 +1,9 @@
 import { Hono } from "hono"
 import type { UpgradeWebSocket } from "hono/ws"
 import { Log } from "@/util/log"
+import * as Fence from "./fence"
+import type { WorkspaceID } from "@/control-plane/schema"
+import { Workspace } from "@/control-plane/workspace"
 
 const hop = new Set([
   "connection",
@@ -101,12 +104,27 @@ const app = (upgrade: UpgradeWebSocket) =>
 export namespace ServerProxy {
   const log = Log.Default.clone().tag("service", "server-proxy")
 
-  export function http(url: string | URL, extra: HeadersInit | undefined, req: Request) {
+  export async function http(
+    url: string | URL,
+    extra: HeadersInit | undefined,
+    req: Request,
+    workspaceID: WorkspaceID,
+  ) {
     console.log("proxy http request", {
       method: req.method,
       request: req.url,
       url: String(url),
     })
+
+    if (!Workspace.isSyncing(workspaceID)) {
+      return new Response(`broken sync connection for workspace: ${workspaceID}`, {
+        status: 503,
+        headers: {
+          "content-type": "text/plain; charset=utf-8",
+        },
+      })
+    }
+
     return fetch(
       new Request(url, {
         method: req.method,
@@ -116,21 +134,26 @@ export namespace ServerProxy {
         signal: req.signal,
       }),
     ).then((res) => {
+      const sync = Fence.parse(res.headers)
       const next = new Headers(res.headers)
       next.delete("content-encoding")
       next.delete("content-length")
 
-      console.log("proxy http response", {
-        method: req.method,
-        request: req.url,
-        url: String(url),
-        status: res.status,
-        statusText: res.statusText,
-      })
-      return new Response(res.body, {
-        status: res.status,
-        statusText: res.statusText,
-        headers: next,
+      const done = sync ? Fence.wait(workspaceID, sync, req.signal) : Promise.resolve()
+
+      return done.then(async () => {
+        console.log("proxy http response", {
+          method: req.method,
+          request: req.url,
+          url: String(url),
+          status: res.status,
+          statusText: res.statusText,
+        })
+        return new Response(res.body, {
+          status: res.status,
+          statusText: res.statusText,
+          headers: next,
+        })
       })
     })
   }

+ 18 - 0
packages/opencode/src/server/server.ts

@@ -4,9 +4,11 @@ import { adapter } from "#hono"
 import { MDNS } from "./mdns"
 import { lazy } from "@/util/lazy"
 import { AuthMiddleware, CompressionMiddleware, CorsMiddleware, ErrorMiddleware, LoggerMiddleware } from "./middleware"
+import { FenceMiddleware } from "./fence"
 import { InstanceRoutes } from "./instance"
 import { initProjectors } from "./projectors"
 import { Log } from "@/util/log"
+import { Flag } from "@/flag/flag"
 import { ControlPlaneRoutes } from "./control"
 import { UIRoutes } from "./ui"
 
@@ -30,6 +32,22 @@ export namespace Server {
   function create(opts: { cors?: string[] }) {
     const app = new Hono()
     const runtime = adapter.create(app)
+
+    if (Flag.OPENCODE_WORKSPACE_ID) {
+      return {
+        app: app
+          .onError(ErrorMiddleware)
+          .use(AuthMiddleware)
+          .use(LoggerMiddleware)
+          .use(CompressionMiddleware)
+          .use(CorsMiddleware(opts))
+          .use(FenceMiddleware)
+          .route("/", ControlPlaneRoutes())
+          .route("/", InstanceRoutes(runtime.upgradeWebSocket)),
+        runtime,
+      }
+    }
+
     return {
       app: app
         .onError(ErrorMiddleware)