瀏覽代碼

fix(core): use a queue to process events in event routes (#18259)

James Long 1 月之前
父節點
當前提交
0540751897

+ 2 - 2
packages/opencode/src/bus/index.ts

@@ -51,8 +51,8 @@ export namespace Bus {
     })
     const pending = []
     for (const key of [def.type, "*"]) {
-      const match = state().subscriptions.get(key)
-      for (const sub of match ?? []) {
+      const match = [...(state().subscriptions.get(key) ?? [])]
+      for (const sub of match) {
         pending.push(sub(payload))
       }
     }

+ 85 - 0
packages/opencode/src/server/routes/event.ts

@@ -0,0 +1,85 @@
+import { Hono } from "hono"
+import { describeRoute, resolver } from "hono-openapi"
+import { streamSSE } from "hono/streaming"
+import { Log } from "@/util/log"
+import { BusEvent } from "@/bus/bus-event"
+import { Bus } from "@/bus"
+import { lazy } from "../../util/lazy"
+import { AsyncQueue } from "../../util/queue"
+import { Instance } from "@/project/instance"
+
+const log = Log.create({ service: "server" })
+
+export const EventRoutes = lazy(() =>
+  new Hono().get(
+    "/event",
+    describeRoute({
+      summary: "Subscribe to events",
+      description: "Get events",
+      operationId: "event.subscribe",
+      responses: {
+        200: {
+          description: "Event stream",
+          content: {
+            "text/event-stream": {
+              schema: resolver(BusEvent.payloads()),
+            },
+          },
+        },
+      },
+    }),
+    async (c) => {
+      log.info("event connected")
+      c.header("X-Accel-Buffering", "no")
+      c.header("X-Content-Type-Options", "nosniff")
+      return streamSSE(c, async (stream) => {
+        const q = new AsyncQueue<string | null>()
+        let done = false
+
+        q.push(
+          JSON.stringify({
+            type: "server.connected",
+            properties: {},
+          }),
+        )
+
+        // Send heartbeat every 10s to prevent stalled proxy streams.
+        const heartbeat = setInterval(() => {
+          q.push(
+            JSON.stringify({
+              type: "server.heartbeat",
+              properties: {},
+            }),
+          )
+        }, 10_000)
+
+        const unsub = Bus.subscribeAll((event) => {
+          q.push(JSON.stringify(event))
+          if (event.type === Bus.InstanceDisposed.type) {
+            stop()
+          }
+        })
+
+        const stop = () => {
+          if (done) return
+          done = true
+          clearInterval(heartbeat)
+          unsub()
+          q.push(null)
+          log.info("event disconnected")
+        }
+
+        stream.onAbort(stop)
+
+        try {
+          for await (const data of q) {
+            if (data === null) return
+            await stream.writeSSE({ data })
+          }
+        } finally {
+          stop()
+        }
+      })
+    },
+  ),
+)

+ 34 - 20
packages/opencode/src/server/routes/global.ts

@@ -4,6 +4,7 @@ import { streamSSE } from "hono/streaming"
 import z from "zod"
 import { BusEvent } from "@/bus/bus-event"
 import { GlobalBus } from "@/bus/global"
+import { AsyncQueue } from "@/util/queue"
 import { Instance } from "../../project/instance"
 import { Installation } from "@/installation"
 import { Log } from "../../util/log"
@@ -69,41 +70,54 @@ export const GlobalRoutes = lazy(() =>
         c.header("X-Accel-Buffering", "no")
         c.header("X-Content-Type-Options", "nosniff")
         return streamSSE(c, async (stream) => {
-          stream.writeSSE({
-            data: JSON.stringify({
+          const q = new AsyncQueue<string | null>()
+          let done = false
+
+          q.push(
+            JSON.stringify({
               payload: {
                 type: "server.connected",
                 properties: {},
               },
             }),
-          })
-          async function handler(event: any) {
-            await stream.writeSSE({
-              data: JSON.stringify(event),
-            })
-          }
-          GlobalBus.on("event", handler)
+          )
 
           // Send heartbeat every 10s to prevent stalled proxy streams.
           const heartbeat = setInterval(() => {
-            stream.writeSSE({
-              data: JSON.stringify({
+            q.push(
+              JSON.stringify({
                 payload: {
                   type: "server.heartbeat",
                   properties: {},
                 },
               }),
-            })
+            )
           }, 10_000)
 
-          await new Promise<void>((resolve) => {
-            stream.onAbort(() => {
-              clearInterval(heartbeat)
-              GlobalBus.off("event", handler)
-              resolve()
-              log.info("global event disconnected")
-            })
-          })
+          async function handler(event: any) {
+            q.push(JSON.stringify(event))
+          }
+          GlobalBus.on("event", handler)
+
+          const stop = () => {
+            if (done) return
+            done = true
+            clearInterval(heartbeat)
+            GlobalBus.off("event", handler)
+            q.push(null)
+            log.info("event disconnected")
+          }
+
+          stream.onAbort(stop)
+
+          try {
+            for await (const data of q) {
+              if (data === null) return
+              await stream.writeSSE({ data })
+            }
+          } finally {
+            stop()
+          }
         })
       },
     )

+ 2 - 61
packages/opencode/src/server/server.ts

@@ -1,10 +1,7 @@
-import { BusEvent } from "@/bus/bus-event"
-import { Bus } from "@/bus"
 import { Log } from "../util/log"
 import { describeRoute, generateSpecs, validator, resolver, openAPIRouteHandler } from "hono-openapi"
 import { Hono } from "hono"
 import { cors } from "hono/cors"
-import { streamSSE } from "hono/streaming"
 import { proxy } from "hono/proxy"
 import { basicAuth } from "hono/basic-auth"
 import z from "zod"
@@ -34,6 +31,7 @@ import { FileRoutes } from "./routes/file"
 import { ConfigRoutes } from "./routes/config"
 import { ExperimentalRoutes } from "./routes/experimental"
 import { ProviderRoutes } from "./routes/provider"
+import { EventRoutes } from "./routes/event"
 import { InstanceBootstrap } from "../project/bootstrap"
 import { NotFoundError } from "../storage/db"
 import type { ContentfulStatusCode } from "hono/utils/http-status"
@@ -251,6 +249,7 @@ export namespace Server {
       .route("/question", QuestionRoutes())
       .route("/provider", ProviderRoutes())
       .route("/", FileRoutes())
+      .route("/", EventRoutes())
       .route("/mcp", McpRoutes())
       .route("/tui", TuiRoutes())
       .post(
@@ -498,64 +497,6 @@ export namespace Server {
           return c.json(await Format.status())
         },
       )
-      .get(
-        "/event",
-        describeRoute({
-          summary: "Subscribe to events",
-          description: "Get events",
-          operationId: "event.subscribe",
-          responses: {
-            200: {
-              description: "Event stream",
-              content: {
-                "text/event-stream": {
-                  schema: resolver(BusEvent.payloads()),
-                },
-              },
-            },
-          },
-        }),
-        async (c) => {
-          log.info("event connected")
-          c.header("X-Accel-Buffering", "no")
-          c.header("X-Content-Type-Options", "nosniff")
-          return streamSSE(c, async (stream) => {
-            stream.writeSSE({
-              data: JSON.stringify({
-                type: "server.connected",
-                properties: {},
-              }),
-            })
-            const unsub = Bus.subscribeAll(async (event) => {
-              await stream.writeSSE({
-                data: JSON.stringify(event),
-              })
-              if (event.type === Bus.InstanceDisposed.type) {
-                stream.close()
-              }
-            })
-
-            // Send heartbeat every 10s to prevent stalled proxy streams.
-            const heartbeat = setInterval(() => {
-              stream.writeSSE({
-                data: JSON.stringify({
-                  type: "server.heartbeat",
-                  properties: {},
-                }),
-              })
-            }, 10_000)
-
-            await new Promise<void>((resolve) => {
-              stream.onAbort(() => {
-                clearInterval(heartbeat)
-                unsub()
-                resolve()
-                log.info("event disconnected")
-              })
-            })
-          })
-        },
-      )
       .all("/*", async (c) => {
         const path = c.req.path