opencode-relay.mjs 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. import { createServer } from 'node:http';
  2. const PORT = Number(process.env.PORT || 8787);
  3. const HOST = process.env.HOST || '0.0.0.0';
  4. const EXPO_PUSH_URL = 'https://exp.host/--/api/v2/push/send';
  5. /** @type {Map<string, {jobID: string, sessionID: string, opencodeBaseURL: string, relayBaseURL: string, expoPushToken: string, createdAt: number, done: boolean}>} */
  6. const jobs = new Map();
  7. /** @type {Map<string, {key: string, opencodeBaseURL: string, abortController: AbortController, sessions: Set<string>, running: boolean}>} */
  8. const streams = new Map();
  9. /** @type {Set<string>} */
  10. const dedupe = new Set();
  11. function json(res, status, body) {
  12. const value = JSON.stringify(body);
  13. res.writeHead(status, {
  14. 'Content-Type': 'application/json',
  15. 'Content-Length': Buffer.byteLength(value),
  16. });
  17. res.end(value);
  18. }
  19. async function readJSON(req) {
  20. let raw = '';
  21. for await (const chunk of req) {
  22. raw += chunk;
  23. if (raw.length > 1_000_000) {
  24. throw new Error('Payload too large');
  25. }
  26. }
  27. if (!raw.trim()) return {};
  28. return JSON.parse(raw);
  29. }
  30. function extractSessionID(event) {
  31. const properties = event?.properties ?? {};
  32. if (typeof properties.sessionID === 'string') return properties.sessionID;
  33. if (properties.info && typeof properties.info === 'object' && typeof properties.info.sessionID === 'string') {
  34. return properties.info.sessionID;
  35. }
  36. if (properties.part && typeof properties.part === 'object' && typeof properties.part.sessionID === 'string') {
  37. return properties.part.sessionID;
  38. }
  39. return null;
  40. }
  41. function classifyEvent(event) {
  42. const type = String(event?.type || '');
  43. const lower = type.toLowerCase();
  44. if (lower.includes('permission')) return 'permission';
  45. if (lower.includes('error')) return 'error';
  46. if (type === 'session.status') {
  47. const statusType = event?.properties?.status?.type;
  48. if (statusType === 'idle') return 'complete';
  49. }
  50. if (type === 'message.updated') {
  51. const info = event?.properties?.info;
  52. if (info && typeof info === 'object') {
  53. if (info.error) return 'error';
  54. if (info.role === 'assistant' && info.time && typeof info.time === 'object' && info.time.completed) {
  55. return 'complete';
  56. }
  57. }
  58. }
  59. return null;
  60. }
  61. function notificationBody(eventType) {
  62. if (eventType === 'complete') {
  63. return {
  64. title: 'Session complete',
  65. body: 'OpenCode finished your monitored prompt.',
  66. };
  67. }
  68. if (eventType === 'permission') {
  69. return {
  70. title: 'Action needed',
  71. body: 'OpenCode needs a permission decision.',
  72. };
  73. }
  74. return {
  75. title: 'Session error',
  76. body: 'OpenCode reported an error for your monitored session.',
  77. };
  78. }
  79. async function sendPush({ expoPushToken, eventType, sessionID, jobID }) {
  80. const dedupeKey = `${jobID}:${eventType}`;
  81. if (dedupe.has(dedupeKey)) return;
  82. dedupe.add(dedupeKey);
  83. const text = notificationBody(eventType);
  84. const payload = {
  85. to: expoPushToken,
  86. priority: 'high',
  87. _contentAvailable: true,
  88. data: {
  89. eventType,
  90. sessionID,
  91. jobID,
  92. title: text.title,
  93. body: text.body,
  94. dedupeKey,
  95. at: Date.now(),
  96. },
  97. };
  98. const response = await fetch(EXPO_PUSH_URL, {
  99. method: 'POST',
  100. headers: {
  101. 'Content-Type': 'application/json',
  102. Accept: 'application/json',
  103. },
  104. body: JSON.stringify(payload),
  105. });
  106. if (!response.ok) {
  107. const body = await response.text();
  108. throw new Error(`Push send failed (${response.status}): ${body || response.statusText}`);
  109. }
  110. }
  111. async function* parseSSE(readable) {
  112. const reader = readable.getReader();
  113. const decoder = new TextDecoder();
  114. let pending = '';
  115. try {
  116. while (true) {
  117. const next = await reader.read();
  118. if (next.done) break;
  119. pending += decoder.decode(next.value, { stream: true });
  120. const blocks = pending.split(/\r?\n\r?\n/);
  121. pending = blocks.pop() || '';
  122. for (const block of blocks) {
  123. const lines = block.split(/\r?\n/);
  124. const dataLines = [];
  125. for (const line of lines) {
  126. if (!line || line.startsWith(':')) continue;
  127. if (line.startsWith('data:')) {
  128. dataLines.push(line.slice(5).trimStart());
  129. }
  130. }
  131. if (dataLines.length > 0) {
  132. yield dataLines.join('\n');
  133. }
  134. }
  135. }
  136. } finally {
  137. reader.releaseLock();
  138. }
  139. }
  140. function cleanupStreamIfUnused(baseURL) {
  141. const key = baseURL.replace(/\/+$/, '');
  142. const entry = streams.get(key);
  143. if (!entry) return;
  144. const stillUsed = Array.from(jobs.values()).some((job) => !job.done && job.opencodeBaseURL === key);
  145. if (stillUsed) return;
  146. entry.abortController.abort();
  147. streams.delete(key);
  148. }
  149. async function runStream(baseURL) {
  150. const key = baseURL.replace(/\/+$/, '');
  151. if (streams.has(key)) return;
  152. const abortController = new AbortController();
  153. streams.set(key, {
  154. key,
  155. opencodeBaseURL: key,
  156. abortController,
  157. sessions: new Set(),
  158. running: true,
  159. });
  160. while (!abortController.signal.aborted) {
  161. try {
  162. const response = await fetch(`${key}/event`, {
  163. signal: abortController.signal,
  164. headers: {
  165. Accept: 'text/event-stream',
  166. 'Cache-Control': 'no-cache',
  167. },
  168. });
  169. if (!response.ok || !response.body) {
  170. throw new Error(`SSE connect failed (${response.status})`);
  171. }
  172. for await (const data of parseSSE(response.body)) {
  173. if (abortController.signal.aborted) break;
  174. let event;
  175. try {
  176. event = JSON.parse(data);
  177. } catch {
  178. continue;
  179. }
  180. const sessionID = extractSessionID(event);
  181. if (!sessionID) continue;
  182. const eventType = classifyEvent(event);
  183. if (!eventType) continue;
  184. const related = Array.from(jobs.values()).filter(
  185. (job) => !job.done && job.opencodeBaseURL === key && job.sessionID === sessionID,
  186. );
  187. if (related.length === 0) continue;
  188. await Promise.allSettled(
  189. related.map(async (job) => {
  190. await sendPush({
  191. expoPushToken: job.expoPushToken,
  192. eventType,
  193. sessionID,
  194. jobID: job.jobID,
  195. });
  196. if (eventType === 'complete' || eventType === 'error') {
  197. const current = jobs.get(job.jobID);
  198. if (current) current.done = true;
  199. }
  200. }),
  201. );
  202. }
  203. } catch (error) {
  204. if (abortController.signal.aborted) break;
  205. console.warn('[relay] SSE loop error:', error instanceof Error ? error.message : String(error));
  206. await new Promise((resolve) => setTimeout(resolve, 1200));
  207. }
  208. }
  209. }
  210. const server = createServer(async (req, res) => {
  211. if (!req.url || !req.method) {
  212. json(res, 400, { ok: false, error: 'Invalid request' });
  213. return;
  214. }
  215. if (req.url === '/health' && req.method === 'GET') {
  216. json(res, 200, {
  217. ok: true,
  218. activeJobs: Array.from(jobs.values()).filter((job) => !job.done).length,
  219. streams: streams.size,
  220. });
  221. return;
  222. }
  223. if (req.url === '/v1/monitor/start' && req.method === 'POST') {
  224. try {
  225. const body = await readJSON(req);
  226. const jobID = String(body.jobID || '').trim();
  227. const sessionID = String(body.sessionID || '').trim();
  228. const opencodeBaseURL = String(body.opencodeBaseURL || '').trim().replace(/\/+$/, '');
  229. const relayBaseURL = String(body.relayBaseURL || '').trim().replace(/\/+$/, '');
  230. const expoPushToken = String(body.expoPushToken || '').trim();
  231. if (!jobID || !sessionID || !opencodeBaseURL || !expoPushToken) {
  232. json(res, 400, { ok: false, error: 'Missing required fields' });
  233. return;
  234. }
  235. jobs.set(jobID, {
  236. jobID,
  237. sessionID,
  238. opencodeBaseURL,
  239. relayBaseURL,
  240. expoPushToken,
  241. createdAt: Date.now(),
  242. done: false,
  243. });
  244. runStream(opencodeBaseURL).catch((error) => {
  245. console.warn('[relay] runStream failed:', error instanceof Error ? error.message : String(error));
  246. });
  247. json(res, 200, { ok: true });
  248. return;
  249. } catch (error) {
  250. json(res, 500, { ok: false, error: error instanceof Error ? error.message : String(error) });
  251. return;
  252. }
  253. }
  254. if (req.url === '/v1/monitor/stop' && req.method === 'POST') {
  255. try {
  256. const body = await readJSON(req);
  257. const jobID = String(body.jobID || '').trim();
  258. const token = String(body.expoPushToken || '').trim();
  259. if (!jobID || !token) {
  260. json(res, 400, { ok: false, error: 'Missing required fields' });
  261. return;
  262. }
  263. const job = jobs.get(jobID);
  264. if (job && job.expoPushToken === token) {
  265. job.done = true;
  266. cleanupStreamIfUnused(job.opencodeBaseURL);
  267. }
  268. json(res, 200, { ok: true });
  269. return;
  270. } catch (error) {
  271. json(res, 500, { ok: false, error: error instanceof Error ? error.message : String(error) });
  272. return;
  273. }
  274. }
  275. json(res, 404, { ok: false, error: 'Not found' });
  276. });
  277. server.listen(PORT, HOST, () => {
  278. console.log(`[relay] listening on http://${HOST}:${PORT}`);
  279. });