runner.test.ts 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. import { describe, expect, test } from "bun:test"
  2. import { Deferred, Effect, Exit, Fiber, Ref, Scope } from "effect"
  3. import { Runner } from "../../src/effect"
  4. import { it } from "../lib/effect"
  5. describe("Runner", () => {
  6. // --- ensureRunning semantics ---
  7. it.live(
  8. "ensureRunning starts work and returns result",
  9. Effect.gen(function* () {
  10. const s = yield* Scope.Scope
  11. const runner = Runner.make<string>(s)
  12. const result = yield* runner.ensureRunning(Effect.succeed("hello"))
  13. expect(result).toBe("hello")
  14. expect(runner.state._tag).toBe("Idle")
  15. expect(runner.busy).toBe(false)
  16. }),
  17. )
  18. it.live(
  19. "ensureRunning propagates work failures",
  20. Effect.gen(function* () {
  21. const s = yield* Scope.Scope
  22. const runner = Runner.make<string, string>(s)
  23. const exit = yield* runner.ensureRunning(Effect.fail("boom")).pipe(Effect.exit)
  24. expect(Exit.isFailure(exit)).toBe(true)
  25. expect(runner.state._tag).toBe("Idle")
  26. }),
  27. )
  28. it.live(
  29. "concurrent callers share the same run",
  30. Effect.gen(function* () {
  31. const s = yield* Scope.Scope
  32. const runner = Runner.make<string>(s)
  33. const calls = yield* Ref.make(0)
  34. const work = Effect.gen(function* () {
  35. yield* Ref.update(calls, (n) => n + 1)
  36. yield* Effect.sleep("10 millis")
  37. return "shared"
  38. })
  39. const [a, b] = yield* Effect.all([runner.ensureRunning(work), runner.ensureRunning(work)], {
  40. concurrency: "unbounded",
  41. })
  42. expect(a).toBe("shared")
  43. expect(b).toBe("shared")
  44. expect(yield* Ref.get(calls)).toBe(1)
  45. }),
  46. )
  47. it.live(
  48. "concurrent callers all receive same error",
  49. Effect.gen(function* () {
  50. const s = yield* Scope.Scope
  51. const runner = Runner.make<string, string>(s)
  52. const work = Effect.gen(function* () {
  53. yield* Effect.sleep("10 millis")
  54. return yield* Effect.fail("boom")
  55. })
  56. const [a, b] = yield* Effect.all(
  57. [runner.ensureRunning(work).pipe(Effect.exit), runner.ensureRunning(work).pipe(Effect.exit)],
  58. { concurrency: "unbounded" },
  59. )
  60. expect(Exit.isFailure(a)).toBe(true)
  61. expect(Exit.isFailure(b)).toBe(true)
  62. }),
  63. )
  64. it.live(
  65. "ensureRunning can be called again after previous run completes",
  66. Effect.gen(function* () {
  67. const s = yield* Scope.Scope
  68. const runner = Runner.make<string>(s)
  69. expect(yield* runner.ensureRunning(Effect.succeed("first"))).toBe("first")
  70. expect(yield* runner.ensureRunning(Effect.succeed("second"))).toBe("second")
  71. }),
  72. )
  73. it.live(
  74. "second ensureRunning ignores new work if already running",
  75. Effect.gen(function* () {
  76. const s = yield* Scope.Scope
  77. const runner = Runner.make<string>(s)
  78. const ran = yield* Ref.make<string[]>([])
  79. const first = Effect.gen(function* () {
  80. yield* Ref.update(ran, (a) => [...a, "first"])
  81. yield* Effect.sleep("50 millis")
  82. return "first-result"
  83. })
  84. const second = Effect.gen(function* () {
  85. yield* Ref.update(ran, (a) => [...a, "second"])
  86. return "second-result"
  87. })
  88. const [a, b] = yield* Effect.all([runner.ensureRunning(first), runner.ensureRunning(second)], {
  89. concurrency: "unbounded",
  90. })
  91. expect(a).toBe("first-result")
  92. expect(b).toBe("first-result")
  93. expect(yield* Ref.get(ran)).toEqual(["first"])
  94. }),
  95. )
  96. // --- cancel semantics ---
  97. it.live(
  98. "cancel interrupts running work",
  99. Effect.gen(function* () {
  100. const s = yield* Scope.Scope
  101. const runner = Runner.make<string>(s)
  102. const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
  103. yield* Effect.sleep("10 millis")
  104. expect(runner.busy).toBe(true)
  105. expect(runner.state._tag).toBe("Running")
  106. yield* runner.cancel
  107. expect(runner.busy).toBe(false)
  108. const exit = yield* Fiber.await(fiber)
  109. expect(Exit.isFailure(exit)).toBe(true)
  110. }),
  111. )
  112. it.live(
  113. "cancel on idle is a no-op",
  114. Effect.gen(function* () {
  115. const s = yield* Scope.Scope
  116. const runner = Runner.make<string>(s)
  117. yield* runner.cancel
  118. expect(runner.busy).toBe(false)
  119. }),
  120. )
  121. it.live(
  122. "cancel with onInterrupt resolves callers gracefully",
  123. Effect.gen(function* () {
  124. const s = yield* Scope.Scope
  125. const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("fallback") })
  126. const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
  127. yield* Effect.sleep("10 millis")
  128. yield* runner.cancel
  129. const exit = yield* Fiber.await(fiber)
  130. expect(Exit.isSuccess(exit)).toBe(true)
  131. if (Exit.isSuccess(exit)) expect(exit.value).toBe("fallback")
  132. }),
  133. )
  134. it.live(
  135. "cancel with queued callers resolves all",
  136. Effect.gen(function* () {
  137. const s = yield* Scope.Scope
  138. const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("fallback") })
  139. const a = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
  140. yield* Effect.sleep("10 millis")
  141. const b = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
  142. yield* Effect.sleep("10 millis")
  143. yield* runner.cancel
  144. const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
  145. expect(Exit.isSuccess(exitA)).toBe(true)
  146. expect(Exit.isSuccess(exitB)).toBe(true)
  147. if (Exit.isSuccess(exitA)) expect(exitA.value).toBe("fallback")
  148. if (Exit.isSuccess(exitB)) expect(exitB.value).toBe("fallback")
  149. }),
  150. )
  151. it.live(
  152. "work can be started after cancel",
  153. Effect.gen(function* () {
  154. const s = yield* Scope.Scope
  155. const runner = Runner.make<string>(s)
  156. const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
  157. yield* Effect.sleep("10 millis")
  158. yield* runner.cancel
  159. yield* Fiber.await(fiber)
  160. const result = yield* runner.ensureRunning(Effect.succeed("after-cancel"))
  161. expect(result).toBe("after-cancel")
  162. }),
  163. )
  164. test("cancel does not deadlock when replacement work starts before interrupted run exits", async () => {
  165. function defer() {
  166. let resolve!: () => void
  167. const promise = new Promise<void>((done) => {
  168. resolve = done
  169. })
  170. return { promise, resolve }
  171. }
  172. function fail(ms: number, msg: string) {
  173. return new Promise<never>((_, reject) => {
  174. setTimeout(() => reject(new Error(msg)), ms)
  175. })
  176. }
  177. const s = await Effect.runPromise(Scope.make())
  178. const hit = defer()
  179. const hold = defer()
  180. const done = defer()
  181. try {
  182. const runner = Runner.make<string>(s)
  183. const first = Effect.never.pipe(
  184. Effect.onInterrupt(() => Effect.sync(() => hit.resolve())),
  185. Effect.ensuring(Effect.promise(() => hold.promise)),
  186. Effect.as("first"),
  187. )
  188. const a = Effect.runPromiseExit(runner.ensureRunning(first))
  189. await Bun.sleep(10)
  190. const stop = Effect.runPromise(runner.cancel)
  191. await Promise.race([hit.promise, fail(250, "cancel did not interrupt running work")])
  192. const b = Effect.runPromise(runner.ensureRunning(Effect.promise(() => done.promise).pipe(Effect.as("second"))))
  193. expect(runner.busy).toBe(true)
  194. hold.resolve()
  195. await Promise.race([stop, fail(250, "cancel deadlocked while replacement run was active")])
  196. expect(runner.busy).toBe(true)
  197. done.resolve()
  198. expect(await b).toBe("second")
  199. expect(runner.busy).toBe(false)
  200. const exit = await a
  201. expect(Exit.isFailure(exit)).toBe(true)
  202. } finally {
  203. hold.resolve()
  204. done.resolve()
  205. await Promise.race([Effect.runPromise(Scope.close(s, Exit.void)), fail(1000, "runner scope did not close")])
  206. }
  207. })
  208. // --- shell semantics ---
  209. it.live(
  210. "shell runs exclusively",
  211. Effect.gen(function* () {
  212. const s = yield* Scope.Scope
  213. const runner = Runner.make<string>(s)
  214. const result = yield* runner.startShell(Effect.succeed("shell-done"))
  215. expect(result).toBe("shell-done")
  216. expect(runner.busy).toBe(false)
  217. }),
  218. )
  219. it.live(
  220. "shell rejects when run is active",
  221. Effect.gen(function* () {
  222. const s = yield* Scope.Scope
  223. const runner = Runner.make<string>(s)
  224. const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
  225. yield* Effect.sleep("10 millis")
  226. const exit = yield* runner.startShell(Effect.succeed("nope")).pipe(Effect.exit)
  227. expect(Exit.isFailure(exit)).toBe(true)
  228. yield* runner.cancel
  229. yield* Fiber.await(fiber)
  230. }),
  231. )
  232. it.live(
  233. "shell rejects when another shell is running",
  234. Effect.gen(function* () {
  235. const s = yield* Scope.Scope
  236. const runner = Runner.make<string>(s)
  237. const gate = yield* Deferred.make<void>()
  238. const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("first"))).pipe(Effect.forkChild)
  239. yield* Effect.sleep("10 millis")
  240. const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
  241. expect(Exit.isFailure(exit)).toBe(true)
  242. yield* Deferred.succeed(gate, undefined)
  243. yield* Fiber.await(sh)
  244. }),
  245. )
  246. it.live(
  247. "shell rejects via busy callback and cancel still stops the first shell",
  248. Effect.gen(function* () {
  249. const s = yield* Scope.Scope
  250. const runner = Runner.make<string>(s, {
  251. busy: () => {
  252. throw new Error("busy")
  253. },
  254. })
  255. const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
  256. yield* Effect.sleep("10 millis")
  257. const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
  258. expect(Exit.isFailure(exit)).toBe(true)
  259. yield* runner.cancel
  260. const done = yield* Fiber.await(sh)
  261. expect(Exit.isFailure(done)).toBe(true)
  262. }),
  263. )
  264. it.live(
  265. "cancel interrupts shell",
  266. Effect.gen(function* () {
  267. const s = yield* Scope.Scope
  268. const runner = Runner.make<string>(s)
  269. const gate = yield* Deferred.make<void>()
  270. const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ignored"))).pipe(Effect.forkChild)
  271. yield* Effect.sleep("10 millis")
  272. const stop = yield* runner.cancel.pipe(Effect.forkChild)
  273. const stopExit = yield* Fiber.await(stop).pipe(Effect.timeout("250 millis"))
  274. expect(Exit.isSuccess(stopExit)).toBe(true)
  275. expect(runner.busy).toBe(false)
  276. const shellExit = yield* Fiber.await(sh)
  277. expect(Exit.isFailure(shellExit)).toBe(true)
  278. yield* Deferred.succeed(gate, undefined).pipe(Effect.ignore)
  279. }),
  280. )
  281. // --- shell→run handoff ---
  282. it.live(
  283. "ensureRunning queues behind shell then runs after",
  284. Effect.gen(function* () {
  285. const s = yield* Scope.Scope
  286. const runner = Runner.make<string>(s)
  287. const gate = yield* Deferred.make<void>()
  288. const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell-result"))).pipe(Effect.forkChild)
  289. yield* Effect.sleep("10 millis")
  290. expect(runner.state._tag).toBe("Shell")
  291. const run = yield* runner.ensureRunning(Effect.succeed("run-result")).pipe(Effect.forkChild)
  292. yield* Effect.sleep("10 millis")
  293. expect(runner.state._tag).toBe("ShellThenRun")
  294. yield* Deferred.succeed(gate, undefined)
  295. yield* Fiber.await(sh)
  296. const exit = yield* Fiber.await(run)
  297. expect(Exit.isSuccess(exit)).toBe(true)
  298. if (Exit.isSuccess(exit)) expect(exit.value).toBe("run-result")
  299. expect(runner.state._tag).toBe("Idle")
  300. }),
  301. )
  302. it.live(
  303. "multiple ensureRunning callers share the queued run behind shell",
  304. Effect.gen(function* () {
  305. const s = yield* Scope.Scope
  306. const runner = Runner.make<string>(s)
  307. const calls = yield* Ref.make(0)
  308. const gate = yield* Deferred.make<void>()
  309. const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell"))).pipe(Effect.forkChild)
  310. yield* Effect.sleep("10 millis")
  311. const work = Effect.gen(function* () {
  312. yield* Ref.update(calls, (n) => n + 1)
  313. return "run"
  314. })
  315. const a = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
  316. const b = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
  317. yield* Effect.sleep("10 millis")
  318. yield* Deferred.succeed(gate, undefined)
  319. yield* Fiber.await(sh)
  320. const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
  321. expect(Exit.isSuccess(exitA)).toBe(true)
  322. expect(Exit.isSuccess(exitB)).toBe(true)
  323. expect(yield* Ref.get(calls)).toBe(1)
  324. }),
  325. )
  326. it.live(
  327. "cancel during shell_then_run cancels both",
  328. Effect.gen(function* () {
  329. const s = yield* Scope.Scope
  330. const runner = Runner.make<string>(s)
  331. const gate = yield* Deferred.make<void>()
  332. const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
  333. yield* Effect.sleep("10 millis")
  334. const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
  335. yield* Effect.sleep("10 millis")
  336. expect(runner.state._tag).toBe("ShellThenRun")
  337. yield* runner.cancel
  338. expect(runner.busy).toBe(false)
  339. yield* Fiber.await(sh)
  340. const exit = yield* Fiber.await(run)
  341. expect(Exit.isFailure(exit)).toBe(true)
  342. }),
  343. )
  344. // --- lifecycle callbacks ---
  345. it.live(
  346. "onIdle fires when returning to idle from running",
  347. Effect.gen(function* () {
  348. const s = yield* Scope.Scope
  349. const count = yield* Ref.make(0)
  350. const runner = Runner.make<string>(s, {
  351. onIdle: Ref.update(count, (n) => n + 1),
  352. })
  353. yield* runner.ensureRunning(Effect.succeed("ok"))
  354. expect(yield* Ref.get(count)).toBe(1)
  355. }),
  356. )
  357. it.live(
  358. "onIdle fires on cancel",
  359. Effect.gen(function* () {
  360. const s = yield* Scope.Scope
  361. const count = yield* Ref.make(0)
  362. const runner = Runner.make<string>(s, {
  363. onIdle: Ref.update(count, (n) => n + 1),
  364. })
  365. const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
  366. yield* Effect.sleep("10 millis")
  367. yield* runner.cancel
  368. yield* Fiber.await(fiber)
  369. expect(yield* Ref.get(count)).toBeGreaterThanOrEqual(1)
  370. }),
  371. )
  372. it.live(
  373. "onBusy fires when shell starts",
  374. Effect.gen(function* () {
  375. const s = yield* Scope.Scope
  376. const count = yield* Ref.make(0)
  377. const runner = Runner.make<string>(s, {
  378. onBusy: Ref.update(count, (n) => n + 1),
  379. })
  380. yield* runner.startShell(Effect.succeed("done"))
  381. expect(yield* Ref.get(count)).toBe(1)
  382. }),
  383. )
  384. // --- busy flag ---
  385. it.live(
  386. "busy is true during run",
  387. Effect.gen(function* () {
  388. const s = yield* Scope.Scope
  389. const runner = Runner.make<string>(s)
  390. const gate = yield* Deferred.make<void>()
  391. const fiber = yield* runner.ensureRunning(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
  392. yield* Effect.sleep("10 millis")
  393. expect(runner.busy).toBe(true)
  394. yield* Deferred.succeed(gate, undefined)
  395. yield* Fiber.await(fiber)
  396. expect(runner.busy).toBe(false)
  397. }),
  398. )
  399. it.live(
  400. "busy is true during shell",
  401. Effect.gen(function* () {
  402. const s = yield* Scope.Scope
  403. const runner = Runner.make<string>(s)
  404. const gate = yield* Deferred.make<void>()
  405. const fiber = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
  406. yield* Effect.sleep("10 millis")
  407. expect(runner.busy).toBe(true)
  408. yield* Deferred.succeed(gate, undefined)
  409. yield* Fiber.await(fiber)
  410. expect(runner.busy).toBe(false)
  411. }),
  412. )
  413. })