| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525 |
- import z from "zod"
- import { setTimeout as sleep } from "node:timers/promises"
- import { fn } from "@/util/fn"
- import { Database, asc, eq, inArray } from "@/storage/db"
- import { Project } from "@/project"
- import { BusEvent } from "@/bus/bus-event"
- import { GlobalBus } from "@/bus/global"
- import { SyncEvent } from "@/sync"
- import { EventTable } from "@/sync/event.sql"
- import { Flag } from "@/flag/flag"
- import { Log } from "@/util"
- import { Filesystem } from "@/util"
- import { ProjectID } from "@/project/schema"
- import { Slug } from "@opencode-ai/shared/util/slug"
- import { WorkspaceTable } from "./workspace.sql"
- import { getAdaptor } from "./adaptors"
- import { WorkspaceInfo } from "./types"
- import { WorkspaceID } from "./schema"
- import { parseSSE } from "./sse"
- import { Session } from "@/session"
- import { SessionTable } from "@/session/session.sql"
- import { SessionID } from "@/session/schema"
- import { errorData } from "@/util/error"
- import { AppRuntime } from "@/effect/app-runtime"
- import { EventSequenceTable } from "@/sync/event.sql"
- import { waitEvent } from "./util"
- export namespace Workspace {
- export const Info = WorkspaceInfo.meta({
- ref: "Workspace",
- })
- export type Info = z.infer<typeof Info>
- export const ConnectionStatus = z.object({
- workspaceID: WorkspaceID.zod,
- status: z.enum(["connected", "connecting", "disconnected", "error"]),
- error: z.string().optional(),
- })
- export type ConnectionStatus = z.infer<typeof ConnectionStatus>
- const Restore = z.object({
- workspaceID: WorkspaceID.zod,
- sessionID: SessionID.zod,
- total: z.number().int().min(0),
- step: z.number().int().min(0),
- })
- export const Event = {
- Ready: BusEvent.define(
- "workspace.ready",
- z.object({
- name: z.string(),
- }),
- ),
- Failed: BusEvent.define(
- "workspace.failed",
- z.object({
- message: z.string(),
- }),
- ),
- Restore: BusEvent.define("workspace.restore", Restore),
- Status: BusEvent.define("workspace.status", ConnectionStatus),
- }
- function fromRow(row: typeof WorkspaceTable.$inferSelect): Info {
- return {
- id: row.id,
- type: row.type,
- branch: row.branch,
- name: row.name,
- directory: row.directory,
- extra: row.extra,
- projectID: row.project_id,
- }
- }
- const CreateInput = z.object({
- id: WorkspaceID.zod.optional(),
- type: Info.shape.type,
- branch: Info.shape.branch,
- projectID: ProjectID.zod,
- extra: Info.shape.extra,
- })
- export const create = fn(CreateInput, async (input) => {
- const id = WorkspaceID.ascending(input.id)
- const adaptor = await getAdaptor(input.projectID, input.type)
- const config = await adaptor.configure({ ...input, id, name: Slug.create(), directory: null })
- const info: Info = {
- id,
- type: config.type,
- branch: config.branch ?? null,
- name: config.name ?? null,
- directory: config.directory ?? null,
- extra: config.extra ?? null,
- projectID: input.projectID,
- }
- Database.use((db) => {
- db.insert(WorkspaceTable)
- .values({
- id: info.id,
- type: info.type,
- branch: info.branch,
- name: info.name,
- directory: info.directory,
- extra: info.extra,
- project_id: info.projectID,
- })
- .run()
- })
- await adaptor.create(config)
- void startSync(info)
- await waitEvent({
- timeout: TIMEOUT,
- fn(event) {
- if (event.workspace === info.id && event.payload.type === Event.Status.type) {
- const { status } = event.payload.properties
- return status === "error" || status === "connected"
- }
- return false
- },
- })
- return info
- })
- const SessionRestoreInput = z.object({
- workspaceID: WorkspaceID.zod,
- sessionID: SessionID.zod,
- })
- export const sessionRestore = fn(SessionRestoreInput, async (input) => {
- log.info("session restore requested", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- })
- try {
- const space = await get(input.workspaceID)
- if (!space) throw new Error(`Workspace not found: ${input.workspaceID}`)
- const adaptor = await getAdaptor(space.projectID, space.type)
- const target = await adaptor.target(space)
- // Need to switch the workspace of the session
- SyncEvent.run(Session.Event.Updated, {
- sessionID: input.sessionID,
- info: {
- workspaceID: input.workspaceID,
- },
- })
- const rows = Database.use((db) =>
- db
- .select({
- id: EventTable.id,
- aggregateID: EventTable.aggregate_id,
- seq: EventTable.seq,
- type: EventTable.type,
- data: EventTable.data,
- })
- .from(EventTable)
- .where(eq(EventTable.aggregate_id, input.sessionID))
- .orderBy(asc(EventTable.seq))
- .all(),
- )
- if (rows.length === 0) throw new Error(`No events found for session: ${input.sessionID}`)
- const all = rows
- const size = 10
- const sets = Array.from({ length: Math.ceil(all.length / size) }, (_, i) => all.slice(i * size, (i + 1) * size))
- const total = sets.length
- log.info("session restore prepared", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- workspaceType: space.type,
- directory: space.directory,
- target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
- events: all.length,
- batches: total,
- first: all[0]?.seq,
- last: all.at(-1)?.seq,
- })
- GlobalBus.emit("event", {
- directory: "global",
- workspace: input.workspaceID,
- payload: {
- type: Event.Restore.type,
- properties: {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- total,
- step: 0,
- },
- },
- })
- for (const [i, events] of sets.entries()) {
- log.info("session restore batch starting", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- step: i + 1,
- total,
- events: events.length,
- first: events[0]?.seq,
- last: events.at(-1)?.seq,
- target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
- })
- if (target.type === "local") {
- SyncEvent.replayAll(events)
- log.info("session restore batch replayed locally", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- step: i + 1,
- total,
- events: events.length,
- })
- } else {
- const url = route(target.url, "/sync/replay")
- const headers = new Headers(target.headers)
- headers.set("content-type", "application/json")
- const res = await fetch(url, {
- method: "POST",
- headers,
- body: JSON.stringify({
- directory: space.directory ?? "",
- events,
- }),
- })
- if (!res.ok) {
- const body = await res.text()
- log.error("session restore batch failed", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- step: i + 1,
- total,
- status: res.status,
- body,
- })
- throw new Error(
- `Failed to replay session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${res.status} ${body}`,
- )
- }
- log.info("session restore batch posted", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- step: i + 1,
- total,
- status: res.status,
- })
- }
- GlobalBus.emit("event", {
- directory: "global",
- workspace: input.workspaceID,
- payload: {
- type: Event.Restore.type,
- properties: {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- total,
- step: i + 1,
- },
- },
- })
- }
- log.info("session restore complete", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- batches: total,
- })
- return {
- total,
- }
- } catch (err) {
- log.error("session restore failed", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- error: errorData(err),
- })
- throw err
- }
- })
- export function list(project: Project.Info) {
- const rows = Database.use((db) =>
- db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
- )
- const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
- for (const space of spaces) void startSync(space)
- return spaces
- }
- function lookup(id: WorkspaceID) {
- const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
- if (!row) return
- return fromRow(row)
- }
- export const get = fn(WorkspaceID.zod, async (id) => {
- const space = lookup(id)
- if (!space) return
- void startSync(space)
- return space
- })
- export const remove = fn(WorkspaceID.zod, async (id) => {
- const sessions = Database.use((db) =>
- db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, id)).all(),
- )
- for (const session of sessions) {
- await AppRuntime.runPromise(Session.Service.use((svc) => svc.remove(session.id)))
- }
- const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
- if (row) {
- stopSync(id)
- const info = fromRow(row)
- try {
- const adaptor = await getAdaptor(info.projectID, row.type)
- await adaptor.remove(info)
- } catch {
- log.error("adaptor not available when removing workspace", { type: row.type })
- }
- Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
- return info
- }
- })
- const connections = new Map<WorkspaceID, ConnectionStatus>()
- const aborts = new Map<WorkspaceID, AbortController>()
- const TIMEOUT = 5000
- function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
- const prev = connections.get(id)
- if (prev?.status === status && prev?.error === error) return
- const next = { workspaceID: id, status, error }
- connections.set(id, next)
- if (status === "error") {
- aborts.delete(id)
- }
- GlobalBus.emit("event", {
- directory: "global",
- workspace: id,
- payload: {
- type: Event.Status.type,
- properties: next,
- },
- })
- }
- export function status(): ConnectionStatus[] {
- return [...connections.values()]
- }
- function synced(state: Record<string, number>) {
- const ids = Object.keys(state)
- if (ids.length === 0) return true
- const done = Object.fromEntries(
- Database.use((db) =>
- db
- .select({
- id: EventSequenceTable.aggregate_id,
- seq: EventSequenceTable.seq,
- })
- .from(EventSequenceTable)
- .where(inArray(EventSequenceTable.aggregate_id, ids))
- .all(),
- ).map((row) => [row.id, row.seq]),
- ) as Record<string, number>
- return ids.every((id) => {
- return (done[id] ?? -1) >= state[id]
- })
- }
- export async function isSyncing(workspaceID: WorkspaceID) {
- return aborts.has(workspaceID)
- }
- export async function waitForSync(workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) {
- if (synced(state)) return
- try {
- await waitEvent({
- timeout: TIMEOUT,
- signal,
- fn(event) {
- if (event.workspace !== workspaceID && event.payload.type !== "sync") {
- return false
- }
- return synced(state)
- },
- })
- } catch {
- if (signal?.aborted) throw signal.reason ?? new Error("Request aborted")
- throw new Error(`Timed out waiting for sync fence: ${JSON.stringify(state)}`)
- }
- }
- const log = Log.create({ service: "workspace-sync" })
- function route(url: string | URL, path: string) {
- const next = new URL(url)
- next.pathname = `${next.pathname.replace(/\/$/, "")}${path}`
- next.search = ""
- next.hash = ""
- return next
- }
- async function syncWorkspace(space: Info, signal: AbortSignal) {
- while (!signal.aborted) {
- log.info("connecting to global sync", { workspace: space.name })
- setStatus(space.id, "connecting")
- const adaptor = await getAdaptor(space.projectID, space.type)
- const target = await adaptor.target(space)
- if (target.type === "local") return
- const res = await fetch(route(target.url, "/global/event"), {
- method: "GET",
- headers: target.headers,
- signal,
- }).catch((err: unknown) => {
- setStatus(space.id, "error", err instanceof Error ? err.message : String(err))
- log.info("failed to connect to global sync", {
- workspace: space.name,
- error: err,
- })
- return undefined
- })
- if (!res || !res.ok || !res.body) {
- const error = !res ? "No response from global sync" : `Global sync HTTP ${res.status}`
- log.info("failed to connect to global sync", { workspace: space.name, error })
- setStatus(space.id, "error", error)
- await sleep(1000)
- continue
- }
- log.info("global sync connected", { workspace: space.name })
- setStatus(space.id, "connected")
- await parseSSE(res.body, signal, (evt: any) => {
- try {
- if (!("payload" in evt)) return
- if (evt.payload.type === "sync") {
- // This name -> type is temporary
- SyncEvent.replay({ ...evt.payload, type: evt.payload.name } as SyncEvent.SerializedEvent)
- }
- GlobalBus.emit("event", {
- directory: evt.directory,
- project: evt.project,
- workspace: space.id,
- payload: evt.payload,
- })
- } catch (err) {
- log.info("failed to replay global event", {
- workspaceID: space.id,
- error: err,
- })
- }
- })
- log.info("disconnected from global sync: " + space.id)
- setStatus(space.id, "disconnected")
- // TODO: Implement exponential backoff
- await sleep(1000)
- }
- }
- async function startSync(space: Info) {
- if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
- const adaptor = await getAdaptor(space.projectID, space.type)
- const target = await adaptor.target(space)
- if (target.type === "local") {
- void Filesystem.exists(target.directory).then((exists) => {
- setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
- })
- return
- }
- if (aborts.has(space.id)) return true
- setStatus(space.id, "disconnected")
- const abort = new AbortController()
- aborts.set(space.id, abort)
- void syncWorkspace(space, abort.signal).catch((error) => {
- aborts.delete(space.id)
- setStatus(space.id, "error", String(error))
- log.warn("workspace listener failed", {
- workspaceID: space.id,
- error,
- })
- })
- }
- function stopSync(id: WorkspaceID) {
- aborts.get(id)?.abort()
- aborts.delete(id)
- connections.delete(id)
- }
- }
|