| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test"
- import { tmpdir } from "../fixture/fixture"
- import z from "zod"
- import { Bus } from "../../src/bus"
- import { Instance } from "../../src/project/instance"
- import { SyncEvent } from "../../src/sync"
- import { Database } from "../../src/storage/db"
- import { EventTable } from "../../src/sync/event.sql"
- import { Identifier } from "../../src/id/id"
- import { Flag } from "../../src/flag/flag"
- import { initProjectors } from "../../src/server/projectors"
- const original = Flag.KILO_EXPERIMENTAL_WORKSPACES
- beforeEach(() => {
- Database.close()
- // @ts-expect-error don't do this normally, but it works
- Flag.KILO_EXPERIMENTAL_WORKSPACES = true
- })
- afterEach(() => {
- // @ts-expect-error don't do this normally, but it works
- Flag.KILO_EXPERIMENTAL_WORKSPACES = original
- })
- function withInstance(fn: () => void | Promise<void>) {
- return async () => {
- await using tmp = await tmpdir()
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- await fn()
- },
- })
- }
- }
- describe("SyncEvent", () => {
- function setup() {
- SyncEvent.reset()
- const Created = SyncEvent.define({
- type: "item.created",
- version: 1,
- aggregate: "id",
- schema: z.object({ id: z.string(), name: z.string() }),
- })
- const Sent = SyncEvent.define({
- type: "item.sent",
- version: 1,
- aggregate: "item_id",
- schema: z.object({ item_id: z.string(), to: z.string() }),
- })
- SyncEvent.init({
- projectors: [SyncEvent.project(Created, () => {}), SyncEvent.project(Sent, () => {})],
- })
- return { Created, Sent }
- }
- afterAll(() => {
- SyncEvent.reset()
- initProjectors()
- })
- describe("run", () => {
- test(
- "inserts event row",
- withInstance(() => {
- const { Created } = setup()
- SyncEvent.run(Created, { id: "evt_1", name: "first" })
- const rows = Database.use((db) => db.select().from(EventTable).all())
- expect(rows).toHaveLength(1)
- expect(rows[0].type).toBe("item.created.1")
- expect(rows[0].aggregate_id).toBe("evt_1")
- }),
- )
- test(
- "increments seq per aggregate",
- withInstance(() => {
- const { Created } = setup()
- SyncEvent.run(Created, { id: "evt_1", name: "first" })
- SyncEvent.run(Created, { id: "evt_1", name: "second" })
- const rows = Database.use((db) => db.select().from(EventTable).all())
- expect(rows).toHaveLength(2)
- expect(rows[1].seq).toBe(rows[0].seq + 1)
- }),
- )
- test(
- "uses custom aggregate field from agg()",
- withInstance(() => {
- const { Sent } = setup()
- SyncEvent.run(Sent, { item_id: "evt_1", to: "james" })
- const rows = Database.use((db) => db.select().from(EventTable).all())
- expect(rows).toHaveLength(1)
- expect(rows[0].aggregate_id).toBe("evt_1")
- }),
- )
- test(
- "emits events",
- withInstance(async () => {
- const { Created } = setup()
- const events: Array<{
- type: string
- properties: { id: string; name: string }
- }> = []
- const received = new Promise<void>((resolve) => {
- Bus.subscribeAll((event) => {
- events.push(event)
- resolve()
- })
- })
- SyncEvent.run(Created, { id: "evt_1", name: "test" })
- await received
- expect(events).toHaveLength(1)
- expect(events[0]).toEqual({
- type: "item.created",
- properties: {
- id: "evt_1",
- name: "test",
- },
- })
- }),
- )
- })
- describe("replay", () => {
- test(
- "inserts event from external payload",
- withInstance(() => {
- const id = Identifier.descending("message")
- SyncEvent.replay({
- id: "evt_1",
- type: "item.created.1",
- seq: 0,
- aggregateID: id,
- data: { id, name: "replayed" },
- })
- const rows = Database.use((db) => db.select().from(EventTable).all())
- expect(rows).toHaveLength(1)
- expect(rows[0].aggregate_id).toBe(id)
- }),
- )
- test(
- "throws on sequence mismatch",
- withInstance(() => {
- const id = Identifier.descending("message")
- SyncEvent.replay({
- id: "evt_1",
- type: "item.created.1",
- seq: 0,
- aggregateID: id,
- data: { id, name: "first" },
- })
- expect(() =>
- SyncEvent.replay({
- id: "evt_1",
- type: "item.created.1",
- seq: 5,
- aggregateID: id,
- data: { id, name: "bad" },
- }),
- ).toThrow(/Sequence mismatch/)
- }),
- )
- test(
- "throws on unknown event type",
- withInstance(() => {
- expect(() =>
- SyncEvent.replay({
- id: "evt_1",
- type: "unknown.event.1",
- seq: 0,
- aggregateID: "x",
- data: {},
- }),
- ).toThrow(/Unknown event type/)
- }),
- )
- })
- })
|