bus.test.ts 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. import { afterEach, describe, expect, test } from "bun:test"
  2. import z from "zod"
  3. import { Bus } from "../../src/bus"
  4. import { BusEvent } from "../../src/bus/bus-event"
  5. import { Instance } from "../../src/project/instance"
  6. import { tmpdir } from "../fixture/fixture"
  7. const TestEvent = {
  8. Ping: BusEvent.define("test.ping", z.object({ value: z.number() })),
  9. Pong: BusEvent.define("test.pong", z.object({ message: z.string() })),
  10. }
  11. function withInstance(directory: string, fn: () => Promise<void>) {
  12. return Instance.provide({ directory, fn })
  13. }
  14. describe("Bus", () => {
  15. afterEach(() => Instance.disposeAll())
  16. describe("publish + subscribe", () => {
  17. test("subscriber is live immediately after subscribe returns", async () => {
  18. await using tmp = await tmpdir()
  19. const received: number[] = []
  20. await withInstance(tmp.path, async () => {
  21. Bus.subscribe(TestEvent.Ping, (evt) => {
  22. received.push(evt.properties.value)
  23. })
  24. await Bus.publish(TestEvent.Ping, { value: 42 })
  25. await Bun.sleep(10)
  26. })
  27. expect(received).toEqual([42])
  28. })
  29. test("subscriber receives matching events", async () => {
  30. await using tmp = await tmpdir()
  31. const received: number[] = []
  32. await withInstance(tmp.path, async () => {
  33. Bus.subscribe(TestEvent.Ping, (evt) => {
  34. received.push(evt.properties.value)
  35. })
  36. // Give the subscriber fiber time to start consuming
  37. await Bun.sleep(10)
  38. await Bus.publish(TestEvent.Ping, { value: 42 })
  39. await Bus.publish(TestEvent.Ping, { value: 99 })
  40. // Give subscriber time to process
  41. await Bun.sleep(10)
  42. })
  43. expect(received).toEqual([42, 99])
  44. })
  45. test("subscriber does not receive events of other types", async () => {
  46. await using tmp = await tmpdir()
  47. const pings: number[] = []
  48. await withInstance(tmp.path, async () => {
  49. Bus.subscribe(TestEvent.Ping, (evt) => {
  50. pings.push(evt.properties.value)
  51. })
  52. await Bun.sleep(10)
  53. await Bus.publish(TestEvent.Pong, { message: "hello" })
  54. await Bus.publish(TestEvent.Ping, { value: 1 })
  55. await Bun.sleep(10)
  56. })
  57. expect(pings).toEqual([1])
  58. })
  59. test("publish with no subscribers does not throw", async () => {
  60. await using tmp = await tmpdir()
  61. await withInstance(tmp.path, async () => {
  62. await Bus.publish(TestEvent.Ping, { value: 1 })
  63. })
  64. })
  65. })
  66. describe("unsubscribe", () => {
  67. test("unsubscribe stops delivery", async () => {
  68. await using tmp = await tmpdir()
  69. const received: number[] = []
  70. await withInstance(tmp.path, async () => {
  71. const unsub = Bus.subscribe(TestEvent.Ping, (evt) => {
  72. received.push(evt.properties.value)
  73. })
  74. await Bun.sleep(10)
  75. await Bus.publish(TestEvent.Ping, { value: 1 })
  76. await Bun.sleep(10)
  77. unsub()
  78. await Bun.sleep(10)
  79. await Bus.publish(TestEvent.Ping, { value: 2 })
  80. await Bun.sleep(10)
  81. })
  82. expect(received).toEqual([1])
  83. })
  84. })
  85. describe("subscribeAll", () => {
  86. test("subscribeAll is live immediately after subscribe returns", async () => {
  87. await using tmp = await tmpdir()
  88. const received: string[] = []
  89. await withInstance(tmp.path, async () => {
  90. Bus.subscribeAll((evt) => {
  91. received.push(evt.type)
  92. })
  93. await Bus.publish(TestEvent.Ping, { value: 1 })
  94. await Bun.sleep(10)
  95. })
  96. expect(received).toEqual(["test.ping"])
  97. })
  98. test("receives all event types", async () => {
  99. await using tmp = await tmpdir()
  100. const received: string[] = []
  101. await withInstance(tmp.path, async () => {
  102. Bus.subscribeAll((evt) => {
  103. received.push(evt.type)
  104. })
  105. await Bun.sleep(10)
  106. await Bus.publish(TestEvent.Ping, { value: 1 })
  107. await Bus.publish(TestEvent.Pong, { message: "hi" })
  108. await Bun.sleep(10)
  109. })
  110. expect(received).toContain("test.ping")
  111. expect(received).toContain("test.pong")
  112. })
  113. })
  114. describe("multiple subscribers", () => {
  115. test("all subscribers for same event type are called", async () => {
  116. await using tmp = await tmpdir()
  117. const a: number[] = []
  118. const b: number[] = []
  119. await withInstance(tmp.path, async () => {
  120. Bus.subscribe(TestEvent.Ping, (evt) => {
  121. a.push(evt.properties.value)
  122. })
  123. Bus.subscribe(TestEvent.Ping, (evt) => {
  124. b.push(evt.properties.value)
  125. })
  126. await Bun.sleep(10)
  127. await Bus.publish(TestEvent.Ping, { value: 7 })
  128. await Bun.sleep(10)
  129. })
  130. expect(a).toEqual([7])
  131. expect(b).toEqual([7])
  132. })
  133. })
  134. describe("instance isolation", () => {
  135. test("events in one directory do not reach subscribers in another", async () => {
  136. await using tmpA = await tmpdir()
  137. await using tmpB = await tmpdir()
  138. const receivedA: number[] = []
  139. const receivedB: number[] = []
  140. await withInstance(tmpA.path, async () => {
  141. Bus.subscribe(TestEvent.Ping, (evt) => {
  142. receivedA.push(evt.properties.value)
  143. })
  144. await Bun.sleep(10)
  145. })
  146. await withInstance(tmpB.path, async () => {
  147. Bus.subscribe(TestEvent.Ping, (evt) => {
  148. receivedB.push(evt.properties.value)
  149. })
  150. await Bun.sleep(10)
  151. })
  152. await withInstance(tmpA.path, async () => {
  153. await Bus.publish(TestEvent.Ping, { value: 1 })
  154. await Bun.sleep(10)
  155. })
  156. await withInstance(tmpB.path, async () => {
  157. await Bus.publish(TestEvent.Ping, { value: 2 })
  158. await Bun.sleep(10)
  159. })
  160. expect(receivedA).toEqual([1])
  161. expect(receivedB).toEqual([2])
  162. })
  163. })
  164. describe("instance disposal", () => {
  165. test("InstanceDisposed is delivered to wildcard subscribers before stream ends", async () => {
  166. await using tmp = await tmpdir()
  167. const received: string[] = []
  168. await withInstance(tmp.path, async () => {
  169. Bus.subscribeAll((evt) => {
  170. received.push(evt.type)
  171. })
  172. await Bun.sleep(10)
  173. await Bus.publish(TestEvent.Ping, { value: 1 })
  174. await Bun.sleep(10)
  175. })
  176. // Instance.disposeAll triggers the finalizer which publishes InstanceDisposed
  177. await Instance.disposeAll()
  178. await Bun.sleep(50)
  179. expect(received).toContain("test.ping")
  180. expect(received).toContain(Bus.InstanceDisposed.type)
  181. })
  182. })
  183. })