2
0
Эх сурвалжийг харах

refactor(sync): effectify sync event

Kit Langton 2 долоо хоног өмнө
parent
commit
dc719269b6

+ 206 - 137
packages/opencode/src/sync/index.ts

@@ -1,6 +1,8 @@
 import z from "zod"
 import type { ZodObject } from "zod"
 import { EventEmitter } from "events"
+import { Effect, Layer, ServiceMap } from "effect"
+import { makeRuntime } from "@/effect/run-service"
 import { Database, eq } from "@/storage/db"
 import { Bus as ProjectBus } from "@/bus"
 import { BusEvent } from "@/bus/bus-event"
@@ -31,37 +33,18 @@ export namespace SyncEvent {
 
   type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
 
+  type State = {
+    projectors: Map<Definition, ProjectorFunc> | undefined
+    convert: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
+    bus: EventEmitter<{ event: [{ def: Definition; event: Event }] }>
+  }
+
   export const registry = new Map<string, Definition>()
-  let projectors: Map<Definition, ProjectorFunc> | undefined
   const versions = new Map<string, number>()
   let frozen = false
-  let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
-
-  const Bus = new EventEmitter<{ event: [{ def: Definition; event: Event }] }>()
-
-  export function reset() {
-    frozen = false
-    projectors = undefined
-    convertEvent = (_, data) => data
-  }
-
-  export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) {
-    projectors = new Map(input.projectors)
-
-    // Install all the latest event defs to the bus. We only ever emit
-    // latest versions from code, and keep around old versions for
-    // replaying. Replaying does not go through the bus, and it
-    // simplifies the bus to only use unversioned latest events
-    for (let [type, version] of versions.entries()) {
-      let def = registry.get(versionedType(type, version))!
-
-      BusEvent.define(def.type, def.properties || def.schema)
-    }
 
-    // Freeze the system so it clearly errors if events are defined
-    // after `init` which would cause bugs
-    frozen = true
-    convertEvent = input.convertEvent || ((_, data) => data)
+  function noop(_: string, data: Event["data"]) {
+    return data
   }
 
   export function versionedType<A extends string>(type: A): A
@@ -102,140 +85,226 @@ export namespace SyncEvent {
     return [def, func as ProjectorFunc]
   }
 
-  function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
-    if (projectors == null) {
-      throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
-    }
+  export interface Interface {
+    readonly reset: () => Effect.Effect<void>
+    readonly init: (input: {
+      projectors: Array<[Definition, ProjectorFunc]>
+      convertEvent?: State["convert"]
+    }) => Effect.Effect<void>
+    readonly replay: (event: SerializedEvent, options?: { republish: boolean }) => Effect.Effect<void>
+    readonly run: <Def extends Definition>(def: Def, data: Event<Def>["data"]) => Effect.Effect<void>
+    readonly remove: (aggregateID: string) => Effect.Effect<void>
+    readonly subscribeAll: (handler: (event: { def: Definition; event: Event }) => void) => Effect.Effect<() => void>
+  }
 
-    const projector = projectors.get(def)
-    if (!projector) {
-      throw new Error(`Projector not found for event: ${def.type}`)
-    }
+  export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SyncEvent") {}
 
-    // idempotent: need to ignore any events already logged
+  export const layer = Layer.effect(
+    Service,
+    Effect.gen(function* () {
+      const state: State = {
+        projectors: undefined,
+        convert: noop,
+        bus: new EventEmitter<{ event: [{ def: Definition; event: Event }] }>(),
+      }
 
-    Database.transaction((tx) => {
-      projector(tx, event.data)
+      const process = Effect.fnUntraced(function* <Def extends Definition>(
+        def: Def,
+        event: Event<Def>,
+        options: { publish: boolean },
+      ) {
+        if (state.projectors == null) {
+          throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
+        }
 
-      if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
-        tx.insert(EventSequenceTable)
-          .values({
-            aggregate_id: event.aggregateID,
-            seq: event.seq,
-          })
-          .onConflictDoUpdate({
-            target: EventSequenceTable.aggregate_id,
-            set: { seq: event.seq },
-          })
-          .run()
-        tx.insert(EventTable)
-          .values({
-            id: event.id,
-            seq: event.seq,
-            aggregate_id: event.aggregateID,
-            type: versionedType(def.type, def.version),
-            data: event.data as Record<string, unknown>,
-          })
-          .run()
-      }
+        const projector = state.projectors.get(def)
+        if (!projector) {
+          throw new Error(`Projector not found for event: ${def.type}`)
+        }
 
-      Database.effect(() => {
-        Bus.emit("event", {
-          def,
-          event,
-        })
+        Database.transaction((tx) => {
+          projector(tx, event.data)
 
-        if (options?.publish) {
-          const result = convertEvent(def.type, event.data)
-          if (result instanceof Promise) {
-            result.then((data) => {
-              ProjectBus.publish({ type: def.type, properties: def.schema }, data)
-            })
-          } else {
-            ProjectBus.publish({ type: def.type, properties: def.schema }, result)
+          if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
+            tx.insert(EventSequenceTable)
+              .values({
+                aggregate_id: event.aggregateID,
+                seq: event.seq,
+              })
+              .onConflictDoUpdate({
+                target: EventSequenceTable.aggregate_id,
+                set: { seq: event.seq },
+              })
+              .run()
+            tx.insert(EventTable)
+              .values({
+                id: event.id,
+                seq: event.seq,
+                aggregate_id: event.aggregateID,
+                type: versionedType(def.type, def.version),
+                data: event.data as Record<string, unknown>,
+              })
+              .run()
           }
+
+          Database.effect(() => {
+            state.bus.emit("event", { def, event })
+
+            if (!options.publish) return
+
+            const result = state.convert(def.type, event.data)
+            if (result instanceof Promise) {
+              result.then((data) => {
+                ProjectBus.publish({ type: def.type, properties: def.schema }, data)
+              })
+              return
+            }
+
+            ProjectBus.publish({ type: def.type, properties: def.schema }, result)
+          })
+        })
+      })
+
+      const reset = Effect.fn("SyncEvent.reset")(() =>
+        Effect.sync(() => {
+          frozen = false
+          state.projectors = undefined
+          state.convert = noop
+        }),
+      )
+
+      const init = Effect.fn("SyncEvent.init")(
+        (input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: State["convert"] }) =>
+          Effect.sync(() => {
+            state.projectors = new Map(input.projectors)
+
+            for (const [type, version] of versions.entries()) {
+              const def = registry.get(versionedType(type, version))!
+              BusEvent.define(def.type, def.properties || def.schema)
+            }
+
+            frozen = true
+            state.convert = input.convertEvent || noop
+          }),
+      )
+
+      // TODO:
+      //
+      // * Support applying multiple events at one time. One transaction,
+      //   and it validets all the sequence ids
+      // * when loading events from db, apply zod validation to ensure shape
+
+      const replay = Effect.fn("SyncEvent.replay")(function* (
+        event: SerializedEvent,
+        options?: { republish: boolean },
+      ) {
+        const def = registry.get(event.type)
+        if (!def) {
+          throw new Error(`Unknown event type: ${event.type}`)
+        }
+
+        const row = Database.use((db) =>
+          db
+            .select({ seq: EventSequenceTable.seq })
+            .from(EventSequenceTable)
+            .where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
+            .get(),
+        )
+
+        const latest = row?.seq ?? -1
+        if (event.seq <= latest) {
+          return
         }
+
+        const expected = latest + 1
+        if (event.seq !== expected) {
+          throw new Error(
+            `Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`,
+          )
+        }
+
+        yield* process(def, event, { publish: !!options?.republish })
       })
-    })
-  }
 
-  // TODO:
-  //
-  // * Support applying multiple events at one time. One transaction,
-  //   and it validets all the sequence ids
-  // * when loading events from db, apply zod validation to ensure shape
+      const run: Interface["run"] = Effect.fn("SyncEvent.run")(function* <Def extends Definition>(
+        def: Def,
+        data: Event<Def>["data"],
+      ) {
+        const agg = (data as Record<string, string>)[def.aggregate]
+        if (agg == null) {
+          throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
+        }
 
-  export function replay(event: SerializedEvent, options?: { republish: boolean }) {
-    const def = registry.get(event.type)
-    if (!def) {
-      throw new Error(`Unknown event type: ${event.type}`)
-    }
+        if (def.version !== versions.get(def.type)) {
+          throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
+        }
 
-    const row = Database.use((db) =>
-      db
-        .select({ seq: EventSequenceTable.seq })
-        .from(EventSequenceTable)
-        .where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
-        .get(),
-    )
-
-    const latest = row?.seq ?? -1
-    if (event.seq <= latest) {
-      return
-    }
+        Database.transaction(
+          (tx) => {
+            const id = EventID.ascending()
+            const row = tx
+              .select({ seq: EventSequenceTable.seq })
+              .from(EventSequenceTable)
+              .where(eq(EventSequenceTable.aggregate_id, agg))
+              .get()
+            const seq = row?.seq != null ? row.seq + 1 : 0
+
+            const event = { id, seq, aggregateID: agg, data }
+            Effect.runSync(process(def, event, { publish: true }))
+          },
+          {
+            behavior: "immediate",
+          },
+        )
+      })
 
-    const expected = latest + 1
-    if (event.seq !== expected) {
-      throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
-    }
+      const remove = Effect.fn("SyncEvent.remove")((aggregateID: string) =>
+        Effect.sync(() => {
+          Database.transaction((tx) => {
+            tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
+            tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
+          })
+        }),
+      )
+
+      const subscribeAll = Effect.fn("SyncEvent.subscribeAll")(
+        (handler: (event: { def: Definition; event: Event }) => void) =>
+          Effect.sync(() => {
+            state.bus.on("event", handler)
+            return () => state.bus.off("event", handler)
+          }),
+      )
+
+      return Service.of({ reset, init, replay, run, remove, subscribeAll })
+    }),
+  )
+
+  export const defaultLayer = layer
+
+  const { runSync } = makeRuntime(Service, defaultLayer)
 
-    process(def, event, { publish: !!options?.republish })
+  export function reset() {
+    return runSync((svc) => svc.reset())
   }
 
-  export function run<Def extends Definition>(def: Def, data: Event<Def>["data"]) {
-    const agg = (data as Record<string, string>)[def.aggregate]
-    // This should never happen: we've enforced it via typescript in
-    // the definition
-    if (agg == null) {
-      throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
-    }
+  export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: State["convert"] }) {
+    return runSync((svc) => svc.init(input))
+  }
 
-    if (def.version !== versions.get(def.type)) {
-      throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
-    }
+  export function replay(event: SerializedEvent, options?: { republish: boolean }) {
+    return runSync((svc) => svc.replay(event, options))
+  }
 
-    // Note that this is an "immediate" transaction which is critical.
-    // We need to make sure we can safely read and write with nothing
-    // else changing the data from under us
-    Database.transaction(
-      (tx) => {
-        const id = EventID.ascending()
-        const row = tx
-          .select({ seq: EventSequenceTable.seq })
-          .from(EventSequenceTable)
-          .where(eq(EventSequenceTable.aggregate_id, agg))
-          .get()
-        const seq = row?.seq != null ? row.seq + 1 : 0
-
-        const event = { id, seq, aggregateID: agg, data }
-        process(def, event, { publish: true })
-      },
-      {
-        behavior: "immediate",
-      },
-    )
+  export function run<Def extends Definition>(def: Def, data: Event<Def>["data"]) {
+    return runSync((svc) => svc.run(def, data))
   }
 
   export function remove(aggregateID: string) {
-    Database.transaction((tx) => {
-      tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
-      tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
-    })
+    return runSync((svc) => svc.remove(aggregateID))
   }
 
   export function subscribeAll(handler: (event: { def: Definition; event: Event }) => void) {
-    Bus.on("event", handler)
-    return () => Bus.off("event", handler)
+    return runSync((svc) => svc.subscribeAll(handler))
   }
 
   export function payloads() {