server.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. const fs = require("node:fs");
  2. const http = require("node:http");
  3. const path = require("node:path");
  4. const { URL } = require("node:url");
  5. const next = require("next");
  6. const { WebSocketServer } = require("ws");
  7. const RESPONSES_WS_TERMINAL_TYPES = new Set([
  8. "response.completed",
  9. "response.failed",
  10. "response.incomplete",
  11. "error",
  12. ]);
  13. const LOOPBACK_BLOCKED_HEADERS = new Set([
  14. "connection",
  15. "content-length",
  16. "host",
  17. "sec-websocket-extensions",
  18. "sec-websocket-key",
  19. "sec-websocket-protocol",
  20. "sec-websocket-version",
  21. "upgrade",
  22. ]);
  23. const MAX_SSE_BUFFER_CHARS = 64 * 1024;
  24. function getFlag(name) {
  25. return process.argv.includes(`--${name}`);
  26. }
  27. function getFlagValue(name) {
  28. const index = process.argv.indexOf(`--${name}`);
  29. if (index === -1) return undefined;
  30. return process.argv[index + 1];
  31. }
  32. function normalizePort(value, fallback) {
  33. if (value === undefined || value === null || value === "") {
  34. return fallback;
  35. }
  36. const parsed = Number.parseInt(String(value), 10);
  37. return Number.isNaN(parsed) ? fallback : parsed;
  38. }
  39. function resolveOptions(overrides = {}) {
  40. const dev = overrides.dev ?? (getFlag("dev") || process.env.NODE_ENV !== "production");
  41. const fallbackPort = dev ? 13500 : 3000;
  42. const port = overrides.port ?? normalizePort(getFlagValue("port") ?? process.env.PORT, fallbackPort);
  43. const hostname = overrides.hostname ?? process.env.HOSTNAME ?? process.env.HOST ?? "0.0.0.0";
  44. const keepAliveTimeout = normalizePort(process.env.KEEP_ALIVE_TIMEOUT, undefined);
  45. return {
  46. dev,
  47. dir: overrides.dir ?? __dirname,
  48. port,
  49. hostname,
  50. keepAliveTimeout,
  51. };
  52. }
  53. function loadStandaloneNextConfig(dir) {
  54. const configPath = path.join(dir, "standalone-next-config.json");
  55. if (!fs.existsSync(configPath)) {
  56. return null;
  57. }
  58. const nextConfig = JSON.parse(fs.readFileSync(configPath, "utf8"));
  59. process.env.__NEXT_PRIVATE_STANDALONE_CONFIG = JSON.stringify(nextConfig);
  60. return nextConfig;
  61. }
  62. function writeJsonFrame(socket, payload) {
  63. if (socket.readyState !== socket.OPEN) {
  64. return;
  65. }
  66. socket.send(JSON.stringify(payload));
  67. }
  68. function writeProtocolError(socket, code, message, extras = {}) {
  69. writeJsonFrame(socket, {
  70. type: "error",
  71. error: {
  72. code,
  73. message,
  74. ...extras,
  75. },
  76. });
  77. }
  78. function validateCreateFrame(frame) {
  79. if (!frame || typeof frame !== "object" || Array.isArray(frame)) {
  80. throw new Error("Frame must be a JSON object");
  81. }
  82. if (frame.type !== "response.create") {
  83. throw new Error("First frame must be response.create");
  84. }
  85. if (!frame.response || typeof frame.response !== "object" || Array.isArray(frame.response)) {
  86. throw new Error("response.create must include a response object");
  87. }
  88. if (typeof frame.response.model !== "string" || frame.response.model.trim() === "") {
  89. throw new Error("response.create must include a non-empty response.model");
  90. }
  91. return frame;
  92. }
  93. function buildLoopbackHeaders(requestHeaders) {
  94. const headers = new Headers();
  95. for (const [key, value] of Object.entries(requestHeaders)) {
  96. if (LOOPBACK_BLOCKED_HEADERS.has(key.toLowerCase())) {
  97. continue;
  98. }
  99. if (Array.isArray(value)) {
  100. headers.set(key, value.join(", "));
  101. } else if (typeof value === "string") {
  102. headers.set(key, value);
  103. }
  104. }
  105. headers.set("accept", "text/event-stream");
  106. headers.set("content-type", "application/json");
  107. return headers;
  108. }
  109. async function defaultForwardResponsesRequest({ request, bodyText, targetOrigin, signal }) {
  110. const requestUrl = new URL(request.url || "/", targetOrigin);
  111. const payload = JSON.parse(bodyText);
  112. payload.stream = true;
  113. return fetch(`${targetOrigin}/v1/responses${requestUrl.search}`, {
  114. method: "POST",
  115. headers: buildLoopbackHeaders(request.headers),
  116. body: JSON.stringify(payload),
  117. signal,
  118. });
  119. }
  120. function extractSseBlocks(buffer) {
  121. const normalized = buffer.replace(/\r\n/g, "\n");
  122. const blocks = normalized.split("\n\n");
  123. const rest = blocks.pop() || "";
  124. return { blocks, rest };
  125. }
  126. function parseSseBlock(block) {
  127. const lines = block.split("\n");
  128. let event = "message";
  129. const dataLines = [];
  130. for (const rawLine of lines) {
  131. const line = rawLine.trimEnd();
  132. if (!line || line.startsWith(":")) {
  133. continue;
  134. }
  135. if (line.startsWith("event:")) {
  136. event = line.slice(6).trim() || "message";
  137. continue;
  138. }
  139. if (line.startsWith("data:")) {
  140. let value = line.slice(5);
  141. if (value.startsWith(" ")) {
  142. value = value.slice(1);
  143. }
  144. dataLines.push(value);
  145. }
  146. }
  147. return {
  148. event,
  149. data: dataLines.join("\n"),
  150. };
  151. }
  152. function sseEventToWsFrame(parsedEvent) {
  153. if (parsedEvent.data === "[DONE]") {
  154. return null;
  155. }
  156. try {
  157. const payload = JSON.parse(parsedEvent.data);
  158. if (payload && typeof payload === "object" && !Array.isArray(payload)) {
  159. if (parsedEvent.event && parsedEvent.event !== "message") {
  160. return {
  161. type: parsedEvent.event,
  162. ...payload,
  163. };
  164. }
  165. if (typeof payload.type === "string") {
  166. return payload;
  167. }
  168. return {
  169. type: "message",
  170. data: payload,
  171. };
  172. }
  173. return {
  174. type: parsedEvent.event || "message",
  175. data: payload,
  176. };
  177. } catch {
  178. return {
  179. type: parsedEvent.event || "message",
  180. data: parsedEvent.data,
  181. };
  182. }
  183. }
  184. function createResponsesWebSocketConnectionHandler(options) {
  185. const forwardResponsesRequest = options.forwardResponsesRequest || defaultForwardResponsesRequest;
  186. return function handleConnection(socket, request) {
  187. const state = {
  188. activeTurn: false,
  189. activeAbortController: null,
  190. };
  191. const pingInterval = setInterval(() => {
  192. if (socket.readyState === socket.OPEN) socket.ping();
  193. }, 30000);
  194. socket.on("close", () => clearInterval(pingInterval));
  195. socket.on("message", async (raw) => {
  196. let parsedFrame;
  197. try {
  198. parsedFrame = JSON.parse(raw.toString());
  199. } catch {
  200. writeProtocolError(socket, "invalid_json", "WebSocket frame must be valid JSON");
  201. return;
  202. }
  203. if (parsedFrame.type === "response.cancel") {
  204. if (!state.activeAbortController) {
  205. writeProtocolError(socket, "no_active_response", "No active response to cancel");
  206. return;
  207. }
  208. state.activeAbortController.abort();
  209. return;
  210. }
  211. let createFrame;
  212. try {
  213. createFrame = validateCreateFrame(parsedFrame);
  214. } catch (error) {
  215. writeProtocolError(
  216. socket,
  217. "invalid_request",
  218. error instanceof Error ? error.message : "Invalid response.create frame"
  219. );
  220. return;
  221. }
  222. if (state.activeTurn) {
  223. writeProtocolError(
  224. socket,
  225. "response_already_in_progress",
  226. "A response.create request is already in flight on this socket"
  227. );
  228. return;
  229. }
  230. state.activeTurn = true;
  231. state.activeAbortController = new AbortController();
  232. try {
  233. const response = await forwardResponsesRequest({
  234. request,
  235. bodyText: JSON.stringify(createFrame.response),
  236. targetOrigin: options.targetOrigin,
  237. signal: state.activeAbortController.signal,
  238. });
  239. if (!response.ok) {
  240. const text = await response.text().catch(() => "");
  241. writeProtocolError(
  242. socket,
  243. `http_${response.status}`,
  244. text || `Loopback bridge returned HTTP ${response.status}`
  245. );
  246. return;
  247. }
  248. const contentType = response.headers.get("content-type") || "";
  249. if (!contentType.includes("text/event-stream")) {
  250. const payload = await response.json().catch(async () => ({ body: await response.text() }));
  251. if (payload && payload.error) {
  252. writeProtocolError(
  253. socket,
  254. payload.error.code || "bridge_error",
  255. payload.error.message || "Loopback bridge returned an error payload"
  256. );
  257. return;
  258. }
  259. writeJsonFrame(socket, {
  260. type: "response.completed",
  261. response: payload,
  262. });
  263. return;
  264. }
  265. const reader = response.body?.getReader();
  266. if (!reader) {
  267. writeProtocolError(socket, "bridge_stream_missing", "Loopback bridge returned no SSE body");
  268. return;
  269. }
  270. const decoder = new TextDecoder();
  271. let buffer = "";
  272. let terminalSeen = false;
  273. while (true) {
  274. const { done, value } = await reader.read();
  275. if (done) {
  276. break;
  277. }
  278. if (!value || value.byteLength === 0) {
  279. continue;
  280. }
  281. buffer += decoder.decode(value, { stream: true });
  282. if (buffer.length > MAX_SSE_BUFFER_CHARS) {
  283. throw new Error("Buffered SSE frame exceeded safety limit");
  284. }
  285. const { blocks, rest } = extractSseBlocks(buffer);
  286. buffer = rest;
  287. for (const block of blocks) {
  288. const parsedEvent = parseSseBlock(block);
  289. const frame = sseEventToWsFrame(parsedEvent);
  290. if (!frame) {
  291. continue;
  292. }
  293. writeJsonFrame(socket, frame);
  294. if (RESPONSES_WS_TERMINAL_TYPES.has(frame.type)) {
  295. terminalSeen = true;
  296. }
  297. }
  298. if (terminalSeen) {
  299. break;
  300. }
  301. }
  302. } catch (error) {
  303. if (!state.activeAbortController.signal.aborted) {
  304. writeProtocolError(
  305. socket,
  306. "bridge_failed",
  307. error instanceof Error ? error.message : "Responses WebSocket bridge failed"
  308. );
  309. }
  310. } finally {
  311. state.activeTurn = false;
  312. state.activeAbortController = null;
  313. }
  314. });
  315. socket.on("close", () => {
  316. if (state.activeAbortController) {
  317. state.activeAbortController.abort();
  318. }
  319. state.activeAbortController = null;
  320. state.activeTurn = false;
  321. });
  322. socket.on("error", () => {
  323. socket.close();
  324. });
  325. };
  326. }
  327. function createResponsesUpgradeServer(options) {
  328. const wss = new WebSocketServer({
  329. noServer: true,
  330. perMessageDeflate: false,
  331. maxPayload: 1 * 1024 * 1024,
  332. });
  333. const handleConnection = createResponsesWebSocketConnectionHandler(options);
  334. wss.on("connection", (socket, request) => {
  335. handleConnection(socket, request);
  336. });
  337. return wss;
  338. }
  339. async function startServer(overrides = {}) {
  340. const options = resolveOptions(overrides);
  341. if (!options.dev) {
  342. process.env.NODE_ENV = "production";
  343. }
  344. const standaloneConfig = options.dev ? null : loadStandaloneNextConfig(options.dir);
  345. const app = next({
  346. dev: options.dev,
  347. dir: options.dir,
  348. hostname: options.hostname,
  349. port: options.port,
  350. conf: standaloneConfig ?? undefined,
  351. });
  352. await app.prepare();
  353. const handle = app.getRequestHandler();
  354. const server = http.createServer((req, res) => {
  355. Promise.resolve(handle(req, res)).catch((error) => {
  356. console.error("[CCH] HTTP request handling failed", error);
  357. if (!res.headersSent) {
  358. res.statusCode = 500;
  359. res.end("Internal Server Error");
  360. }
  361. });
  362. });
  363. if (options.keepAliveTimeout !== undefined) {
  364. server.keepAliveTimeout = options.keepAliveTimeout;
  365. }
  366. await new Promise((resolve, reject) => {
  367. server.once("error", reject);
  368. server.listen(options.port, options.hostname, () => {
  369. server.off("error", reject);
  370. resolve();
  371. });
  372. });
  373. const address = server.address();
  374. const actualPort = typeof address === "object" && address ? address.port : options.port;
  375. const targetOrigin = `http://127.0.0.1:${actualPort}`;
  376. const wss = createResponsesUpgradeServer({ targetOrigin });
  377. server.on("upgrade", (req, socket, head) => {
  378. try {
  379. const requestUrl = new URL(req.url || "/", `http://${req.headers.host || options.hostname}`);
  380. if (requestUrl.pathname !== "/v1/responses") {
  381. socket.destroy();
  382. return;
  383. }
  384. wss.handleUpgrade(req, socket, head, (ws) => {
  385. wss.emit("connection", ws, req);
  386. });
  387. } catch (error) {
  388. console.error("[CCH] WebSocket upgrade failed", error);
  389. socket.destroy();
  390. }
  391. });
  392. return {
  393. app,
  394. server,
  395. wss,
  396. port: actualPort,
  397. hostname: options.hostname,
  398. async close() {
  399. await Promise.all([
  400. new Promise((resolve) => wss.close(() => resolve())),
  401. new Promise((resolve, reject) => server.close((error) => (error ? reject(error) : resolve()))),
  402. ]);
  403. if (typeof app.close === "function") {
  404. await app.close();
  405. }
  406. },
  407. };
  408. }
  409. if (require.main === module) {
  410. startServer()
  411. .then(({ hostname, port }) => {
  412. console.log(`[CCH] Server listening on http://${hostname}:${port}`);
  413. })
  414. .catch((error) => {
  415. console.error(error);
  416. process.exit(1);
  417. });
  418. }
  419. module.exports = {
  420. createResponsesWebSocketConnectionHandler,
  421. createResponsesUpgradeServer,
  422. startServer,
  423. };