| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- import { createServer } from 'node:http';
- const PORT = Number(process.env.PORT || 8787);
- const HOST = process.env.HOST || '0.0.0.0';
- const EXPO_PUSH_URL = 'https://exp.host/--/api/v2/push/send';
- /** @type {Map<string, {jobID: string, sessionID: string, opencodeBaseURL: string, relayBaseURL: string, expoPushToken: string, createdAt: number, done: boolean}>} */
- const jobs = new Map();
- /** @type {Map<string, {key: string, opencodeBaseURL: string, abortController: AbortController, sessions: Set<string>, running: boolean}>} */
- const streams = new Map();
- /** @type {Set<string>} */
- const dedupe = new Set();
- function json(res, status, body) {
- const value = JSON.stringify(body);
- res.writeHead(status, {
- 'Content-Type': 'application/json',
- 'Content-Length': Buffer.byteLength(value),
- });
- res.end(value);
- }
- async function readJSON(req) {
- let raw = '';
- for await (const chunk of req) {
- raw += chunk;
- if (raw.length > 1_000_000) {
- throw new Error('Payload too large');
- }
- }
- if (!raw.trim()) return {};
- return JSON.parse(raw);
- }
- function extractSessionID(event) {
- const properties = event?.properties ?? {};
- if (typeof properties.sessionID === 'string') return properties.sessionID;
- if (properties.info && typeof properties.info === 'object' && typeof properties.info.sessionID === 'string') {
- return properties.info.sessionID;
- }
- if (properties.part && typeof properties.part === 'object' && typeof properties.part.sessionID === 'string') {
- return properties.part.sessionID;
- }
- return null;
- }
- function classifyEvent(event) {
- const type = String(event?.type || '');
- const lower = type.toLowerCase();
- if (lower.includes('permission')) return 'permission';
- if (lower.includes('error')) return 'error';
- if (type === 'session.status') {
- const statusType = event?.properties?.status?.type;
- if (statusType === 'idle') return 'complete';
- }
- if (type === 'message.updated') {
- const info = event?.properties?.info;
- if (info && typeof info === 'object') {
- if (info.error) return 'error';
- if (info.role === 'assistant' && info.time && typeof info.time === 'object' && info.time.completed) {
- return 'complete';
- }
- }
- }
- return null;
- }
- function notificationBody(eventType) {
- if (eventType === 'complete') {
- return {
- title: 'Session complete',
- body: 'OpenCode finished your monitored prompt.',
- };
- }
- if (eventType === 'permission') {
- return {
- title: 'Action needed',
- body: 'OpenCode needs a permission decision.',
- };
- }
- return {
- title: 'Session error',
- body: 'OpenCode reported an error for your monitored session.',
- };
- }
- async function sendPush({ expoPushToken, eventType, sessionID, jobID }) {
- const dedupeKey = `${jobID}:${eventType}`;
- if (dedupe.has(dedupeKey)) return;
- dedupe.add(dedupeKey);
- const text = notificationBody(eventType);
- const payload = {
- to: expoPushToken,
- priority: 'high',
- _contentAvailable: true,
- data: {
- eventType,
- sessionID,
- jobID,
- title: text.title,
- body: text.body,
- dedupeKey,
- at: Date.now(),
- },
- };
- const response = await fetch(EXPO_PUSH_URL, {
- method: 'POST',
- headers: {
- 'Content-Type': 'application/json',
- Accept: 'application/json',
- },
- body: JSON.stringify(payload),
- });
- if (!response.ok) {
- const body = await response.text();
- throw new Error(`Push send failed (${response.status}): ${body || response.statusText}`);
- }
- }
- async function* parseSSE(readable) {
- const reader = readable.getReader();
- const decoder = new TextDecoder();
- let pending = '';
- try {
- while (true) {
- const next = await reader.read();
- if (next.done) break;
- pending += decoder.decode(next.value, { stream: true });
- const blocks = pending.split(/\r?\n\r?\n/);
- pending = blocks.pop() || '';
- for (const block of blocks) {
- const lines = block.split(/\r?\n/);
- const dataLines = [];
- for (const line of lines) {
- if (!line || line.startsWith(':')) continue;
- if (line.startsWith('data:')) {
- dataLines.push(line.slice(5).trimStart());
- }
- }
- if (dataLines.length > 0) {
- yield dataLines.join('\n');
- }
- }
- }
- } finally {
- reader.releaseLock();
- }
- }
- function cleanupStreamIfUnused(baseURL) {
- const key = baseURL.replace(/\/+$/, '');
- const entry = streams.get(key);
- if (!entry) return;
- const stillUsed = Array.from(jobs.values()).some((job) => !job.done && job.opencodeBaseURL === key);
- if (stillUsed) return;
- entry.abortController.abort();
- streams.delete(key);
- }
- async function runStream(baseURL) {
- const key = baseURL.replace(/\/+$/, '');
- if (streams.has(key)) return;
- const abortController = new AbortController();
- streams.set(key, {
- key,
- opencodeBaseURL: key,
- abortController,
- sessions: new Set(),
- running: true,
- });
- while (!abortController.signal.aborted) {
- try {
- const response = await fetch(`${key}/event`, {
- signal: abortController.signal,
- headers: {
- Accept: 'text/event-stream',
- 'Cache-Control': 'no-cache',
- },
- });
- if (!response.ok || !response.body) {
- throw new Error(`SSE connect failed (${response.status})`);
- }
- for await (const data of parseSSE(response.body)) {
- if (abortController.signal.aborted) break;
- let event;
- try {
- event = JSON.parse(data);
- } catch {
- continue;
- }
- const sessionID = extractSessionID(event);
- if (!sessionID) continue;
- const eventType = classifyEvent(event);
- if (!eventType) continue;
- const related = Array.from(jobs.values()).filter(
- (job) => !job.done && job.opencodeBaseURL === key && job.sessionID === sessionID,
- );
- if (related.length === 0) continue;
- await Promise.allSettled(
- related.map(async (job) => {
- await sendPush({
- expoPushToken: job.expoPushToken,
- eventType,
- sessionID,
- jobID: job.jobID,
- });
- if (eventType === 'complete' || eventType === 'error') {
- const current = jobs.get(job.jobID);
- if (current) current.done = true;
- }
- }),
- );
- }
- } catch (error) {
- if (abortController.signal.aborted) break;
- console.warn('[relay] SSE loop error:', error instanceof Error ? error.message : String(error));
- await new Promise((resolve) => setTimeout(resolve, 1200));
- }
- }
- }
- const server = createServer(async (req, res) => {
- if (!req.url || !req.method) {
- json(res, 400, { ok: false, error: 'Invalid request' });
- return;
- }
- if (req.url === '/health' && req.method === 'GET') {
- json(res, 200, {
- ok: true,
- activeJobs: Array.from(jobs.values()).filter((job) => !job.done).length,
- streams: streams.size,
- });
- return;
- }
- if (req.url === '/v1/monitor/start' && req.method === 'POST') {
- try {
- const body = await readJSON(req);
- const jobID = String(body.jobID || '').trim();
- const sessionID = String(body.sessionID || '').trim();
- const opencodeBaseURL = String(body.opencodeBaseURL || '').trim().replace(/\/+$/, '');
- const relayBaseURL = String(body.relayBaseURL || '').trim().replace(/\/+$/, '');
- const expoPushToken = String(body.expoPushToken || '').trim();
- if (!jobID || !sessionID || !opencodeBaseURL || !expoPushToken) {
- json(res, 400, { ok: false, error: 'Missing required fields' });
- return;
- }
- jobs.set(jobID, {
- jobID,
- sessionID,
- opencodeBaseURL,
- relayBaseURL,
- expoPushToken,
- createdAt: Date.now(),
- done: false,
- });
- runStream(opencodeBaseURL).catch((error) => {
- console.warn('[relay] runStream failed:', error instanceof Error ? error.message : String(error));
- });
- json(res, 200, { ok: true });
- return;
- } catch (error) {
- json(res, 500, { ok: false, error: error instanceof Error ? error.message : String(error) });
- return;
- }
- }
- if (req.url === '/v1/monitor/stop' && req.method === 'POST') {
- try {
- const body = await readJSON(req);
- const jobID = String(body.jobID || '').trim();
- const token = String(body.expoPushToken || '').trim();
- if (!jobID || !token) {
- json(res, 400, { ok: false, error: 'Missing required fields' });
- return;
- }
- const job = jobs.get(jobID);
- if (job && job.expoPushToken === token) {
- job.done = true;
- cleanupStreamIfUnused(job.opencodeBaseURL);
- }
- json(res, 200, { ok: true });
- return;
- } catch (error) {
- json(res, 500, { ok: false, error: error instanceof Error ? error.message : String(error) });
- return;
- }
- }
- json(res, 404, { ok: false, error: 'Not found' });
- });
- server.listen(PORT, HOST, () => {
- console.log(`[relay] listening on http://${HOST}:${PORT}`);
- });
|