watcher.test.ts 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. import { $ } from "bun"
  2. import { afterEach, describe, expect, test } from "bun:test"
  3. import fs from "fs/promises"
  4. import path from "path"
  5. import { ConfigProvider, Deferred, Effect, Layer, ManagedRuntime, Option } from "effect"
  6. import { tmpdir } from "../fixture/fixture"
  7. import { Bus } from "../../src/bus"
  8. import { Config } from "../../src/config/config"
  9. import { FileWatcher } from "../../src/file/watcher"
  10. import { Git } from "../../src/git"
  11. import { Instance } from "../../src/project/instance"
  12. // Native @parcel/watcher bindings aren't reliably available in CI (missing on Linux, flaky on Windows)
  13. const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip
  14. // ---------------------------------------------------------------------------
  15. // Helpers
  16. // ---------------------------------------------------------------------------
  17. const watcherConfigLayer = ConfigProvider.layer(
  18. ConfigProvider.fromUnknown({
  19. KILO_EXPERIMENTAL_FILEWATCHER: "true",
  20. KILO_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
  21. }),
  22. )
  23. type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
  24. /** Run `body` with a live FileWatcher service. */
  25. function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
  26. return Instance.provide({
  27. directory,
  28. fn: async () => {
  29. const layer: Layer.Layer<FileWatcher.Service, never, never> = FileWatcher.layer.pipe(
  30. Layer.provide(Config.defaultLayer),
  31. Layer.provide(Git.defaultLayer),
  32. Layer.provide(watcherConfigLayer),
  33. )
  34. const rt = ManagedRuntime.make(layer)
  35. try {
  36. await rt.runPromise(FileWatcher.Service.use((s) => s.init()))
  37. await Effect.runPromise(ready(directory))
  38. await Effect.runPromise(body)
  39. } finally {
  40. await rt.dispose()
  41. }
  42. },
  43. })
  44. }
  45. function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {
  46. let done = false
  47. const unsub = Bus.subscribe(FileWatcher.Event.Updated, (evt) => {
  48. if (done) return
  49. if (!check(evt.properties)) return
  50. hit(evt.properties)
  51. })
  52. return () => {
  53. if (done) return
  54. done = true
  55. unsub()
  56. }
  57. }
  58. function wait(directory: string, check: (evt: WatcherEvent) => boolean) {
  59. return Effect.gen(function* () {
  60. const deferred = yield* Deferred.make<WatcherEvent>()
  61. const cleanup = yield* Effect.sync(() => {
  62. let off = () => {}
  63. off = listen(directory, check, (evt) => {
  64. off()
  65. Deferred.doneUnsafe(deferred, Effect.succeed(evt))
  66. })
  67. return off
  68. })
  69. return { cleanup, deferred }
  70. })
  71. }
  72. function nextUpdate<E>(directory: string, check: (evt: WatcherEvent) => boolean, trigger: Effect.Effect<void, E>) {
  73. return Effect.acquireUseRelease(
  74. wait(directory, check),
  75. ({ deferred }) =>
  76. Effect.gen(function* () {
  77. yield* trigger
  78. return yield* Deferred.await(deferred).pipe(Effect.timeout("5 seconds"))
  79. }),
  80. ({ cleanup }) => Effect.sync(cleanup),
  81. )
  82. }
  83. /** Effect that asserts no matching event arrives within `ms`. */
  84. function noUpdate<E>(
  85. directory: string,
  86. check: (evt: WatcherEvent) => boolean,
  87. trigger: Effect.Effect<void, E>,
  88. ms = 500,
  89. ) {
  90. return Effect.acquireUseRelease(
  91. wait(directory, check),
  92. ({ deferred }) =>
  93. Effect.gen(function* () {
  94. yield* trigger
  95. expect(yield* Deferred.await(deferred).pipe(Effect.timeoutOption(`${ms} millis`))).toEqual(Option.none())
  96. }),
  97. ({ cleanup }) => Effect.sync(cleanup),
  98. )
  99. }
  100. function ready(directory: string) {
  101. const file = path.join(directory, `.watcher-${Math.random().toString(36).slice(2)}`)
  102. const head = path.join(directory, ".git", "HEAD")
  103. return Effect.gen(function* () {
  104. yield* nextUpdate(
  105. directory,
  106. (evt) => evt.file === file && evt.event === "add",
  107. Effect.promise(() => fs.writeFile(file, "ready")),
  108. ).pipe(Effect.ensuring(Effect.promise(() => fs.rm(file, { force: true }).catch(() => undefined))), Effect.asVoid)
  109. const git = yield* Effect.promise(() =>
  110. fs
  111. .stat(head)
  112. .then(() => true)
  113. .catch(() => false),
  114. )
  115. if (!git) return
  116. const branch = `watch-${Math.random().toString(36).slice(2)}`
  117. const hash = yield* Effect.promise(() => $`git rev-parse HEAD`.cwd(directory).quiet().text())
  118. yield* nextUpdate(
  119. directory,
  120. (evt) => evt.file === head && evt.event !== "unlink",
  121. Effect.promise(async () => {
  122. await fs.writeFile(path.join(directory, ".git", "refs", "heads", branch), hash.trim() + "\n")
  123. await fs.writeFile(head, `ref: refs/heads/${branch}\n`)
  124. }),
  125. ).pipe(Effect.asVoid)
  126. })
  127. }
  128. // ---------------------------------------------------------------------------
  129. // Tests
  130. // ---------------------------------------------------------------------------
  131. describeWatcher("FileWatcher", () => {
  132. afterEach(async () => {
  133. await Instance.disposeAll()
  134. })
  135. test("publishes root create, update, and delete events", async () => {
  136. await using tmp = await tmpdir({ git: true })
  137. const file = path.join(tmp.path, "watch.txt")
  138. const dir = tmp.path
  139. const cases = [
  140. { event: "add" as const, trigger: Effect.promise(() => fs.writeFile(file, "a")) },
  141. { event: "change" as const, trigger: Effect.promise(() => fs.writeFile(file, "b")) },
  142. { event: "unlink" as const, trigger: Effect.promise(() => fs.unlink(file)) },
  143. ]
  144. await withWatcher(
  145. dir,
  146. Effect.forEach(cases, ({ event, trigger }) =>
  147. nextUpdate(dir, (evt) => evt.file === file && evt.event === event, trigger).pipe(
  148. Effect.tap((evt) => Effect.sync(() => expect(evt).toEqual({ file, event }))),
  149. ),
  150. ),
  151. )
  152. })
  153. test("watches non-git roots", async () => {
  154. await using tmp = await tmpdir()
  155. const file = path.join(tmp.path, "plain.txt")
  156. const dir = tmp.path
  157. await withWatcher(
  158. dir,
  159. nextUpdate(
  160. dir,
  161. (e) => e.file === file && e.event === "add",
  162. Effect.promise(() => fs.writeFile(file, "plain")),
  163. ).pipe(Effect.tap((evt) => Effect.sync(() => expect(evt).toEqual({ file, event: "add" })))),
  164. )
  165. })
  166. test("cleanup stops publishing events", async () => {
  167. await using tmp = await tmpdir({ git: true })
  168. const file = path.join(tmp.path, "after-dispose.txt")
  169. // Start and immediately stop the watcher (withWatcher disposes on exit)
  170. await withWatcher(tmp.path, Effect.void)
  171. // Now write a file — no watcher should be listening
  172. await Instance.provide({
  173. directory: tmp.path,
  174. fn: () =>
  175. Effect.runPromise(
  176. noUpdate(
  177. tmp.path,
  178. (e) => e.file === file,
  179. Effect.promise(() => fs.writeFile(file, "gone")),
  180. ),
  181. ),
  182. })
  183. })
  184. test("ignores .git/index changes", async () => {
  185. await using tmp = await tmpdir({ git: true })
  186. const gitIndex = path.join(tmp.path, ".git", "index")
  187. const edit = path.join(tmp.path, "tracked.txt")
  188. await withWatcher(
  189. tmp.path,
  190. noUpdate(
  191. tmp.path,
  192. (e) => e.file === gitIndex,
  193. Effect.promise(async () => {
  194. await fs.writeFile(edit, "a")
  195. await $`git add .`.cwd(tmp.path).quiet().nothrow()
  196. }),
  197. ),
  198. )
  199. })
  200. test("publishes .git/HEAD events", async () => {
  201. await using tmp = await tmpdir({ git: true })
  202. const head = path.join(tmp.path, ".git", "HEAD")
  203. const branch = `watch-${Math.random().toString(36).slice(2)}`
  204. await $`git branch ${branch}`.cwd(tmp.path).quiet()
  205. await withWatcher(
  206. tmp.path,
  207. nextUpdate(
  208. tmp.path,
  209. (evt) => evt.file === head && evt.event !== "unlink",
  210. Effect.promise(() => fs.writeFile(head, `ref: refs/heads/${branch}\n`)),
  211. ).pipe(
  212. Effect.tap((evt) =>
  213. Effect.sync(() => {
  214. expect(evt.file).toBe(head)
  215. expect(["add", "change"]).toContain(evt.event)
  216. }),
  217. ),
  218. ),
  219. )
  220. })
  221. })