index.ts 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. import { BusEvent } from "@/bus/bus-event"
  2. import { Bus } from "@/bus"
  3. import { type IPty } from "bun-pty"
  4. import z from "zod"
  5. import { Identifier } from "../id/id"
  6. import { Log } from "../util/log"
  7. import type { WSContext } from "hono/ws"
  8. import { Instance } from "../project/instance"
  9. import { lazy } from "@opencode-ai/util/lazy"
  10. import { Shell } from "@/shell/shell"
  11. export namespace Pty {
  12. const log = Log.create({ service: "pty" })
  13. const BUFFER_LIMIT = 1024 * 1024 * 2
  14. const BUFFER_CHUNK = 64 * 1024
  15. const pty = lazy(async () => {
  16. const { spawn } = await import("bun-pty")
  17. return spawn
  18. })
  19. export const Info = z
  20. .object({
  21. id: Identifier.schema("pty"),
  22. title: z.string(),
  23. command: z.string(),
  24. args: z.array(z.string()),
  25. cwd: z.string(),
  26. status: z.enum(["running", "exited"]),
  27. pid: z.number(),
  28. })
  29. .meta({ ref: "Pty" })
  30. export type Info = z.infer<typeof Info>
  31. export const CreateInput = z.object({
  32. command: z.string().optional(),
  33. args: z.array(z.string()).optional(),
  34. cwd: z.string().optional(),
  35. title: z.string().optional(),
  36. env: z.record(z.string(), z.string()).optional(),
  37. })
  38. export type CreateInput = z.infer<typeof CreateInput>
  39. export const UpdateInput = z.object({
  40. title: z.string().optional(),
  41. size: z
  42. .object({
  43. rows: z.number(),
  44. cols: z.number(),
  45. })
  46. .optional(),
  47. })
  48. export type UpdateInput = z.infer<typeof UpdateInput>
  49. export const Event = {
  50. Created: BusEvent.define("pty.created", z.object({ info: Info })),
  51. Updated: BusEvent.define("pty.updated", z.object({ info: Info })),
  52. Exited: BusEvent.define("pty.exited", z.object({ id: Identifier.schema("pty"), exitCode: z.number() })),
  53. Deleted: BusEvent.define("pty.deleted", z.object({ id: Identifier.schema("pty") })),
  54. }
  55. interface ActiveSession {
  56. info: Info
  57. process: IPty
  58. buffer: string
  59. subscribers: Set<WSContext>
  60. }
  61. const state = Instance.state(
  62. () => new Map<string, ActiveSession>(),
  63. async (sessions) => {
  64. for (const session of sessions.values()) {
  65. try {
  66. session.process.kill()
  67. } catch {}
  68. for (const ws of session.subscribers) {
  69. ws.close()
  70. }
  71. }
  72. sessions.clear()
  73. },
  74. )
  75. export function list() {
  76. return Array.from(state().values()).map((s) => s.info)
  77. }
  78. export function get(id: string) {
  79. return state().get(id)?.info
  80. }
  81. export async function create(input: CreateInput) {
  82. const id = Identifier.create("pty", false)
  83. const command = input.command || Shell.preferred()
  84. const args = input.args || []
  85. if (command.endsWith("sh")) {
  86. args.push("-l")
  87. }
  88. const cwd = input.cwd || Instance.directory
  89. const env = { ...process.env, ...input.env, TERM: "xterm-256color" } as Record<string, string>
  90. log.info("creating session", { id, cmd: command, args, cwd })
  91. const spawn = await pty()
  92. const ptyProcess = spawn(command, args, {
  93. name: "xterm-256color",
  94. cwd,
  95. env,
  96. })
  97. const info = {
  98. id,
  99. title: input.title || `Terminal ${id.slice(-4)}`,
  100. command,
  101. args,
  102. cwd,
  103. status: "running",
  104. pid: ptyProcess.pid,
  105. } as const
  106. const session: ActiveSession = {
  107. info,
  108. process: ptyProcess,
  109. buffer: "",
  110. subscribers: new Set(),
  111. }
  112. state().set(id, session)
  113. ptyProcess.onData((data) => {
  114. let open = false
  115. for (const ws of session.subscribers) {
  116. if (ws.readyState !== 1) {
  117. session.subscribers.delete(ws)
  118. continue
  119. }
  120. open = true
  121. ws.send(data)
  122. }
  123. if (open) return
  124. session.buffer += data
  125. if (session.buffer.length <= BUFFER_LIMIT) return
  126. session.buffer = session.buffer.slice(-BUFFER_LIMIT)
  127. })
  128. ptyProcess.onExit(({ exitCode }) => {
  129. log.info("session exited", { id, exitCode })
  130. session.info.status = "exited"
  131. Bus.publish(Event.Exited, { id, exitCode })
  132. for (const ws of session.subscribers) {
  133. ws.close()
  134. }
  135. state().delete(id)
  136. })
  137. Bus.publish(Event.Created, { info })
  138. return info
  139. }
  140. export async function update(id: string, input: UpdateInput) {
  141. const session = state().get(id)
  142. if (!session) return
  143. if (input.title) {
  144. session.info.title = input.title
  145. }
  146. if (input.size) {
  147. session.process.resize(input.size.cols, input.size.rows)
  148. }
  149. Bus.publish(Event.Updated, { info: session.info })
  150. return session.info
  151. }
  152. export async function remove(id: string) {
  153. const session = state().get(id)
  154. if (!session) return
  155. log.info("removing session", { id })
  156. try {
  157. session.process.kill()
  158. } catch {}
  159. for (const ws of session.subscribers) {
  160. ws.close()
  161. }
  162. state().delete(id)
  163. Bus.publish(Event.Deleted, { id })
  164. }
  165. export function resize(id: string, cols: number, rows: number) {
  166. const session = state().get(id)
  167. if (session && session.info.status === "running") {
  168. session.process.resize(cols, rows)
  169. }
  170. }
  171. export function write(id: string, data: string) {
  172. const session = state().get(id)
  173. if (session && session.info.status === "running") {
  174. session.process.write(data)
  175. }
  176. }
  177. export function connect(id: string, ws: WSContext) {
  178. const session = state().get(id)
  179. if (!session) {
  180. ws.close()
  181. return
  182. }
  183. log.info("client connected to session", { id })
  184. session.subscribers.add(ws)
  185. if (session.buffer) {
  186. const buffer = session.buffer.length <= BUFFER_LIMIT ? session.buffer : session.buffer.slice(-BUFFER_LIMIT)
  187. session.buffer = ""
  188. try {
  189. for (let i = 0; i < buffer.length; i += BUFFER_CHUNK) {
  190. ws.send(buffer.slice(i, i + BUFFER_CHUNK))
  191. }
  192. } catch {
  193. session.subscribers.delete(ws)
  194. session.buffer = buffer
  195. ws.close()
  196. return
  197. }
  198. }
  199. return {
  200. onMessage: (message: string | ArrayBuffer) => {
  201. session.process.write(String(message))
  202. },
  203. onClose: () => {
  204. log.info("client disconnected from session", { id })
  205. session.subscribers.delete(ws)
  206. },
  207. }
  208. }
  209. }