api.ts 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. import { DurableObject } from "cloudflare:workers"
  2. import {
  3. DurableObjectNamespace,
  4. ExecutionContext,
  5. } from "@cloudflare/workers-types"
  6. import { createHash } from "node:crypto"
  7. import path from "node:path"
  8. import { Resource } from "sst"
  9. type Bindings = {
  10. SYNC_SERVER: DurableObjectNamespace<WebSocketHibernationServer>
  11. }
  12. export class SyncServer extends DurableObject {
  13. private files: Map<string, string> = new Map()
  14. constructor(ctx, env) {
  15. super(ctx, env)
  16. this.ctx.blockConcurrencyWhile(async () => {
  17. this.files = await this.ctx.storage.list()
  18. })
  19. }
  20. async publish(filename: string, content: string) {
  21. console.log(
  22. "SyncServer publish",
  23. filename,
  24. content,
  25. "to",
  26. this.ctx.getWebSockets().length,
  27. "subscribers",
  28. )
  29. this.files.set(filename, content)
  30. await this.ctx.storage.put(filename, content)
  31. this.ctx.getWebSockets().forEach((client) => {
  32. client.send(JSON.stringify({ filename, content }))
  33. })
  34. }
  35. async webSocketMessage(ws, message) {
  36. if (message === "load_history") {
  37. }
  38. }
  39. async webSocketClose(ws, code, reason, wasClean) {
  40. ws.close(code, "Durable Object is closing WebSocket")
  41. }
  42. async fetch(req: Request) {
  43. console.log("SyncServer subscribe")
  44. // Creates two ends of a WebSocket connection.
  45. const webSocketPair = new WebSocketPair()
  46. const [client, server] = Object.values(webSocketPair)
  47. this.ctx.acceptWebSocket(server)
  48. setTimeout(() => {
  49. this.files.forEach((content, filename) =>
  50. server.send(JSON.stringify({ filename, content })),
  51. )
  52. }, 0)
  53. return new Response(null, {
  54. status: 101,
  55. webSocket: client,
  56. })
  57. }
  58. }
  59. export default {
  60. async fetch(request: Request, env: Bindings, ctx: ExecutionContext) {
  61. const url = new URL(request.url)
  62. if (request.method === "GET" && url.pathname === "/") {
  63. return new Response("Hello, world!", {
  64. headers: { "Content-Type": "text/plain" },
  65. })
  66. }
  67. if (request.method === "POST" && url.pathname.endsWith("/share_create")) {
  68. const body = await request.json()
  69. const sessionID = body.session_id
  70. const shareID = createHash("sha256").update(sessionID).digest("hex")
  71. const infoFile = `${shareID}/info/${sessionID}.json`
  72. const ret = await Resource.Bucket.get(infoFile)
  73. if (ret)
  74. return new Response("Error: Session already sharing", { status: 400 })
  75. await Resource.Bucket.put(infoFile, "")
  76. return new Response(JSON.stringify({ share_id: shareID }), {
  77. headers: { "Content-Type": "application/json" },
  78. })
  79. }
  80. if (request.method === "POST" && url.pathname.endsWith("/share_delete")) {
  81. const body = await request.json()
  82. const sessionID = body.session_id
  83. const shareID = body.share_id
  84. const infoFile = `${shareID}/info/${sessionID}.json`
  85. await Resource.Bucket.delete(infoFile)
  86. return new Response(JSON.stringify({}), {
  87. headers: { "Content-Type": "application/json" },
  88. })
  89. }
  90. if (request.method === "POST" && url.pathname.endsWith("/share_sync")) {
  91. const body = await request.json()
  92. const sessionID = body.session_id
  93. const shareID = body.share_id
  94. const filename = body.filename
  95. const content = body.content
  96. // validate filename
  97. if (!filename.startsWith("info/") && !filename.startsWith("message/"))
  98. return new Response("Error: Invalid filename", { status: 400 })
  99. const infoFile = `${shareID}/info/${sessionID}.json`
  100. const ret = await Resource.Bucket.get(infoFile)
  101. if (!ret)
  102. return new Response("Error: Session not shared", { status: 400 })
  103. // send message to server
  104. const id = env.SYNC_SERVER.idFromName(sessionID)
  105. const stub = env.SYNC_SERVER.get(id)
  106. await stub.publish(filename, content)
  107. // store message
  108. await Resource.Bucket.put(`${shareID}/${filename}`, content)
  109. return new Response(JSON.stringify({}), {
  110. headers: { "Content-Type": "application/json" },
  111. })
  112. }
  113. if (request.method === "GET" && url.pathname.endsWith("/share_poll")) {
  114. // Expect to receive a WebSocket Upgrade request.
  115. // If there is one, accept the request and return a WebSocket Response.
  116. const upgradeHeader = request.headers.get("Upgrade")
  117. if (!upgradeHeader || upgradeHeader !== "websocket") {
  118. return new Response("Error: Upgrade header is required", {
  119. status: 426,
  120. })
  121. }
  122. // get query parameters
  123. const shareID = url.searchParams.get("share_id")
  124. if (!shareID)
  125. return new Response("Error: Share ID is required", { status: 400 })
  126. // Get session ID
  127. const listRet = await Resource.Bucket.list({
  128. prefix: `${shareID}/info/`,
  129. delimiter: "/",
  130. })
  131. if (listRet.objects.length === 0)
  132. return new Response("Error: Session not shared", { status: 400 })
  133. if (listRet.objects.length > 1)
  134. return new Response("Error: Multiple sessions found", { status: 400 })
  135. const sessionID = path.parse(listRet.objects[0].key).name
  136. // subscribe to server
  137. const id = env.SYNC_SERVER.idFromName(sessionID)
  138. const stub = env.SYNC_SERVER.get(id)
  139. return stub.fetch(request)
  140. }
  141. },
  142. }