Ver Fonte

effectify Pty service, add runSyncInstance helper

Kit Langton há 1 mês atrás
pai
commit
6a3cb06c6c

+ 1 - 1
packages/opencode/specs/effect-migration.md

@@ -128,7 +128,7 @@ Still open and likely worth migrating:
 
 - [ ] `Plugin`
 - [ ] `ToolRegistry`
-- [ ] `Pty`
+- [x] `Pty`
 - [ ] `Worktree`
 - [ ] `Installation`
 - [ ] `Bus`

+ 3 - 0
packages/opencode/src/effect/instances.ts

@@ -4,6 +4,7 @@ import { FileTime } from "@/file/time"
 import { FileWatcher } from "@/file/watcher"
 import { Format } from "@/format"
 import { PermissionNext } from "@/permission"
+import { Pty } from "@/pty"
 import { Instance } from "@/project/instance"
 import { Vcs } from "@/project/vcs"
 import { ProviderAuth } from "@/provider/auth"
@@ -24,6 +25,7 @@ export type InstanceServices =
   | FileTime.Service
   | Format.Service
   | File.Service
+  | Pty.Service
   | Skill.Service
   | Snapshot.Service
 
@@ -44,6 +46,7 @@ function lookup(_key: string) {
     Layer.fresh(FileTime.layer).pipe(Layer.orDie),
     Layer.fresh(Format.layer),
     Layer.fresh(File.layer),
+    Layer.fresh(Pty.layer),
     Layer.fresh(Skill.defaultLayer),
     Layer.fresh(Snapshot.defaultLayer),
   ).pipe(Layer.provide(ctx))

+ 4 - 0
packages/opencode/src/effect/runtime.ts

@@ -18,6 +18,10 @@ export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceSer
   return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
 }
 
+export function runSyncInstance<A, E>(effect: Effect.Effect<A, E, InstanceServices>) {
+  return runtime.runSync(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
+}
+
 export function disposeRuntime() {
   return runtime.dispose()
 }

+ 250 - 196
packages/opencode/src/pty/index.ts

@@ -1,13 +1,15 @@
 import { BusEvent } from "@/bus/bus-event"
 import { Bus } from "@/bus"
+import { InstanceContext } from "@/effect/instance-context"
+import { runPromiseInstance, runSyncInstance } from "@/effect/runtime"
 import { type IPty } from "bun-pty"
 import z from "zod"
 import { Log } from "../util/log"
-import { Instance } from "../project/instance"
 import { lazy } from "@opencode-ai/util/lazy"
 import { Shell } from "@/shell/shell"
 import { Plugin } from "@/plugin"
 import { PtyID } from "./schema"
+import { Effect, Layer, ServiceMap } from "effect"
 
 export namespace Pty {
   const log = Log.create({ service: "pty" })
@@ -90,232 +92,284 @@ export namespace Pty {
     subscribers: Map<unknown, Socket>
   }
 
-  const state = Instance.state(
-    () => new Map<PtyID, ActiveSession>(),
-    async (sessions) => {
-      for (const session of sessions.values()) {
+  export interface Interface {
+    readonly list: () => Effect.Effect<Info[]>
+    readonly get: (id: PtyID) => Effect.Effect<Info | undefined>
+    readonly create: (input: CreateInput) => Effect.Effect<Info>
+    readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect<Info | undefined>
+    readonly remove: (id: PtyID) => Effect.Effect<void>
+    readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect<void>
+    readonly write: (id: PtyID, data: string) => Effect.Effect<void>
+    readonly connect: (
+      id: PtyID,
+      ws: Socket,
+      cursor?: number,
+    ) => Effect.Effect<{ onMessage: (message: string | ArrayBuffer) => void; onClose: () => void } | undefined>
+  }
+
+  export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Pty") {}
+
+  export const layer = Layer.effect(
+    Service,
+    Effect.gen(function* () {
+      const instance = yield* InstanceContext
+      const sessions = new Map<PtyID, ActiveSession>()
+
+      yield* Effect.addFinalizer(() =>
+        Effect.sync(() => {
+          for (const session of sessions.values()) {
+            try {
+              session.process.kill()
+            } catch {}
+            for (const [key, ws] of session.subscribers.entries()) {
+              try {
+                if (ws.data === key) ws.close()
+              } catch {}
+            }
+          }
+          sessions.clear()
+        }),
+      )
+
+      const removeSession = (id: PtyID) => {
+        const session = sessions.get(id)
+        if (!session) return
+        sessions.delete(id)
+        log.info("removing session", { id })
         try {
           session.process.kill()
         } catch {}
         for (const [key, ws] of session.subscribers.entries()) {
           try {
             if (ws.data === key) ws.close()
-          } catch {
-            // ignore
-          }
+          } catch {}
         }
+        session.subscribers.clear()
+        Bus.publish(Event.Deleted, { id: session.info.id })
       }
-      sessions.clear()
-    },
-  )
-
-  export function list() {
-    return Array.from(state().values()).map((s) => s.info)
-  }
-
-  export function get(id: PtyID) {
-    return state().get(id)?.info
-  }
 
-  export async function create(input: CreateInput) {
-    const id = PtyID.ascending()
-    const command = input.command || Shell.preferred()
-    const args = input.args || []
-    if (command.endsWith("sh")) {
-      args.push("-l")
-    }
-
-    const cwd = input.cwd || Instance.directory
-    const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
-    const env = {
-      ...process.env,
-      ...input.env,
-      ...shellEnv.env,
-      TERM: "xterm-256color",
-      OPENCODE_TERMINAL: "1",
-    } as Record<string, string>
-
-    if (process.platform === "win32") {
-      env.LC_ALL = "C.UTF-8"
-      env.LC_CTYPE = "C.UTF-8"
-      env.LANG = "C.UTF-8"
-    }
-    log.info("creating session", { id, cmd: command, args, cwd })
-
-    const spawn = await pty()
-    const ptyProcess = spawn(command, args, {
-      name: "xterm-256color",
-      cwd,
-      env,
-    })
+      const list = Effect.fn("Pty.list")(function* () {
+        return Array.from(sessions.values()).map((s) => s.info)
+      })
 
-    const info = {
-      id,
-      title: input.title || `Terminal ${id.slice(-4)}`,
-      command,
-      args,
-      cwd,
-      status: "running",
-      pid: ptyProcess.pid,
-    } as const
-    const session: ActiveSession = {
-      info,
-      process: ptyProcess,
-      buffer: "",
-      bufferCursor: 0,
-      cursor: 0,
-      subscribers: new Map(),
-    }
-    state().set(id, session)
-    ptyProcess.onData(
-      Instance.bind((chunk) => {
-        session.cursor += chunk.length
+      const get = Effect.fn("Pty.get")(function* (id: PtyID) {
+        return sessions.get(id)?.info
+      })
 
-        for (const [key, ws] of session.subscribers.entries()) {
-          if (ws.readyState !== 1) {
-            session.subscribers.delete(key)
-            continue
+      const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
+        return yield* Effect.promise(async () => {
+          const id = PtyID.ascending()
+          const command = input.command || Shell.preferred()
+          const args = input.args || []
+          if (command.endsWith("sh")) {
+            args.push("-l")
           }
 
-          if (ws.data !== key) {
-            session.subscribers.delete(key)
-            continue
+          const cwd = input.cwd || instance.directory
+          const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
+          const env = {
+            ...process.env,
+            ...input.env,
+            ...shellEnv.env,
+            TERM: "xterm-256color",
+            OPENCODE_TERMINAL: "1",
+          } as Record<string, string>
+
+          if (process.platform === "win32") {
+            env.LC_ALL = "C.UTF-8"
+            env.LC_CTYPE = "C.UTF-8"
+            env.LANG = "C.UTF-8"
+          }
+          log.info("creating session", { id, cmd: command, args, cwd })
+
+          const spawn = await pty()
+          const ptyProcess = spawn(command, args, {
+            name: "xterm-256color",
+            cwd,
+            env,
+          })
+
+          const info = {
+            id,
+            title: input.title || `Terminal ${id.slice(-4)}`,
+            command,
+            args,
+            cwd,
+            status: "running",
+            pid: ptyProcess.pid,
+          } as const
+          const session: ActiveSession = {
+            info,
+            process: ptyProcess,
+            buffer: "",
+            bufferCursor: 0,
+            cursor: 0,
+            subscribers: new Map(),
           }
+          sessions.set(id, session)
+          ptyProcess.onData((chunk) => {
+            session.cursor += chunk.length
+
+            for (const [key, ws] of session.subscribers.entries()) {
+              if (ws.readyState !== 1) {
+                session.subscribers.delete(key)
+                continue
+              }
+              if (ws.data !== key) {
+                session.subscribers.delete(key)
+                continue
+              }
+              try {
+                ws.send(chunk)
+              } catch {
+                session.subscribers.delete(key)
+              }
+            }
+
+            session.buffer += chunk
+            if (session.buffer.length <= BUFFER_LIMIT) return
+            const excess = session.buffer.length - BUFFER_LIMIT
+            session.buffer = session.buffer.slice(excess)
+            session.bufferCursor += excess
+          })
+          ptyProcess.onExit(({ exitCode }) => {
+            if (session.info.status === "exited") return
+            log.info("session exited", { id, exitCode })
+            session.info.status = "exited"
+            Bus.publish(Event.Exited, { id, exitCode })
+            removeSession(id)
+          })
+          Bus.publish(Event.Created, { info })
+          return info
+        })
+      })
+
+      const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) {
+        const session = sessions.get(id)
+        if (!session) return
+        if (input.title) {
+          session.info.title = input.title
+        }
+        if (input.size) {
+          session.process.resize(input.size.cols, input.size.rows)
+        }
+        Bus.publish(Event.Updated, { info: session.info })
+        return session.info
+      })
+
+      const remove = Effect.fn("Pty.remove")(function* (id: PtyID) {
+        removeSession(id)
+      })
+
+      const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) {
+        const session = sessions.get(id)
+        if (session && session.info.status === "running") {
+          session.process.resize(cols, rows)
+        }
+      })
+
+      const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) {
+        const session = sessions.get(id)
+        if (session && session.info.status === "running") {
+          session.process.write(data)
+        }
+      })
 
+      const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) {
+        const session = sessions.get(id)
+        if (!session) {
+          ws.close()
+          return
+        }
+        log.info("client connected to session", { id })
+
+        const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws
+        session.subscribers.delete(connectionKey)
+        session.subscribers.set(connectionKey, ws)
+
+        const cleanup = () => {
+          session.subscribers.delete(connectionKey)
+        }
+
+        const start = session.bufferCursor
+        const end = session.cursor
+
+        const from =
+          cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
+
+        const data = (() => {
+          if (!session.buffer) return ""
+          if (from >= end) return ""
+          const offset = Math.max(0, from - start)
+          if (offset >= session.buffer.length) return ""
+          return session.buffer.slice(offset)
+        })()
+
+        if (data) {
           try {
-            ws.send(chunk)
+            for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
+              ws.send(data.slice(i, i + BUFFER_CHUNK))
+            }
           } catch {
-            session.subscribers.delete(key)
+            cleanup()
+            ws.close()
+            return
           }
         }
 
-        session.buffer += chunk
-        if (session.buffer.length <= BUFFER_LIMIT) return
-        const excess = session.buffer.length - BUFFER_LIMIT
-        session.buffer = session.buffer.slice(excess)
-        session.bufferCursor += excess
-      }),
-    )
-    ptyProcess.onExit(
-      Instance.bind(({ exitCode }) => {
-        if (session.info.status === "exited") return
-        log.info("session exited", { id, exitCode })
-        session.info.status = "exited"
-        Bus.publish(Event.Exited, { id, exitCode })
-        remove(id)
-      }),
-    )
-    Bus.publish(Event.Created, { info })
-    return info
-  }
+        try {
+          ws.send(meta(end))
+        } catch {
+          cleanup()
+          ws.close()
+          return
+        }
+        return {
+          onMessage: (message: string | ArrayBuffer) => {
+            session.process.write(String(message))
+          },
+          onClose: () => {
+            log.info("client disconnected from session", { id })
+            cleanup()
+          },
+        }
+      })
 
-  export async function update(id: PtyID, input: UpdateInput) {
-    const session = state().get(id)
-    if (!session) return
-    if (input.title) {
-      session.info.title = input.title
-    }
-    if (input.size) {
-      session.process.resize(input.size.cols, input.size.rows)
-    }
-    Bus.publish(Event.Updated, { info: session.info })
-    return session.info
+      return Service.of({ list, get, create, update, remove, resize, write, connect })
+    }),
+  )
+
+  // Sync facades
+  export function list() {
+    return runSyncInstance(Service.use((svc) => svc.list()))
   }
 
-  export async function remove(id: PtyID) {
-    const session = state().get(id)
-    if (!session) return
-    state().delete(id)
-    log.info("removing session", { id })
-    try {
-      session.process.kill()
-    } catch {}
-    for (const [key, ws] of session.subscribers.entries()) {
-      try {
-        if (ws.data === key) ws.close()
-      } catch {
-        // ignore
-      }
-    }
-    session.subscribers.clear()
-    Bus.publish(Event.Deleted, { id: session.info.id })
+  export function get(id: PtyID) {
+    return runSyncInstance(Service.use((svc) => svc.get(id)))
   }
 
   export function resize(id: PtyID, cols: number, rows: number) {
-    const session = state().get(id)
-    if (session && session.info.status === "running") {
-      session.process.resize(cols, rows)
-    }
+    runSyncInstance(Service.use((svc) => svc.resize(id, cols, rows)))
   }
 
   export function write(id: PtyID, data: string) {
-    const session = state().get(id)
-    if (session && session.info.status === "running") {
-      session.process.write(data)
-    }
+    runSyncInstance(Service.use((svc) => svc.write(id, data)))
   }
 
   export function connect(id: PtyID, ws: Socket, cursor?: number) {
-    const session = state().get(id)
-    if (!session) {
-      ws.close()
-      return
-    }
-    log.info("client connected to session", { id })
-
-    // Use ws.data as the unique key for this connection lifecycle.
-    // If ws.data is undefined, fallback to ws object.
-    const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws
-
-    // Optionally cleanup if the key somehow exists
-    session.subscribers.delete(connectionKey)
-    session.subscribers.set(connectionKey, ws)
-
-    const cleanup = () => {
-      session.subscribers.delete(connectionKey)
-    }
-
-    const start = session.bufferCursor
-    const end = session.cursor
-
-    const from =
-      cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
-
-    const data = (() => {
-      if (!session.buffer) return ""
-      if (from >= end) return ""
-      const offset = Math.max(0, from - start)
-      if (offset >= session.buffer.length) return ""
-      return session.buffer.slice(offset)
-    })()
-
-    if (data) {
-      try {
-        for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
-          ws.send(data.slice(i, i + BUFFER_CHUNK))
-        }
-      } catch {
-        cleanup()
-        ws.close()
-        return
-      }
-    }
-
-    try {
-      ws.send(meta(end))
-    } catch {
-      cleanup()
-      ws.close()
-      return
-    }
-    return {
-      onMessage: (message: string | ArrayBuffer) => {
-        session.process.write(String(message))
-      },
-      onClose: () => {
-        log.info("client disconnected from session", { id })
-        cleanup()
-      },
-    }
+    return runSyncInstance(Service.use((svc) => svc.connect(id, ws, cursor)))
+  }
+
+  // Async facades
+  export async function create(input: CreateInput) {
+    return runPromiseInstance(Service.use((svc) => svc.create(input)))
+  }
+
+  export async function update(id: PtyID, input: UpdateInput) {
+    return runPromiseInstance(Service.use((svc) => svc.update(id, input)))
+  }
+
+  export async function remove(id: PtyID) {
+    return runPromiseInstance(Service.use((svc) => svc.remove(id)))
   }
 }