Răsfoiți Sursa

fix(sync): preserve shared state and caller semantics

Kit Langton 1 săptămână în urmă
părinte
comite
c3eb2418ca

+ 30 - 30
packages/opencode/src/session/index.ts

@@ -366,11 +366,12 @@ export namespace Session {
   const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
     Effect.sync(() => Database.use(fn))
 
-  export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> = Layer.effect(
+  export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service | SyncEvent.Service> = Layer.effect(
     Service,
     Effect.gen(function* () {
       const bus = yield* Bus.Service
       const storage = yield* Storage.Service
+      const sync = yield* SyncEvent.Service
 
       const createNext = Effect.fn("Session.createNext")(function* (input: {
         id?: SessionID
@@ -398,7 +399,7 @@ export namespace Session {
         }
         log.info("created", result)
 
-        yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result }))
+        yield* sync.run(Event.Created, { sessionID: result.id, info: result })
 
         if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
           // This only exist for backwards compatibility. We should not be
@@ -446,10 +447,12 @@ export namespace Session {
             Effect.catchCause(() => Effect.succeed(false)),
           )
 
-          yield* Effect.sync(() => {
-            SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
-            SyncEvent.remove(sessionID)
-          })
+          if (hasInstance) {
+            yield* sync.run(Event.Deleted, { sessionID, info: session }, { publish: true })
+          } else {
+            yield* Effect.sync(() => SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: false }))
+          }
+          yield* sync.remove(sessionID)
         } catch (e) {
           log.error(e)
         }
@@ -457,19 +460,17 @@ export namespace Session {
 
       const updateMessage = <T extends MessageV2.Info>(msg: T): Effect.Effect<T> =>
         Effect.gen(function* () {
-          yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }))
+          yield* sync.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })
           return msg
         }).pipe(Effect.withSpan("Session.updateMessage"))
 
       const updatePart = <T extends MessageV2.Part>(part: T): Effect.Effect<T> =>
         Effect.gen(function* () {
-          yield* Effect.sync(() =>
-            SyncEvent.run(MessageV2.Event.PartUpdated, {
-              sessionID: part.sessionID,
-              part: structuredClone(part),
-              time: Date.now(),
-            }),
-          )
+          yield* sync.run(MessageV2.Event.PartUpdated, {
+            sessionID: part.sessionID,
+            part: structuredClone(part),
+            time: Date.now(),
+          })
           return part
         }).pipe(Effect.withSpan("Session.updatePart"))
 
@@ -549,8 +550,7 @@ export namespace Session {
         return session
       })
 
-      const patch = (sessionID: SessionID, info: Patch) =>
-        Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID, info }))
+      const patch = (sessionID: SessionID, info: Patch) => sync.run(Event.Updated, { sessionID, info })
 
       const touch = Effect.fn("Session.touch")(function* (sessionID: SessionID) {
         yield* patch(sessionID, { time: { updated: Date.now() } })
@@ -607,12 +607,10 @@ export namespace Session {
         sessionID: SessionID
         messageID: MessageID
       }) {
-        yield* Effect.sync(() =>
-          SyncEvent.run(MessageV2.Event.Removed, {
-            sessionID: input.sessionID,
-            messageID: input.messageID,
-          }),
-        )
+        yield* sync.run(MessageV2.Event.Removed, {
+          sessionID: input.sessionID,
+          messageID: input.messageID,
+        })
         return input.messageID
       })
 
@@ -621,13 +619,11 @@ export namespace Session {
         messageID: MessageID
         partID: PartID
       }) {
-        yield* Effect.sync(() =>
-          SyncEvent.run(MessageV2.Event.PartRemoved, {
-            sessionID: input.sessionID,
-            messageID: input.messageID,
-            partID: input.partID,
-          }),
-        )
+        yield* sync.run(MessageV2.Event.PartRemoved, {
+          sessionID: input.sessionID,
+          messageID: input.messageID,
+          partID: input.partID,
+        })
         return input.partID
       })
 
@@ -678,7 +674,11 @@ export namespace Session {
     }),
   )
 
-  export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Storage.defaultLayer))
+  export const defaultLayer = layer.pipe(
+    Layer.provide(Bus.layer),
+    Layer.provide(Storage.defaultLayer),
+    Layer.provide(SyncEvent.defaultLayer),
+  )
 
   const { runPromise } = makeRuntime(Service, defaultLayer)
 

+ 4 - 2
packages/opencode/src/session/revert.ts

@@ -40,6 +40,7 @@ export namespace SessionRevert {
       const bus = yield* Bus.Service
       const summary = yield* SessionSummary.Service
       const state = yield* SessionRunState.Service
+      const sync = yield* SyncEvent.Service
 
       const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) {
         yield* state.assertNotBusy(input.sessionID)
@@ -123,7 +124,7 @@ export namespace SessionRevert {
           remove.push(msg)
         }
         for (const msg of remove) {
-          SyncEvent.run(MessageV2.Event.Removed, {
+          yield* sync.run(MessageV2.Event.Removed, {
             sessionID,
             messageID: msg.info.id,
           })
@@ -135,7 +136,7 @@ export namespace SessionRevert {
             const removeParts = target.parts.slice(idx)
             target.parts = target.parts.slice(0, idx)
             for (const part of removeParts) {
-              SyncEvent.run(MessageV2.Event.PartRemoved, {
+              yield* sync.run(MessageV2.Event.PartRemoved, {
                 sessionID,
                 messageID: target.info.id,
                 partID: part.id,
@@ -158,6 +159,7 @@ export namespace SessionRevert {
       Layer.provide(Storage.defaultLayer),
       Layer.provide(Bus.layer),
       Layer.provide(SessionSummary.defaultLayer),
+      Layer.provide(SyncEvent.defaultLayer),
     ),
   )
 

+ 4 - 4
packages/opencode/src/share/session.ts

@@ -24,20 +24,19 @@ export namespace SessionShare {
       const session = yield* Session.Service
       const shareNext = yield* ShareNext.Service
       const scope = yield* Scope.Scope
+      const sync = yield* SyncEvent.Service
 
       const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) {
         const conf = yield* cfg.get()
         if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration")
         const result = yield* shareNext.create(sessionID)
-        yield* Effect.sync(() =>
-          SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }),
-        )
+        yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } })
         return result
       })
 
       const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) {
         yield* shareNext.remove(sessionID)
-        yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } }))
+        yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } })
       })
 
       const create = Effect.fn("SessionShare.create")(function* (input?: Parameters<typeof Session.create>[0]) {
@@ -57,6 +56,7 @@ export namespace SessionShare {
     Layer.provide(ShareNext.defaultLayer),
     Layer.provide(Session.defaultLayer),
     Layer.provide(Config.defaultLayer),
+    Layer.provide(SyncEvent.defaultLayer),
   )
 
   const { runPromise } = makeRuntime(Service, defaultLayer)

+ 103 - 104
packages/opencode/src/sync/index.ts

@@ -2,8 +2,8 @@ import z from "zod"
 import type { ZodObject } from "zod"
 import { EventEmitter } from "events"
 import { Context, Effect, Layer } from "effect"
-import { makeRuntime } from "@/effect/run-service"
 import { InstanceState } from "@/effect/instance-state"
+import { makeRuntime } from "@/effect/run-service"
 import { Database, eq } from "@/storage/db"
 import { Bus as ProjectBus } from "@/bus"
 import { BusEvent } from "@/bus/bus-event"
@@ -42,6 +42,9 @@ export namespace SyncEvent {
   export const registry = new Map<string, Definition>()
   const versions = new Map<string, number>()
   let frozen = false
+  let projectors: Map<Definition, ProjectorFunc> | undefined
+  let convert: Convert = (_, data) => data as Record<string, unknown>
+  const bus = new EventEmitter<{ event: [Payload] }>()
 
   export interface Interface {
     readonly reset: () => Effect.Effect<void>
@@ -62,10 +65,6 @@ export namespace SyncEvent {
   export const layer = Layer.effect(
     Service,
     Effect.gen(function* () {
-      let projectors: Map<Definition, ProjectorFunc> | undefined
-      let convert: Convert = (_, data) => data as Record<string, unknown>
-      const bus = new EventEmitter<{ event: [Payload] }>()
-
       const reset = Effect.fn("SyncEvent.reset")(() =>
         Effect.sync(() => {
           frozen = false
@@ -94,13 +93,7 @@ export namespace SyncEvent {
         }),
       )
 
-      const process = Effect.fn("SyncEvent.process")(function* <Def extends Definition>(
-        def: Def,
-        event: Event<Def>,
-        options: { publish: boolean },
-      ) {
-        const ctx = yield* InstanceState.context
-
+      function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
         if (projectors == null) {
           throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
         }
@@ -110,57 +103,55 @@ export namespace SyncEvent {
           throw new Error(`Projector not found for event: ${def.type}`)
         }
 
-        yield* Effect.sync(() => {
-          // idempotent: need to ignore any events already logged
-          Database.transaction((tx) => {
-            projector(tx, event.data)
-
-            if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
-              tx.insert(EventSequenceTable)
-                .values({
-                  aggregate_id: event.aggregateID,
-                  seq: event.seq,
-                })
-                .onConflictDoUpdate({
-                  target: EventSequenceTable.aggregate_id,
-                  set: { seq: event.seq },
-                })
-                .run()
-              tx.insert(EventTable)
-                .values({
-                  id: event.id,
-                  seq: event.seq,
-                  aggregate_id: event.aggregateID,
-                  type: versionedType(def.type, def.version),
-                  data: event.data as Record<string, unknown>,
-                })
-                .run()
-            }
-
-            Database.effect(() => {
-              Instance.restore(ctx, () => {
-                bus.emit("event", { def, event })
-
-                if (!options.publish) return
-
-                const result = convert(def.type, event.data)
-                if (result instanceof Promise) {
-                  void result.then((data) => {
-                    Instance.restore(ctx, () => {
-                      void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
-                    })
-                  })
-                  return
-                }
+        // idempotent: need to ignore any events already logged
+        Database.transaction((tx) => {
+          projector(tx, event.data)
 
-                void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
+          if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
+            tx.insert(EventSequenceTable)
+              .values({
+                aggregate_id: event.aggregateID,
+                seq: event.seq,
               })
-            })
-          })
+              .onConflictDoUpdate({
+                target: EventSequenceTable.aggregate_id,
+                set: { seq: event.seq },
+              })
+              .run()
+            tx.insert(EventTable)
+              .values({
+                id: event.id,
+                seq: event.seq,
+                aggregate_id: event.aggregateID,
+                type: versionedType(def.type, def.version),
+                data: event.data as Record<string, unknown>,
+              })
+              .run()
+          }
+
+          Database.effect(
+            InstanceState.bind(() => {
+              bus.emit("event", { def, event })
+
+              if (!options.publish) return
+
+              const result = convert(def.type, event.data)
+              if (result instanceof Promise) {
+                void result.then(
+                  InstanceState.bind((data) => {
+                    void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
+                  }),
+                )
+                return
+              }
+
+              void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
+            }),
+          )
         })
-      })
+      }
 
-      const replay = Effect.fn("SyncEvent.replay")(function* (event: SerializedEvent, options?: { publish: boolean }) {
+      function replay(event: SerializedEvent, options?: { publish: boolean }) {
         const def = registry.get(event.type)
         if (!def) {
           throw new Error(`Unknown event type: ${event.type}`)
@@ -184,14 +175,10 @@ export namespace SyncEvent {
           )
         }
 
-        yield* process(def, event, { publish: !!options?.publish })
-      })
+        process(def, event, { publish: !!options?.publish })
+      }
 
-      const run = Effect.fn("SyncEvent.run")(function* <Def extends Definition>(
-        def: Def,
-        data: Event<Def>["data"],
-        options?: { publish?: boolean },
-      ) {
+      function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
         const agg = (data as Record<string, string>)[def.aggregate]
         if (agg == null) {
           throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
@@ -203,45 +190,39 @@ export namespace SyncEvent {
 
         const publish = options?.publish ?? true
 
-        yield* Effect.sync(() => {
-          // Note that this is an "immediate" transaction which is critical.
-          // We need to make sure we can safely read and write with nothing
-          // else changing the data from under us
-          Database.transaction(
-            (tx) => {
-              const id = EventID.ascending()
-              const row = tx
-                .select({ seq: EventSequenceTable.seq })
-                .from(EventSequenceTable)
-                .where(eq(EventSequenceTable.aggregate_id, agg))
-                .get()
-              const seq = row?.seq != null ? row.seq + 1 : 0
-
-              const event = { id, seq, aggregateID: agg, data }
-              Effect.runSync(process(def, event, { publish }))
-            },
-            {
-              behavior: "immediate",
-            },
-          )
-        })
-      })
+        // Note that this is an "immediate" transaction which is critical.
+        // We need to make sure we can safely read and write with nothing
+        // else changing the data from under us
+        Database.transaction(
+          (tx) => {
+            const id = EventID.ascending()
+            const row = tx
+              .select({ seq: EventSequenceTable.seq })
+              .from(EventSequenceTable)
+              .where(eq(EventSequenceTable.aggregate_id, agg))
+              .get()
+            const seq = row?.seq != null ? row.seq + 1 : 0
+
+            const event = { id, seq, aggregateID: agg, data }
+            process(def, event, { publish })
+          },
+          {
+            behavior: "immediate",
+          },
+        )
+      }
 
-      const remove = Effect.fn("SyncEvent.remove")((aggregateID: string) =>
-        Effect.sync(() => {
-          Database.transaction((tx) => {
-            tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
-            tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
-          })
-        }),
-      )
+      function remove(aggregateID: string) {
+        Database.transaction((tx) => {
+          tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
+          tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
+        })
+      }
 
-      const subscribeAll = Effect.fn("SyncEvent.subscribeAll")((handler: (event: Payload) => void) =>
-        Effect.sync(() => {
-          bus.on("event", handler)
-          return () => bus.off("event", handler)
-        }),
-      )
+      function subscribeAll(handler: (event: Payload) => void) {
+        bus.on("event", handler)
+        return () => bus.off("event", handler)
+      }
 
       function payloads() {
         return z
@@ -266,7 +247,25 @@ export namespace SyncEvent {
           })
       }
 
-      return Service.of({ reset, init, replay, run, remove, subscribeAll, payloads })
+      return Service.of({
+        reset,
+        init,
+        replay: (event, options) =>
+          Effect.gen(function* () {
+            const ctx = yield* InstanceState.context
+            return yield* Effect.sync(() => Instance.restore(ctx, () => replay(event, options)))
+          }),
+        run: (def, data, options) =>
+          options?.publish === false
+            ? Effect.sync(() => run(def, data, options))
+            : Effect.gen(function* () {
+                const ctx = yield* InstanceState.context
+                return yield* Effect.sync(() => Instance.restore(ctx, () => run(def, data, options)))
+              }),
+        remove: (aggregateID) => Effect.sync(() => remove(aggregateID)),
+        subscribeAll: (handler) => Effect.sync(() => subscribeAll(handler)),
+        payloads,
+      })
     }),
   )