Przeglądaj źródła

effectify Pty service (#18572)

Kit Langton 4 tygodni temu
rodzic
commit
13bac9c91a

+ 279 - 203
packages/opencode/src/pty/index.ts

@@ -1,13 +1,16 @@
 import { BusEvent } from "@/bus/bus-event"
 import { Bus } from "@/bus"
+import { InstanceState } from "@/effect/instance-state"
+import { makeRunPromise } from "@/effect/run-service"
+import { Instance } from "@/project/instance"
 import { type IPty } from "bun-pty"
 import z from "zod"
 import { Log } from "../util/log"
-import { Instance } from "../project/instance"
 import { lazy } from "@opencode-ai/util/lazy"
 import { Shell } from "@/shell/shell"
 import { Plugin } from "@/plugin"
 import { PtyID } from "./schema"
+import { Effect, Layer, ServiceMap } from "effect"
 
 export namespace Pty {
   const log = Log.create({ service: "pty" })
@@ -23,6 +26,20 @@ export namespace Pty {
     close: (code?: number, reason?: string) => void
   }
 
+  type Active = {
+    info: Info
+    process: IPty
+    buffer: string
+    bufferCursor: number
+    cursor: number
+    subscribers: Map<unknown, Socket>
+  }
+
+  type State = {
+    dir: string
+    sessions: Map<PtyID, Active>
+  }
+
   // WebSocket control frame: 0x00 + UTF-8 JSON.
   const meta = (cursor: number) => {
     const json = JSON.stringify({ cursor })
@@ -81,241 +98,300 @@ export namespace Pty {
     Deleted: BusEvent.define("pty.deleted", z.object({ id: PtyID.zod })),
   }
 
-  interface ActiveSession {
-    info: Info
-    process: IPty
-    buffer: string
-    bufferCursor: number
-    cursor: number
-    subscribers: Map<unknown, Socket>
+  export interface Interface {
+    readonly list: () => Effect.Effect<Info[]>
+    readonly get: (id: PtyID) => Effect.Effect<Info | undefined>
+    readonly create: (input: CreateInput) => Effect.Effect<Info>
+    readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect<Info | undefined>
+    readonly remove: (id: PtyID) => Effect.Effect<void>
+    readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect<void>
+    readonly write: (id: PtyID, data: string) => Effect.Effect<void>
+    readonly connect: (
+      id: PtyID,
+      ws: Socket,
+      cursor?: number,
+    ) => Effect.Effect<{ onMessage: (message: string | ArrayBuffer) => void; onClose: () => void } | undefined>
   }
 
-  const state = Instance.state(
-    () => new Map<PtyID, ActiveSession>(),
-    async (sessions) => {
-      for (const session of sessions.values()) {
+  export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Pty") {}
+
+  export const layer = Layer.effect(
+    Service,
+    Effect.gen(function* () {
+      function teardown(session: Active) {
         try {
           session.process.kill()
         } catch {}
         for (const [key, ws] of session.subscribers.entries()) {
           try {
             if (ws.data === key) ws.close()
-          } catch {
-            // ignore
-          }
+          } catch {}
         }
+        session.subscribers.clear()
       }
-      sessions.clear()
-    },
-  )
 
-  export function list() {
-    return Array.from(state().values()).map((s) => s.info)
-  }
+      const cache = yield* InstanceState.make<State>(
+        Effect.fn("Pty.state")(function* (ctx) {
+          const state = {
+            dir: ctx.directory,
+            sessions: new Map<PtyID, Active>(),
+          }
 
-  export function get(id: PtyID) {
-    return state().get(id)?.info
-  }
+          yield* Effect.addFinalizer(() =>
+            Effect.sync(() => {
+              for (const session of state.sessions.values()) {
+                teardown(session)
+              }
+              state.sessions.clear()
+            }),
+          )
+
+          return state
+        }),
+      )
+
+      const remove = Effect.fn("Pty.remove")(function* (id: PtyID) {
+        const state = yield* InstanceState.get(cache)
+        const session = state.sessions.get(id)
+        if (!session) return
+        state.sessions.delete(id)
+        log.info("removing session", { id })
+        teardown(session)
+        void Bus.publish(Event.Deleted, { id: session.info.id })
+      })
 
-  export async function create(input: CreateInput) {
-    const id = PtyID.ascending()
-    const command = input.command || Shell.preferred()
-    const args = input.args || []
-    if (command.endsWith("sh")) {
-      args.push("-l")
-    }
-
-    const cwd = input.cwd || Instance.directory
-    const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
-    const env = {
-      ...process.env,
-      ...input.env,
-      ...shellEnv.env,
-      TERM: "xterm-256color",
-      OPENCODE_TERMINAL: "1",
-    } as Record<string, string>
-
-    if (process.platform === "win32") {
-      env.LC_ALL = "C.UTF-8"
-      env.LC_CTYPE = "C.UTF-8"
-      env.LANG = "C.UTF-8"
-    }
-    log.info("creating session", { id, cmd: command, args, cwd })
-
-    const spawn = await pty()
-    const ptyProcess = spawn(command, args, {
-      name: "xterm-256color",
-      cwd,
-      env,
-    })
+      const list = Effect.fn("Pty.list")(function* () {
+        const state = yield* InstanceState.get(cache)
+        return Array.from(state.sessions.values()).map((session) => session.info)
+      })
 
-    const info = {
-      id,
-      title: input.title || `Terminal ${id.slice(-4)}`,
-      command,
-      args,
-      cwd,
-      status: "running",
-      pid: ptyProcess.pid,
-    } as const
-    const session: ActiveSession = {
-      info,
-      process: ptyProcess,
-      buffer: "",
-      bufferCursor: 0,
-      cursor: 0,
-      subscribers: new Map(),
-    }
-    state().set(id, session)
-    ptyProcess.onData(
-      Instance.bind((chunk) => {
-        session.cursor += chunk.length
+      const get = Effect.fn("Pty.get")(function* (id: PtyID) {
+        const state = yield* InstanceState.get(cache)
+        return state.sessions.get(id)?.info
+      })
 
-        for (const [key, ws] of session.subscribers.entries()) {
-          if (ws.readyState !== 1) {
-            session.subscribers.delete(key)
-            continue
+      const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
+        const state = yield* InstanceState.get(cache)
+        return yield* Effect.promise(async () => {
+          const id = PtyID.ascending()
+          const command = input.command || Shell.preferred()
+          const args = input.args || []
+          if (command.endsWith("sh")) {
+            args.push("-l")
           }
 
-          if (ws.data !== key) {
-            session.subscribers.delete(key)
-            continue
+          const cwd = input.cwd || state.dir
+          const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
+          const env = {
+            ...process.env,
+            ...input.env,
+            ...shellEnv.env,
+            TERM: "xterm-256color",
+            OPENCODE_TERMINAL: "1",
+          } as Record<string, string>
+
+          if (process.platform === "win32") {
+            env.LC_ALL = "C.UTF-8"
+            env.LC_CTYPE = "C.UTF-8"
+            env.LANG = "C.UTF-8"
           }
+          log.info("creating session", { id, cmd: command, args, cwd })
+
+          const spawn = await pty()
+          const proc = spawn(command, args, {
+            name: "xterm-256color",
+            cwd,
+            env,
+          })
+
+          const info = {
+            id,
+            title: input.title || `Terminal ${id.slice(-4)}`,
+            command,
+            args,
+            cwd,
+            status: "running",
+            pid: proc.pid,
+          } as const
+          const session: Active = {
+            info,
+            process: proc,
+            buffer: "",
+            bufferCursor: 0,
+            cursor: 0,
+            subscribers: new Map(),
+          }
+          state.sessions.set(id, session)
+          proc.onData(
+            Instance.bind((chunk) => {
+              session.cursor += chunk.length
+
+              for (const [key, ws] of session.subscribers.entries()) {
+                if (ws.readyState !== 1) {
+                  session.subscribers.delete(key)
+                  continue
+                }
+                if (ws.data !== key) {
+                  session.subscribers.delete(key)
+                  continue
+                }
+                try {
+                  ws.send(chunk)
+                } catch {
+                  session.subscribers.delete(key)
+                }
+              }
+
+              session.buffer += chunk
+              if (session.buffer.length <= BUFFER_LIMIT) return
+              const excess = session.buffer.length - BUFFER_LIMIT
+              session.buffer = session.buffer.slice(excess)
+              session.bufferCursor += excess
+            }),
+          )
+          proc.onExit(
+            Instance.bind(({ exitCode }) => {
+              if (session.info.status === "exited") return
+              log.info("session exited", { id, exitCode })
+              session.info.status = "exited"
+              void Bus.publish(Event.Exited, { id, exitCode })
+              Effect.runFork(remove(id))
+            }),
+          )
+          await Bus.publish(Event.Created, { info })
+          return info
+        })
+      })
+
+      const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) {
+        const state = yield* InstanceState.get(cache)
+        const session = state.sessions.get(id)
+        if (!session) return
+        if (input.title) {
+          session.info.title = input.title
+        }
+        if (input.size) {
+          session.process.resize(input.size.cols, input.size.rows)
+        }
+        yield* Effect.promise(() => Bus.publish(Event.Updated, { info: session.info }))
+        return session.info
+      })
 
+      const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) {
+        const state = yield* InstanceState.get(cache)
+        const session = state.sessions.get(id)
+        if (session && session.info.status === "running") {
+          session.process.resize(cols, rows)
+        }
+      })
+
+      const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) {
+        const state = yield* InstanceState.get(cache)
+        const session = state.sessions.get(id)
+        if (session && session.info.status === "running") {
+          session.process.write(data)
+        }
+      })
+
+      const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) {
+        const state = yield* InstanceState.get(cache)
+        const session = state.sessions.get(id)
+        if (!session) {
+          ws.close()
+          return
+        }
+        log.info("client connected to session", { id })
+
+        // Use ws.data as the unique key for this connection lifecycle.
+        // If ws.data is undefined, fallback to ws object.
+        const key = ws.data && typeof ws.data === "object" ? ws.data : ws
+        // Optionally cleanup if the key somehow exists
+        session.subscribers.delete(key)
+        session.subscribers.set(key, ws)
+
+        const cleanup = () => {
+          session.subscribers.delete(key)
+        }
+
+        const start = session.bufferCursor
+        const end = session.cursor
+        const from =
+          cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
+
+        const data = (() => {
+          if (!session.buffer) return ""
+          if (from >= end) return ""
+          const offset = Math.max(0, from - start)
+          if (offset >= session.buffer.length) return ""
+          return session.buffer.slice(offset)
+        })()
+
+        if (data) {
           try {
-            ws.send(chunk)
+            for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
+              ws.send(data.slice(i, i + BUFFER_CHUNK))
+            }
           } catch {
-            session.subscribers.delete(key)
+            cleanup()
+            ws.close()
+            return
           }
         }
 
-        session.buffer += chunk
-        if (session.buffer.length <= BUFFER_LIMIT) return
-        const excess = session.buffer.length - BUFFER_LIMIT
-        session.buffer = session.buffer.slice(excess)
-        session.bufferCursor += excess
-      }),
-    )
-    ptyProcess.onExit(
-      Instance.bind(({ exitCode }) => {
-        if (session.info.status === "exited") return
-        log.info("session exited", { id, exitCode })
-        session.info.status = "exited"
-        Bus.publish(Event.Exited, { id, exitCode })
-        remove(id)
-      }),
-    )
-    Bus.publish(Event.Created, { info })
-    return info
+        try {
+          ws.send(meta(end))
+        } catch {
+          cleanup()
+          ws.close()
+          return
+        }
+
+        return {
+          onMessage: (message: string | ArrayBuffer) => {
+            session.process.write(String(message))
+          },
+          onClose: () => {
+            log.info("client disconnected from session", { id })
+            cleanup()
+          },
+        }
+      })
+
+      return Service.of({ list, get, create, update, remove, resize, write, connect })
+    }),
+  )
+
+  const runPromise = makeRunPromise(Service, layer)
+
+  export async function list() {
+    return runPromise((svc) => svc.list())
   }
 
-  export async function update(id: PtyID, input: UpdateInput) {
-    const session = state().get(id)
-    if (!session) return
-    if (input.title) {
-      session.info.title = input.title
-    }
-    if (input.size) {
-      session.process.resize(input.size.cols, input.size.rows)
-    }
-    Bus.publish(Event.Updated, { info: session.info })
-    return session.info
+  export async function get(id: PtyID) {
+    return runPromise((svc) => svc.get(id))
   }
 
-  export async function remove(id: PtyID) {
-    const session = state().get(id)
-    if (!session) return
-    state().delete(id)
-    log.info("removing session", { id })
-    try {
-      session.process.kill()
-    } catch {}
-    for (const [key, ws] of session.subscribers.entries()) {
-      try {
-        if (ws.data === key) ws.close()
-      } catch {
-        // ignore
-      }
-    }
-    session.subscribers.clear()
-    Bus.publish(Event.Deleted, { id: session.info.id })
+  export async function resize(id: PtyID, cols: number, rows: number) {
+    return runPromise((svc) => svc.resize(id, cols, rows))
   }
 
-  export function resize(id: PtyID, cols: number, rows: number) {
-    const session = state().get(id)
-    if (session && session.info.status === "running") {
-      session.process.resize(cols, rows)
-    }
+  export async function write(id: PtyID, data: string) {
+    return runPromise((svc) => svc.write(id, data))
   }
 
-  export function write(id: PtyID, data: string) {
-    const session = state().get(id)
-    if (session && session.info.status === "running") {
-      session.process.write(data)
-    }
+  export async function connect(id: PtyID, ws: Socket, cursor?: number) {
+    return runPromise((svc) => svc.connect(id, ws, cursor))
   }
 
-  export function connect(id: PtyID, ws: Socket, cursor?: number) {
-    const session = state().get(id)
-    if (!session) {
-      ws.close()
-      return
-    }
-    log.info("client connected to session", { id })
-
-    // Use ws.data as the unique key for this connection lifecycle.
-    // If ws.data is undefined, fallback to ws object.
-    const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws
-
-    // Optionally cleanup if the key somehow exists
-    session.subscribers.delete(connectionKey)
-    session.subscribers.set(connectionKey, ws)
-
-    const cleanup = () => {
-      session.subscribers.delete(connectionKey)
-    }
-
-    const start = session.bufferCursor
-    const end = session.cursor
-
-    const from =
-      cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
-
-    const data = (() => {
-      if (!session.buffer) return ""
-      if (from >= end) return ""
-      const offset = Math.max(0, from - start)
-      if (offset >= session.buffer.length) return ""
-      return session.buffer.slice(offset)
-    })()
-
-    if (data) {
-      try {
-        for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
-          ws.send(data.slice(i, i + BUFFER_CHUNK))
-        }
-      } catch {
-        cleanup()
-        ws.close()
-        return
-      }
-    }
-
-    try {
-      ws.send(meta(end))
-    } catch {
-      cleanup()
-      ws.close()
-      return
-    }
-    return {
-      onMessage: (message: string | ArrayBuffer) => {
-        session.process.write(String(message))
-      },
-      onClose: () => {
-        log.info("client disconnected from session", { id })
-        cleanup()
-      },
-    }
+  export async function create(input: CreateInput) {
+    return runPromise((svc) => svc.create(input))
+  }
+
+  export async function update(id: PtyID, input: UpdateInput) {
+    return runPromise((svc) => svc.update(id, input))
+  }
+
+  export async function remove(id: PtyID) {
+    return runPromise((svc) => svc.remove(id))
   }
 }

+ 17 - 7
packages/opencode/src/server/routes/pty.ts

@@ -28,7 +28,7 @@ export const PtyRoutes = lazy(() =>
         },
       }),
       async (c) => {
-        return c.json(Pty.list())
+        return c.json(await Pty.list())
       },
     )
     .post(
@@ -75,7 +75,7 @@ export const PtyRoutes = lazy(() =>
       }),
       validator("param", z.object({ ptyID: PtyID.zod })),
       async (c) => {
-        const info = Pty.get(c.req.valid("param").ptyID)
+        const info = await Pty.get(c.req.valid("param").ptyID)
         if (!info) {
           throw new NotFoundError({ message: "Session not found" })
         }
@@ -150,7 +150,7 @@ export const PtyRoutes = lazy(() =>
         },
       }),
       validator("param", z.object({ ptyID: PtyID.zod })),
-      upgradeWebSocket((c) => {
+      upgradeWebSocket(async (c) => {
         const id = PtyID.zod.parse(c.req.param("ptyID"))
         const cursor = (() => {
           const value = c.req.query("cursor")
@@ -159,8 +159,8 @@ export const PtyRoutes = lazy(() =>
           if (!Number.isSafeInteger(parsed) || parsed < -1) return
           return parsed
         })()
-        let handler: ReturnType<typeof Pty.connect>
-        if (!Pty.get(id)) throw new Error("Session not found")
+        let handler: Awaited<ReturnType<typeof Pty.connect>>
+        if (!(await Pty.get(id))) throw new Error("Session not found")
 
         type Socket = {
           readyState: number
@@ -176,17 +176,27 @@ export const PtyRoutes = lazy(() =>
           return typeof (value as { readyState?: unknown }).readyState === "number"
         }
 
+        const pending: string[] = []
+        let ready = false
+
         return {
-          onOpen(_event, ws) {
+          async onOpen(_event, ws) {
             const socket = ws.raw
             if (!isSocket(socket)) {
               ws.close()
               return
             }
-            handler = Pty.connect(id, socket, cursor)
+            handler = await Pty.connect(id, socket, cursor)
+            ready = true
+            for (const msg of pending) handler?.onMessage(msg)
+            pending.length = 0
           },
           onMessage(event) {
             if (typeof event.data !== "string") return
+            if (!ready) {
+              pending.push(event.data)
+              return
+            }
             handler?.onMessage(event.data)
           },
           onClose() {