Преглед изворни кода

fix(core): switch /event route to effect and implement queue

James Long пре 4 недеља
родитељ
комит
66ab7332de
2 измењених фајлова са 93 додато и 61 уклоњено
  1. 91 0
      packages/opencode/src/server/routes/event.ts
  2. 2 61
      packages/opencode/src/server/server.ts

+ 91 - 0
packages/opencode/src/server/routes/event.ts

@@ -0,0 +1,91 @@
+import { Hono } from "hono"
+import { describeRoute, resolver } from "hono-openapi"
+import { streamSSE } from "hono/streaming"
+import { Effect, Queue, Stream } from "effect"
+import { Log } from "@/util/log"
+import { BusEvent } from "@/bus/bus-event"
+import { Bus } from "@/bus"
+import { lazy } from "../../util/lazy"
+
+const log = Log.create({ service: "server" })
+
+export const EventRoutes = lazy(() =>
+  new Hono().get(
+    "/event",
+    describeRoute({
+      summary: "Subscribe to events",
+      description: "Get events",
+      operationId: "event.subscribe",
+      responses: {
+        200: {
+          description: "Event stream",
+          content: {
+            "text/event-stream": {
+              schema: resolver(BusEvent.payloads()),
+            },
+          },
+        },
+      },
+    }),
+    async (c) => {
+      log.info("event connected")
+      c.header("X-Accel-Buffering", "no")
+      c.header("X-Content-Type-Options", "nosniff")
+      return streamSSE(c, async (stream) => {
+        await Effect.runPromise(
+          Stream.callback<string>((q) =>
+            Effect.acquireRelease(
+              Effect.sync(() => {
+                stream.onAbort(() => {
+                  Queue.endUnsafe(q)
+                })
+
+                Queue.offerUnsafe(
+                  q,
+                  JSON.stringify({
+                    type: "server.connected",
+                    properties: {},
+                  }),
+                )
+
+                const unsub = Bus.subscribeAll((event) => {
+                  Queue.offerUnsafe(q, JSON.stringify(event))
+                  if (event.type === Bus.InstanceDisposed.type) {
+                    Queue.endUnsafe(q)
+                  }
+                })
+
+                // Send heartbeat every 10s to prevent stalled proxy streams.
+                const heartbeat = setInterval(() => {
+                  Queue.offerUnsafe(
+                    q,
+                    JSON.stringify({
+                      type: "server.heartbeat",
+                      properties: {},
+                    }),
+                  )
+                }, 10_000)
+
+                return { heartbeat, unsub }
+              }),
+              (x) =>
+                Effect.sync(() => {
+                  clearInterval(x.heartbeat)
+                  x.unsub()
+                  Queue.endUnsafe(q)
+                  log.info("event disconnected")
+                }),
+            ),
+          ).pipe(
+            Stream.runForEach((data) =>
+              Effect.tryPromise({
+                try: () => stream.writeSSE({ data }),
+                catch: () => {},
+              }),
+            ),
+          ),
+        )
+      })
+    },
+  ),
+)

+ 2 - 61
packages/opencode/src/server/server.ts

@@ -1,10 +1,7 @@
-import { BusEvent } from "@/bus/bus-event"
-import { Bus } from "@/bus"
 import { Log } from "../util/log"
 import { describeRoute, generateSpecs, validator, resolver, openAPIRouteHandler } from "hono-openapi"
 import { Hono } from "hono"
 import { cors } from "hono/cors"
-import { streamSSE } from "hono/streaming"
 import { proxy } from "hono/proxy"
 import { basicAuth } from "hono/basic-auth"
 import z from "zod"
@@ -34,6 +31,7 @@ import { FileRoutes } from "./routes/file"
 import { ConfigRoutes } from "./routes/config"
 import { ExperimentalRoutes } from "./routes/experimental"
 import { ProviderRoutes } from "./routes/provider"
+import { EventRoutes } from "./routes/event"
 import { InstanceBootstrap } from "../project/bootstrap"
 import { NotFoundError } from "../storage/db"
 import type { ContentfulStatusCode } from "hono/utils/http-status"
@@ -251,6 +249,7 @@ export namespace Server {
       .route("/question", QuestionRoutes())
       .route("/provider", ProviderRoutes())
       .route("/", FileRoutes())
+      .route("/", EventRoutes())
       .route("/mcp", McpRoutes())
       .route("/tui", TuiRoutes())
       .post(
@@ -498,64 +497,6 @@ export namespace Server {
           return c.json(await Format.status())
         },
       )
-      .get(
-        "/event",
-        describeRoute({
-          summary: "Subscribe to events",
-          description: "Get events",
-          operationId: "event.subscribe",
-          responses: {
-            200: {
-              description: "Event stream",
-              content: {
-                "text/event-stream": {
-                  schema: resolver(BusEvent.payloads()),
-                },
-              },
-            },
-          },
-        }),
-        async (c) => {
-          log.info("event connected")
-          c.header("X-Accel-Buffering", "no")
-          c.header("X-Content-Type-Options", "nosniff")
-          return streamSSE(c, async (stream) => {
-            stream.writeSSE({
-              data: JSON.stringify({
-                type: "server.connected",
-                properties: {},
-              }),
-            })
-            const unsub = Bus.subscribeAll(async (event) => {
-              await stream.writeSSE({
-                data: JSON.stringify(event),
-              })
-              if (event.type === Bus.InstanceDisposed.type) {
-                stream.close()
-              }
-            })
-
-            // Send heartbeat every 10s to prevent stalled proxy streams.
-            const heartbeat = setInterval(() => {
-              stream.writeSSE({
-                data: JSON.stringify({
-                  type: "server.heartbeat",
-                  properties: {},
-                }),
-              })
-            }, 10_000)
-
-            await new Promise<void>((resolve) => {
-              stream.onAbort(() => {
-                clearInterval(heartbeat)
-                unsub()
-                resolve()
-                log.info("event disconnected")
-              })
-            })
-          })
-        },
-      )
       .all("/*", async (c) => {
         const path = c.req.path