workspace.ts 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525
  1. import z from "zod"
  2. import { setTimeout as sleep } from "node:timers/promises"
  3. import { fn } from "@/util/fn"
  4. import { Database, asc, eq, inArray } from "@/storage/db"
  5. import { Project } from "@/project"
  6. import { BusEvent } from "@/bus/bus-event"
  7. import { GlobalBus } from "@/bus/global"
  8. import { SyncEvent } from "@/sync"
  9. import { EventTable } from "@/sync/event.sql"
  10. import { Flag } from "@/flag/flag"
  11. import { Log } from "@/util"
  12. import { Filesystem } from "@/util"
  13. import { ProjectID } from "@/project/schema"
  14. import { Slug } from "@opencode-ai/shared/util/slug"
  15. import { WorkspaceTable } from "./workspace.sql"
  16. import { getAdaptor } from "./adaptors"
  17. import { WorkspaceInfo } from "./types"
  18. import { WorkspaceID } from "./schema"
  19. import { parseSSE } from "./sse"
  20. import { Session } from "@/session"
  21. import { SessionTable } from "@/session/session.sql"
  22. import { SessionID } from "@/session/schema"
  23. import { errorData } from "@/util/error"
  24. import { AppRuntime } from "@/effect/app-runtime"
  25. import { EventSequenceTable } from "@/sync/event.sql"
  26. import { waitEvent } from "./util"
  27. export namespace Workspace {
  28. export const Info = WorkspaceInfo.meta({
  29. ref: "Workspace",
  30. })
  31. export type Info = z.infer<typeof Info>
  32. export const ConnectionStatus = z.object({
  33. workspaceID: WorkspaceID.zod,
  34. status: z.enum(["connected", "connecting", "disconnected", "error"]),
  35. error: z.string().optional(),
  36. })
  37. export type ConnectionStatus = z.infer<typeof ConnectionStatus>
  38. const Restore = z.object({
  39. workspaceID: WorkspaceID.zod,
  40. sessionID: SessionID.zod,
  41. total: z.number().int().min(0),
  42. step: z.number().int().min(0),
  43. })
  44. export const Event = {
  45. Ready: BusEvent.define(
  46. "workspace.ready",
  47. z.object({
  48. name: z.string(),
  49. }),
  50. ),
  51. Failed: BusEvent.define(
  52. "workspace.failed",
  53. z.object({
  54. message: z.string(),
  55. }),
  56. ),
  57. Restore: BusEvent.define("workspace.restore", Restore),
  58. Status: BusEvent.define("workspace.status", ConnectionStatus),
  59. }
  60. function fromRow(row: typeof WorkspaceTable.$inferSelect): Info {
  61. return {
  62. id: row.id,
  63. type: row.type,
  64. branch: row.branch,
  65. name: row.name,
  66. directory: row.directory,
  67. extra: row.extra,
  68. projectID: row.project_id,
  69. }
  70. }
  71. const CreateInput = z.object({
  72. id: WorkspaceID.zod.optional(),
  73. type: Info.shape.type,
  74. branch: Info.shape.branch,
  75. projectID: ProjectID.zod,
  76. extra: Info.shape.extra,
  77. })
  78. export const create = fn(CreateInput, async (input) => {
  79. const id = WorkspaceID.ascending(input.id)
  80. const adaptor = await getAdaptor(input.projectID, input.type)
  81. const config = await adaptor.configure({ ...input, id, name: Slug.create(), directory: null })
  82. const info: Info = {
  83. id,
  84. type: config.type,
  85. branch: config.branch ?? null,
  86. name: config.name ?? null,
  87. directory: config.directory ?? null,
  88. extra: config.extra ?? null,
  89. projectID: input.projectID,
  90. }
  91. Database.use((db) => {
  92. db.insert(WorkspaceTable)
  93. .values({
  94. id: info.id,
  95. type: info.type,
  96. branch: info.branch,
  97. name: info.name,
  98. directory: info.directory,
  99. extra: info.extra,
  100. project_id: info.projectID,
  101. })
  102. .run()
  103. })
  104. await adaptor.create(config)
  105. void startSync(info)
  106. await waitEvent({
  107. timeout: TIMEOUT,
  108. fn(event) {
  109. if (event.workspace === info.id && event.payload.type === Event.Status.type) {
  110. const { status } = event.payload.properties
  111. return status === "error" || status === "connected"
  112. }
  113. return false
  114. },
  115. })
  116. return info
  117. })
  118. const SessionRestoreInput = z.object({
  119. workspaceID: WorkspaceID.zod,
  120. sessionID: SessionID.zod,
  121. })
  122. export const sessionRestore = fn(SessionRestoreInput, async (input) => {
  123. log.info("session restore requested", {
  124. workspaceID: input.workspaceID,
  125. sessionID: input.sessionID,
  126. })
  127. try {
  128. const space = await get(input.workspaceID)
  129. if (!space) throw new Error(`Workspace not found: ${input.workspaceID}`)
  130. const adaptor = await getAdaptor(space.projectID, space.type)
  131. const target = await adaptor.target(space)
  132. // Need to switch the workspace of the session
  133. SyncEvent.run(Session.Event.Updated, {
  134. sessionID: input.sessionID,
  135. info: {
  136. workspaceID: input.workspaceID,
  137. },
  138. })
  139. const rows = Database.use((db) =>
  140. db
  141. .select({
  142. id: EventTable.id,
  143. aggregateID: EventTable.aggregate_id,
  144. seq: EventTable.seq,
  145. type: EventTable.type,
  146. data: EventTable.data,
  147. })
  148. .from(EventTable)
  149. .where(eq(EventTable.aggregate_id, input.sessionID))
  150. .orderBy(asc(EventTable.seq))
  151. .all(),
  152. )
  153. if (rows.length === 0) throw new Error(`No events found for session: ${input.sessionID}`)
  154. const all = rows
  155. const size = 10
  156. const sets = Array.from({ length: Math.ceil(all.length / size) }, (_, i) => all.slice(i * size, (i + 1) * size))
  157. const total = sets.length
  158. log.info("session restore prepared", {
  159. workspaceID: input.workspaceID,
  160. sessionID: input.sessionID,
  161. workspaceType: space.type,
  162. directory: space.directory,
  163. target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
  164. events: all.length,
  165. batches: total,
  166. first: all[0]?.seq,
  167. last: all.at(-1)?.seq,
  168. })
  169. GlobalBus.emit("event", {
  170. directory: "global",
  171. workspace: input.workspaceID,
  172. payload: {
  173. type: Event.Restore.type,
  174. properties: {
  175. workspaceID: input.workspaceID,
  176. sessionID: input.sessionID,
  177. total,
  178. step: 0,
  179. },
  180. },
  181. })
  182. for (const [i, events] of sets.entries()) {
  183. log.info("session restore batch starting", {
  184. workspaceID: input.workspaceID,
  185. sessionID: input.sessionID,
  186. step: i + 1,
  187. total,
  188. events: events.length,
  189. first: events[0]?.seq,
  190. last: events.at(-1)?.seq,
  191. target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
  192. })
  193. if (target.type === "local") {
  194. SyncEvent.replayAll(events)
  195. log.info("session restore batch replayed locally", {
  196. workspaceID: input.workspaceID,
  197. sessionID: input.sessionID,
  198. step: i + 1,
  199. total,
  200. events: events.length,
  201. })
  202. } else {
  203. const url = route(target.url, "/sync/replay")
  204. const headers = new Headers(target.headers)
  205. headers.set("content-type", "application/json")
  206. const res = await fetch(url, {
  207. method: "POST",
  208. headers,
  209. body: JSON.stringify({
  210. directory: space.directory ?? "",
  211. events,
  212. }),
  213. })
  214. if (!res.ok) {
  215. const body = await res.text()
  216. log.error("session restore batch failed", {
  217. workspaceID: input.workspaceID,
  218. sessionID: input.sessionID,
  219. step: i + 1,
  220. total,
  221. status: res.status,
  222. body,
  223. })
  224. throw new Error(
  225. `Failed to replay session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${res.status} ${body}`,
  226. )
  227. }
  228. log.info("session restore batch posted", {
  229. workspaceID: input.workspaceID,
  230. sessionID: input.sessionID,
  231. step: i + 1,
  232. total,
  233. status: res.status,
  234. })
  235. }
  236. GlobalBus.emit("event", {
  237. directory: "global",
  238. workspace: input.workspaceID,
  239. payload: {
  240. type: Event.Restore.type,
  241. properties: {
  242. workspaceID: input.workspaceID,
  243. sessionID: input.sessionID,
  244. total,
  245. step: i + 1,
  246. },
  247. },
  248. })
  249. }
  250. log.info("session restore complete", {
  251. workspaceID: input.workspaceID,
  252. sessionID: input.sessionID,
  253. batches: total,
  254. })
  255. return {
  256. total,
  257. }
  258. } catch (err) {
  259. log.error("session restore failed", {
  260. workspaceID: input.workspaceID,
  261. sessionID: input.sessionID,
  262. error: errorData(err),
  263. })
  264. throw err
  265. }
  266. })
  267. export function list(project: Project.Info) {
  268. const rows = Database.use((db) =>
  269. db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
  270. )
  271. const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
  272. for (const space of spaces) void startSync(space)
  273. return spaces
  274. }
  275. function lookup(id: WorkspaceID) {
  276. const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
  277. if (!row) return
  278. return fromRow(row)
  279. }
  280. export const get = fn(WorkspaceID.zod, async (id) => {
  281. const space = lookup(id)
  282. if (!space) return
  283. void startSync(space)
  284. return space
  285. })
  286. export const remove = fn(WorkspaceID.zod, async (id) => {
  287. const sessions = Database.use((db) =>
  288. db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, id)).all(),
  289. )
  290. for (const session of sessions) {
  291. await AppRuntime.runPromise(Session.Service.use((svc) => svc.remove(session.id)))
  292. }
  293. const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
  294. if (row) {
  295. stopSync(id)
  296. const info = fromRow(row)
  297. try {
  298. const adaptor = await getAdaptor(info.projectID, row.type)
  299. await adaptor.remove(info)
  300. } catch {
  301. log.error("adaptor not available when removing workspace", { type: row.type })
  302. }
  303. Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
  304. return info
  305. }
  306. })
  307. const connections = new Map<WorkspaceID, ConnectionStatus>()
  308. const aborts = new Map<WorkspaceID, AbortController>()
  309. const TIMEOUT = 5000
  310. function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
  311. const prev = connections.get(id)
  312. if (prev?.status === status && prev?.error === error) return
  313. const next = { workspaceID: id, status, error }
  314. connections.set(id, next)
  315. if (status === "error") {
  316. aborts.delete(id)
  317. }
  318. GlobalBus.emit("event", {
  319. directory: "global",
  320. workspace: id,
  321. payload: {
  322. type: Event.Status.type,
  323. properties: next,
  324. },
  325. })
  326. }
  327. export function status(): ConnectionStatus[] {
  328. return [...connections.values()]
  329. }
  330. function synced(state: Record<string, number>) {
  331. const ids = Object.keys(state)
  332. if (ids.length === 0) return true
  333. const done = Object.fromEntries(
  334. Database.use((db) =>
  335. db
  336. .select({
  337. id: EventSequenceTable.aggregate_id,
  338. seq: EventSequenceTable.seq,
  339. })
  340. .from(EventSequenceTable)
  341. .where(inArray(EventSequenceTable.aggregate_id, ids))
  342. .all(),
  343. ).map((row) => [row.id, row.seq]),
  344. ) as Record<string, number>
  345. return ids.every((id) => {
  346. return (done[id] ?? -1) >= state[id]
  347. })
  348. }
  349. export async function isSyncing(workspaceID: WorkspaceID) {
  350. return aborts.has(workspaceID)
  351. }
  352. export async function waitForSync(workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) {
  353. if (synced(state)) return
  354. try {
  355. await waitEvent({
  356. timeout: TIMEOUT,
  357. signal,
  358. fn(event) {
  359. if (event.workspace !== workspaceID && event.payload.type !== "sync") {
  360. return false
  361. }
  362. return synced(state)
  363. },
  364. })
  365. } catch {
  366. if (signal?.aborted) throw signal.reason ?? new Error("Request aborted")
  367. throw new Error(`Timed out waiting for sync fence: ${JSON.stringify(state)}`)
  368. }
  369. }
  370. const log = Log.create({ service: "workspace-sync" })
  371. function route(url: string | URL, path: string) {
  372. const next = new URL(url)
  373. next.pathname = `${next.pathname.replace(/\/$/, "")}${path}`
  374. next.search = ""
  375. next.hash = ""
  376. return next
  377. }
  378. async function syncWorkspace(space: Info, signal: AbortSignal) {
  379. while (!signal.aborted) {
  380. log.info("connecting to global sync", { workspace: space.name })
  381. setStatus(space.id, "connecting")
  382. const adaptor = await getAdaptor(space.projectID, space.type)
  383. const target = await adaptor.target(space)
  384. if (target.type === "local") return
  385. const res = await fetch(route(target.url, "/global/event"), {
  386. method: "GET",
  387. headers: target.headers,
  388. signal,
  389. }).catch((err: unknown) => {
  390. setStatus(space.id, "error", err instanceof Error ? err.message : String(err))
  391. log.info("failed to connect to global sync", {
  392. workspace: space.name,
  393. error: err,
  394. })
  395. return undefined
  396. })
  397. if (!res || !res.ok || !res.body) {
  398. const error = !res ? "No response from global sync" : `Global sync HTTP ${res.status}`
  399. log.info("failed to connect to global sync", { workspace: space.name, error })
  400. setStatus(space.id, "error", error)
  401. await sleep(1000)
  402. continue
  403. }
  404. log.info("global sync connected", { workspace: space.name })
  405. setStatus(space.id, "connected")
  406. await parseSSE(res.body, signal, (evt: any) => {
  407. try {
  408. if (!("payload" in evt)) return
  409. if (evt.payload.type === "sync") {
  410. // This name -> type is temporary
  411. SyncEvent.replay({ ...evt.payload, type: evt.payload.name } as SyncEvent.SerializedEvent)
  412. }
  413. GlobalBus.emit("event", {
  414. directory: evt.directory,
  415. project: evt.project,
  416. workspace: space.id,
  417. payload: evt.payload,
  418. })
  419. } catch (err) {
  420. log.info("failed to replay global event", {
  421. workspaceID: space.id,
  422. error: err,
  423. })
  424. }
  425. })
  426. log.info("disconnected from global sync: " + space.id)
  427. setStatus(space.id, "disconnected")
  428. // TODO: Implement exponential backoff
  429. await sleep(1000)
  430. }
  431. }
  432. async function startSync(space: Info) {
  433. if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
  434. const adaptor = await getAdaptor(space.projectID, space.type)
  435. const target = await adaptor.target(space)
  436. if (target.type === "local") {
  437. void Filesystem.exists(target.directory).then((exists) => {
  438. setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
  439. })
  440. return
  441. }
  442. if (aborts.has(space.id)) return true
  443. setStatus(space.id, "disconnected")
  444. const abort = new AbortController()
  445. aborts.set(space.id, abort)
  446. void syncWorkspace(space, abort.signal).catch((error) => {
  447. aborts.delete(space.id)
  448. setStatus(space.id, "error", String(error))
  449. log.warn("workspace listener failed", {
  450. workspaceID: space.id,
  451. error,
  452. })
  453. })
  454. }
  455. function stopSync(id: WorkspaceID) {
  456. aborts.get(id)?.abort()
  457. aborts.delete(id)
  458. connections.delete(id)
  459. }
  460. }