share-next.ts 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import { Bus } from "@/bus"
  2. import { Config } from "@/config/config"
  3. import { ulid } from "ulid"
  4. import { Provider } from "@/provider/provider"
  5. import { Session } from "@/session"
  6. import { MessageV2 } from "@/session/message-v2"
  7. import { Storage } from "@/storage/storage"
  8. import { Log } from "@/util/log"
  9. import type * as SDK from "@opencode-ai/sdk"
  10. export namespace ShareNext {
  11. const log = Log.create({ service: "share-next" })
  12. export async function init() {
  13. const config = await Config.get()
  14. if (!config.enterprise) return
  15. Bus.subscribe(Session.Event.Updated, async (evt) => {
  16. await sync(evt.properties.info.id, [
  17. {
  18. type: "session",
  19. data: evt.properties.info,
  20. },
  21. ])
  22. })
  23. Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
  24. await sync(evt.properties.info.sessionID, [
  25. {
  26. type: "message",
  27. data: evt.properties.info,
  28. },
  29. ])
  30. if (evt.properties.info.role === "user") {
  31. await sync(evt.properties.info.sessionID, [
  32. {
  33. type: "model",
  34. data: [
  35. await Provider.getModel(evt.properties.info.model.providerID, evt.properties.info.model.modelID).then(
  36. (m) => m,
  37. ),
  38. ],
  39. },
  40. ])
  41. }
  42. })
  43. Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
  44. await sync(evt.properties.part.sessionID, [
  45. {
  46. type: "part",
  47. data: evt.properties.part,
  48. },
  49. ])
  50. })
  51. Bus.subscribe(Session.Event.Diff, async (evt) => {
  52. await sync(evt.properties.sessionID, [
  53. {
  54. type: "session_diff",
  55. data: evt.properties.diff,
  56. },
  57. ])
  58. })
  59. }
  60. export async function create(sessionID: string) {
  61. log.info("creating share", { sessionID })
  62. const url = await Config.get().then((x) => x.enterprise!.url)
  63. const result = await fetch(`${url}/api/share`, {
  64. method: "POST",
  65. headers: {
  66. "Content-Type": "application/json",
  67. },
  68. body: JSON.stringify({ sessionID: sessionID }),
  69. })
  70. .then((x) => x.json())
  71. .then((x) => x as { id: string; url: string; secret: string })
  72. await Storage.write(["session_share", sessionID], result)
  73. fullSync(sessionID)
  74. return result
  75. }
  76. function get(sessionID: string) {
  77. return Storage.read<{
  78. id: string
  79. secret: string
  80. url: string
  81. }>(["session_share", sessionID])
  82. }
  83. type Data =
  84. | {
  85. type: "session"
  86. data: SDK.Session
  87. }
  88. | {
  89. type: "message"
  90. data: SDK.Message
  91. }
  92. | {
  93. type: "part"
  94. data: SDK.Part
  95. }
  96. | {
  97. type: "session_diff"
  98. data: SDK.FileDiff[]
  99. }
  100. | {
  101. type: "model"
  102. data: SDK.Model[]
  103. }
  104. const queue = new Map<string, { timeout: NodeJS.Timeout; data: Map<string, Data> }>()
  105. async function sync(sessionID: string, data: Data[]) {
  106. const existing = queue.get(sessionID)
  107. if (existing) {
  108. for (const item of data) {
  109. existing.data.set("id" in item ? (item.id as string) : ulid(), item)
  110. }
  111. return
  112. }
  113. const dataMap = new Map<string, Data>()
  114. for (const item of data) {
  115. dataMap.set("id" in item ? (item.id as string) : ulid(), item)
  116. }
  117. const timeout = setTimeout(async () => {
  118. const queued = queue.get(sessionID)
  119. if (!queued) return
  120. queue.delete(sessionID)
  121. const url = await Config.get().then((x) => x.enterprise!.url)
  122. const share = await get(sessionID)
  123. if (!share) return
  124. await fetch(`${url}/api/share/${share.id}/sync`, {
  125. method: "POST",
  126. headers: {
  127. "Content-Type": "application/json",
  128. },
  129. body: JSON.stringify({
  130. secret: share.secret,
  131. data: Array.from(queued.data.values()),
  132. }),
  133. })
  134. }, 1000)
  135. queue.set(sessionID, { timeout, data: dataMap })
  136. }
  137. export async function remove(sessionID: string) {
  138. log.info("removing share", { sessionID })
  139. const url = await Config.get().then((x) => x.enterprise!.url)
  140. const share = await get(sessionID)
  141. if (!share) return
  142. await fetch(`${url}/api/share/${share.id}`, {
  143. method: "DELETE",
  144. headers: {
  145. "Content-Type": "application/json",
  146. },
  147. body: JSON.stringify({
  148. secret: share.secret,
  149. }),
  150. })
  151. await Storage.remove(["session_share", share.id])
  152. }
  153. async function fullSync(sessionID: string) {
  154. log.info("full sync", { sessionID })
  155. const session = await Session.get(sessionID)
  156. const diffs = await Session.diff(sessionID)
  157. const messages = await Array.fromAsync(MessageV2.stream(sessionID))
  158. const models = await Promise.all(
  159. messages
  160. .filter((m) => m.info.role === "user")
  161. .map((m) => (m.info as SDK.UserMessage).model)
  162. .map((m) => Provider.getModel(m.providerID, m.modelID).then((m) => m)),
  163. )
  164. await sync(sessionID, [
  165. {
  166. type: "session",
  167. data: session,
  168. },
  169. ...messages.map((x) => ({
  170. type: "message" as const,
  171. data: x.info,
  172. })),
  173. ...messages.flatMap((x) => x.parts.map((y) => ({ type: "part" as const, data: y }))),
  174. {
  175. type: "session_diff",
  176. data: diffs,
  177. },
  178. {
  179. type: "model",
  180. data: models,
  181. },
  182. ])
  183. }
  184. }