Jelajahi Sumber

refactor(file-time): effectify FileTimeService with Semaphore locks (#17835)

Kit Langton 1 bulan lalu
induk
melakukan
2cbdf04ec9

+ 4 - 1
packages/opencode/src/effect/instances.ts

@@ -6,6 +6,7 @@ import { QuestionService } from "@/question/service"
 import { PermissionService } from "@/permission/service"
 import { FileWatcherService } from "@/file/watcher"
 import { VcsService } from "@/project/vcs"
+import { FileTimeService } from "@/file/time"
 import { Instance } from "@/project/instance"
 
 export { InstanceContext } from "./instance-context"
@@ -16,6 +17,7 @@ export type InstanceServices =
   | ProviderAuthService
   | FileWatcherService
   | VcsService
+  | FileTimeService
 
 function lookup(directory: string) {
   const project = Instance.project
@@ -24,8 +26,9 @@ function lookup(directory: string) {
     Layer.fresh(QuestionService.layer),
     Layer.fresh(PermissionService.layer),
     Layer.fresh(ProviderAuthService.layer),
-    Layer.fresh(FileWatcherService.layer),
+    Layer.fresh(FileWatcherService.layer).pipe(Layer.orDie),
     Layer.fresh(VcsService.layer),
+    Layer.fresh(FileTimeService.layer).pipe(Layer.orDie),
   ).pipe(Layer.provide(ctx))
 }
 

+ 103 - 59
packages/opencode/src/file/time.ts

@@ -1,71 +1,115 @@
-import { Instance } from "../project/instance"
 import { Log } from "../util/log"
-import { Flag } from "../flag/flag"
+import { Flag } from "@/flag/flag"
 import { Filesystem } from "../util/filesystem"
+import { Effect, Layer, ServiceMap, Semaphore } from "effect"
+import { runPromiseInstance } from "@/effect/runtime"
+import type { SessionID } from "@/session/schema"
 
-export namespace FileTime {
-  const log = Log.create({ service: "file.time" })
-  // Per-session read times plus per-file write locks.
-  // All tools that overwrite existing files should run their
-  // assert/read/write/update sequence inside withLock(filepath, ...)
-  // so concurrent writes to the same file are serialized.
-  export const state = Instance.state(() => {
-    const read: {
-      [sessionID: string]: {
-        [path: string]: Date | undefined
-      }
-    } = {}
-    const locks = new Map<string, Promise<void>>()
-    return {
-      read,
-      locks,
-    }
-  })
-
-  export function read(sessionID: string, file: string) {
-    log.info("read", { sessionID, file })
-    const { read } = state()
-    read[sessionID] = read[sessionID] || {}
-    read[sessionID][file] = new Date()
+const log = Log.create({ service: "file.time" })
+
+export namespace FileTimeService {
+  export interface Service {
+    readonly read: (sessionID: SessionID, file: string) => Effect.Effect<void>
+    readonly get: (sessionID: SessionID, file: string) => Effect.Effect<Date | undefined>
+    readonly assert: (sessionID: SessionID, filepath: string) => Effect.Effect<void>
+    readonly withLock: <T>(filepath: string, fn: () => Promise<T>) => Effect.Effect<T>
   }
+}
 
-  export function get(sessionID: string, file: string) {
-    return state().read[sessionID]?.[file]
+type Stamp = {
+  readonly read: Date
+  readonly mtime: number | undefined
+  readonly ctime: number | undefined
+  readonly size: number | undefined
+}
+
+function stamp(file: string): Stamp {
+  const stat = Filesystem.stat(file)
+  const size = typeof stat?.size === "bigint" ? Number(stat.size) : stat?.size
+  return {
+    read: new Date(),
+    mtime: stat?.mtime?.getTime(),
+    ctime: stat?.ctime?.getTime(),
+    size,
   }
+}
 
-  export async function withLock<T>(filepath: string, fn: () => Promise<T>): Promise<T> {
-    const current = state()
-    const currentLock = current.locks.get(filepath) ?? Promise.resolve()
-    let release: () => void = () => {}
-    const nextLock = new Promise<void>((resolve) => {
-      release = resolve
-    })
-    const chained = currentLock.then(() => nextLock)
-    current.locks.set(filepath, chained)
-    await currentLock
-    try {
-      return await fn()
-    } finally {
-      release()
-      if (current.locks.get(filepath) === chained) {
-        current.locks.delete(filepath)
+function session(reads: Map<SessionID, Map<string, Stamp>>, sessionID: SessionID) {
+  let value = reads.get(sessionID)
+  if (!value) {
+    value = new Map<string, Stamp>()
+    reads.set(sessionID, value)
+  }
+  return value
+}
+
+export class FileTimeService extends ServiceMap.Service<FileTimeService, FileTimeService.Service>()(
+  "@opencode/FileTime",
+) {
+  static readonly layer = Layer.effect(
+    FileTimeService,
+    Effect.gen(function* () {
+      const disableCheck = yield* Flag.OPENCODE_DISABLE_FILETIME_CHECK
+      const reads = new Map<SessionID, Map<string, Stamp>>()
+      const locks = new Map<string, Semaphore.Semaphore>()
+
+      function getLock(filepath: string) {
+        let lock = locks.get(filepath)
+        if (!lock) {
+          lock = Semaphore.makeUnsafe(1)
+          locks.set(filepath, lock)
+        }
+        return lock
       }
-    }
+
+      return FileTimeService.of({
+        read: Effect.fn("FileTimeService.read")(function* (sessionID: SessionID, file: string) {
+          log.info("read", { sessionID, file })
+          session(reads, sessionID).set(file, stamp(file))
+        }),
+
+        get: Effect.fn("FileTimeService.get")(function* (sessionID: SessionID, file: string) {
+          return reads.get(sessionID)?.get(file)?.read
+        }),
+
+        assert: Effect.fn("FileTimeService.assert")(function* (sessionID: SessionID, filepath: string) {
+          if (disableCheck) return
+
+          const time = reads.get(sessionID)?.get(filepath)
+          if (!time) throw new Error(`You must read file ${filepath} before overwriting it. Use the Read tool first`)
+          const next = stamp(filepath)
+          const changed = next.mtime !== time.mtime || next.ctime !== time.ctime || next.size !== time.size
+
+          if (changed) {
+            throw new Error(
+              `File ${filepath} has been modified since it was last read.\nLast modification: ${new Date(next.mtime ?? next.read.getTime()).toISOString()}\nLast read: ${time.read.toISOString()}\n\nPlease read the file again before modifying it.`,
+            )
+          }
+        }),
+
+        withLock: Effect.fn("FileTimeService.withLock")(function* <T>(filepath: string, fn: () => Promise<T>) {
+          const lock = getLock(filepath)
+          return yield* Effect.promise(fn).pipe(lock.withPermits(1))
+        }),
+      })
+    }),
+  )
+}
+
+export namespace FileTime {
+  export function read(sessionID: SessionID, file: string) {
+    return runPromiseInstance(FileTimeService.use((s) => s.read(sessionID, file)))
+  }
+
+  export function get(sessionID: SessionID, file: string) {
+    return runPromiseInstance(FileTimeService.use((s) => s.get(sessionID, file)))
   }
 
-  export async function assert(sessionID: string, filepath: string) {
-    if (Flag.OPENCODE_DISABLE_FILETIME_CHECK === true) {
-      return
-    }
-
-    const time = get(sessionID, filepath)
-    if (!time) throw new Error(`You must read file ${filepath} before overwriting it. Use the Read tool first`)
-    const mtime = Filesystem.stat(filepath)?.mtime
-    // Allow a 50ms tolerance for Windows NTFS timestamp fuzziness / async flushing
-    if (mtime && mtime.getTime() > time.getTime() + 50) {
-      throw new Error(
-        `File ${filepath} has been modified since it was last read.\nLast modification: ${mtime.toISOString()}\nLast read: ${time.toISOString()}\n\nPlease read the file again before modifying it.`,
-      )
-    }
+  export async function assert(sessionID: SessionID, filepath: string) {
+    return runPromiseInstance(FileTimeService.use((s) => s.assert(sessionID, filepath)))
+  }
+
+  export async function withLock<T>(filepath: string, fn: () => Promise<T>): Promise<T> {
+    return runPromiseInstance(FileTimeService.use((s) => s.withLock(filepath, fn)))
   }
 }

+ 2 - 1
packages/opencode/src/file/watcher.ts

@@ -72,7 +72,8 @@ export class FileWatcherService extends ServiceMap.Service<FileWatcherService, F
     FileWatcherService,
     Effect.gen(function* () {
       const instance = yield* InstanceContext
-      if (yield* Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER) return FileWatcherService.of({ init })
+      if (yield* Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER)
+        return FileWatcherService.of({ init })
 
       log.info("init", { directory: instance.directory })
 

+ 3 - 1
packages/opencode/src/flag/flag.ts

@@ -61,7 +61,9 @@ export namespace Flag {
   export const OPENCODE_EXPERIMENTAL_OXFMT = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_OXFMT")
   export const OPENCODE_EXPERIMENTAL_LSP_TY = truthy("OPENCODE_EXPERIMENTAL_LSP_TY")
   export const OPENCODE_EXPERIMENTAL_LSP_TOOL = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_LSP_TOOL")
-  export const OPENCODE_DISABLE_FILETIME_CHECK = truthy("OPENCODE_DISABLE_FILETIME_CHECK")
+  export const OPENCODE_DISABLE_FILETIME_CHECK = Config.boolean("OPENCODE_DISABLE_FILETIME_CHECK").pipe(
+    Config.withDefault(false),
+  )
   export const OPENCODE_EXPERIMENTAL_PLAN_MODE = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_PLAN_MODE")
   export const OPENCODE_EXPERIMENTAL_WORKSPACES = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES")
   export const OPENCODE_EXPERIMENTAL_MARKDOWN = !falsy("OPENCODE_EXPERIMENTAL_MARKDOWN")

+ 1 - 1
packages/opencode/src/session/prompt.ts

@@ -1245,7 +1245,7 @@ export namespace SessionPrompt {
                 ]
               }
 
-              FileTime.read(input.sessionID, filepath)
+              await FileTime.read(input.sessionID, filepath)
               return [
                 {
                   messageID: info.id,

+ 2 - 2
packages/opencode/src/tool/edit.ts

@@ -78,7 +78,7 @@ export const EditTool = Tool.define("edit", {
           file: filePath,
           event: existed ? "change" : "add",
         })
-        FileTime.read(ctx.sessionID, filePath)
+        await FileTime.read(ctx.sessionID, filePath)
         return
       }
 
@@ -119,7 +119,7 @@ export const EditTool = Tool.define("edit", {
       diff = trimDiff(
         createTwoFilesPatch(filePath, filePath, normalizeLineEndings(contentOld), normalizeLineEndings(contentNew)),
       )
-      FileTime.read(ctx.sessionID, filePath)
+      await FileTime.read(ctx.sessionID, filePath)
     })
 
     const filediff: Snapshot.FileDiff = {

+ 1 - 1
packages/opencode/src/tool/read.ts

@@ -214,7 +214,7 @@ export const ReadTool = Tool.define("read", {
 
     // just warms the lsp client
     LSP.touchFile(filepath, false)
-    FileTime.read(ctx.sessionID, filepath)
+    await FileTime.read(ctx.sessionID, filepath)
 
     if (instructions.length > 0) {
       output += `\n\n<system-reminder>\n${instructions.map((i) => i.content).join("\n\n")}\n</system-reminder>`

+ 1 - 1
packages/opencode/src/tool/write.ts

@@ -49,7 +49,7 @@ export const WriteTool = Tool.define("write", {
       file: filepath,
       event: exists ? "change" : "add",
     })
-    FileTime.read(ctx.sessionID, filepath)
+    await FileTime.read(ctx.sessionID, filepath)
 
     let output = "Wrote file successfully."
     await LSP.touchFile(filepath, true)

+ 30 - 83
packages/opencode/test/file/time.test.ts

@@ -1,13 +1,16 @@
-import { describe, test, expect, beforeEach } from "bun:test"
+import { describe, test, expect, afterEach } from "bun:test"
 import path from "path"
 import fs from "fs/promises"
 import { FileTime } from "../../src/file/time"
 import { Instance } from "../../src/project/instance"
+import { SessionID } from "../../src/session/schema"
 import { Filesystem } from "../../src/util/filesystem"
 import { tmpdir } from "../fixture/fixture"
 
+afterEach(() => Instance.disposeAll())
+
 describe("file/time", () => {
-  const sessionID = "test-session-123"
+  const sessionID = SessionID.make("ses_00000000000000000000000001")
 
   describe("read() and get()", () => {
     test("stores read timestamp", async () => {
@@ -18,12 +21,13 @@ describe("file/time", () => {
       await Instance.provide({
         directory: tmp.path,
         fn: async () => {
-          const before = FileTime.get(sessionID, filepath)
+          const before = await FileTime.get(sessionID, filepath)
           expect(before).toBeUndefined()
 
-          FileTime.read(sessionID, filepath)
+          await FileTime.read(sessionID, filepath)
+          await Bun.sleep(10)
 
-          const after = FileTime.get(sessionID, filepath)
+          const after = await FileTime.get(sessionID, filepath)
           expect(after).toBeInstanceOf(Date)
           expect(after!.getTime()).toBeGreaterThan(0)
         },
@@ -38,11 +42,12 @@ describe("file/time", () => {
       await Instance.provide({
         directory: tmp.path,
         fn: async () => {
-          FileTime.read("session1", filepath)
-          FileTime.read("session2", filepath)
+          await FileTime.read(SessionID.make("ses_00000000000000000000000002"), filepath)
+          await FileTime.read(SessionID.make("ses_00000000000000000000000003"), filepath)
+          await Bun.sleep(10)
 
-          const time1 = FileTime.get("session1", filepath)
-          const time2 = FileTime.get("session2", filepath)
+          const time1 = await FileTime.get(SessionID.make("ses_00000000000000000000000002"), filepath)
+          const time2 = await FileTime.get(SessionID.make("ses_00000000000000000000000003"), filepath)
 
           expect(time1).toBeDefined()
           expect(time2).toBeDefined()
@@ -59,14 +64,16 @@ describe("file/time", () => {
         directory: tmp.path,
         fn: async () => {
           FileTime.read(sessionID, filepath)
-          const first = FileTime.get(sessionID, filepath)!
+          await Bun.sleep(10)
+          const first = await FileTime.get(sessionID, filepath)
 
-          await new Promise((resolve) => setTimeout(resolve, 10))
+          await Bun.sleep(10)
 
           FileTime.read(sessionID, filepath)
-          const second = FileTime.get(sessionID, filepath)!
+          await Bun.sleep(10)
+          const second = await FileTime.get(sessionID, filepath)
 
-          expect(second.getTime()).toBeGreaterThanOrEqual(first.getTime())
+          expect(second!.getTime()).toBeGreaterThanOrEqual(first!.getTime())
         },
       })
     })
@@ -82,8 +89,7 @@ describe("file/time", () => {
         directory: tmp.path,
         fn: async () => {
           FileTime.read(sessionID, filepath)
-
-          // Should not throw
+          await Bun.sleep(10)
           await FileTime.assert(sessionID, filepath)
         },
       })
@@ -111,13 +117,8 @@ describe("file/time", () => {
         directory: tmp.path,
         fn: async () => {
           FileTime.read(sessionID, filepath)
-
-          // Wait to ensure different timestamps
-          await new Promise((resolve) => setTimeout(resolve, 100))
-
-          // Modify file after reading
+          await Bun.sleep(100)
           await fs.writeFile(filepath, "modified content", "utf-8")
-
           await expect(FileTime.assert(sessionID, filepath)).rejects.toThrow("modified since it was last read")
         },
       })
@@ -132,7 +133,7 @@ describe("file/time", () => {
         directory: tmp.path,
         fn: async () => {
           FileTime.read(sessionID, filepath)
-          await new Promise((resolve) => setTimeout(resolve, 100))
+          await Bun.sleep(100)
           await fs.writeFile(filepath, "modified", "utf-8")
 
           let error: Error | undefined
@@ -147,28 +148,6 @@ describe("file/time", () => {
         },
       })
     })
-
-    test("skips check when OPENCODE_DISABLE_FILETIME_CHECK is true", async () => {
-      await using tmp = await tmpdir()
-      const filepath = path.join(tmp.path, "file.txt")
-      await fs.writeFile(filepath, "content", "utf-8")
-
-      await Instance.provide({
-        directory: tmp.path,
-        fn: async () => {
-          const { Flag } = await import("../../src/flag/flag")
-          const original = Flag.OPENCODE_DISABLE_FILETIME_CHECK
-          ;(Flag as { OPENCODE_DISABLE_FILETIME_CHECK: boolean }).OPENCODE_DISABLE_FILETIME_CHECK = true
-
-          try {
-            // Should not throw even though file wasn't read
-            await FileTime.assert(sessionID, filepath)
-          } finally {
-            ;(Flag as { OPENCODE_DISABLE_FILETIME_CHECK: boolean }).OPENCODE_DISABLE_FILETIME_CHECK = original
-          }
-        },
-      })
-    })
   })
 
   describe("withLock()", () => {
@@ -215,7 +194,7 @@ describe("file/time", () => {
 
           const op1 = FileTime.withLock(filepath, async () => {
             order.push(1)
-            await new Promise((resolve) => setTimeout(resolve, 10))
+            await Bun.sleep(50)
             order.push(2)
           })
 
@@ -225,12 +204,7 @@ describe("file/time", () => {
           })
 
           await Promise.all([op1, op2])
-
-          // Operations should be serialized
-          expect(order).toContain(1)
-          expect(order).toContain(2)
-          expect(order).toContain(3)
-          expect(order).toContain(4)
+          expect(order).toEqual([1, 2, 3, 4])
         },
       })
     })
@@ -248,8 +222,8 @@ describe("file/time", () => {
 
           const op1 = FileTime.withLock(filepath1, async () => {
             started1 = true
-            await new Promise((resolve) => setTimeout(resolve, 50))
-            expect(started2).toBe(true) // op2 should have started while op1 is running
+            await Bun.sleep(50)
+            expect(started2).toBe(true)
           })
 
           const op2 = FileTime.withLock(filepath2, async () => {
@@ -257,7 +231,6 @@ describe("file/time", () => {
           })
 
           await Promise.all([op1, op2])
-
           expect(started1).toBe(true)
           expect(started2).toBe(true)
         },
@@ -277,7 +250,6 @@ describe("file/time", () => {
             }),
           ).rejects.toThrow("Test error")
 
-          // Lock should be released, subsequent operations should work
           let executed = false
           await FileTime.withLock(filepath, async () => {
             executed = true
@@ -286,31 +258,6 @@ describe("file/time", () => {
         },
       })
     })
-
-    test("deadlocks on nested locks (expected behavior)", async () => {
-      await using tmp = await tmpdir()
-      const filepath = path.join(tmp.path, "file.txt")
-
-      await Instance.provide({
-        directory: tmp.path,
-        fn: async () => {
-          // Nested locks on same file cause deadlock - this is expected
-          // The outer lock waits for inner to complete, but inner waits for outer to release
-          const timeout = new Promise<never>((_, reject) =>
-            setTimeout(() => reject(new Error("Deadlock detected")), 100),
-          )
-
-          const nestedLock = FileTime.withLock(filepath, async () => {
-            return FileTime.withLock(filepath, async () => {
-              return "inner"
-            })
-          })
-
-          // Should timeout due to deadlock
-          await expect(Promise.race([nestedLock, timeout])).rejects.toThrow("Deadlock detected")
-        },
-      })
-    })
   })
 
   describe("stat() Filesystem.stat pattern", () => {
@@ -323,12 +270,12 @@ describe("file/time", () => {
         directory: tmp.path,
         fn: async () => {
           FileTime.read(sessionID, filepath)
+          await Bun.sleep(10)
 
           const stats = Filesystem.stat(filepath)
           expect(stats?.mtime).toBeInstanceOf(Date)
           expect(stats!.mtime.getTime()).toBeGreaterThan(0)
 
-          // FileTime.assert uses this stat internally
           await FileTime.assert(sessionID, filepath)
         },
       })
@@ -343,11 +290,11 @@ describe("file/time", () => {
         directory: tmp.path,
         fn: async () => {
           FileTime.read(sessionID, filepath)
+          await Bun.sleep(10)
 
           const originalStat = Filesystem.stat(filepath)
 
-          // Wait and modify
-          await new Promise((resolve) => setTimeout(resolve, 100))
+          await Bun.sleep(100)
           await fs.writeFile(filepath, "modified", "utf-8")
 
           const newStat = Filesystem.stat(filepath)