| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495 |
- import { describe, expect, test } from "bun:test"
- import { Deferred, Effect, Exit, Fiber, Ref, Scope } from "effect"
- import { Runner } from "../../src/effect"
- import { it } from "../lib/effect"
- describe("Runner", () => {
- // --- ensureRunning semantics ---
- it.live(
- "ensureRunning starts work and returns result",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const result = yield* runner.ensureRunning(Effect.succeed("hello"))
- expect(result).toBe("hello")
- expect(runner.state._tag).toBe("Idle")
- expect(runner.busy).toBe(false)
- }),
- )
- it.live(
- "ensureRunning propagates work failures",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string, string>(s)
- const exit = yield* runner.ensureRunning(Effect.fail("boom")).pipe(Effect.exit)
- expect(Exit.isFailure(exit)).toBe(true)
- expect(runner.state._tag).toBe("Idle")
- }),
- )
- it.live(
- "concurrent callers share the same run",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const calls = yield* Ref.make(0)
- const work = Effect.gen(function* () {
- yield* Ref.update(calls, (n) => n + 1)
- yield* Effect.sleep("10 millis")
- return "shared"
- })
- const [a, b] = yield* Effect.all([runner.ensureRunning(work), runner.ensureRunning(work)], {
- concurrency: "unbounded",
- })
- expect(a).toBe("shared")
- expect(b).toBe("shared")
- expect(yield* Ref.get(calls)).toBe(1)
- }),
- )
- it.live(
- "concurrent callers all receive same error",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string, string>(s)
- const work = Effect.gen(function* () {
- yield* Effect.sleep("10 millis")
- return yield* Effect.fail("boom")
- })
- const [a, b] = yield* Effect.all(
- [runner.ensureRunning(work).pipe(Effect.exit), runner.ensureRunning(work).pipe(Effect.exit)],
- { concurrency: "unbounded" },
- )
- expect(Exit.isFailure(a)).toBe(true)
- expect(Exit.isFailure(b)).toBe(true)
- }),
- )
- it.live(
- "ensureRunning can be called again after previous run completes",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- expect(yield* runner.ensureRunning(Effect.succeed("first"))).toBe("first")
- expect(yield* runner.ensureRunning(Effect.succeed("second"))).toBe("second")
- }),
- )
- it.live(
- "second ensureRunning ignores new work if already running",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const ran = yield* Ref.make<string[]>([])
- const first = Effect.gen(function* () {
- yield* Ref.update(ran, (a) => [...a, "first"])
- yield* Effect.sleep("50 millis")
- return "first-result"
- })
- const second = Effect.gen(function* () {
- yield* Ref.update(ran, (a) => [...a, "second"])
- return "second-result"
- })
- const [a, b] = yield* Effect.all([runner.ensureRunning(first), runner.ensureRunning(second)], {
- concurrency: "unbounded",
- })
- expect(a).toBe("first-result")
- expect(b).toBe("first-result")
- expect(yield* Ref.get(ran)).toEqual(["first"])
- }),
- )
- // --- cancel semantics ---
- it.live(
- "cancel interrupts running work",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- expect(runner.busy).toBe(true)
- expect(runner.state._tag).toBe("Running")
- yield* runner.cancel
- expect(runner.busy).toBe(false)
- const exit = yield* Fiber.await(fiber)
- expect(Exit.isFailure(exit)).toBe(true)
- }),
- )
- it.live(
- "cancel on idle is a no-op",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- yield* runner.cancel
- expect(runner.busy).toBe(false)
- }),
- )
- it.live(
- "cancel with onInterrupt resolves callers gracefully",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("fallback") })
- const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- yield* runner.cancel
- const exit = yield* Fiber.await(fiber)
- expect(Exit.isSuccess(exit)).toBe(true)
- if (Exit.isSuccess(exit)) expect(exit.value).toBe("fallback")
- }),
- )
- it.live(
- "cancel with queued callers resolves all",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("fallback") })
- const a = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- const b = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- yield* runner.cancel
- const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
- expect(Exit.isSuccess(exitA)).toBe(true)
- expect(Exit.isSuccess(exitB)).toBe(true)
- if (Exit.isSuccess(exitA)) expect(exitA.value).toBe("fallback")
- if (Exit.isSuccess(exitB)) expect(exitB.value).toBe("fallback")
- }),
- )
- it.live(
- "work can be started after cancel",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- yield* runner.cancel
- yield* Fiber.await(fiber)
- const result = yield* runner.ensureRunning(Effect.succeed("after-cancel"))
- expect(result).toBe("after-cancel")
- }),
- )
- test("cancel does not deadlock when replacement work starts before interrupted run exits", async () => {
- function defer() {
- let resolve!: () => void
- const promise = new Promise<void>((done) => {
- resolve = done
- })
- return { promise, resolve }
- }
- function fail(ms: number, msg: string) {
- return new Promise<never>((_, reject) => {
- setTimeout(() => reject(new Error(msg)), ms)
- })
- }
- const s = await Effect.runPromise(Scope.make())
- const hit = defer()
- const hold = defer()
- const done = defer()
- try {
- const runner = Runner.make<string>(s)
- const first = Effect.never.pipe(
- Effect.onInterrupt(() => Effect.sync(() => hit.resolve())),
- Effect.ensuring(Effect.promise(() => hold.promise)),
- Effect.as("first"),
- )
- const a = Effect.runPromiseExit(runner.ensureRunning(first))
- await Bun.sleep(10)
- const stop = Effect.runPromise(runner.cancel)
- await Promise.race([hit.promise, fail(250, "cancel did not interrupt running work")])
- const b = Effect.runPromise(runner.ensureRunning(Effect.promise(() => done.promise).pipe(Effect.as("second"))))
- expect(runner.busy).toBe(true)
- hold.resolve()
- await Promise.race([stop, fail(250, "cancel deadlocked while replacement run was active")])
- expect(runner.busy).toBe(true)
- done.resolve()
- expect(await b).toBe("second")
- expect(runner.busy).toBe(false)
- const exit = await a
- expect(Exit.isFailure(exit)).toBe(true)
- } finally {
- hold.resolve()
- done.resolve()
- await Promise.race([Effect.runPromise(Scope.close(s, Exit.void)), fail(1000, "runner scope did not close")])
- }
- })
- // --- shell semantics ---
- it.live(
- "shell runs exclusively",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const result = yield* runner.startShell(Effect.succeed("shell-done"))
- expect(result).toBe("shell-done")
- expect(runner.busy).toBe(false)
- }),
- )
- it.live(
- "shell rejects when run is active",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- const exit = yield* runner.startShell(Effect.succeed("nope")).pipe(Effect.exit)
- expect(Exit.isFailure(exit)).toBe(true)
- yield* runner.cancel
- yield* Fiber.await(fiber)
- }),
- )
- it.live(
- "shell rejects when another shell is running",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const gate = yield* Deferred.make<void>()
- const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("first"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
- expect(Exit.isFailure(exit)).toBe(true)
- yield* Deferred.succeed(gate, undefined)
- yield* Fiber.await(sh)
- }),
- )
- it.live(
- "shell rejects via busy callback and cancel still stops the first shell",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s, {
- busy: () => {
- throw new Error("busy")
- },
- })
- const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
- expect(Exit.isFailure(exit)).toBe(true)
- yield* runner.cancel
- const done = yield* Fiber.await(sh)
- expect(Exit.isFailure(done)).toBe(true)
- }),
- )
- it.live(
- "cancel interrupts shell",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const gate = yield* Deferred.make<void>()
- const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ignored"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- const stop = yield* runner.cancel.pipe(Effect.forkChild)
- const stopExit = yield* Fiber.await(stop).pipe(Effect.timeout("250 millis"))
- expect(Exit.isSuccess(stopExit)).toBe(true)
- expect(runner.busy).toBe(false)
- const shellExit = yield* Fiber.await(sh)
- expect(Exit.isFailure(shellExit)).toBe(true)
- yield* Deferred.succeed(gate, undefined).pipe(Effect.ignore)
- }),
- )
- // --- shell→run handoff ---
- it.live(
- "ensureRunning queues behind shell then runs after",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const gate = yield* Deferred.make<void>()
- const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell-result"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- expect(runner.state._tag).toBe("Shell")
- const run = yield* runner.ensureRunning(Effect.succeed("run-result")).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- expect(runner.state._tag).toBe("ShellThenRun")
- yield* Deferred.succeed(gate, undefined)
- yield* Fiber.await(sh)
- const exit = yield* Fiber.await(run)
- expect(Exit.isSuccess(exit)).toBe(true)
- if (Exit.isSuccess(exit)) expect(exit.value).toBe("run-result")
- expect(runner.state._tag).toBe("Idle")
- }),
- )
- it.live(
- "multiple ensureRunning callers share the queued run behind shell",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const calls = yield* Ref.make(0)
- const gate = yield* Deferred.make<void>()
- const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- const work = Effect.gen(function* () {
- yield* Ref.update(calls, (n) => n + 1)
- return "run"
- })
- const a = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
- const b = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- yield* Deferred.succeed(gate, undefined)
- yield* Fiber.await(sh)
- const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
- expect(Exit.isSuccess(exitA)).toBe(true)
- expect(Exit.isSuccess(exitB)).toBe(true)
- expect(yield* Ref.get(calls)).toBe(1)
- }),
- )
- it.live(
- "cancel during shell_then_run cancels both",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const gate = yield* Deferred.make<void>()
- const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- expect(runner.state._tag).toBe("ShellThenRun")
- yield* runner.cancel
- expect(runner.busy).toBe(false)
- yield* Fiber.await(sh)
- const exit = yield* Fiber.await(run)
- expect(Exit.isFailure(exit)).toBe(true)
- }),
- )
- // --- lifecycle callbacks ---
- it.live(
- "onIdle fires when returning to idle from running",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const count = yield* Ref.make(0)
- const runner = Runner.make<string>(s, {
- onIdle: Ref.update(count, (n) => n + 1),
- })
- yield* runner.ensureRunning(Effect.succeed("ok"))
- expect(yield* Ref.get(count)).toBe(1)
- }),
- )
- it.live(
- "onIdle fires on cancel",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const count = yield* Ref.make(0)
- const runner = Runner.make<string>(s, {
- onIdle: Ref.update(count, (n) => n + 1),
- })
- const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- yield* runner.cancel
- yield* Fiber.await(fiber)
- expect(yield* Ref.get(count)).toBeGreaterThanOrEqual(1)
- }),
- )
- it.live(
- "onBusy fires when shell starts",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const count = yield* Ref.make(0)
- const runner = Runner.make<string>(s, {
- onBusy: Ref.update(count, (n) => n + 1),
- })
- yield* runner.startShell(Effect.succeed("done"))
- expect(yield* Ref.get(count)).toBe(1)
- }),
- )
- // --- busy flag ---
- it.live(
- "busy is true during run",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const gate = yield* Deferred.make<void>()
- const fiber = yield* runner.ensureRunning(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- expect(runner.busy).toBe(true)
- yield* Deferred.succeed(gate, undefined)
- yield* Fiber.await(fiber)
- expect(runner.busy).toBe(false)
- }),
- )
- it.live(
- "busy is true during shell",
- Effect.gen(function* () {
- const s = yield* Scope.Scope
- const runner = Runner.make<string>(s)
- const gate = yield* Deferred.make<void>()
- const fiber = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
- yield* Effect.sleep("10 millis")
- expect(runner.busy).toBe(true)
- yield* Deferred.succeed(gate, undefined)
- yield* Fiber.await(fiber)
- expect(runner.busy).toBe(false)
- }),
- )
- })
|