bus-effect.test.ts 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
  2. import { describe, expect } from "bun:test"
  3. import { Deferred, Effect, Layer, Stream } from "effect"
  4. import z from "zod"
  5. import { Bus } from "../../src/bus"
  6. import { BusEvent } from "../../src/bus/bus-event"
  7. import { Instance } from "../../src/project/instance"
  8. import { provideInstance, provideTmpdirInstance, tmpdirScoped } from "../fixture/fixture"
  9. import { testEffect } from "../lib/effect"
  10. const TestEvent = {
  11. Ping: BusEvent.define("test.effect.ping", z.object({ value: z.number() })),
  12. Pong: BusEvent.define("test.effect.pong", z.object({ message: z.string() })),
  13. }
  14. const node = NodeChildProcessSpawner.layer.pipe(
  15. Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)),
  16. )
  17. const live = Layer.mergeAll(Bus.layer, node)
  18. const it = testEffect(live)
  19. describe("Bus (Effect-native)", () => {
  20. it.live("publish + subscribe stream delivers events", () =>
  21. provideTmpdirInstance(() =>
  22. Effect.gen(function* () {
  23. const bus = yield* Bus.Service
  24. const received: number[] = []
  25. const done = yield* Deferred.make<void>()
  26. yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
  27. Effect.sync(() => {
  28. received.push(evt.properties.value)
  29. if (received.length === 2) Deferred.doneUnsafe(done, Effect.void)
  30. }),
  31. ).pipe(Effect.forkScoped)
  32. yield* Effect.sleep("10 millis")
  33. yield* bus.publish(TestEvent.Ping, { value: 1 })
  34. yield* bus.publish(TestEvent.Ping, { value: 2 })
  35. yield* Deferred.await(done)
  36. expect(received).toEqual([1, 2])
  37. }),
  38. ),
  39. )
  40. it.live("subscribe filters by event type", () =>
  41. provideTmpdirInstance(() =>
  42. Effect.gen(function* () {
  43. const bus = yield* Bus.Service
  44. const pings: number[] = []
  45. const done = yield* Deferred.make<void>()
  46. yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
  47. Effect.sync(() => {
  48. pings.push(evt.properties.value)
  49. Deferred.doneUnsafe(done, Effect.void)
  50. }),
  51. ).pipe(Effect.forkScoped)
  52. yield* Effect.sleep("10 millis")
  53. yield* bus.publish(TestEvent.Pong, { message: "ignored" })
  54. yield* bus.publish(TestEvent.Ping, { value: 42 })
  55. yield* Deferred.await(done)
  56. expect(pings).toEqual([42])
  57. }),
  58. ),
  59. )
  60. it.live("subscribeAll receives all types", () =>
  61. provideTmpdirInstance(() =>
  62. Effect.gen(function* () {
  63. const bus = yield* Bus.Service
  64. const types: string[] = []
  65. const done = yield* Deferred.make<void>()
  66. yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
  67. Effect.sync(() => {
  68. types.push(evt.type)
  69. if (types.length === 2) Deferred.doneUnsafe(done, Effect.void)
  70. }),
  71. ).pipe(Effect.forkScoped)
  72. yield* Effect.sleep("10 millis")
  73. yield* bus.publish(TestEvent.Ping, { value: 1 })
  74. yield* bus.publish(TestEvent.Pong, { message: "hi" })
  75. yield* Deferred.await(done)
  76. expect(types).toContain("test.effect.ping")
  77. expect(types).toContain("test.effect.pong")
  78. }),
  79. ),
  80. )
  81. it.live("multiple subscribers each receive the event", () =>
  82. provideTmpdirInstance(() =>
  83. Effect.gen(function* () {
  84. const bus = yield* Bus.Service
  85. const a: number[] = []
  86. const b: number[] = []
  87. const doneA = yield* Deferred.make<void>()
  88. const doneB = yield* Deferred.make<void>()
  89. yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
  90. Effect.sync(() => {
  91. a.push(evt.properties.value)
  92. Deferred.doneUnsafe(doneA, Effect.void)
  93. }),
  94. ).pipe(Effect.forkScoped)
  95. yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
  96. Effect.sync(() => {
  97. b.push(evt.properties.value)
  98. Deferred.doneUnsafe(doneB, Effect.void)
  99. }),
  100. ).pipe(Effect.forkScoped)
  101. yield* Effect.sleep("10 millis")
  102. yield* bus.publish(TestEvent.Ping, { value: 99 })
  103. yield* Deferred.await(doneA)
  104. yield* Deferred.await(doneB)
  105. expect(a).toEqual([99])
  106. expect(b).toEqual([99])
  107. }),
  108. ),
  109. )
  110. it.live("subscribeAll stream sees InstanceDisposed on disposal", () =>
  111. Effect.gen(function* () {
  112. const dir = yield* tmpdirScoped()
  113. const types: string[] = []
  114. const seen = yield* Deferred.make<void>()
  115. const disposed = yield* Deferred.make<void>()
  116. // Set up subscriber inside the instance
  117. yield* Effect.gen(function* () {
  118. const bus = yield* Bus.Service
  119. yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
  120. Effect.sync(() => {
  121. types.push(evt.type)
  122. if (evt.type === TestEvent.Ping.type) Deferred.doneUnsafe(seen, Effect.void)
  123. if (evt.type === Bus.InstanceDisposed.type) Deferred.doneUnsafe(disposed, Effect.void)
  124. }),
  125. ).pipe(Effect.forkScoped)
  126. yield* Effect.sleep("10 millis")
  127. yield* bus.publish(TestEvent.Ping, { value: 1 })
  128. yield* Deferred.await(seen)
  129. }).pipe(provideInstance(dir))
  130. // Dispose from OUTSIDE the instance scope
  131. yield* Effect.promise(() => Instance.disposeAll())
  132. yield* Deferred.await(disposed).pipe(Effect.timeout("2 seconds"))
  133. expect(types).toContain("test.effect.ping")
  134. expect(types).toContain(Bus.InstanceDisposed.type)
  135. }),
  136. )
  137. })