| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
- import { describe, expect } from "bun:test"
- import { Deferred, Effect, Layer, Stream } from "effect"
- import z from "zod"
- import { Bus } from "../../src/bus"
- import { BusEvent } from "../../src/bus/bus-event"
- import { Instance } from "../../src/project/instance"
- import { provideInstance, provideTmpdirInstance, tmpdirScoped } from "../fixture/fixture"
- import { testEffect } from "../lib/effect"
- const TestEvent = {
- Ping: BusEvent.define("test.effect.ping", z.object({ value: z.number() })),
- Pong: BusEvent.define("test.effect.pong", z.object({ message: z.string() })),
- }
- const node = NodeChildProcessSpawner.layer.pipe(
- Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)),
- )
- const live = Layer.mergeAll(Bus.layer, node)
- const it = testEffect(live)
- describe("Bus (Effect-native)", () => {
- it.live("publish + subscribe stream delivers events", () =>
- provideTmpdirInstance(() =>
- Effect.gen(function* () {
- const bus = yield* Bus.Service
- const received: number[] = []
- const done = yield* Deferred.make<void>()
- yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
- Effect.sync(() => {
- received.push(evt.properties.value)
- if (received.length === 2) Deferred.doneUnsafe(done, Effect.void)
- }),
- ).pipe(Effect.forkScoped)
- yield* Effect.sleep("10 millis")
- yield* bus.publish(TestEvent.Ping, { value: 1 })
- yield* bus.publish(TestEvent.Ping, { value: 2 })
- yield* Deferred.await(done)
- expect(received).toEqual([1, 2])
- }),
- ),
- )
- it.live("subscribe filters by event type", () =>
- provideTmpdirInstance(() =>
- Effect.gen(function* () {
- const bus = yield* Bus.Service
- const pings: number[] = []
- const done = yield* Deferred.make<void>()
- yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
- Effect.sync(() => {
- pings.push(evt.properties.value)
- Deferred.doneUnsafe(done, Effect.void)
- }),
- ).pipe(Effect.forkScoped)
- yield* Effect.sleep("10 millis")
- yield* bus.publish(TestEvent.Pong, { message: "ignored" })
- yield* bus.publish(TestEvent.Ping, { value: 42 })
- yield* Deferred.await(done)
- expect(pings).toEqual([42])
- }),
- ),
- )
- it.live("subscribeAll receives all types", () =>
- provideTmpdirInstance(() =>
- Effect.gen(function* () {
- const bus = yield* Bus.Service
- const types: string[] = []
- const done = yield* Deferred.make<void>()
- yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
- Effect.sync(() => {
- types.push(evt.type)
- if (types.length === 2) Deferred.doneUnsafe(done, Effect.void)
- }),
- ).pipe(Effect.forkScoped)
- yield* Effect.sleep("10 millis")
- yield* bus.publish(TestEvent.Ping, { value: 1 })
- yield* bus.publish(TestEvent.Pong, { message: "hi" })
- yield* Deferred.await(done)
- expect(types).toContain("test.effect.ping")
- expect(types).toContain("test.effect.pong")
- }),
- ),
- )
- it.live("multiple subscribers each receive the event", () =>
- provideTmpdirInstance(() =>
- Effect.gen(function* () {
- const bus = yield* Bus.Service
- const a: number[] = []
- const b: number[] = []
- const doneA = yield* Deferred.make<void>()
- const doneB = yield* Deferred.make<void>()
- yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
- Effect.sync(() => {
- a.push(evt.properties.value)
- Deferred.doneUnsafe(doneA, Effect.void)
- }),
- ).pipe(Effect.forkScoped)
- yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
- Effect.sync(() => {
- b.push(evt.properties.value)
- Deferred.doneUnsafe(doneB, Effect.void)
- }),
- ).pipe(Effect.forkScoped)
- yield* Effect.sleep("10 millis")
- yield* bus.publish(TestEvent.Ping, { value: 99 })
- yield* Deferred.await(doneA)
- yield* Deferred.await(doneB)
- expect(a).toEqual([99])
- expect(b).toEqual([99])
- }),
- ),
- )
- it.live("subscribeAll stream sees InstanceDisposed on disposal", () =>
- Effect.gen(function* () {
- const dir = yield* tmpdirScoped()
- const types: string[] = []
- const seen = yield* Deferred.make<void>()
- const disposed = yield* Deferred.make<void>()
- // Set up subscriber inside the instance
- yield* Effect.gen(function* () {
- const bus = yield* Bus.Service
- yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
- Effect.sync(() => {
- types.push(evt.type)
- if (evt.type === TestEvent.Ping.type) Deferred.doneUnsafe(seen, Effect.void)
- if (evt.type === Bus.InstanceDisposed.type) Deferred.doneUnsafe(disposed, Effect.void)
- }),
- ).pipe(Effect.forkScoped)
- yield* Effect.sleep("10 millis")
- yield* bus.publish(TestEvent.Ping, { value: 1 })
- yield* Deferred.await(seen)
- }).pipe(provideInstance(dir))
- // Dispose from OUTSIDE the instance scope
- yield* Effect.promise(() => Instance.disposeAll())
- yield* Deferred.await(disposed).pipe(Effect.timeout("2 seconds"))
- expect(types).toContain("test.effect.ping")
- expect(types).toContain(Bus.InstanceDisposed.type)
- }),
- )
- })
|