effect-flock.test.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. import { describe, expect } from "bun:test"
  2. import { spawn } from "child_process"
  3. import fs from "fs/promises"
  4. import path from "path"
  5. import os from "os"
  6. import { Cause, Effect, Exit, Layer } from "effect"
  7. import { testEffect } from "../lib/effect"
  8. import { AppFileSystem } from "@opencode-ai/shared/filesystem"
  9. import { EffectFlock } from "@opencode-ai/shared/util/effect-flock"
  10. import { Global } from "@opencode-ai/shared/global"
  11. import { Hash } from "@opencode-ai/shared/util/hash"
  12. function lock(dir: string, key: string) {
  13. return path.join(dir, Hash.fast(key) + ".lock")
  14. }
  15. function sleep(ms: number) {
  16. return new Promise<void>((resolve) => setTimeout(resolve, ms))
  17. }
  18. async function exists(file: string) {
  19. return fs
  20. .stat(file)
  21. .then(() => true)
  22. .catch(() => false)
  23. }
  24. async function readJson<T>(p: string): Promise<T> {
  25. return JSON.parse(await fs.readFile(p, "utf8"))
  26. }
  27. // ---------------------------------------------------------------------------
  28. // Worker subprocess helpers
  29. // ---------------------------------------------------------------------------
  30. type Msg = {
  31. key: string
  32. dir: string
  33. holdMs?: number
  34. ready?: string
  35. active?: string
  36. done?: string
  37. }
  38. const root = path.join(import.meta.dir, "../..")
  39. const worker = path.join(import.meta.dir, "../fixture/effect-flock-worker.ts")
  40. function run(msg: Msg) {
  41. return new Promise<{ code: number; stdout: Buffer; stderr: Buffer }>((resolve) => {
  42. const proc = spawn(process.execPath, [worker, JSON.stringify(msg)], { cwd: root })
  43. const stdout: Buffer[] = []
  44. const stderr: Buffer[] = []
  45. proc.stdout?.on("data", (data) => stdout.push(Buffer.from(data)))
  46. proc.stderr?.on("data", (data) => stderr.push(Buffer.from(data)))
  47. proc.on("close", (code) => {
  48. resolve({ code: code ?? 1, stdout: Buffer.concat(stdout), stderr: Buffer.concat(stderr) })
  49. })
  50. })
  51. }
  52. function spawnWorker(msg: Msg) {
  53. return spawn(process.execPath, [worker, JSON.stringify(msg)], {
  54. cwd: root,
  55. stdio: ["ignore", "pipe", "pipe"],
  56. })
  57. }
  58. function stopWorker(proc: ReturnType<typeof spawnWorker>) {
  59. if (proc.exitCode !== null || proc.signalCode !== null) return Promise.resolve()
  60. if (process.platform !== "win32" || !proc.pid) {
  61. proc.kill()
  62. return Promise.resolve()
  63. }
  64. return new Promise<void>((resolve) => {
  65. const killProc = spawn("taskkill", ["/pid", String(proc.pid), "/T", "/F"])
  66. killProc.on("close", () => {
  67. proc.kill()
  68. resolve()
  69. })
  70. })
  71. }
  72. async function waitForFile(file: string, timeout = 3_000) {
  73. const stop = Date.now() + timeout
  74. while (Date.now() < stop) {
  75. if (await exists(file)) return
  76. await sleep(20)
  77. }
  78. throw new Error(`Timed out waiting for file: ${file}`)
  79. }
  80. // ---------------------------------------------------------------------------
  81. // Test layer
  82. // ---------------------------------------------------------------------------
  83. const testGlobal = Layer.succeed(
  84. Global.Service,
  85. Global.Service.of({
  86. home: os.homedir(),
  87. data: os.tmpdir(),
  88. cache: os.tmpdir(),
  89. config: os.tmpdir(),
  90. state: os.tmpdir(),
  91. bin: os.tmpdir(),
  92. log: os.tmpdir(),
  93. }),
  94. )
  95. const testLayer = EffectFlock.layer.pipe(Layer.provide(testGlobal), Layer.provide(AppFileSystem.defaultLayer))
  96. // ---------------------------------------------------------------------------
  97. // Tests
  98. // ---------------------------------------------------------------------------
  99. describe("util.effect-flock", () => {
  100. const it = testEffect(testLayer)
  101. it.live(
  102. "acquire and release via scoped Effect",
  103. Effect.gen(function* () {
  104. const flock = yield* EffectFlock.Service
  105. const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
  106. const dir = path.join(tmp, "locks")
  107. const lockDir = lock(dir, "eflock:acquire")
  108. yield* Effect.scoped(flock.acquire("eflock:acquire", dir))
  109. expect(yield* Effect.promise(() => exists(lockDir))).toBe(false)
  110. yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
  111. }),
  112. )
  113. it.live(
  114. "withLock data-first",
  115. Effect.gen(function* () {
  116. const flock = yield* EffectFlock.Service
  117. const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
  118. const dir = path.join(tmp, "locks")
  119. let hit = false
  120. yield* flock.withLock(
  121. Effect.sync(() => {
  122. hit = true
  123. }),
  124. "eflock:df",
  125. dir,
  126. )
  127. expect(hit).toBe(true)
  128. yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
  129. }),
  130. )
  131. it.live(
  132. "withLock pipeable",
  133. Effect.gen(function* () {
  134. const flock = yield* EffectFlock.Service
  135. const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
  136. const dir = path.join(tmp, "locks")
  137. let hit = false
  138. yield* Effect.sync(() => {
  139. hit = true
  140. }).pipe(flock.withLock("eflock:pipe", dir))
  141. expect(hit).toBe(true)
  142. yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
  143. }),
  144. )
  145. it.live(
  146. "writes owner metadata",
  147. Effect.gen(function* () {
  148. const flock = yield* EffectFlock.Service
  149. const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
  150. const dir = path.join(tmp, "locks")
  151. const key = "eflock:meta"
  152. const file = path.join(lock(dir, key), "meta.json")
  153. yield* Effect.scoped(
  154. Effect.gen(function* () {
  155. yield* flock.acquire(key, dir)
  156. const json = yield* Effect.promise(() =>
  157. readJson<{ token?: unknown; pid?: unknown; hostname?: unknown; createdAt?: unknown }>(file),
  158. )
  159. expect(typeof json.token).toBe("string")
  160. expect(typeof json.pid).toBe("number")
  161. expect(typeof json.hostname).toBe("string")
  162. expect(typeof json.createdAt).toBe("string")
  163. }),
  164. )
  165. yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
  166. }),
  167. )
  168. it.live(
  169. "breaks stale lock dirs",
  170. Effect.gen(function* () {
  171. const flock = yield* EffectFlock.Service
  172. const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
  173. const dir = path.join(tmp, "locks")
  174. const key = "eflock:stale"
  175. const lockDir = lock(dir, key)
  176. yield* Effect.promise(async () => {
  177. await fs.mkdir(lockDir, { recursive: true })
  178. const old = new Date(Date.now() - 120_000)
  179. await fs.utimes(lockDir, old, old)
  180. })
  181. let hit = false
  182. yield* flock.withLock(
  183. Effect.sync(() => {
  184. hit = true
  185. }),
  186. key,
  187. dir,
  188. )
  189. expect(hit).toBe(true)
  190. yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
  191. }),
  192. )
  193. it.live(
  194. "recovers from stale breaker",
  195. Effect.gen(function* () {
  196. const flock = yield* EffectFlock.Service
  197. const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
  198. const dir = path.join(tmp, "locks")
  199. const key = "eflock:stale-breaker"
  200. const lockDir = lock(dir, key)
  201. const breaker = lockDir + ".breaker"
  202. yield* Effect.promise(async () => {
  203. await fs.mkdir(lockDir, { recursive: true })
  204. await fs.mkdir(breaker)
  205. const old = new Date(Date.now() - 120_000)
  206. await fs.utimes(lockDir, old, old)
  207. await fs.utimes(breaker, old, old)
  208. })
  209. let hit = false
  210. yield* flock.withLock(
  211. Effect.sync(() => {
  212. hit = true
  213. }),
  214. key,
  215. dir,
  216. )
  217. expect(hit).toBe(true)
  218. expect(yield* Effect.promise(() => exists(breaker))).toBe(false)
  219. yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
  220. }),
  221. )
  222. it.live(
  223. "detects compromise when lock dir removed",
  224. Effect.gen(function* () {
  225. const flock = yield* EffectFlock.Service
  226. const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
  227. const dir = path.join(tmp, "locks")
  228. const key = "eflock:compromised"
  229. const lockDir = lock(dir, key)
  230. const result = yield* flock
  231. .withLock(
  232. Effect.promise(() => fs.rm(lockDir, { recursive: true, force: true })),
  233. key,
  234. dir,
  235. )
  236. .pipe(Effect.exit)
  237. expect(Exit.isFailure(result)).toBe(true)
  238. expect(Exit.isFailure(result) ? Cause.pretty(result.cause) : "").toContain("missing")
  239. yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
  240. }),
  241. )
  242. it.live(
  243. "detects token mismatch",
  244. Effect.gen(function* () {
  245. const flock = yield* EffectFlock.Service
  246. const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
  247. const dir = path.join(tmp, "locks")
  248. const key = "eflock:token"
  249. const lockDir = lock(dir, key)
  250. const meta = path.join(lockDir, "meta.json")
  251. const result = yield* flock
  252. .withLock(
  253. Effect.promise(async () => {
  254. const json = await readJson<{ token?: string }>(meta)
  255. json.token = "tampered"
  256. await fs.writeFile(meta, JSON.stringify(json, null, 2))
  257. }),
  258. key,
  259. dir,
  260. )
  261. .pipe(Effect.exit)
  262. expect(Exit.isFailure(result)).toBe(true)
  263. expect(Exit.isFailure(result) ? Cause.pretty(result.cause) : "").toContain("token mismatch")
  264. expect(yield* Effect.promise(() => exists(lockDir))).toBe(true)
  265. yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
  266. }),
  267. )
  268. it.live(
  269. "fails on unwritable lock roots",
  270. Effect.gen(function* () {
  271. if (process.platform === "win32") return
  272. const flock = yield* EffectFlock.Service
  273. const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
  274. const dir = path.join(tmp, "locks")
  275. yield* Effect.promise(async () => {
  276. await fs.mkdir(dir, { recursive: true })
  277. await fs.chmod(dir, 0o500)
  278. })
  279. const result = yield* flock.withLock(Effect.void, "eflock:perm", dir).pipe(Effect.exit)
  280. // oxlint-disable-next-line no-base-to-string -- Exit has a useful toString for test assertions
  281. expect(String(result)).toContain("PermissionDenied")
  282. yield* Effect.promise(() => fs.chmod(dir, 0o700).then(() => fs.rm(tmp, { recursive: true, force: true })))
  283. }),
  284. )
  285. it.live(
  286. "enforces mutual exclusion under process contention",
  287. () =>
  288. Effect.promise(async () => {
  289. const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "eflock-stress-"))
  290. const dir = path.join(tmp, "locks")
  291. const done = path.join(tmp, "done.log")
  292. const active = path.join(tmp, "active")
  293. const n = 16
  294. try {
  295. const out = await Promise.all(
  296. Array.from({ length: n }, () => run({ key: "eflock:stress", dir, done, active, holdMs: 30 })),
  297. )
  298. expect(out.map((x) => x.code)).toEqual(Array.from({ length: n }, () => 0))
  299. expect(out.map((x) => x.stderr.toString()).filter(Boolean)).toEqual([])
  300. const lines = (await fs.readFile(done, "utf8"))
  301. .split("\n")
  302. .map((x) => x.trim())
  303. .filter(Boolean)
  304. expect(lines.length).toBe(n)
  305. } finally {
  306. await fs.rm(tmp, { recursive: true, force: true })
  307. }
  308. }),
  309. 60_000,
  310. )
  311. it.live(
  312. "recovers after a crashed lock owner",
  313. () =>
  314. Effect.promise(async () => {
  315. const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "eflock-crash-"))
  316. const dir = path.join(tmp, "locks")
  317. const ready = path.join(tmp, "ready")
  318. const proc = spawnWorker({ key: "eflock:crash", dir, ready, holdMs: 120_000 })
  319. try {
  320. await waitForFile(ready, 5_000)
  321. await stopWorker(proc)
  322. await new Promise((resolve) => proc.on("close", resolve))
  323. // Backdate lock files so they're past STALE_MS (60s)
  324. const lockDir = lock(dir, "eflock:crash")
  325. const old = new Date(Date.now() - 120_000)
  326. await fs.utimes(lockDir, old, old).catch(() => {})
  327. await fs.utimes(path.join(lockDir, "heartbeat"), old, old).catch(() => {})
  328. await fs.utimes(path.join(lockDir, "meta.json"), old, old).catch(() => {})
  329. const done = path.join(tmp, "done.log")
  330. const result = await run({ key: "eflock:crash", dir, done, holdMs: 10 })
  331. expect(result.code).toBe(0)
  332. expect(result.stderr.toString()).toBe("")
  333. } finally {
  334. await stopWorker(proc).catch(() => {})
  335. await fs.rm(tmp, { recursive: true, force: true })
  336. }
  337. }),
  338. 30_000,
  339. )
  340. })