Browse Source

refactor(pty): remove async facade exports (#22305)

Kit Langton 5 ngày trước cách đây
mục cha
commit
6825b0bbc7

+ 0 - 31
packages/opencode/src/pty/index.ts

@@ -1,7 +1,6 @@
 import { BusEvent } from "@/bus/bus-event"
 import { Bus } from "@/bus"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRuntime } from "@/effect/run-service"
 import { Instance } from "@/project/instance"
 import type { Proc } from "#pty"
 import z from "zod"
@@ -361,34 +360,4 @@ export namespace Pty {
   )
 
   export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Plugin.defaultLayer))
-
-  const { runPromise } = makeRuntime(Service, defaultLayer)
-
-  export async function list() {
-    return runPromise((svc) => svc.list())
-  }
-
-  export async function get(id: PtyID) {
-    return runPromise((svc) => svc.get(id))
-  }
-
-  export async function write(id: PtyID, data: string) {
-    return runPromise((svc) => svc.write(id, data))
-  }
-
-  export async function connect(id: PtyID, ws: Socket, cursor?: number) {
-    return runPromise((svc) => svc.connect(id, ws, cursor))
-  }
-
-  export async function create(input: CreateInput) {
-    return runPromise((svc) => svc.create(input))
-  }
-
-  export async function update(id: PtyID, input: UpdateInput) {
-    return runPromise((svc) => svc.update(id, input))
-  }
-
-  export async function remove(id: PtyID) {
-    return runPromise((svc) => svc.remove(id))
-  }
 }

+ 56 - 8
packages/opencode/src/server/instance/pty.ts

@@ -1,7 +1,9 @@
 import { Hono, type MiddlewareHandler } from "hono"
 import { describeRoute, validator, resolver } from "hono-openapi"
 import type { UpgradeWebSocket } from "hono/ws"
+import { Effect } from "effect"
 import z from "zod"
+import { AppRuntime } from "@/effect/app-runtime"
 import { Pty } from "@/pty"
 import { PtyID } from "@/pty/schema"
 import { NotFoundError } from "../../storage/db"
@@ -27,7 +29,14 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {
         },
       }),
       async (c) => {
-        return c.json(await Pty.list())
+        return c.json(
+          await AppRuntime.runPromise(
+            Effect.gen(function* () {
+              const pty = yield* Pty.Service
+              return yield* pty.list()
+            }),
+          ),
+        )
       },
     )
     .post(
@@ -50,7 +59,12 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {
       }),
       validator("json", Pty.CreateInput),
       async (c) => {
-        const info = await Pty.create(c.req.valid("json"))
+        const info = await AppRuntime.runPromise(
+          Effect.gen(function* () {
+            const pty = yield* Pty.Service
+            return yield* pty.create(c.req.valid("json"))
+          }),
+        )
         return c.json(info)
       },
     )
@@ -74,7 +88,12 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {
       }),
       validator("param", z.object({ ptyID: PtyID.zod })),
       async (c) => {
-        const info = await Pty.get(c.req.valid("param").ptyID)
+        const info = await AppRuntime.runPromise(
+          Effect.gen(function* () {
+            const pty = yield* Pty.Service
+            return yield* pty.get(c.req.valid("param").ptyID)
+          }),
+        )
         if (!info) {
           throw new NotFoundError({ message: "Session not found" })
         }
@@ -102,7 +121,12 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {
       validator("param", z.object({ ptyID: PtyID.zod })),
       validator("json", Pty.UpdateInput),
       async (c) => {
-        const info = await Pty.update(c.req.valid("param").ptyID, c.req.valid("json"))
+        const info = await AppRuntime.runPromise(
+          Effect.gen(function* () {
+            const pty = yield* Pty.Service
+            return yield* pty.update(c.req.valid("param").ptyID, c.req.valid("json"))
+          }),
+        )
         return c.json(info)
       },
     )
@@ -126,7 +150,12 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {
       }),
       validator("param", z.object({ ptyID: PtyID.zod })),
       async (c) => {
-        await Pty.remove(c.req.valid("param").ptyID)
+        await AppRuntime.runPromise(
+          Effect.gen(function* () {
+            const pty = yield* Pty.Service
+            yield* pty.remove(c.req.valid("param").ptyID)
+          }),
+        )
         return c.json(true)
       },
     )
@@ -150,6 +179,11 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {
       }),
       validator("param", z.object({ ptyID: PtyID.zod })),
       upgradeWebSocket(async (c) => {
+        type Handler = {
+          onMessage: (message: string | ArrayBuffer) => void
+          onClose: () => void
+        }
+
         const id = PtyID.zod.parse(c.req.param("ptyID"))
         const cursor = (() => {
           const value = c.req.query("cursor")
@@ -158,8 +192,17 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {
           if (!Number.isSafeInteger(parsed) || parsed < -1) return
           return parsed
         })()
-        let handler: Awaited<ReturnType<typeof Pty.connect>>
-        if (!(await Pty.get(id))) throw new Error("Session not found")
+        let handler: Handler | undefined
+        if (
+          !(await AppRuntime.runPromise(
+            Effect.gen(function* () {
+              const pty = yield* Pty.Service
+              return yield* pty.get(id)
+            }),
+          ))
+        ) {
+          throw new Error("Session not found")
+        }
 
         type Socket = {
           readyState: number
@@ -185,7 +228,12 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {
               ws.close()
               return
             }
-            handler = await Pty.connect(id, socket, cursor)
+            handler = await AppRuntime.runPromise(
+              Effect.gen(function* () {
+                const pty = yield* Pty.Service
+                return yield* pty.connect(id, socket, cursor)
+              }),
+            )
             ready = true
             for (const msg of pending) handler?.onMessage(msg)
             pending.length = 0

+ 115 - 110
packages/opencode/test/pty/pty-output-isolation.test.ts

@@ -1,4 +1,6 @@
 import { describe, expect, test } from "bun:test"
+import { AppRuntime } from "../../src/effect/app-runtime"
+import { Effect } from "effect"
 import { Instance } from "../../src/project/instance"
 import { Pty } from "../../src/pty"
 import { tmpdir } from "../fixture/fixture"
@@ -10,48 +12,48 @@ describe("pty", () => {
 
     await Instance.provide({
       directory: dir.path,
-      fn: async () => {
-        const a = await Pty.create({ command: "cat", title: "a" })
-        const b = await Pty.create({ command: "cat", title: "b" })
-        try {
-          const outA: string[] = []
-          const outB: string[] = []
-
-          const ws = {
-            readyState: 1,
-            data: { events: { connection: "a" } },
-            send: (data: unknown) => {
-              outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
-            },
-            close: () => {
-              // no-op (simulate abrupt drop)
-            },
-          }
-
-          // Connect "a" first with ws.
-          Pty.connect(a.id, ws as any)
-
-          // Now "reuse" the same ws object for another connection.
-          ws.data = { events: { connection: "b" } }
-          ws.send = (data: unknown) => {
-            outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
-          }
-          Pty.connect(b.id, ws as any)
-
-          // Clear connect metadata writes.
-          outA.length = 0
-          outB.length = 0
-
-          // Output from a must never show up in b.
-          Pty.write(a.id, "AAA\n")
-          await sleep(100)
-
-          expect(outB.join("")).not.toContain("AAA")
-        } finally {
-          await Pty.remove(a.id)
-          await Pty.remove(b.id)
-        }
-      },
+      fn: () =>
+        AppRuntime.runPromise(
+          Effect.gen(function* () {
+            const pty = yield* Pty.Service
+            const a = yield* pty.create({ command: "cat", title: "a" })
+            const b = yield* pty.create({ command: "cat", title: "b" })
+            try {
+              const outA: string[] = []
+              const outB: string[] = []
+
+              const ws = {
+                readyState: 1,
+                data: { events: { connection: "a" } },
+                send: (data: unknown) => {
+                  outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
+                },
+                close: () => {
+                  // no-op (simulate abrupt drop)
+                },
+              }
+
+              yield* pty.connect(a.id, ws as any)
+
+              ws.data = { events: { connection: "b" } }
+              ws.send = (data: unknown) => {
+                outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
+              }
+              yield* pty.connect(b.id, ws as any)
+
+              outA.length = 0
+              outB.length = 0
+
+              yield* pty.write(a.id, "AAA\n")
+              yield* Effect.promise(() => sleep(100))
+
+              expect(outB.join("")).not.toContain("AAA")
+            } finally {
+              yield* pty.remove(a.id)
+              yield* pty.remove(b.id)
+            }
+          }),
+        ),
     })
   })
 
@@ -60,42 +62,43 @@ describe("pty", () => {
 
     await Instance.provide({
       directory: dir.path,
-      fn: async () => {
-        const a = await Pty.create({ command: "cat", title: "a" })
-        try {
-          const outA: string[] = []
-          const outB: string[] = []
-
-          const ws = {
-            readyState: 1,
-            data: { events: { connection: "a" } },
-            send: (data: unknown) => {
-              outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
-            },
-            close: () => {
-              // no-op (simulate abrupt drop)
-            },
-          }
-
-          // Connect "a" first.
-          Pty.connect(a.id, ws as any)
-          outA.length = 0
-
-          // Simulate Bun reusing the same websocket object for another
-          // connection before the next onOpen calls Pty.connect.
-          ws.data = { events: { connection: "b" } }
-          ws.send = (data: unknown) => {
-            outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
-          }
-
-          Pty.write(a.id, "AAA\n")
-          await sleep(100)
-
-          expect(outB.join("")).not.toContain("AAA")
-        } finally {
-          await Pty.remove(a.id)
-        }
-      },
+      fn: () =>
+        AppRuntime.runPromise(
+          Effect.gen(function* () {
+            const pty = yield* Pty.Service
+            const a = yield* pty.create({ command: "cat", title: "a" })
+            try {
+              const outA: string[] = []
+              const outB: string[] = []
+
+              const ws = {
+                readyState: 1,
+                data: { events: { connection: "a" } },
+                send: (data: unknown) => {
+                  outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
+                },
+                close: () => {
+                  // no-op (simulate abrupt drop)
+                },
+              }
+
+              yield* pty.connect(a.id, ws as any)
+              outA.length = 0
+
+              ws.data = { events: { connection: "b" } }
+              ws.send = (data: unknown) => {
+                outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
+              }
+
+              yield* pty.write(a.id, "AAA\n")
+              yield* Effect.promise(() => sleep(100))
+
+              expect(outB.join("")).not.toContain("AAA")
+            } finally {
+              yield* pty.remove(a.id)
+            }
+          }),
+        ),
     })
   })
 
@@ -104,38 +107,40 @@ describe("pty", () => {
 
     await Instance.provide({
       directory: dir.path,
-      fn: async () => {
-        const a = await Pty.create({ command: "cat", title: "a" })
-        try {
-          const out: string[] = []
-
-          const ctx = { connId: 1 }
-          const ws = {
-            readyState: 1,
-            data: ctx,
-            send: (data: unknown) => {
-              out.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
-            },
-            close: () => {
-              // no-op
-            },
-          }
-
-          Pty.connect(a.id, ws as any)
-          out.length = 0
-
-          // Mutating fields on ws.data should not look like a new
-          // connection lifecycle when the object identity stays stable.
-          ctx.connId = 2
-
-          Pty.write(a.id, "AAA\n")
-          await sleep(100)
-
-          expect(out.join("")).toContain("AAA")
-        } finally {
-          await Pty.remove(a.id)
-        }
-      },
+      fn: () =>
+        AppRuntime.runPromise(
+          Effect.gen(function* () {
+            const pty = yield* Pty.Service
+            const a = yield* pty.create({ command: "cat", title: "a" })
+            try {
+              const out: string[] = []
+
+              const ctx = { connId: 1 }
+              const ws = {
+                readyState: 1,
+                data: ctx,
+                send: (data: unknown) => {
+                  out.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
+                },
+                close: () => {
+                  // no-op
+                },
+              }
+
+              yield* pty.connect(a.id, ws as any)
+              out.length = 0
+
+              ctx.connId = 2
+
+              yield* pty.write(a.id, "AAA\n")
+              yield* Effect.promise(() => sleep(100))
+
+              expect(out.join("")).toContain("AAA")
+            } finally {
+              yield* pty.remove(a.id)
+            }
+          }),
+        ),
     })
   })
 })

+ 54 - 44
packages/opencode/test/pty/pty-session.test.ts

@@ -1,5 +1,7 @@
 import { describe, expect, test } from "bun:test"
+import { AppRuntime } from "../../src/effect/app-runtime"
 import { Bus } from "../../src/bus"
+import { Effect } from "effect"
 import { Instance } from "../../src/project/instance"
 import { Pty } from "../../src/pty"
 import type { PtyID } from "../../src/pty/schema"
@@ -27,33 +29,37 @@ describe("pty", () => {
 
     await Instance.provide({
       directory: dir.path,
-      fn: async () => {
-        const log: Array<{ type: "created" | "exited" | "deleted"; id: PtyID }> = []
-        const off = [
-          Bus.subscribe(Pty.Event.Created, (evt) => log.push({ type: "created", id: evt.properties.info.id })),
-          Bus.subscribe(Pty.Event.Exited, (evt) => log.push({ type: "exited", id: evt.properties.id })),
-          Bus.subscribe(Pty.Event.Deleted, (evt) => log.push({ type: "deleted", id: evt.properties.id })),
-        ]
+      fn: () =>
+        AppRuntime.runPromise(
+          Effect.gen(function* () {
+            const pty = yield* Pty.Service
+            const log: Array<{ type: "created" | "exited" | "deleted"; id: PtyID }> = []
+            const off = [
+              Bus.subscribe(Pty.Event.Created, (evt) => log.push({ type: "created", id: evt.properties.info.id })),
+              Bus.subscribe(Pty.Event.Exited, (evt) => log.push({ type: "exited", id: evt.properties.id })),
+              Bus.subscribe(Pty.Event.Deleted, (evt) => log.push({ type: "deleted", id: evt.properties.id })),
+            ]
 
-        let id: PtyID | undefined
-        try {
-          const info = await Pty.create({
-            command: "/usr/bin/env",
-            args: ["sh", "-c", "sleep 0.1"],
-            title: "sleep",
-          })
-          id = info.id
+            let id: PtyID | undefined
+            try {
+              const info = yield* pty.create({
+                command: "/usr/bin/env",
+                args: ["sh", "-c", "sleep 0.1"],
+                title: "sleep",
+              })
+              id = info.id
 
-          await wait(() => pick(log, id!).includes("exited"))
+              yield* Effect.promise(() => wait(() => pick(log, id!).includes("exited")))
 
-          await Pty.remove(id)
-          await wait(() => pick(log, id!).length >= 3)
-          expect(pick(log, id!)).toEqual(["created", "exited", "deleted"])
-        } finally {
-          off.forEach((x) => x())
-          if (id) await Pty.remove(id)
-        }
-      },
+              yield* pty.remove(id)
+              yield* Effect.promise(() => wait(() => pick(log, id!).length >= 3))
+              expect(pick(log, id!)).toEqual(["created", "exited", "deleted"])
+            } finally {
+              off.forEach((x) => x())
+              if (id) yield* pty.remove(id)
+            }
+          }),
+        ),
     })
   })
 
@@ -64,29 +70,33 @@ describe("pty", () => {
 
     await Instance.provide({
       directory: dir.path,
-      fn: async () => {
-        const log: Array<{ type: "created" | "exited" | "deleted"; id: PtyID }> = []
-        const off = [
-          Bus.subscribe(Pty.Event.Created, (evt) => log.push({ type: "created", id: evt.properties.info.id })),
-          Bus.subscribe(Pty.Event.Exited, (evt) => log.push({ type: "exited", id: evt.properties.id })),
-          Bus.subscribe(Pty.Event.Deleted, (evt) => log.push({ type: "deleted", id: evt.properties.id })),
-        ]
+      fn: () =>
+        AppRuntime.runPromise(
+          Effect.gen(function* () {
+            const pty = yield* Pty.Service
+            const log: Array<{ type: "created" | "exited" | "deleted"; id: PtyID }> = []
+            const off = [
+              Bus.subscribe(Pty.Event.Created, (evt) => log.push({ type: "created", id: evt.properties.info.id })),
+              Bus.subscribe(Pty.Event.Exited, (evt) => log.push({ type: "exited", id: evt.properties.id })),
+              Bus.subscribe(Pty.Event.Deleted, (evt) => log.push({ type: "deleted", id: evt.properties.id })),
+            ]
 
-        let id: PtyID | undefined
-        try {
-          const info = await Pty.create({ command: "/bin/sh", title: "sh" })
-          id = info.id
+            let id: PtyID | undefined
+            try {
+              const info = yield* pty.create({ command: "/bin/sh", title: "sh" })
+              id = info.id
 
-          await sleep(100)
+              yield* Effect.promise(() => sleep(100))
 
-          await Pty.remove(id)
-          await wait(() => pick(log, id!).length >= 3)
-          expect(pick(log, id!)).toEqual(["created", "exited", "deleted"])
-        } finally {
-          off.forEach((x) => x())
-          if (id) await Pty.remove(id)
-        }
-      },
+              yield* pty.remove(id)
+              yield* Effect.promise(() => wait(() => pick(log, id!).length >= 3))
+              expect(pick(log, id!)).toEqual(["created", "exited", "deleted"])
+            } finally {
+              off.forEach((x) => x())
+              if (id) yield* pty.remove(id)
+            }
+          }),
+        ),
     })
   })
 })

+ 26 - 16
packages/opencode/test/pty/pty-shell.test.ts

@@ -1,4 +1,6 @@
 import { describe, expect, test } from "bun:test"
+import { AppRuntime } from "../../src/effect/app-runtime"
+import { Effect } from "effect"
 import { Instance } from "../../src/project/instance"
 import { Pty } from "../../src/pty"
 import { Shell } from "../../src/shell/shell"
@@ -17,14 +19,18 @@ describe("pty shell args", () => {
         await using dir = await tmpdir()
         await Instance.provide({
           directory: dir.path,
-          fn: async () => {
-            const info = await Pty.create({ command: ps, title: "pwsh" })
-            try {
-              expect(info.args).toEqual([])
-            } finally {
-              await Pty.remove(info.id)
-            }
-          },
+          fn: () =>
+            AppRuntime.runPromise(
+              Effect.gen(function* () {
+                const pty = yield* Pty.Service
+                const info = yield* pty.create({ command: ps, title: "pwsh" })
+                try {
+                  expect(info.args).toEqual([])
+                } finally {
+                  yield* pty.remove(info.id)
+                }
+              }),
+            ),
         })
       },
       { timeout: 30000 },
@@ -43,14 +49,18 @@ describe("pty shell args", () => {
         await using dir = await tmpdir()
         await Instance.provide({
           directory: dir.path,
-          fn: async () => {
-            const info = await Pty.create({ command: bash, title: "bash" })
-            try {
-              expect(info.args).toEqual(["-l"])
-            } finally {
-              await Pty.remove(info.id)
-            }
-          },
+          fn: () =>
+            AppRuntime.runPromise(
+              Effect.gen(function* () {
+                const pty = yield* Pty.Service
+                const info = yield* pty.create({ command: bash, title: "bash" })
+                try {
+                  expect(info.args).toEqual(["-l"])
+                } finally {
+                  yield* pty.remove(info.id)
+                }
+              }),
+            ),
         })
       },
       { timeout: 30000 },