api.ts 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import { DurableObject } from "cloudflare:workers"
  2. import { randomUUID } from "node:crypto"
  3. type Env = {
  4. SYNC_SERVER: DurableObjectNamespace<SyncServer>
  5. Bucket: R2Bucket
  6. }
  7. export class SyncServer extends DurableObject<Env> {
  8. constructor(ctx: DurableObjectState, env: Env) {
  9. super(ctx, env)
  10. }
  11. async fetch() {
  12. console.log("SyncServer subscribe")
  13. const webSocketPair = new WebSocketPair()
  14. const [client, server] = Object.values(webSocketPair)
  15. this.ctx.acceptWebSocket(server)
  16. const data = await this.ctx.storage.list()
  17. Array.from(data.entries())
  18. .filter(([key, _]) => key.startsWith("session/"))
  19. .map(([key, content]) => server.send(JSON.stringify({ key, content })))
  20. return new Response(null, {
  21. status: 101,
  22. webSocket: client,
  23. })
  24. }
  25. async webSocketMessage(ws, message) {}
  26. async webSocketClose(ws, code, reason, wasClean) {
  27. ws.close(code, "Durable Object is closing WebSocket")
  28. }
  29. async publish(secret: string, key: string, content: any) {
  30. if (secret !== (await this.getSecret())) throw new Error("Invalid secret")
  31. const sessionID = await this.getSessionID()
  32. if (
  33. !key.startsWith(`session/info/${sessionID}`) &&
  34. !key.startsWith(`session/message/${sessionID}/`)
  35. )
  36. return new Response("Error: Invalid key", { status: 400 })
  37. // store message
  38. await this.env.Bucket.put(`share/${key}.json`, JSON.stringify(content), {
  39. httpMetadata: {
  40. contentType: "application/json",
  41. },
  42. })
  43. await this.ctx.storage.put(key, content)
  44. const clients = this.ctx.getWebSockets()
  45. console.log("SyncServer publish", key, "to", clients.length, "subscribers")
  46. for (const client of clients) {
  47. client.send(JSON.stringify({ key, content }))
  48. }
  49. }
  50. public async share(sessionID: string) {
  51. let secret = await this.getSecret()
  52. if (secret) return secret
  53. secret = randomUUID()
  54. await this.ctx.storage.put("secret", secret)
  55. await this.ctx.storage.put("sessionID", sessionID)
  56. return secret
  57. }
  58. public async getData() {
  59. const data = await this.ctx.storage.list()
  60. return Array.from(data.entries())
  61. .filter(([key, _]) => key.startsWith("session/"))
  62. .map(([key, content]) => ({ key, content }))
  63. }
  64. private async getSecret() {
  65. return this.ctx.storage.get<string>("secret")
  66. }
  67. private async getSessionID() {
  68. return this.ctx.storage.get<string>("sessionID")
  69. }
  70. async clear(secret: string) {
  71. await this.assertSecret(secret)
  72. await this.ctx.storage.deleteAll()
  73. }
  74. private async assertSecret(secret: string) {
  75. if (secret !== (await this.getSecret())) throw new Error("Invalid secret")
  76. }
  77. static shortName(id: string) {
  78. return id.substring(id.length - 8)
  79. }
  80. }
  81. export default {
  82. async fetch(request: Request, env: Env, ctx: ExecutionContext) {
  83. const url = new URL(request.url)
  84. const splits = url.pathname.split("/")
  85. const method = splits[1]
  86. if (request.method === "GET" && method === "") {
  87. return new Response("Hello, world!", {
  88. headers: { "Content-Type": "text/plain" },
  89. })
  90. }
  91. if (request.method === "POST" && method === "share_create") {
  92. const body = await request.json<any>()
  93. const sessionID = body.sessionID
  94. const short = SyncServer.shortName(sessionID)
  95. const id = env.SYNC_SERVER.idFromName(short)
  96. const stub = env.SYNC_SERVER.get(id)
  97. const secret = await stub.share(sessionID)
  98. return new Response(
  99. JSON.stringify({
  100. secret,
  101. url: "https://dev.opencode.ai/s/" + short,
  102. }),
  103. {
  104. headers: { "Content-Type": "application/json" },
  105. },
  106. )
  107. }
  108. if (request.method === "POST" && method === "share_delete") {
  109. const body = await request.json<any>()
  110. const sessionID = body.sessionID
  111. const secret = body.secret
  112. const id = env.SYNC_SERVER.idFromName(SyncServer.shortName(sessionID))
  113. const stub = env.SYNC_SERVER.get(id)
  114. await stub.clear(secret)
  115. return new Response(JSON.stringify({}), {
  116. headers: { "Content-Type": "application/json" },
  117. })
  118. }
  119. if (request.method === "POST" && method === "share_sync") {
  120. const body = await request.json<{
  121. sessionID: string
  122. secret: string
  123. key: string
  124. content: any
  125. }>()
  126. const name = SyncServer.shortName(body.sessionID)
  127. const id = env.SYNC_SERVER.idFromName(name)
  128. const stub = env.SYNC_SERVER.get(id)
  129. await stub.publish(body.secret, body.key, body.content)
  130. return new Response(JSON.stringify({}), {
  131. headers: { "Content-Type": "application/json" },
  132. })
  133. }
  134. if (request.method === "GET" && method === "share_poll") {
  135. const upgradeHeader = request.headers.get("Upgrade")
  136. if (!upgradeHeader || upgradeHeader !== "websocket") {
  137. return new Response("Error: Upgrade header is required", {
  138. status: 426,
  139. })
  140. }
  141. const id = url.searchParams.get("id")
  142. console.log("share_poll", id)
  143. if (!id)
  144. return new Response("Error: Share ID is required", { status: 400 })
  145. const stub = env.SYNC_SERVER.get(env.SYNC_SERVER.idFromName(id))
  146. return stub.fetch(request)
  147. }
  148. if (request.method === "GET" && method === "share_data") {
  149. const id = url.searchParams.get("id")
  150. console.log("share_data", id)
  151. if (!id)
  152. return new Response("Error: Share ID is required", { status: 400 })
  153. const stub = env.SYNC_SERVER.get(env.SYNC_SERVER.idFromName(id))
  154. const data = await stub.getData()
  155. let info
  156. const messages: Record<string, any> = {}
  157. data.forEach((d) => {
  158. const [root, type, ...splits] = d.key.split("/")
  159. if (root !== "session") return
  160. if (type === "info") {
  161. info = d.content
  162. return
  163. }
  164. if (type === "message") {
  165. const [, messageID] = splits
  166. messages[messageID] = d.content
  167. }
  168. })
  169. return new Response(
  170. JSON.stringify({
  171. info,
  172. messages,
  173. }),
  174. {
  175. headers: { "Content-Type": "application/json" },
  176. },
  177. )
  178. }
  179. },
  180. }