index.test.ts 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test"
  2. import { tmpdir } from "../fixture/fixture"
  3. import z from "zod"
  4. import { Bus } from "../../src/bus"
  5. import { Instance } from "../../src/project/instance"
  6. import { SyncEvent } from "../../src/sync"
  7. import { Database } from "../../src/storage"
  8. import { EventTable } from "../../src/sync/event.sql"
  9. import { Identifier } from "../../src/id/id"
  10. import { Flag } from "../../src/flag/flag"
  11. import { initProjectors } from "../../src/server/projectors"
  12. const original = Flag.KILO_EXPERIMENTAL_WORKSPACES
  13. beforeEach(() => {
  14. Database.close()
  15. Flag.KILO_EXPERIMENTAL_WORKSPACES = true
  16. })
  17. afterEach(() => {
  18. Flag.KILO_EXPERIMENTAL_WORKSPACES = original
  19. })
  20. function withInstance(fn: () => void | Promise<void>) {
  21. return async () => {
  22. await using tmp = await tmpdir()
  23. await Instance.provide({
  24. directory: tmp.path,
  25. fn: async () => {
  26. await fn()
  27. },
  28. })
  29. }
  30. }
  31. describe("SyncEvent", () => {
  32. function setup() {
  33. SyncEvent.reset()
  34. const Created = SyncEvent.define({
  35. type: "item.created",
  36. version: 1,
  37. aggregate: "id",
  38. schema: z.object({ id: z.string(), name: z.string() }),
  39. })
  40. const Sent = SyncEvent.define({
  41. type: "item.sent",
  42. version: 1,
  43. aggregate: "item_id",
  44. schema: z.object({ item_id: z.string(), to: z.string() }),
  45. })
  46. SyncEvent.init({
  47. projectors: [SyncEvent.project(Created, () => {}), SyncEvent.project(Sent, () => {})],
  48. })
  49. return { Created, Sent }
  50. }
  51. afterAll(() => {
  52. SyncEvent.reset()
  53. initProjectors()
  54. })
  55. describe("run", () => {
  56. test(
  57. "inserts event row",
  58. withInstance(() => {
  59. const { Created } = setup()
  60. SyncEvent.run(Created, { id: "evt_1", name: "first" })
  61. const rows = Database.use((db) => db.select().from(EventTable).all())
  62. expect(rows).toHaveLength(1)
  63. expect(rows[0].type).toBe("item.created.1")
  64. expect(rows[0].aggregate_id).toBe("evt_1")
  65. }),
  66. )
  67. test(
  68. "increments seq per aggregate",
  69. withInstance(() => {
  70. const { Created } = setup()
  71. SyncEvent.run(Created, { id: "evt_1", name: "first" })
  72. SyncEvent.run(Created, { id: "evt_1", name: "second" })
  73. const rows = Database.use((db) => db.select().from(EventTable).all())
  74. expect(rows).toHaveLength(2)
  75. expect(rows[1].seq).toBe(rows[0].seq + 1)
  76. }),
  77. )
  78. test(
  79. "uses custom aggregate field from agg()",
  80. withInstance(() => {
  81. const { Sent } = setup()
  82. SyncEvent.run(Sent, { item_id: "evt_1", to: "james" })
  83. const rows = Database.use((db) => db.select().from(EventTable).all())
  84. expect(rows).toHaveLength(1)
  85. expect(rows[0].aggregate_id).toBe("evt_1")
  86. }),
  87. )
  88. test(
  89. "emits events",
  90. withInstance(async () => {
  91. const { Created } = setup()
  92. const events: Array<{
  93. type: string
  94. properties: { id: string; name: string }
  95. }> = []
  96. const received = new Promise<void>((resolve) => {
  97. Bus.subscribeAll((event) => {
  98. events.push(event)
  99. resolve()
  100. })
  101. })
  102. SyncEvent.run(Created, { id: "evt_1", name: "test" })
  103. await received
  104. expect(events).toHaveLength(1)
  105. expect(events[0]).toEqual({
  106. type: "item.created",
  107. properties: {
  108. id: "evt_1",
  109. name: "test",
  110. },
  111. })
  112. }),
  113. )
  114. })
  115. describe("replay", () => {
  116. test(
  117. "inserts event from external payload",
  118. withInstance(() => {
  119. const id = Identifier.descending("message")
  120. SyncEvent.replay({
  121. id: "evt_1",
  122. type: "item.created.1",
  123. seq: 0,
  124. aggregateID: id,
  125. data: { id, name: "replayed" },
  126. })
  127. const rows = Database.use((db) => db.select().from(EventTable).all())
  128. expect(rows).toHaveLength(1)
  129. expect(rows[0].aggregate_id).toBe(id)
  130. }),
  131. )
  132. test(
  133. "throws on sequence mismatch",
  134. withInstance(() => {
  135. const id = Identifier.descending("message")
  136. SyncEvent.replay({
  137. id: "evt_1",
  138. type: "item.created.1",
  139. seq: 0,
  140. aggregateID: id,
  141. data: { id, name: "first" },
  142. })
  143. expect(() =>
  144. SyncEvent.replay({
  145. id: "evt_1",
  146. type: "item.created.1",
  147. seq: 5,
  148. aggregateID: id,
  149. data: { id, name: "bad" },
  150. }),
  151. ).toThrow(/Sequence mismatch/)
  152. }),
  153. )
  154. test(
  155. "throws on unknown event type",
  156. withInstance(() => {
  157. expect(() =>
  158. SyncEvent.replay({
  159. id: "evt_1",
  160. type: "unknown.event.1",
  161. seq: 0,
  162. aggregateID: "x",
  163. data: {},
  164. }),
  165. ).toThrow(/Unknown event type/)
  166. }),
  167. )
  168. test(
  169. "replayAll accepts later chunks after the first batch",
  170. withInstance(() => {
  171. const { Created } = setup()
  172. const id = Identifier.descending("message")
  173. const one = SyncEvent.replayAll([
  174. {
  175. id: "evt_1",
  176. type: SyncEvent.versionedType(Created.type, Created.version),
  177. seq: 0,
  178. aggregateID: id,
  179. data: { id, name: "first" },
  180. },
  181. {
  182. id: "evt_2",
  183. type: SyncEvent.versionedType(Created.type, Created.version),
  184. seq: 1,
  185. aggregateID: id,
  186. data: { id, name: "second" },
  187. },
  188. ])
  189. const two = SyncEvent.replayAll([
  190. {
  191. id: "evt_3",
  192. type: SyncEvent.versionedType(Created.type, Created.version),
  193. seq: 2,
  194. aggregateID: id,
  195. data: { id, name: "third" },
  196. },
  197. {
  198. id: "evt_4",
  199. type: SyncEvent.versionedType(Created.type, Created.version),
  200. seq: 3,
  201. aggregateID: id,
  202. data: { id, name: "fourth" },
  203. },
  204. ])
  205. expect(one).toBe(id)
  206. expect(two).toBe(id)
  207. const rows = Database.use((db) => db.select().from(EventTable).all())
  208. expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3])
  209. }),
  210. )
  211. })
  212. })