|
|
@@ -1,11 +1,10 @@
|
|
|
import { BusEvent } from "@/bus/bus-event"
|
|
|
import { Bus } from "@/bus"
|
|
|
-import { type IPty } from "bun-pty"
|
|
|
+import { type IPty } from "node-pty"
|
|
|
import z from "zod"
|
|
|
import { Identifier } from "../id/id"
|
|
|
import { Log } from "../util/log"
|
|
|
import { Instance } from "../project/instance"
|
|
|
-import { lazy } from "@opencode-ai/util/lazy"
|
|
|
import { Shell } from "@/shell/shell"
|
|
|
import { Plugin } from "@/plugin"
|
|
|
|
|
|
@@ -15,22 +14,27 @@ export namespace Pty {
|
|
|
const BUFFER_LIMIT = 1024 * 1024 * 2
|
|
|
const BUFFER_CHUNK = 64 * 1024
|
|
|
const encoder = new TextEncoder()
|
|
|
+ const decoder = new TextDecoder()
|
|
|
+ const FRAME_META = 0
|
|
|
+ const FRAME_OUTPUT = 1
|
|
|
+ const FRAME_INPUT = 2
|
|
|
+ const MAX_CONNECTION = 200
|
|
|
|
|
|
type Socket = {
|
|
|
readyState: number
|
|
|
- data?: unknown
|
|
|
send: (data: string | Uint8Array | ArrayBuffer) => void
|
|
|
close: (code?: number, reason?: string) => void
|
|
|
}
|
|
|
|
|
|
type Subscriber = {
|
|
|
id: number
|
|
|
- token: unknown
|
|
|
+ connection: string
|
|
|
}
|
|
|
|
|
|
const sockets = new WeakMap<object, number>()
|
|
|
const owners = new WeakMap<object, string>()
|
|
|
let socketCounter = 0
|
|
|
+ let connectionCounter = 0
|
|
|
|
|
|
const tagSocket = (ws: Socket) => {
|
|
|
if (!ws || typeof ws !== "object") return
|
|
|
@@ -39,33 +43,74 @@ export namespace Pty {
|
|
|
return next
|
|
|
}
|
|
|
|
|
|
- const token = (ws: Socket) => {
|
|
|
- const data = ws.data
|
|
|
- if (!data || typeof data !== "object") return
|
|
|
+ const connection = () => {
|
|
|
+ connectionCounter = (connectionCounter + 1) % Number.MAX_SAFE_INTEGER
|
|
|
+ return `${Date.now().toString(36)}-${connectionCounter.toString(36)}`
|
|
|
+ }
|
|
|
|
|
|
- const events = (data as { events?: unknown }).events
|
|
|
- if (events && typeof events === "object") return events
|
|
|
+ const normalizeConnection = (value?: string) => {
|
|
|
+ const next = typeof value === "string" ? value.trim() : ""
|
|
|
+ if (!next) return connection()
|
|
|
+ if (next.length > MAX_CONNECTION) return connection()
|
|
|
+ if (encoder.encode(next).length > 255) return connection()
|
|
|
+ return next
|
|
|
+ }
|
|
|
+
|
|
|
+ const output = (connection: string, data: string) => {
|
|
|
+ const channel = encoder.encode(connection)
|
|
|
+ const chunk = encoder.encode(data)
|
|
|
+ const out = new Uint8Array(2 + channel.length + chunk.length)
|
|
|
+ out[0] = FRAME_OUTPUT
|
|
|
+ out[1] = channel.length
|
|
|
+ out.set(channel, 2)
|
|
|
+ out.set(chunk, 2 + channel.length)
|
|
|
+ return out
|
|
|
+ }
|
|
|
|
|
|
- const url = (data as { url?: unknown }).url
|
|
|
- if (url && typeof url === "object") return url
|
|
|
+ const input = (message: string | Uint8Array | ArrayBuffer) => {
|
|
|
+ if (typeof message === "string") {
|
|
|
+ return { data: message }
|
|
|
+ }
|
|
|
|
|
|
- return data
|
|
|
+ const bytes = message instanceof Uint8Array ? message : new Uint8Array(message)
|
|
|
+ if (bytes[0] !== FRAME_INPUT) return
|
|
|
+ const size = bytes[1]
|
|
|
+ if (!Number.isSafeInteger(size) || size < 0) return
|
|
|
+ if (bytes.length < 2 + size) return
|
|
|
+ return {
|
|
|
+ connection: decoder.decode(bytes.subarray(2, 2 + size)),
|
|
|
+ data: decoder.decode(bytes.subarray(2 + size)),
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- // WebSocket control frame: 0x00 + UTF-8 JSON.
|
|
|
- const meta = (cursor: number) => {
|
|
|
- const json = JSON.stringify({ cursor })
|
|
|
+ // WebSocket control frame: 0x00 + UTF-8 JSON ({ cursor, connection }).
|
|
|
+ const meta = (cursor: number, connection: string) => {
|
|
|
+ const json = JSON.stringify({ cursor, connection })
|
|
|
const bytes = encoder.encode(json)
|
|
|
const out = new Uint8Array(bytes.length + 1)
|
|
|
- out[0] = 0
|
|
|
+ out[0] = FRAME_META
|
|
|
out.set(bytes, 1)
|
|
|
return out
|
|
|
}
|
|
|
|
|
|
- const pty = lazy(async () => {
|
|
|
- const { spawn } = await import("bun-pty")
|
|
|
- return spawn
|
|
|
- })
|
|
|
+ type Spawn = (file: string, args: string | string[], options: unknown) => IPty
|
|
|
+ let override: Spawn | undefined
|
|
|
+ let spawn: Spawn | undefined
|
|
|
+
|
|
|
+ const pty = async (): Promise<Spawn> => {
|
|
|
+ if (override) return override
|
|
|
+ if (spawn) return spawn
|
|
|
+ const mod = await import("node-pty")
|
|
|
+ const next = mod.spawn as Spawn
|
|
|
+ spawn = next
|
|
|
+ return next
|
|
|
+ }
|
|
|
+
|
|
|
+ export function setSpawn(input?: Spawn) {
|
|
|
+ override = input
|
|
|
+ if (input) return
|
|
|
+ spawn = undefined
|
|
|
+ }
|
|
|
|
|
|
export const Info = z
|
|
|
.object({
|
|
|
@@ -210,13 +255,8 @@ export namespace Pty {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- if (sub.token !== undefined && token(ws) !== sub.token) {
|
|
|
- session.subscribers.delete(ws)
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
try {
|
|
|
- ws.send(chunk)
|
|
|
+ ws.send(output(sub.connection, chunk))
|
|
|
} catch {
|
|
|
session.subscribers.delete(ws)
|
|
|
}
|
|
|
@@ -292,7 +332,7 @@ export namespace Pty {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- export function connect(id: string, ws: Socket, cursor?: number) {
|
|
|
+ export function connect(id: string, ws: Socket, cursor?: number, connectionID?: string) {
|
|
|
const session = state().get(id)
|
|
|
if (!session) {
|
|
|
ws.close()
|
|
|
@@ -312,7 +352,11 @@ export namespace Pty {
|
|
|
}
|
|
|
|
|
|
owners.set(ws, id)
|
|
|
- session.subscribers.set(ws, { id: socketId, token: token(ws) })
|
|
|
+ const sub = {
|
|
|
+ id: socketId,
|
|
|
+ connection: normalizeConnection(connectionID),
|
|
|
+ }
|
|
|
+ session.subscribers.set(ws, sub)
|
|
|
|
|
|
const cleanup = () => {
|
|
|
session.subscribers.delete(ws)
|
|
|
@@ -336,7 +380,7 @@ export namespace Pty {
|
|
|
if (data) {
|
|
|
try {
|
|
|
for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
|
|
|
- ws.send(data.slice(i, i + BUFFER_CHUNK))
|
|
|
+ ws.send(output(sub.connection, data.slice(i, i + BUFFER_CHUNK)))
|
|
|
}
|
|
|
} catch {
|
|
|
cleanup()
|
|
|
@@ -346,15 +390,18 @@ export namespace Pty {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- ws.send(meta(end))
|
|
|
+ ws.send(meta(end, sub.connection))
|
|
|
} catch {
|
|
|
cleanup()
|
|
|
ws.close()
|
|
|
return
|
|
|
}
|
|
|
return {
|
|
|
- onMessage: (message: string | ArrayBuffer) => {
|
|
|
- session.process.write(String(message))
|
|
|
+ onMessage: (message: string | Uint8Array | ArrayBuffer) => {
|
|
|
+ const next = input(message)
|
|
|
+ if (!next?.data) return
|
|
|
+ if (next.connection && next.connection !== sub.connection) return
|
|
|
+ session.process.write(next.data)
|
|
|
},
|
|
|
onClose: () => {
|
|
|
log.info("client disconnected from session", { id })
|