index.test.ts 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test"
  2. import { tmpdir } from "../fixture/fixture"
  3. import z from "zod"
  4. import { Bus } from "../../src/bus"
  5. import { Instance } from "../../src/project/instance"
  6. import { SyncEvent } from "../../src/sync"
  7. import { Database } from "../../src/storage/db"
  8. import { EventTable } from "../../src/sync/event.sql"
  9. import { Identifier } from "../../src/id/id"
  10. import { Flag } from "../../src/flag/flag"
  11. import { initProjectors } from "../../src/server/projectors"
  12. const original = Flag.KILO_EXPERIMENTAL_WORKSPACES
  13. beforeEach(() => {
  14. Database.close()
  15. // @ts-expect-error don't do this normally, but it works
  16. Flag.KILO_EXPERIMENTAL_WORKSPACES = true
  17. })
  18. afterEach(() => {
  19. // @ts-expect-error don't do this normally, but it works
  20. Flag.KILO_EXPERIMENTAL_WORKSPACES = original
  21. })
  22. function withInstance(fn: () => void | Promise<void>) {
  23. return async () => {
  24. await using tmp = await tmpdir()
  25. await Instance.provide({
  26. directory: tmp.path,
  27. fn: async () => {
  28. await fn()
  29. },
  30. })
  31. }
  32. }
  33. describe("SyncEvent", () => {
  34. function setup() {
  35. SyncEvent.reset()
  36. const Created = SyncEvent.define({
  37. type: "item.created",
  38. version: 1,
  39. aggregate: "id",
  40. schema: z.object({ id: z.string(), name: z.string() }),
  41. })
  42. const Sent = SyncEvent.define({
  43. type: "item.sent",
  44. version: 1,
  45. aggregate: "item_id",
  46. schema: z.object({ item_id: z.string(), to: z.string() }),
  47. })
  48. SyncEvent.init({
  49. projectors: [SyncEvent.project(Created, () => {}), SyncEvent.project(Sent, () => {})],
  50. })
  51. return { Created, Sent }
  52. }
  53. afterAll(() => {
  54. SyncEvent.reset()
  55. initProjectors()
  56. })
  57. describe("run", () => {
  58. test(
  59. "inserts event row",
  60. withInstance(() => {
  61. const { Created } = setup()
  62. SyncEvent.run(Created, { id: "evt_1", name: "first" })
  63. const rows = Database.use((db) => db.select().from(EventTable).all())
  64. expect(rows).toHaveLength(1)
  65. expect(rows[0].type).toBe("item.created.1")
  66. expect(rows[0].aggregate_id).toBe("evt_1")
  67. }),
  68. )
  69. test(
  70. "increments seq per aggregate",
  71. withInstance(() => {
  72. const { Created } = setup()
  73. SyncEvent.run(Created, { id: "evt_1", name: "first" })
  74. SyncEvent.run(Created, { id: "evt_1", name: "second" })
  75. const rows = Database.use((db) => db.select().from(EventTable).all())
  76. expect(rows).toHaveLength(2)
  77. expect(rows[1].seq).toBe(rows[0].seq + 1)
  78. }),
  79. )
  80. test(
  81. "uses custom aggregate field from agg()",
  82. withInstance(() => {
  83. const { Sent } = setup()
  84. SyncEvent.run(Sent, { item_id: "evt_1", to: "james" })
  85. const rows = Database.use((db) => db.select().from(EventTable).all())
  86. expect(rows).toHaveLength(1)
  87. expect(rows[0].aggregate_id).toBe("evt_1")
  88. }),
  89. )
  90. test(
  91. "emits events",
  92. withInstance(async () => {
  93. const { Created } = setup()
  94. const events: Array<{
  95. type: string
  96. properties: { id: string; name: string }
  97. }> = []
  98. const received = new Promise<void>((resolve) => {
  99. Bus.subscribeAll((event) => {
  100. events.push(event)
  101. resolve()
  102. })
  103. })
  104. SyncEvent.run(Created, { id: "evt_1", name: "test" })
  105. await received
  106. expect(events).toHaveLength(1)
  107. expect(events[0]).toEqual({
  108. type: "item.created",
  109. properties: {
  110. id: "evt_1",
  111. name: "test",
  112. },
  113. })
  114. }),
  115. )
  116. })
  117. describe("replay", () => {
  118. test(
  119. "inserts event from external payload",
  120. withInstance(() => {
  121. const id = Identifier.descending("message")
  122. SyncEvent.replay({
  123. id: "evt_1",
  124. type: "item.created.1",
  125. seq: 0,
  126. aggregateID: id,
  127. data: { id, name: "replayed" },
  128. })
  129. const rows = Database.use((db) => db.select().from(EventTable).all())
  130. expect(rows).toHaveLength(1)
  131. expect(rows[0].aggregate_id).toBe(id)
  132. }),
  133. )
  134. test(
  135. "throws on sequence mismatch",
  136. withInstance(() => {
  137. const id = Identifier.descending("message")
  138. SyncEvent.replay({
  139. id: "evt_1",
  140. type: "item.created.1",
  141. seq: 0,
  142. aggregateID: id,
  143. data: { id, name: "first" },
  144. })
  145. expect(() =>
  146. SyncEvent.replay({
  147. id: "evt_1",
  148. type: "item.created.1",
  149. seq: 5,
  150. aggregateID: id,
  151. data: { id, name: "bad" },
  152. }),
  153. ).toThrow(/Sequence mismatch/)
  154. }),
  155. )
  156. test(
  157. "throws on unknown event type",
  158. withInstance(() => {
  159. expect(() =>
  160. SyncEvent.replay({
  161. id: "evt_1",
  162. type: "unknown.event.1",
  163. seq: 0,
  164. aggregateID: "x",
  165. data: {},
  166. }),
  167. ).toThrow(/Unknown event type/)
  168. }),
  169. )
  170. })
  171. })