Przeglądaj źródła

refactor(session): effectify SessionSummary service (#20142)

Kit Langton 2 tygodni temu
rodzic
commit
954a6ca88e

+ 2 - 2
packages/opencode/src/server/routes/session.ts

@@ -436,13 +436,13 @@ export const SessionRoutes = lazy(() =>
       validator(
         "param",
         z.object({
-          sessionID: SessionSummary.diff.schema.shape.sessionID,
+          sessionID: SessionSummary.DiffInput.shape.sessionID,
         }),
       ),
       validator(
         "query",
         z.object({
-          messageID: SessionSummary.diff.schema.shape.messageID,
+          messageID: SessionSummary.DiffInput.shape.messageID,
         }),
       ),
       async (c) => {

+ 4 - 6
packages/opencode/src/session/processor.ts

@@ -294,12 +294,10 @@ export namespace SessionProcessor {
                 }
                 ctx.snapshot = undefined
               }
-              yield* Effect.promise(() =>
-                SessionSummary.summarize({
-                  sessionID: ctx.sessionID,
-                  messageID: ctx.assistantMessage.parentID,
-                }),
-              ).pipe(Effect.ignoreCause({ log: true, message: "session summary failed" }), Effect.forkDetach)
+              SessionSummary.summarize({
+                sessionID: ctx.sessionID,
+                messageID: ctx.assistantMessage.parentID,
+              })
               if (
                 !ctx.assistantMessage.summary &&
                 isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model })

+ 112 - 96
packages/opencode/src/session/summary.ts

@@ -1,14 +1,12 @@
-import { fn } from "@/util/fn"
 import z from "zod"
+import { Effect, Layer, ServiceMap } from "effect"
+import { makeRuntime } from "@/effect/run-service"
+import { Bus } from "@/bus"
+import { Snapshot } from "@/snapshot"
+import { Storage } from "@/storage/storage"
 import { Session } from "."
-
 import { MessageV2 } from "./message-v2"
 import { SessionID, MessageID } from "./schema"
-import { Snapshot } from "@/snapshot"
-
-import { Storage } from "@/storage/storage"
-import { Bus } from "@/bus"
-import { NotFoundError } from "@/storage/db"
 
 export namespace SessionSummary {
   function unquoteGitPath(input: string) {
@@ -67,103 +65,121 @@ export namespace SessionSummary {
     return Buffer.from(bytes).toString()
   }
 
-  export const summarize = fn(
-    z.object({
-      sessionID: SessionID.zod,
-      messageID: MessageID.zod,
-    }),
-    async (input) => {
-      await Session.messages({ sessionID: input.sessionID })
-        .then((all) =>
-          Promise.all([
-            summarizeSession({ sessionID: input.sessionID, messages: all }),
-            summarizeMessage({ messageID: input.messageID, messages: all }),
-          ]),
-        )
-        .catch((err) => {
-          if (NotFoundError.isInstance(err)) return
-          throw err
+  export interface Interface {
+    readonly summarize: (input: { sessionID: SessionID; messageID: MessageID }) => Effect.Effect<void>
+    readonly diff: (input: { sessionID: SessionID; messageID?: MessageID }) => Effect.Effect<Snapshot.FileDiff[]>
+    readonly computeDiff: (input: { messages: MessageV2.WithParts[] }) => Effect.Effect<Snapshot.FileDiff[]>
+  }
+
+  export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SessionSummary") {}
+
+  export const layer = Layer.effect(
+    Service,
+    Effect.gen(function* () {
+      const sessions = yield* Session.Service
+      const snapshot = yield* Snapshot.Service
+      const storage = yield* Storage.Service
+      const bus = yield* Bus.Service
+
+      const computeDiff = Effect.fn("SessionSummary.computeDiff")(function* (input: {
+        messages: MessageV2.WithParts[]
+      }) {
+        let from: string | undefined
+        let to: string | undefined
+        for (const item of input.messages) {
+          if (!from) {
+            for (const part of item.parts) {
+              if (part.type === "step-start" && part.snapshot) {
+                from = part.snapshot
+                break
+              }
+            }
+          }
+          for (const part of item.parts) {
+            if (part.type === "step-finish" && part.snapshot) to = part.snapshot
+          }
+        }
+        if (from && to) return yield* snapshot.diffFull(from, to)
+        return []
+      })
+
+      const summarize = Effect.fn("SessionSummary.summarize")(function* (input: {
+        sessionID: SessionID
+        messageID: MessageID
+      }) {
+        const all = yield* sessions.messages({ sessionID: input.sessionID })
+        if (!all.length) return
+
+        const diffs = yield* computeDiff({ messages: all })
+        yield* sessions.setSummary({
+          sessionID: input.sessionID,
+          summary: {
+            additions: diffs.reduce((sum, x) => sum + x.additions, 0),
+            deletions: diffs.reduce((sum, x) => sum + x.deletions, 0),
+            files: diffs.length,
+          },
         })
-    },
-  )
+        yield* storage.write(["session_diff", input.sessionID], diffs).pipe(Effect.ignore)
+        yield* bus.publish(Session.Event.Diff, { sessionID: input.sessionID, diff: diffs })
 
-  async function summarizeSession(input: { sessionID: SessionID; messages: MessageV2.WithParts[] }) {
-    const diffs = await computeDiff({ messages: input.messages })
-    await Session.setSummary({
-      sessionID: input.sessionID,
-      summary: {
-        additions: diffs.reduce((sum, x) => sum + x.additions, 0),
-        deletions: diffs.reduce((sum, x) => sum + x.deletions, 0),
-        files: diffs.length,
-      },
-    })
-    await Storage.write(["session_diff", input.sessionID], diffs)
-    Bus.publish(Session.Event.Diff, {
-      sessionID: input.sessionID,
-      diff: diffs,
-    })
-  }
+        const messages = all.filter(
+          (m) =>
+            m.info.id === input.messageID || (m.info.role === "assistant" && m.info.parentID === input.messageID),
+        )
+        const target = messages.find((m) => m.info.id === input.messageID)
+        if (!target || target.info.role !== "user") return
+        const msgDiffs = yield* computeDiff({ messages })
+        target.info.summary = { ...target.info.summary, diffs: msgDiffs }
+        yield* sessions.updateMessage(target.info)
+      })
 
-  async function summarizeMessage(input: { messageID: string; messages: MessageV2.WithParts[] }) {
-    const messages = input.messages.filter(
-      (m) => m.info.id === input.messageID || (m.info.role === "assistant" && m.info.parentID === input.messageID),
-    )
-    const msgWithParts = messages.find((m) => m.info.id === input.messageID)
-    if (!msgWithParts || msgWithParts.info.role !== "user") return
-    const userMsg = msgWithParts.info
-    const diffs = await computeDiff({ messages })
-    userMsg.summary = {
-      ...userMsg.summary,
-      diffs,
-    }
-    await Session.updateMessage(userMsg)
-  }
+      const diff = Effect.fn("SessionSummary.diff")(function* (input: {
+        sessionID: SessionID
+        messageID?: MessageID
+      }) {
+        const diffs = yield* storage.read<Snapshot.FileDiff[]>(["session_diff", input.sessionID]).pipe(
+          Effect.catch(() => Effect.succeed([] as Snapshot.FileDiff[])),
+        )
+        const next = diffs.map((item) => {
+          const file = unquoteGitPath(item.file)
+          if (file === item.file) return item
+          return { ...item, file }
+        })
+        const changed = next.some((item, i) => item.file !== diffs[i]?.file)
+        if (changed) yield* storage.write(["session_diff", input.sessionID], next).pipe(Effect.ignore)
+        return next
+      })
 
-  export const diff = fn(
-    z.object({
-      sessionID: SessionID.zod,
-      messageID: MessageID.zod.optional(),
+      return Service.of({ summarize, diff, computeDiff })
     }),
-    async (input) => {
-      const diffs = await Storage.read<Snapshot.FileDiff[]>(["session_diff", input.sessionID]).catch(() => [])
-      const next = diffs.map((item) => {
-        const file = unquoteGitPath(item.file)
-        if (file === item.file) return item
-        return {
-          ...item,
-          file,
-        }
-      })
-      const changed = next.some((item, i) => item.file !== diffs[i]?.file)
-      if (changed) Storage.write(["session_diff", input.sessionID], next).catch(() => {})
-      return next
-    },
   )
 
-  export async function computeDiff(input: { messages: MessageV2.WithParts[] }) {
-    let from: string | undefined
-    let to: string | undefined
-
-    // scan assistant messages to find earliest from and latest to
-    // snapshot
-    for (const item of input.messages) {
-      if (!from) {
-        for (const part of item.parts) {
-          if (part.type === "step-start" && part.snapshot) {
-            from = part.snapshot
-            break
-          }
-        }
-      }
+  export const defaultLayer = Layer.unwrap(
+    Effect.sync(() =>
+      layer.pipe(
+        Layer.provide(Session.defaultLayer),
+        Layer.provide(Snapshot.defaultLayer),
+        Layer.provide(Storage.defaultLayer),
+        Layer.provide(Bus.layer),
+      ),
+    ),
+  )
 
-      for (const part of item.parts) {
-        if (part.type === "step-finish" && part.snapshot) {
-          to = part.snapshot
-        }
-      }
-    }
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
-    if (from && to) return Snapshot.diffFull(from, to)
-    return []
+  export const summarize = (input: { sessionID: SessionID; messageID: MessageID }) =>
+    void runPromise((svc) => svc.summarize(input)).catch(() => {})
+
+  export const DiffInput = z.object({
+    sessionID: SessionID.zod,
+    messageID: MessageID.zod.optional(),
+  })
+
+  export async function diff(input: z.infer<typeof DiffInput>) {
+    return runPromise((svc) => svc.diff(input))
+  }
+
+  export async function computeDiff(input: { messages: MessageV2.WithParts[] }) {
+    return runPromise((svc) => svc.computeDiff(input))
   }
 }