Browse Source

fix: Windows e2e stability (CrossSpawnSpawner, snapshot isolation, session race guards) (#19163)

Kit Langton 3 weeks ago
parent
commit
8864fdce2f

+ 2 - 1
packages/app/playwright.config.ts

@@ -6,7 +6,8 @@ const serverHost = process.env.PLAYWRIGHT_SERVER_HOST ?? "127.0.0.1"
 const serverPort = process.env.PLAYWRIGHT_SERVER_PORT ?? "4096"
 const command = `bun run dev -- --host 0.0.0.0 --port ${port}`
 const reuse = !process.env.CI
-const workers = Number(process.env.PLAYWRIGHT_WORKERS ?? (process.env.CI ? 5 : 0)) || undefined
+const workers =
+  Number(process.env.PLAYWRIGHT_WORKERS ?? (process.env.CI ? (process.platform === "win32" ? 2 : 5) : 0)) || undefined
 
 export default defineConfig({
   testDir: "./e2e",

+ 3 - 2
packages/opencode/src/git/index.ts

@@ -1,4 +1,5 @@
-import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
+import { NodeFileSystem, NodePath } from "@effect/platform-node"
+import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
 import { Effect, Layer, ServiceMap, Stream } from "effect"
 import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
 import { makeRunPromise } from "@/effect/run-service"
@@ -258,7 +259,7 @@ export namespace Git {
   )
 
   export const defaultLayer = layer.pipe(
-    Layer.provide(NodeChildProcessSpawner.layer),
+    Layer.provide(CrossSpawnSpawner.layer),
     Layer.provide(NodeFileSystem.layer),
     Layer.provide(NodePath.layer),
   )

+ 6 - 1
packages/opencode/src/session/compaction.ts

@@ -13,6 +13,7 @@ import { fn } from "@/util/fn"
 import { Agent } from "@/agent/agent"
 import { Plugin } from "@/plugin"
 import { Config } from "@/config/config"
+import { NotFoundError } from "@/storage/db"
 import { ProviderTransform } from "@/provider/transform"
 import { ModelID, ProviderID } from "@/provider/schema"
 
@@ -60,7 +61,11 @@ export namespace SessionCompaction {
     const config = await Config.get()
     if (config.compaction?.prune === false) return
     log.info("pruning")
-    const msgs = await Session.messages({ sessionID: input.sessionID })
+    const msgs = await Session.messages({ sessionID: input.sessionID }).catch((err) => {
+      if (NotFoundError.isInstance(err)) return undefined
+      throw err
+    })
+    if (!msgs) return
     let total = 0
     let pruned = 0
     const toPrune = []

+ 38 - 19
packages/opencode/src/session/projectors.ts

@@ -4,6 +4,15 @@ import { Session } from "./index"
 import { MessageV2 } from "./message-v2"
 import { SessionTable, MessageTable, PartTable } from "./session.sql"
 import { ProjectTable } from "../project/project.sql"
+import { Log } from "../util/log"
+
+const log = Log.create({ service: "session.projector" })
+
+function foreign(err: unknown) {
+  if (typeof err !== "object" || err === null) return false
+  if ("code" in err && err.code === "SQLITE_CONSTRAINT_FOREIGNKEY") return true
+  return "message" in err && typeof err.message === "string" && err.message.includes("FOREIGN KEY constraint failed")
+}
 
 export type DeepPartial<T> = T extends object ? { [K in keyof T]?: DeepPartial<T[K]> | null } : T
 
@@ -76,15 +85,20 @@ export default [
     const time_created = data.info.time.created
     const { id, sessionID, ...rest } = data.info
 
-    db.insert(MessageTable)
-      .values({
-        id,
-        session_id: sessionID,
-        time_created,
-        data: rest,
-      })
-      .onConflictDoUpdate({ target: MessageTable.id, set: { data: rest } })
-      .run()
+    try {
+      db.insert(MessageTable)
+        .values({
+          id,
+          session_id: sessionID,
+          time_created,
+          data: rest,
+        })
+        .onConflictDoUpdate({ target: MessageTable.id, set: { data: rest } })
+        .run()
+    } catch (err) {
+      if (!foreign(err)) throw err
+      log.warn("ignored late message update", { messageID: id, sessionID })
+    }
   }),
 
   SyncEvent.project(MessageV2.Event.Removed, (db, data) => {
@@ -102,15 +116,20 @@ export default [
   SyncEvent.project(MessageV2.Event.PartUpdated, (db, data) => {
     const { id, messageID, sessionID, ...rest } = data.part
 
-    db.insert(PartTable)
-      .values({
-        id,
-        message_id: messageID,
-        session_id: sessionID,
-        time_created: data.time,
-        data: rest,
-      })
-      .onConflictDoUpdate({ target: PartTable.id, set: { data: rest } })
-      .run()
+    try {
+      db.insert(PartTable)
+        .values({
+          id,
+          message_id: messageID,
+          session_id: sessionID,
+          time_created: data.time,
+          data: rest,
+        })
+        .onConflictDoUpdate({ target: PartTable.id, set: { data: rest } })
+        .run()
+    } catch (err) {
+      if (!foreign(err)) throw err
+      log.warn("ignored late part update", { partID: id, messageID, sessionID })
+    }
   }),
 ]

+ 196 - 148
packages/opencode/src/snapshot/index.ts

@@ -1,5 +1,5 @@
 import { NodeFileSystem, NodePath } from "@effect/platform-node"
-import { Cause, Duration, Effect, Layer, Schedule, ServiceMap, Stream } from "effect"
+import { Cause, Duration, Effect, Layer, Schedule, Semaphore, ServiceMap, Stream } from "effect"
 import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
 import path from "path"
 import z from "zod"
@@ -7,6 +7,7 @@ import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
 import { InstanceState } from "@/effect/instance-state"
 import { makeRunPromise } from "@/effect/run-service"
 import { AppFileSystem } from "@/filesystem"
+import { Hash } from "@/util/hash"
 import { Config } from "../config/config"
 import { Global } from "../global"
 import { Log } from "../util/log"
@@ -38,7 +39,6 @@ export namespace Snapshot {
   const core = ["-c", "core.longpaths=true", "-c", "core.symlinks=true"]
   const cfg = ["-c", "core.autocrlf=false", ...core]
   const quote = [...cfg, "-c", "core.quotepath=false"]
-
   interface GitResult {
     readonly code: ChildProcessSpawner.ExitCode
     readonly text: string
@@ -66,12 +66,23 @@ export namespace Snapshot {
       Effect.gen(function* () {
         const fs = yield* AppFileSystem.Service
         const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
+        const locks = new Map<string, Semaphore.Semaphore>()
+
+        const lock = (key: string) => {
+          const hit = locks.get(key)
+          if (hit) return hit
+
+          const next = Semaphore.makeUnsafe(1)
+          locks.set(key, next)
+          return next
+        }
+
         const state = yield* InstanceState.make<State>(
           Effect.fn("Snapshot.state")(function* (ctx) {
             const state = {
               directory: ctx.directory,
               worktree: ctx.worktree,
-              gitdir: path.join(Global.Path.data, "snapshot", ctx.project.id),
+              gitdir: path.join(Global.Path.data, "snapshot", ctx.project.id, Hash.fast(ctx.worktree)),
               vcs: ctx.project.vcs,
             }
 
@@ -108,6 +119,7 @@ export namespace Snapshot {
             const exists = (file: string) => fs.exists(file).pipe(Effect.orDie)
             const read = (file: string) => fs.readFileString(file).pipe(Effect.catch(() => Effect.succeed("")))
             const remove = (file: string) => fs.remove(file).pipe(Effect.catch(() => Effect.void))
+            const locked = <A, E, R>(fx: Effect.Effect<A, E, R>) => lock(state.gitdir).withPermits(1)(fx)
 
             const enabled = Effect.fnUntraced(function* () {
               if (state.vcs !== "git") return false
@@ -190,175 +202,211 @@ export namespace Snapshot {
             })
 
             const cleanup = Effect.fnUntraced(function* () {
-              if (!(yield* enabled())) return
-              if (!(yield* exists(state.gitdir))) return
-              const result = yield* git(args(["gc", `--prune=${prune}`]), { cwd: state.directory })
-              if (result.code !== 0) {
-                log.warn("cleanup failed", {
-                  exitCode: result.code,
-                  stderr: result.stderr,
-                })
-                return
-              }
-              log.info("cleanup", { prune })
+              return yield* locked(
+                Effect.gen(function* () {
+                  if (!(yield* enabled())) return
+                  if (!(yield* exists(state.gitdir))) return
+                  const result = yield* git(args(["gc", `--prune=${prune}`]), { cwd: state.directory })
+                  if (result.code !== 0) {
+                    log.warn("cleanup failed", {
+                      exitCode: result.code,
+                      stderr: result.stderr,
+                    })
+                    return
+                  }
+                  log.info("cleanup", { prune })
+                }),
+              )
             })
 
             const track = Effect.fnUntraced(function* () {
-              if (!(yield* enabled())) return
-              const existed = yield* exists(state.gitdir)
-              yield* fs.ensureDir(state.gitdir).pipe(Effect.orDie)
-              if (!existed) {
-                yield* git(["init"], {
-                  env: { GIT_DIR: state.gitdir, GIT_WORK_TREE: state.worktree },
-                })
-                yield* git(["--git-dir", state.gitdir, "config", "core.autocrlf", "false"])
-                yield* git(["--git-dir", state.gitdir, "config", "core.longpaths", "true"])
-                yield* git(["--git-dir", state.gitdir, "config", "core.symlinks", "true"])
-                yield* git(["--git-dir", state.gitdir, "config", "core.fsmonitor", "false"])
-                log.info("initialized")
-              }
-              yield* add()
-              const result = yield* git(args(["write-tree"]), { cwd: state.directory })
-              const hash = result.text.trim()
-              log.info("tracking", { hash, cwd: state.directory, git: state.gitdir })
-              return hash
+              return yield* locked(
+                Effect.gen(function* () {
+                  if (!(yield* enabled())) return
+                  const existed = yield* exists(state.gitdir)
+                  yield* fs.ensureDir(state.gitdir).pipe(Effect.orDie)
+                  if (!existed) {
+                    yield* git(["init"], {
+                      env: { GIT_DIR: state.gitdir, GIT_WORK_TREE: state.worktree },
+                    })
+                    yield* git(["--git-dir", state.gitdir, "config", "core.autocrlf", "false"])
+                    yield* git(["--git-dir", state.gitdir, "config", "core.longpaths", "true"])
+                    yield* git(["--git-dir", state.gitdir, "config", "core.symlinks", "true"])
+                    yield* git(["--git-dir", state.gitdir, "config", "core.fsmonitor", "false"])
+                    log.info("initialized")
+                  }
+                  yield* add()
+                  const result = yield* git(args(["write-tree"]), { cwd: state.directory })
+                  const hash = result.text.trim()
+                  log.info("tracking", { hash, cwd: state.directory, git: state.gitdir })
+                  return hash
+                }),
+              )
             })
 
             const patch = Effect.fnUntraced(function* (hash: string) {
-              yield* add()
-              const result = yield* git(
-                [...quote, ...args(["diff", "--cached", "--no-ext-diff", "--name-only", hash, "--", "."])],
-                {
-                  cwd: state.directory,
-                },
+              return yield* locked(
+                Effect.gen(function* () {
+                  yield* add()
+                  const result = yield* git(
+                    [...quote, ...args(["diff", "--cached", "--no-ext-diff", "--name-only", hash, "--", "."])],
+                    {
+                      cwd: state.directory,
+                    },
+                  )
+                  if (result.code !== 0) {
+                    log.warn("failed to get diff", { hash, exitCode: result.code })
+                    return { hash, files: [] }
+                  }
+                  return {
+                    hash,
+                    files: result.text
+                      .trim()
+                      .split("\n")
+                      .map((x) => x.trim())
+                      .filter(Boolean)
+                      .map((x) => path.join(state.worktree, x).replaceAll("\\", "/")),
+                  }
+                }),
               )
-              if (result.code !== 0) {
-                log.warn("failed to get diff", { hash, exitCode: result.code })
-                return { hash, files: [] }
-              }
-              return {
-                hash,
-                files: result.text
-                  .trim()
-                  .split("\n")
-                  .map((x) => x.trim())
-                  .filter(Boolean)
-                  .map((x) => path.join(state.worktree, x).replaceAll("\\", "/")),
-              }
             })
 
             const restore = Effect.fnUntraced(function* (snapshot: string) {
-              log.info("restore", { commit: snapshot })
-              const result = yield* git([...core, ...args(["read-tree", snapshot])], { cwd: state.worktree })
-              if (result.code === 0) {
-                const checkout = yield* git([...core, ...args(["checkout-index", "-a", "-f"])], { cwd: state.worktree })
-                if (checkout.code === 0) return
-                log.error("failed to restore snapshot", {
-                  snapshot,
-                  exitCode: checkout.code,
-                  stderr: checkout.stderr,
-                })
-                return
-              }
-              log.error("failed to restore snapshot", {
-                snapshot,
-                exitCode: result.code,
-                stderr: result.stderr,
-              })
+              return yield* locked(
+                Effect.gen(function* () {
+                  log.info("restore", { commit: snapshot })
+                  const result = yield* git([...core, ...args(["read-tree", snapshot])], { cwd: state.worktree })
+                  if (result.code === 0) {
+                    const checkout = yield* git([...core, ...args(["checkout-index", "-a", "-f"])], {
+                      cwd: state.worktree,
+                    })
+                    if (checkout.code === 0) return
+                    log.error("failed to restore snapshot", {
+                      snapshot,
+                      exitCode: checkout.code,
+                      stderr: checkout.stderr,
+                    })
+                    return
+                  }
+                  log.error("failed to restore snapshot", {
+                    snapshot,
+                    exitCode: result.code,
+                    stderr: result.stderr,
+                  })
+                }),
+              )
             })
 
             const revert = Effect.fnUntraced(function* (patches: Snapshot.Patch[]) {
-              const seen = new Set<string>()
-              for (const item of patches) {
-                for (const file of item.files) {
-                  if (seen.has(file)) continue
-                  seen.add(file)
-                  log.info("reverting", { file, hash: item.hash })
-                  const result = yield* git([...core, ...args(["checkout", item.hash, "--", file])], {
-                    cwd: state.worktree,
-                  })
-                  if (result.code !== 0) {
-                    const rel = path.relative(state.worktree, file)
-                    const tree = yield* git([...core, ...args(["ls-tree", item.hash, "--", rel])], {
-                      cwd: state.worktree,
-                    })
-                    if (tree.code === 0 && tree.text.trim()) {
-                      log.info("file existed in snapshot but checkout failed, keeping", { file })
-                    } else {
-                      log.info("file did not exist in snapshot, deleting", { file })
-                      yield* remove(file)
+              return yield* locked(
+                Effect.gen(function* () {
+                  const seen = new Set<string>()
+                  for (const item of patches) {
+                    for (const file of item.files) {
+                      if (seen.has(file)) continue
+                      seen.add(file)
+                      log.info("reverting", { file, hash: item.hash })
+                      const result = yield* git([...core, ...args(["checkout", item.hash, "--", file])], {
+                        cwd: state.worktree,
+                      })
+                      if (result.code !== 0) {
+                        const rel = path.relative(state.worktree, file)
+                        const tree = yield* git([...core, ...args(["ls-tree", item.hash, "--", rel])], {
+                          cwd: state.worktree,
+                        })
+                        if (tree.code === 0 && tree.text.trim()) {
+                          log.info("file existed in snapshot but checkout failed, keeping", { file })
+                        } else {
+                          log.info("file did not exist in snapshot, deleting", { file })
+                          yield* remove(file)
+                        }
+                      }
                     }
                   }
-                }
-              }
+                }),
+              )
             })
 
             const diff = Effect.fnUntraced(function* (hash: string) {
-              yield* add()
-              const result = yield* git([...quote, ...args(["diff", "--cached", "--no-ext-diff", hash, "--", "."])], {
-                cwd: state.worktree,
-              })
-              if (result.code !== 0) {
-                log.warn("failed to get diff", {
-                  hash,
-                  exitCode: result.code,
-                  stderr: result.stderr,
-                })
-                return ""
-              }
-              return result.text.trim()
+              return yield* locked(
+                Effect.gen(function* () {
+                  yield* add()
+                  const result = yield* git(
+                    [...quote, ...args(["diff", "--cached", "--no-ext-diff", hash, "--", "."])],
+                    {
+                      cwd: state.worktree,
+                    },
+                  )
+                  if (result.code !== 0) {
+                    log.warn("failed to get diff", {
+                      hash,
+                      exitCode: result.code,
+                      stderr: result.stderr,
+                    })
+                    return ""
+                  }
+                  return result.text.trim()
+                }),
+              )
             })
 
             const diffFull = Effect.fnUntraced(function* (from: string, to: string) {
-              const result: Snapshot.FileDiff[] = []
-              const status = new Map<string, "added" | "deleted" | "modified">()
-
-              const statuses = yield* git(
-                [...quote, ...args(["diff", "--no-ext-diff", "--name-status", "--no-renames", from, to, "--", "."])],
-                { cwd: state.directory },
-              )
+              return yield* locked(
+                Effect.gen(function* () {
+                  const result: Snapshot.FileDiff[] = []
+                  const status = new Map<string, "added" | "deleted" | "modified">()
+
+                  const statuses = yield* git(
+                    [
+                      ...quote,
+                      ...args(["diff", "--no-ext-diff", "--name-status", "--no-renames", from, to, "--", "."]),
+                    ],
+                    { cwd: state.directory },
+                  )
+
+                  for (const line of statuses.text.trim().split("\n")) {
+                    if (!line) continue
+                    const [code, file] = line.split("\t")
+                    if (!code || !file) continue
+                    status.set(file, code.startsWith("A") ? "added" : code.startsWith("D") ? "deleted" : "modified")
+                  }
 
-              for (const line of statuses.text.trim().split("\n")) {
-                if (!line) continue
-                const [code, file] = line.split("\t")
-                if (!code || !file) continue
-                status.set(file, code.startsWith("A") ? "added" : code.startsWith("D") ? "deleted" : "modified")
-              }
+                  const numstat = yield* git(
+                    [...quote, ...args(["diff", "--no-ext-diff", "--no-renames", "--numstat", from, to, "--", "."])],
+                    {
+                      cwd: state.directory,
+                    },
+                  )
+
+                  for (const line of numstat.text.trim().split("\n")) {
+                    if (!line) continue
+                    const [adds, dels, file] = line.split("\t")
+                    if (!file) continue
+                    const binary = adds === "-" && dels === "-"
+                    const [before, after] = binary
+                      ? ["", ""]
+                      : yield* Effect.all(
+                          [
+                            git([...cfg, ...args(["show", `${from}:${file}`])]).pipe(Effect.map((item) => item.text)),
+                            git([...cfg, ...args(["show", `${to}:${file}`])]).pipe(Effect.map((item) => item.text)),
+                          ],
+                          { concurrency: 2 },
+                        )
+                    const additions = binary ? 0 : parseInt(adds)
+                    const deletions = binary ? 0 : parseInt(dels)
+                    result.push({
+                      file,
+                      before,
+                      after,
+                      additions: Number.isFinite(additions) ? additions : 0,
+                      deletions: Number.isFinite(deletions) ? deletions : 0,
+                      status: status.get(file) ?? "modified",
+                    })
+                  }
 
-              const numstat = yield* git(
-                [...quote, ...args(["diff", "--no-ext-diff", "--no-renames", "--numstat", from, to, "--", "."])],
-                {
-                  cwd: state.directory,
-                },
+                  return result
+                }),
               )
-
-              for (const line of numstat.text.trim().split("\n")) {
-                if (!line) continue
-                const [adds, dels, file] = line.split("\t")
-                if (!file) continue
-                const binary = adds === "-" && dels === "-"
-                const [before, after] = binary
-                  ? ["", ""]
-                  : yield* Effect.all(
-                      [
-                        git([...cfg, ...args(["show", `${from}:${file}`])]).pipe(Effect.map((item) => item.text)),
-                        git([...cfg, ...args(["show", `${to}:${file}`])]).pipe(Effect.map((item) => item.text)),
-                      ],
-                      { concurrency: 2 },
-                    )
-                const additions = binary ? 0 : parseInt(adds)
-                const deletions = binary ? 0 : parseInt(dels)
-                result.push({
-                  file,
-                  before,
-                  after,
-                  additions: Number.isFinite(additions) ? additions : 0,
-                  deletions: Number.isFinite(deletions) ? deletions : 0,
-                  status: status.get(file) ?? "modified",
-                })
-              }
-
-              return result
             })
 
             yield* cleanup().pipe(