Просмотр исходного кода

refactor(sync): migrate SyncEvent to service layer

Kit Langton 1 неделя назад
Родитель
Сommit
af7ada9a7a
1 измененных файлов с 237 добавлено и 162 удалено
  1. 237 162
      packages/opencode/src/sync/index.ts

+ 237 - 162
packages/opencode/src/sync/index.ts

@@ -1,6 +1,8 @@
 import z from "zod"
 import type { ZodObject } from "zod"
 import { EventEmitter } from "events"
+import { Context, Effect, Layer } from "effect"
+import { makeRuntime } from "@/effect/run-service"
 import { Database, eq } from "@/storage/db"
 import { Bus as ProjectBus } from "@/bus"
 import { BusEvent } from "@/bus/bus-event"
@@ -9,6 +11,8 @@ import { EventID } from "./schema"
 import { Flag } from "@/flag/flag"
 
 export namespace SyncEvent {
+  type Convert = (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
+
   export type Definition = {
     type: string
     version: number
@@ -30,38 +34,244 @@ export namespace SyncEvent {
   export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
 
   type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
+  type Init = { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: Convert }
+  type Payload = { def: Definition; event: Event }
 
   export const registry = new Map<string, Definition>()
-  let projectors: Map<Definition, ProjectorFunc> | undefined
   const versions = new Map<string, number>()
   let frozen = false
-  let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
-
-  const Bus = new EventEmitter<{ event: [{ def: Definition; event: Event }] }>()
 
-  export function reset() {
-    frozen = false
-    projectors = undefined
-    convertEvent = (_, data) => data
+  export interface Interface {
+    readonly reset: () => Effect.Effect<void>
+    readonly init: (input: Init) => Effect.Effect<void>
+    readonly replay: (event: SerializedEvent, options?: { publish: boolean }) => Effect.Effect<void>
+    readonly run: <Def extends Definition>(
+      def: Def,
+      data: Event<Def>["data"],
+      options?: { publish?: boolean },
+    ) => Effect.Effect<void>
+    readonly remove: (aggregateID: string) => Effect.Effect<void>
+    readonly subscribeAll: (handler: (event: Payload) => void) => Effect.Effect<() => void>
+    readonly payloads: () => z.ZodTypeAny
   }
 
-  export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) {
-    projectors = new Map(input.projectors)
+  export class Service extends Context.Service<Service, Interface>()("@opencode/SyncEvent") {}
+
+  export const layer = Layer.effect(
+    Service,
+    Effect.gen(function* () {
+      let projectors: Map<Definition, ProjectorFunc> | undefined
+      let convert: Convert = (_, data) => data as Record<string, unknown>
+      const bus = new EventEmitter<{ event: [Payload] }>()
+
+      const reset = Effect.fn("SyncEvent.reset")(() =>
+        Effect.sync(() => {
+          frozen = false
+          projectors = undefined
+          convert = (_, data) => data as Record<string, unknown>
+        }),
+      )
+
+      const init = Effect.fn("SyncEvent.init")((input: Init) =>
+        Effect.sync(() => {
+          projectors = new Map(input.projectors)
+
+          // Install all the latest event defs to the bus. We only ever emit
+          // latest versions from code, and keep around old versions for
+          // replaying. Replaying does not go through the bus, and it
+          // simplifies the bus to only use unversioned latest events
+          for (const [type, version] of versions.entries()) {
+            const def = registry.get(versionedType(type, version))!
+            BusEvent.define(def.type, def.properties || def.schema)
+          }
 
-    // Install all the latest event defs to the bus. We only ever emit
-    // latest versions from code, and keep around old versions for
-    // replaying. Replaying does not go through the bus, and it
-    // simplifies the bus to only use unversioned latest events
-    for (let [type, version] of versions.entries()) {
-      let def = registry.get(versionedType(type, version))!
+          // Freeze the system so it clearly errors if events are defined
+          // after `init` which would cause bugs
+          frozen = true
+          convert = input.convertEvent || ((_, data) => data as Record<string, unknown>)
+        }),
+      )
 
-      BusEvent.define(def.type, def.properties || def.schema)
-    }
+      const process = Effect.fn("SyncEvent.process")(function* <Def extends Definition>(
+        def: Def,
+        event: Event<Def>,
+        options: { publish: boolean },
+      ) {
+        if (projectors == null) {
+          throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
+        }
+
+        const projector = projectors.get(def)
+        if (!projector) {
+          throw new Error(`Projector not found for event: ${def.type}`)
+        }
+
+        yield* Effect.sync(() => {
+          // idempotent: need to ignore any events already logged
+          Database.transaction((tx) => {
+            projector(tx, event.data)
+
+            if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
+              tx.insert(EventSequenceTable)
+                .values({
+                  aggregate_id: event.aggregateID,
+                  seq: event.seq,
+                })
+                .onConflictDoUpdate({
+                  target: EventSequenceTable.aggregate_id,
+                  set: { seq: event.seq },
+                })
+                .run()
+              tx.insert(EventTable)
+                .values({
+                  id: event.id,
+                  seq: event.seq,
+                  aggregate_id: event.aggregateID,
+                  type: versionedType(def.type, def.version),
+                  data: event.data as Record<string, unknown>,
+                })
+                .run()
+            }
+
+            Database.effect(() => {
+              bus.emit("event", { def, event })
+
+              if (!options.publish) return
+
+              const result = convert(def.type, event.data)
+              if (result instanceof Promise) {
+                void result.then((data) => {
+                  void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
+                })
+                return
+              }
+
+              void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
+            })
+          })
+        })
+      })
+
+      const replay = Effect.fn("SyncEvent.replay")(function* (event: SerializedEvent, options?: { publish: boolean }) {
+        const def = registry.get(event.type)
+        if (!def) {
+          throw new Error(`Unknown event type: ${event.type}`)
+        }
+
+        const row = Database.use((db) =>
+          db
+            .select({ seq: EventSequenceTable.seq })
+            .from(EventSequenceTable)
+            .where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
+            .get(),
+        )
+
+        const latest = row?.seq ?? -1
+        if (event.seq <= latest) return
+
+        const expected = latest + 1
+        if (event.seq !== expected) {
+          throw new Error(
+            `Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`,
+          )
+        }
+
+        yield* process(def, event, { publish: !!options?.publish })
+      })
+
+      const run = Effect.fn("SyncEvent.run")(function* <Def extends Definition>(
+        def: Def,
+        data: Event<Def>["data"],
+        options?: { publish?: boolean },
+      ) {
+        const agg = (data as Record<string, string>)[def.aggregate]
+        if (agg == null) {
+          throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
+        }
+
+        if (def.version !== versions.get(def.type)) {
+          throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
+        }
+
+        const publish = options?.publish ?? true
+
+        yield* Effect.sync(() => {
+          // Note that this is an "immediate" transaction which is critical.
+          // We need to make sure we can safely read and write with nothing
+          // else changing the data from under us
+          Database.transaction(
+            (tx) => {
+              const id = EventID.ascending()
+              const row = tx
+                .select({ seq: EventSequenceTable.seq })
+                .from(EventSequenceTable)
+                .where(eq(EventSequenceTable.aggregate_id, agg))
+                .get()
+              const seq = row?.seq != null ? row.seq + 1 : 0
+
+              const event = { id, seq, aggregateID: agg, data }
+              Effect.runSync(process(def, event, { publish }))
+            },
+            {
+              behavior: "immediate",
+            },
+          )
+        })
+      })
+
+      const remove = Effect.fn("SyncEvent.remove")((aggregateID: string) =>
+        Effect.sync(() => {
+          Database.transaction((tx) => {
+            tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
+            tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
+          })
+        }),
+      )
+
+      const subscribeAll = Effect.fn("SyncEvent.subscribeAll")((handler: (event: Payload) => void) =>
+        Effect.sync(() => {
+          bus.on("event", handler)
+          return () => bus.off("event", handler)
+        }),
+      )
+
+      function payloads() {
+        return z
+          .union(
+            registry
+              .entries()
+              .map(([type, def]) => {
+                return z
+                  .object({
+                    type: z.literal(type),
+                    aggregate: z.literal(def.aggregate),
+                    data: def.schema,
+                  })
+                  .meta({
+                    ref: "SyncEvent" + "." + def.type,
+                  })
+              })
+              .toArray() as any,
+          )
+          .meta({
+            ref: "SyncEvent",
+          })
+      }
+
+      return Service.of({ reset, init, replay, run, remove, subscribeAll, payloads })
+    }),
+  )
 
-    // Freeze the system so it clearly errors if events are defined
-    // after `init` which would cause bugs
-    frozen = true
-    convertEvent = input.convertEvent || ((_, data) => data)
+  export const defaultLayer = layer
+
+  const { runSync } = makeRuntime(Service, defaultLayer)
+
+  export function reset() {
+    return runSync((svc) => svc.reset())
+  }
+
+  export function init(input: Init) {
+    return runSync((svc) => svc.init(input))
   }
 
   export function versionedType<A extends string>(type: A): A
@@ -102,63 +312,6 @@ export namespace SyncEvent {
     return [def, func as ProjectorFunc]
   }
 
-  function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
-    if (projectors == null) {
-      throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
-    }
-
-    const projector = projectors.get(def)
-    if (!projector) {
-      throw new Error(`Projector not found for event: ${def.type}`)
-    }
-
-    // idempotent: need to ignore any events already logged
-
-    Database.transaction((tx) => {
-      projector(tx, event.data)
-
-      if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
-        tx.insert(EventSequenceTable)
-          .values({
-            aggregate_id: event.aggregateID,
-            seq: event.seq,
-          })
-          .onConflictDoUpdate({
-            target: EventSequenceTable.aggregate_id,
-            set: { seq: event.seq },
-          })
-          .run()
-        tx.insert(EventTable)
-          .values({
-            id: event.id,
-            seq: event.seq,
-            aggregate_id: event.aggregateID,
-            type: versionedType(def.type, def.version),
-            data: event.data as Record<string, unknown>,
-          })
-          .run()
-      }
-
-      Database.effect(() => {
-        Bus.emit("event", {
-          def,
-          event,
-        })
-
-        if (options?.publish) {
-          const result = convertEvent(def.type, event.data)
-          if (result instanceof Promise) {
-            result.then((data) => {
-              ProjectBus.publish({ type: def.type, properties: def.schema }, data)
-            })
-          } else {
-            ProjectBus.publish({ type: def.type, properties: def.schema }, result)
-          }
-        }
-      })
-    })
-  }
-
   // TODO:
   //
   // * Support applying multiple events at one time. One transaction,
@@ -166,100 +319,22 @@ export namespace SyncEvent {
   // * when loading events from db, apply zod validation to ensure shape
 
   export function replay(event: SerializedEvent, options?: { publish: boolean }) {
-    const def = registry.get(event.type)
-    if (!def) {
-      throw new Error(`Unknown event type: ${event.type}`)
-    }
-
-    const row = Database.use((db) =>
-      db
-        .select({ seq: EventSequenceTable.seq })
-        .from(EventSequenceTable)
-        .where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
-        .get(),
-    )
-
-    const latest = row?.seq ?? -1
-    if (event.seq <= latest) {
-      return
-    }
-
-    const expected = latest + 1
-    if (event.seq !== expected) {
-      throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
-    }
-
-    process(def, event, { publish: !!options?.publish })
+    return runSync((svc) => svc.replay(event, options))
   }
 
   export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
-    const agg = (data as Record<string, string>)[def.aggregate]
-    // This should never happen: we've enforced it via typescript in
-    // the definition
-    if (agg == null) {
-      throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
-    }
-
-    if (def.version !== versions.get(def.type)) {
-      throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
-    }
-
-    const { publish = true } = options || {}
-
-    // Note that this is an "immediate" transaction which is critical.
-    // We need to make sure we can safely read and write with nothing
-    // else changing the data from under us
-    Database.transaction(
-      (tx) => {
-        const id = EventID.ascending()
-        const row = tx
-          .select({ seq: EventSequenceTable.seq })
-          .from(EventSequenceTable)
-          .where(eq(EventSequenceTable.aggregate_id, agg))
-          .get()
-        const seq = row?.seq != null ? row.seq + 1 : 0
-
-        const event = { id, seq, aggregateID: agg, data }
-        process(def, event, { publish })
-      },
-      {
-        behavior: "immediate",
-      },
-    )
+    return runSync((svc) => svc.run(def, data, options))
   }
 
   export function remove(aggregateID: string) {
-    Database.transaction((tx) => {
-      tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
-      tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
-    })
+    return runSync((svc) => svc.remove(aggregateID))
   }
 
   export function subscribeAll(handler: (event: { def: Definition; event: Event }) => void) {
-    Bus.on("event", handler)
-    return () => Bus.off("event", handler)
+    return runSync((svc) => svc.subscribeAll(handler))
   }
 
   export function payloads() {
-    return z
-      .union(
-        registry
-          .entries()
-          .map(([type, def]) => {
-            return z
-              .object({
-                type: z.literal(type),
-                aggregate: z.literal(def.aggregate),
-                data: def.schema,
-              })
-              .meta({
-                ref: "SyncEvent" + "." + def.type,
-              })
-          })
-          .toArray() as any,
-      )
-      .meta({
-        ref: "SyncEvent",
-      })
+    return runSync((svc) => Effect.sync(() => svc.payloads()))
   }
 }