| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482 |
- import { afterEach, expect, test } from "bun:test"
- import { Cause, Deferred, Duration, Effect, Exit, Fiber, Layer, ManagedRuntime, ServiceMap } from "effect"
- import { InstanceState } from "../../src/effect/instance-state"
- import { InstanceRef } from "../../src/effect/instance-ref"
- import { Instance } from "../../src/project/instance"
- import { tmpdir } from "../fixture/fixture"
- async function access<A, E>(state: InstanceState<A, E>, dir: string) {
- return Instance.provide({
- directory: dir,
- fn: () => Effect.runPromise(InstanceState.get(state)),
- })
- }
- afterEach(async () => {
- await Instance.disposeAll()
- })
- test("InstanceState caches values per directory", async () => {
- await using tmp = await tmpdir()
- let n = 0
- await Effect.runPromise(
- Effect.scoped(
- Effect.gen(function* () {
- const state = yield* InstanceState.make(() => Effect.sync(() => ({ n: ++n })))
- const a = yield* Effect.promise(() => access(state, tmp.path))
- const b = yield* Effect.promise(() => access(state, tmp.path))
- expect(a).toBe(b)
- expect(n).toBe(1)
- }),
- ),
- )
- })
- test("InstanceState isolates directories", async () => {
- await using one = await tmpdir()
- await using two = await tmpdir()
- let n = 0
- await Effect.runPromise(
- Effect.scoped(
- Effect.gen(function* () {
- const state = yield* InstanceState.make((dir) => Effect.sync(() => ({ dir, n: ++n })))
- const a = yield* Effect.promise(() => access(state, one.path))
- const b = yield* Effect.promise(() => access(state, two.path))
- const c = yield* Effect.promise(() => access(state, one.path))
- expect(a).toBe(c)
- expect(a).not.toBe(b)
- expect(n).toBe(2)
- }),
- ),
- )
- })
- test("InstanceState invalidates on reload", async () => {
- await using tmp = await tmpdir()
- const seen: string[] = []
- let n = 0
- await Effect.runPromise(
- Effect.scoped(
- Effect.gen(function* () {
- const state = yield* InstanceState.make(() =>
- Effect.acquireRelease(
- Effect.sync(() => ({ n: ++n })),
- (value) =>
- Effect.sync(() => {
- seen.push(String(value.n))
- }),
- ),
- )
- const a = yield* Effect.promise(() => access(state, tmp.path))
- yield* Effect.promise(() => Instance.reload({ directory: tmp.path }))
- const b = yield* Effect.promise(() => access(state, tmp.path))
- expect(a).not.toBe(b)
- expect(seen).toEqual(["1"])
- }),
- ),
- )
- })
- test("InstanceState invalidates on disposeAll", async () => {
- await using one = await tmpdir()
- await using two = await tmpdir()
- const seen: string[] = []
- await Effect.runPromise(
- Effect.scoped(
- Effect.gen(function* () {
- const state = yield* InstanceState.make((ctx) =>
- Effect.acquireRelease(
- Effect.sync(() => ({ dir: ctx.directory })),
- (value) =>
- Effect.sync(() => {
- seen.push(value.dir)
- }),
- ),
- )
- yield* Effect.promise(() => access(state, one.path))
- yield* Effect.promise(() => access(state, two.path))
- yield* Effect.promise(() => Instance.disposeAll())
- expect(seen.sort()).toEqual([one.path, two.path].sort())
- }),
- ),
- )
- })
- test("InstanceState.get reads the current directory lazily", async () => {
- await using one = await tmpdir()
- await using two = await tmpdir()
- interface Api {
- readonly get: () => Effect.Effect<string>
- }
- class Test extends ServiceMap.Service<Test, Api>()("@test/InstanceStateLazy") {
- static readonly layer = Layer.effect(
- Test,
- Effect.gen(function* () {
- const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
- const get = InstanceState.get(state)
- return Test.of({
- get: Effect.fn("Test.get")(function* () {
- return yield* get
- }),
- })
- }),
- )
- }
- const rt = ManagedRuntime.make(Test.layer)
- try {
- const a = await Instance.provide({
- directory: one.path,
- fn: () => rt.runPromise(Test.use((svc) => svc.get())),
- })
- const b = await Instance.provide({
- directory: two.path,
- fn: () => rt.runPromise(Test.use((svc) => svc.get())),
- })
- expect(a).toBe(one.path)
- expect(b).toBe(two.path)
- } finally {
- await rt.dispose()
- }
- })
- test("InstanceState preserves directory across async boundaries", async () => {
- await using one = await tmpdir({ git: true })
- await using two = await tmpdir({ git: true })
- await using three = await tmpdir({ git: true })
- interface Api {
- readonly get: () => Effect.Effect<{ directory: string; worktree: string; project: string }>
- }
- class Test extends ServiceMap.Service<Test, Api>()("@test/InstanceStateAsync") {
- static readonly layer = Layer.effect(
- Test,
- Effect.gen(function* () {
- const state = yield* InstanceState.make((ctx) =>
- Effect.sync(() => ({
- directory: ctx.directory,
- worktree: ctx.worktree,
- project: ctx.project.id,
- })),
- )
- return Test.of({
- get: Effect.fn("Test.get")(function* () {
- yield* Effect.promise(() => Bun.sleep(1))
- yield* Effect.sleep(Duration.millis(1))
- for (let i = 0; i < 100; i++) {
- yield* Effect.yieldNow
- }
- for (let i = 0; i < 100; i++) {
- yield* Effect.promise(() => Promise.resolve())
- }
- yield* Effect.sleep(Duration.millis(2))
- yield* Effect.promise(() => Bun.sleep(1))
- return yield* InstanceState.get(state)
- }),
- })
- }),
- )
- }
- const rt = ManagedRuntime.make(Test.layer)
- try {
- const [a, b, c] = await Promise.all([
- Instance.provide({
- directory: one.path,
- fn: () => rt.runPromise(Test.use((svc) => svc.get())),
- }),
- Instance.provide({
- directory: two.path,
- fn: () => rt.runPromise(Test.use((svc) => svc.get())),
- }),
- Instance.provide({
- directory: three.path,
- fn: () => rt.runPromise(Test.use((svc) => svc.get())),
- }),
- ])
- expect(a).toEqual({ directory: one.path, worktree: one.path, project: a.project })
- expect(b).toEqual({ directory: two.path, worktree: two.path, project: b.project })
- expect(c).toEqual({ directory: three.path, worktree: three.path, project: c.project })
- expect(a.project).not.toBe(b.project)
- expect(a.project).not.toBe(c.project)
- expect(b.project).not.toBe(c.project)
- } finally {
- await rt.dispose()
- }
- })
- test("InstanceState survives high-contention concurrent access", async () => {
- const N = 20
- const dirs = await Promise.all(Array.from({ length: N }, () => tmpdir()))
- interface Api {
- readonly get: () => Effect.Effect<string>
- }
- class Test extends ServiceMap.Service<Test, Api>()("@test/HighContention") {
- static readonly layer = Layer.effect(
- Test,
- Effect.gen(function* () {
- const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
- return Test.of({
- get: Effect.fn("Test.get")(function* () {
- // Interleave many async hops to maximize chance of ALS corruption
- for (let i = 0; i < 10; i++) {
- yield* Effect.promise(() => Bun.sleep(Math.random() * 3))
- yield* Effect.yieldNow
- yield* Effect.promise(() => Promise.resolve())
- }
- return yield* InstanceState.get(state)
- }),
- })
- }),
- )
- }
- const rt = ManagedRuntime.make(Test.layer)
- try {
- const results = await Promise.all(
- dirs.map((d) =>
- Instance.provide({
- directory: d.path,
- fn: () => rt.runPromise(Test.use((svc) => svc.get())),
- }),
- ),
- )
- for (let i = 0; i < N; i++) {
- expect(results[i]).toBe(dirs[i].path)
- }
- } finally {
- await rt.dispose()
- for (const d of dirs) await d[Symbol.asyncDispose]()
- }
- })
- test("InstanceState correct after interleaved init and dispose", async () => {
- await using one = await tmpdir()
- await using two = await tmpdir()
- interface Api {
- readonly get: () => Effect.Effect<string>
- }
- class Test extends ServiceMap.Service<Test, Api>()("@test/InterleavedDispose") {
- static readonly layer = Layer.effect(
- Test,
- Effect.gen(function* () {
- const state = yield* InstanceState.make((ctx) =>
- Effect.promise(async () => {
- await Bun.sleep(5) // slow init
- return ctx.directory
- }),
- )
- return Test.of({
- get: Effect.fn("Test.get")(function* () {
- return yield* InstanceState.get(state)
- }),
- })
- }),
- )
- }
- const rt = ManagedRuntime.make(Test.layer)
- try {
- // Init both directories
- const a = await Instance.provide({
- directory: one.path,
- fn: () => rt.runPromise(Test.use((svc) => svc.get())),
- })
- expect(a).toBe(one.path)
- // Dispose one directory, access the other concurrently
- const [, b] = await Promise.all([
- Instance.reload({ directory: one.path }),
- Instance.provide({
- directory: two.path,
- fn: () => rt.runPromise(Test.use((svc) => svc.get())),
- }),
- ])
- expect(b).toBe(two.path)
- // Re-access disposed directory - should get fresh state
- const c = await Instance.provide({
- directory: one.path,
- fn: () => rt.runPromise(Test.use((svc) => svc.get())),
- })
- expect(c).toBe(one.path)
- } finally {
- await rt.dispose()
- }
- })
- test("InstanceState mutation in one directory does not leak to another", async () => {
- await using one = await tmpdir()
- await using two = await tmpdir()
- await Effect.runPromise(
- Effect.scoped(
- Effect.gen(function* () {
- const state = yield* InstanceState.make(() => Effect.sync(() => ({ count: 0 })))
- // Mutate state in directory one
- const s1 = yield* Effect.promise(() => access(state, one.path))
- s1.count = 42
- // Access directory two — should be independent
- const s2 = yield* Effect.promise(() => access(state, two.path))
- expect(s2.count).toBe(0)
- // Confirm directory one still has the mutation
- const s1again = yield* Effect.promise(() => access(state, one.path))
- expect(s1again.count).toBe(42)
- expect(s1again).toBe(s1) // same reference
- }),
- ),
- )
- })
- test("InstanceState dedupes concurrent lookups", async () => {
- await using tmp = await tmpdir()
- let n = 0
- await Effect.runPromise(
- Effect.scoped(
- Effect.gen(function* () {
- const state = yield* InstanceState.make(() =>
- Effect.promise(async () => {
- n += 1
- await Bun.sleep(10)
- return { n }
- }),
- )
- const [a, b] = yield* Effect.promise(() => Promise.all([access(state, tmp.path), access(state, tmp.path)]))
- expect(a).toBe(b)
- expect(n).toBe(1)
- }),
- ),
- )
- })
- test("InstanceState survives deferred resume from the same instance context", async () => {
- await using tmp = await tmpdir({ git: true })
- interface Api {
- readonly get: (gate: Deferred.Deferred<void>) => Effect.Effect<string>
- }
- class Test extends ServiceMap.Service<Test, Api>()("@test/DeferredResume") {
- static readonly layer = Layer.effect(
- Test,
- Effect.gen(function* () {
- const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
- return Test.of({
- get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred<void>) {
- yield* Deferred.await(gate)
- return yield* InstanceState.get(state)
- }),
- })
- }),
- )
- }
- const rt = ManagedRuntime.make(Test.layer)
- try {
- const gate = await Effect.runPromise(Deferred.make<void>())
- const fiber = await Instance.provide({
- directory: tmp.path,
- fn: () => Promise.resolve(rt.runFork(Test.use((svc) => svc.get(gate)))),
- })
- await Instance.provide({
- directory: tmp.path,
- fn: () => Effect.runPromise(Deferred.succeed(gate, void 0)),
- })
- const exit = await Effect.runPromise(Fiber.await(fiber))
- expect(Exit.isSuccess(exit)).toBe(true)
- if (Exit.isSuccess(exit)) {
- expect(exit.value).toBe(tmp.path)
- }
- } finally {
- await rt.dispose()
- }
- })
- test("InstanceState survives deferred resume outside ALS when InstanceRef is set", async () => {
- await using tmp = await tmpdir({ git: true })
- interface Api {
- readonly get: (gate: Deferred.Deferred<void>) => Effect.Effect<string>
- }
- class Test extends ServiceMap.Service<Test, Api>()("@test/DeferredResumeOutside") {
- static readonly layer = Layer.effect(
- Test,
- Effect.gen(function* () {
- const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
- return Test.of({
- get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred<void>) {
- yield* Deferred.await(gate)
- return yield* InstanceState.get(state)
- }),
- })
- }),
- )
- }
- const rt = ManagedRuntime.make(Test.layer)
- try {
- const gate = await Effect.runPromise(Deferred.make<void>())
- // Provide InstanceRef so the fiber carries the context even when
- // the deferred is resolved from outside Instance.provide ALS.
- const fiber = await Instance.provide({
- directory: tmp.path,
- fn: () =>
- Promise.resolve(
- rt.runFork(Test.use((svc) => svc.get(gate)).pipe(Effect.provideService(InstanceRef, Instance.current))),
- ),
- })
- // Resume from outside any Instance.provide — ALS is NOT set here
- await Effect.runPromise(Deferred.succeed(gate, void 0))
- const exit = await Effect.runPromise(Fiber.await(fiber))
- expect(Exit.isSuccess(exit)).toBe(true)
- if (Exit.isSuccess(exit)) {
- expect(exit.value).toBe(tmp.path)
- }
- } finally {
- await rt.dispose()
- }
- })
|