| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- import { DurableObject } from "cloudflare:workers"
- import {
- DurableObjectNamespace,
- ExecutionContext,
- } from "@cloudflare/workers-types"
- import { createHash } from "node:crypto"
- import path from "node:path"
- import { Resource } from "sst"
- type Bindings = {
- SYNC_SERVER: DurableObjectNamespace<WebSocketHibernationServer>
- }
- export class SyncServer extends DurableObject {
- private files: Map<string, string> = new Map()
- constructor(ctx, env) {
- super(ctx, env)
- this.ctx.blockConcurrencyWhile(async () => {
- this.files = await this.ctx.storage.list()
- })
- }
- async publish(filename: string, content: string) {
- console.log(
- "SyncServer publish",
- filename,
- content,
- "to",
- this.ctx.getWebSockets().length,
- "subscribers",
- )
- this.files.set(filename, content)
- await this.ctx.storage.put(filename, content)
- this.ctx.getWebSockets().forEach((client) => {
- client.send(JSON.stringify({ filename, content }))
- })
- }
- async webSocketMessage(ws, message) {
- if (message === "load_history") {
- }
- }
- async webSocketClose(ws, code, reason, wasClean) {
- ws.close(code, "Durable Object is closing WebSocket")
- }
- async fetch(req: Request) {
- console.log("SyncServer subscribe")
- // Creates two ends of a WebSocket connection.
- const webSocketPair = new WebSocketPair()
- const [client, server] = Object.values(webSocketPair)
- this.ctx.acceptWebSocket(server)
- setTimeout(() => {
- this.files.forEach((content, filename) =>
- server.send(JSON.stringify({ filename, content })),
- )
- }, 0)
- return new Response(null, {
- status: 101,
- webSocket: client,
- })
- }
- }
- export default {
- async fetch(request: Request, env: Bindings, ctx: ExecutionContext) {
- const url = new URL(request.url)
- if (request.method === "GET" && url.pathname === "/") {
- return new Response("Hello, world!", {
- headers: { "Content-Type": "text/plain" },
- })
- }
- if (request.method === "POST" && url.pathname.endsWith("/share_create")) {
- const body = await request.json()
- const sessionID = body.session_id
- const shareID = createHash("sha256").update(sessionID).digest("hex")
- const infoFile = `${shareID}/info/${sessionID}.json`
- const ret = await Resource.Bucket.get(infoFile)
- if (ret)
- return new Response("Error: Session already sharing", { status: 400 })
- await Resource.Bucket.put(infoFile, "")
- return new Response(JSON.stringify({ share_id: shareID }), {
- headers: { "Content-Type": "application/json" },
- })
- }
- if (request.method === "POST" && url.pathname.endsWith("/share_delete")) {
- const body = await request.json()
- const sessionID = body.session_id
- const shareID = body.share_id
- const infoFile = `${shareID}/info/${sessionID}.json`
- await Resource.Bucket.delete(infoFile)
- return new Response(JSON.stringify({}), {
- headers: { "Content-Type": "application/json" },
- })
- }
- if (request.method === "POST" && url.pathname.endsWith("/share_sync")) {
- const body = await request.json()
- const sessionID = body.session_id
- const shareID = body.share_id
- const filename = body.filename
- const content = body.content
- // validate filename
- if (!filename.startsWith("info/") && !filename.startsWith("message/"))
- return new Response("Error: Invalid filename", { status: 400 })
- const infoFile = `${shareID}/info/${sessionID}.json`
- const ret = await Resource.Bucket.get(infoFile)
- if (!ret)
- return new Response("Error: Session not shared", { status: 400 })
- // send message to server
- const id = env.SYNC_SERVER.idFromName(sessionID)
- const stub = env.SYNC_SERVER.get(id)
- await stub.publish(filename, content)
- // store message
- await Resource.Bucket.put(`${shareID}/${filename}`, content)
- return new Response(JSON.stringify({}), {
- headers: { "Content-Type": "application/json" },
- })
- }
- if (request.method === "GET" && url.pathname.endsWith("/share_poll")) {
- // Expect to receive a WebSocket Upgrade request.
- // If there is one, accept the request and return a WebSocket Response.
- const upgradeHeader = request.headers.get("Upgrade")
- if (!upgradeHeader || upgradeHeader !== "websocket") {
- return new Response("Error: Upgrade header is required", {
- status: 426,
- })
- }
- // get query parameters
- const shareID = url.searchParams.get("share_id")
- if (!shareID)
- return new Response("Error: Share ID is required", { status: 400 })
- // Get session ID
- const listRet = await Resource.Bucket.list({
- prefix: `${shareID}/info/`,
- delimiter: "/",
- })
- if (listRet.objects.length === 0)
- return new Response("Error: Session not shared", { status: 400 })
- if (listRet.objects.length > 1)
- return new Response("Error: Multiple sessions found", { status: 400 })
- const sessionID = path.parse(listRet.objects[0].key).name
- // subscribe to server
- const id = env.SYNC_SERVER.idFromName(sessionID)
- const stub = env.SYNC_SERVER.get(id)
- return stub.fetch(request)
- }
- },
- }
|