| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479 |
- const fs = require("node:fs");
- const http = require("node:http");
- const path = require("node:path");
- const { URL } = require("node:url");
- const next = require("next");
- const { WebSocketServer } = require("ws");
- const RESPONSES_WS_TERMINAL_TYPES = new Set([
- "response.completed",
- "response.failed",
- "response.incomplete",
- "error",
- ]);
- const LOOPBACK_BLOCKED_HEADERS = new Set([
- "connection",
- "content-length",
- "host",
- "sec-websocket-extensions",
- "sec-websocket-key",
- "sec-websocket-protocol",
- "sec-websocket-version",
- "upgrade",
- ]);
- const MAX_SSE_BUFFER_CHARS = 64 * 1024;
- function getFlag(name) {
- return process.argv.includes(`--${name}`);
- }
- function getFlagValue(name) {
- const index = process.argv.indexOf(`--${name}`);
- if (index === -1) return undefined;
- return process.argv[index + 1];
- }
- function normalizePort(value, fallback) {
- if (value === undefined || value === null || value === "") {
- return fallback;
- }
- const parsed = Number.parseInt(String(value), 10);
- return Number.isNaN(parsed) ? fallback : parsed;
- }
- function resolveOptions(overrides = {}) {
- const dev = overrides.dev ?? (getFlag("dev") || process.env.NODE_ENV !== "production");
- const fallbackPort = dev ? 13500 : 3000;
- const port = overrides.port ?? normalizePort(getFlagValue("port") ?? process.env.PORT, fallbackPort);
- const hostname = overrides.hostname ?? process.env.HOSTNAME ?? process.env.HOST ?? "0.0.0.0";
- const keepAliveTimeout = normalizePort(process.env.KEEP_ALIVE_TIMEOUT, undefined);
- return {
- dev,
- dir: overrides.dir ?? __dirname,
- port,
- hostname,
- keepAliveTimeout,
- };
- }
- function loadStandaloneNextConfig(dir) {
- const configPath = path.join(dir, "standalone-next-config.json");
- if (!fs.existsSync(configPath)) {
- return null;
- }
- const nextConfig = JSON.parse(fs.readFileSync(configPath, "utf8"));
- process.env.__NEXT_PRIVATE_STANDALONE_CONFIG = JSON.stringify(nextConfig);
- return nextConfig;
- }
- function writeJsonFrame(socket, payload) {
- if (socket.readyState !== socket.OPEN) {
- return;
- }
- socket.send(JSON.stringify(payload));
- }
- function writeProtocolError(socket, code, message, extras = {}) {
- writeJsonFrame(socket, {
- type: "error",
- error: {
- code,
- message,
- ...extras,
- },
- });
- }
- function validateCreateFrame(frame) {
- if (!frame || typeof frame !== "object" || Array.isArray(frame)) {
- throw new Error("Frame must be a JSON object");
- }
- if (frame.type !== "response.create") {
- throw new Error("First frame must be response.create");
- }
- if (!frame.response || typeof frame.response !== "object" || Array.isArray(frame.response)) {
- throw new Error("response.create must include a response object");
- }
- if (typeof frame.response.model !== "string" || frame.response.model.trim() === "") {
- throw new Error("response.create must include a non-empty response.model");
- }
- return frame;
- }
- function buildLoopbackHeaders(requestHeaders) {
- const headers = new Headers();
- for (const [key, value] of Object.entries(requestHeaders)) {
- if (LOOPBACK_BLOCKED_HEADERS.has(key.toLowerCase())) {
- continue;
- }
- if (Array.isArray(value)) {
- headers.set(key, value.join(", "));
- } else if (typeof value === "string") {
- headers.set(key, value);
- }
- }
- headers.set("accept", "text/event-stream");
- headers.set("content-type", "application/json");
- return headers;
- }
- async function defaultForwardResponsesRequest({ request, bodyText, targetOrigin, signal }) {
- const requestUrl = new URL(request.url || "/", targetOrigin);
- const payload = JSON.parse(bodyText);
- payload.stream = true;
- return fetch(`${targetOrigin}/v1/responses${requestUrl.search}`, {
- method: "POST",
- headers: buildLoopbackHeaders(request.headers),
- body: JSON.stringify(payload),
- signal,
- });
- }
- function extractSseBlocks(buffer) {
- const normalized = buffer.replace(/\r\n/g, "\n");
- const blocks = normalized.split("\n\n");
- const rest = blocks.pop() || "";
- return { blocks, rest };
- }
- function parseSseBlock(block) {
- const lines = block.split("\n");
- let event = "message";
- const dataLines = [];
- for (const rawLine of lines) {
- const line = rawLine.trimEnd();
- if (!line || line.startsWith(":")) {
- continue;
- }
- if (line.startsWith("event:")) {
- event = line.slice(6).trim() || "message";
- continue;
- }
- if (line.startsWith("data:")) {
- let value = line.slice(5);
- if (value.startsWith(" ")) {
- value = value.slice(1);
- }
- dataLines.push(value);
- }
- }
- return {
- event,
- data: dataLines.join("\n"),
- };
- }
- function sseEventToWsFrame(parsedEvent) {
- if (parsedEvent.data === "[DONE]") {
- return null;
- }
- try {
- const payload = JSON.parse(parsedEvent.data);
- if (payload && typeof payload === "object" && !Array.isArray(payload)) {
- if (parsedEvent.event && parsedEvent.event !== "message") {
- return {
- type: parsedEvent.event,
- ...payload,
- };
- }
- if (typeof payload.type === "string") {
- return payload;
- }
- return {
- type: "message",
- data: payload,
- };
- }
- return {
- type: parsedEvent.event || "message",
- data: payload,
- };
- } catch {
- return {
- type: parsedEvent.event || "message",
- data: parsedEvent.data,
- };
- }
- }
- function createResponsesWebSocketConnectionHandler(options) {
- const forwardResponsesRequest = options.forwardResponsesRequest || defaultForwardResponsesRequest;
- return function handleConnection(socket, request) {
- const state = {
- activeTurn: false,
- activeAbortController: null,
- };
- const pingInterval = setInterval(() => {
- if (socket.readyState === socket.OPEN) socket.ping();
- }, 30000);
- socket.on("close", () => clearInterval(pingInterval));
- socket.on("message", async (raw) => {
- let parsedFrame;
- try {
- parsedFrame = JSON.parse(raw.toString());
- } catch {
- writeProtocolError(socket, "invalid_json", "WebSocket frame must be valid JSON");
- return;
- }
- if (parsedFrame.type === "response.cancel") {
- if (!state.activeAbortController) {
- writeProtocolError(socket, "no_active_response", "No active response to cancel");
- return;
- }
- state.activeAbortController.abort();
- return;
- }
- let createFrame;
- try {
- createFrame = validateCreateFrame(parsedFrame);
- } catch (error) {
- writeProtocolError(
- socket,
- "invalid_request",
- error instanceof Error ? error.message : "Invalid response.create frame"
- );
- return;
- }
- if (state.activeTurn) {
- writeProtocolError(
- socket,
- "response_already_in_progress",
- "A response.create request is already in flight on this socket"
- );
- return;
- }
- state.activeTurn = true;
- state.activeAbortController = new AbortController();
- try {
- const response = await forwardResponsesRequest({
- request,
- bodyText: JSON.stringify(createFrame.response),
- targetOrigin: options.targetOrigin,
- signal: state.activeAbortController.signal,
- });
- if (!response.ok) {
- const text = await response.text().catch(() => "");
- writeProtocolError(
- socket,
- `http_${response.status}`,
- text || `Loopback bridge returned HTTP ${response.status}`
- );
- return;
- }
- const contentType = response.headers.get("content-type") || "";
- if (!contentType.includes("text/event-stream")) {
- const payload = await response.json().catch(async () => ({ body: await response.text() }));
- if (payload && payload.error) {
- writeProtocolError(
- socket,
- payload.error.code || "bridge_error",
- payload.error.message || "Loopback bridge returned an error payload"
- );
- return;
- }
- writeJsonFrame(socket, {
- type: "response.completed",
- response: payload,
- });
- return;
- }
- const reader = response.body?.getReader();
- if (!reader) {
- writeProtocolError(socket, "bridge_stream_missing", "Loopback bridge returned no SSE body");
- return;
- }
- const decoder = new TextDecoder();
- let buffer = "";
- let terminalSeen = false;
- while (true) {
- const { done, value } = await reader.read();
- if (done) {
- break;
- }
- if (!value || value.byteLength === 0) {
- continue;
- }
- buffer += decoder.decode(value, { stream: true });
- if (buffer.length > MAX_SSE_BUFFER_CHARS) {
- throw new Error("Buffered SSE frame exceeded safety limit");
- }
- const { blocks, rest } = extractSseBlocks(buffer);
- buffer = rest;
- for (const block of blocks) {
- const parsedEvent = parseSseBlock(block);
- const frame = sseEventToWsFrame(parsedEvent);
- if (!frame) {
- continue;
- }
- writeJsonFrame(socket, frame);
- if (RESPONSES_WS_TERMINAL_TYPES.has(frame.type)) {
- terminalSeen = true;
- }
- }
- if (terminalSeen) {
- break;
- }
- }
- } catch (error) {
- if (!state.activeAbortController.signal.aborted) {
- writeProtocolError(
- socket,
- "bridge_failed",
- error instanceof Error ? error.message : "Responses WebSocket bridge failed"
- );
- }
- } finally {
- state.activeTurn = false;
- state.activeAbortController = null;
- }
- });
- socket.on("close", () => {
- if (state.activeAbortController) {
- state.activeAbortController.abort();
- }
- state.activeAbortController = null;
- state.activeTurn = false;
- });
- socket.on("error", () => {
- socket.close();
- });
- };
- }
- function createResponsesUpgradeServer(options) {
- const wss = new WebSocketServer({
- noServer: true,
- perMessageDeflate: false,
- maxPayload: 1 * 1024 * 1024,
- });
- const handleConnection = createResponsesWebSocketConnectionHandler(options);
- wss.on("connection", (socket, request) => {
- handleConnection(socket, request);
- });
- return wss;
- }
- async function startServer(overrides = {}) {
- const options = resolveOptions(overrides);
- if (!options.dev) {
- process.env.NODE_ENV = "production";
- }
- const standaloneConfig = options.dev ? null : loadStandaloneNextConfig(options.dir);
- const app = next({
- dev: options.dev,
- dir: options.dir,
- hostname: options.hostname,
- port: options.port,
- conf: standaloneConfig ?? undefined,
- });
- await app.prepare();
- const handle = app.getRequestHandler();
- const server = http.createServer((req, res) => {
- Promise.resolve(handle(req, res)).catch((error) => {
- console.error("[CCH] HTTP request handling failed", error);
- if (!res.headersSent) {
- res.statusCode = 500;
- res.end("Internal Server Error");
- }
- });
- });
- if (options.keepAliveTimeout !== undefined) {
- server.keepAliveTimeout = options.keepAliveTimeout;
- }
- await new Promise((resolve, reject) => {
- server.once("error", reject);
- server.listen(options.port, options.hostname, () => {
- server.off("error", reject);
- resolve();
- });
- });
- const address = server.address();
- const actualPort = typeof address === "object" && address ? address.port : options.port;
- const targetOrigin = `http://127.0.0.1:${actualPort}`;
- const wss = createResponsesUpgradeServer({ targetOrigin });
- server.on("upgrade", (req, socket, head) => {
- try {
- const requestUrl = new URL(req.url || "/", `http://${req.headers.host || options.hostname}`);
- if (requestUrl.pathname !== "/v1/responses") {
- socket.destroy();
- return;
- }
- wss.handleUpgrade(req, socket, head, (ws) => {
- wss.emit("connection", ws, req);
- });
- } catch (error) {
- console.error("[CCH] WebSocket upgrade failed", error);
- socket.destroy();
- }
- });
- return {
- app,
- server,
- wss,
- port: actualPort,
- hostname: options.hostname,
- async close() {
- await Promise.all([
- new Promise((resolve) => wss.close(() => resolve())),
- new Promise((resolve, reject) => server.close((error) => (error ? reject(error) : resolve()))),
- ]);
- if (typeof app.close === "function") {
- await app.close();
- }
- },
- };
- }
- if (require.main === module) {
- startServer()
- .then(({ hostname, port }) => {
- console.log(`[CCH] Server listening on http://${hostname}:${port}`);
- })
- .catch((error) => {
- console.error(error);
- process.exit(1);
- });
- }
- module.exports = {
- createResponsesWebSocketConnectionHandler,
- createResponsesUpgradeServer,
- startServer,
- };
|