|
@@ -5,7 +5,9 @@ import { Database, eq } from "@/storage/db"
|
|
|
import { Project } from "@/project/project"
|
|
import { Project } from "@/project/project"
|
|
|
import { BusEvent } from "@/bus/bus-event"
|
|
import { BusEvent } from "@/bus/bus-event"
|
|
|
import { GlobalBus } from "@/bus/global"
|
|
import { GlobalBus } from "@/bus/global"
|
|
|
|
|
+import { SyncEvent } from "@/sync"
|
|
|
import { Log } from "@/util/log"
|
|
import { Log } from "@/util/log"
|
|
|
|
|
+import { Filesystem } from "@/util/filesystem"
|
|
|
import { ProjectID } from "@/project/schema"
|
|
import { ProjectID } from "@/project/schema"
|
|
|
import { WorkspaceTable } from "./workspace.sql"
|
|
import { WorkspaceTable } from "./workspace.sql"
|
|
|
import { getAdaptor } from "./adaptors"
|
|
import { getAdaptor } from "./adaptors"
|
|
@@ -14,6 +16,18 @@ import { WorkspaceID } from "./schema"
|
|
|
import { parseSSE } from "./sse"
|
|
import { parseSSE } from "./sse"
|
|
|
|
|
|
|
|
export namespace Workspace {
|
|
export namespace Workspace {
|
|
|
|
|
+ export const Info = WorkspaceInfo.meta({
|
|
|
|
|
+ ref: "Workspace",
|
|
|
|
|
+ })
|
|
|
|
|
+ export type Info = z.infer<typeof Info>
|
|
|
|
|
+
|
|
|
|
|
+ export const ConnectionStatus = z.object({
|
|
|
|
|
+ workspaceID: WorkspaceID.zod,
|
|
|
|
|
+ status: z.enum(["connected", "connecting", "disconnected", "error"]),
|
|
|
|
|
+ error: z.string().optional(),
|
|
|
|
|
+ })
|
|
|
|
|
+ export type ConnectionStatus = z.infer<typeof ConnectionStatus>
|
|
|
|
|
+
|
|
|
export const Event = {
|
|
export const Event = {
|
|
|
Ready: BusEvent.define(
|
|
Ready: BusEvent.define(
|
|
|
"workspace.ready",
|
|
"workspace.ready",
|
|
@@ -27,13 +41,9 @@ export namespace Workspace {
|
|
|
message: z.string(),
|
|
message: z.string(),
|
|
|
}),
|
|
}),
|
|
|
),
|
|
),
|
|
|
|
|
+ Status: BusEvent.define("workspace.status", ConnectionStatus),
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- export const Info = WorkspaceInfo.meta({
|
|
|
|
|
- ref: "Workspace",
|
|
|
|
|
- })
|
|
|
|
|
- export type Info = z.infer<typeof Info>
|
|
|
|
|
-
|
|
|
|
|
function fromRow(row: typeof WorkspaceTable.$inferSelect): Info {
|
|
function fromRow(row: typeof WorkspaceTable.$inferSelect): Info {
|
|
|
return {
|
|
return {
|
|
|
id: row.id,
|
|
id: row.id,
|
|
@@ -85,6 +95,9 @@ export namespace Workspace {
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
await adaptor.create(config)
|
|
await adaptor.create(config)
|
|
|
|
|
+
|
|
|
|
|
+ startSync(info)
|
|
|
|
|
+
|
|
|
return info
|
|
return info
|
|
|
})
|
|
})
|
|
|
|
|
|
|
@@ -92,18 +105,24 @@ export namespace Workspace {
|
|
|
const rows = Database.use((db) =>
|
|
const rows = Database.use((db) =>
|
|
|
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
|
|
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
|
|
|
)
|
|
)
|
|
|
- return rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
|
|
|
|
|
|
|
+ const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
|
|
|
|
|
+ for (const space of spaces) startSync(space)
|
|
|
|
|
+ return spaces
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
export const get = fn(WorkspaceID.zod, async (id) => {
|
|
export const get = fn(WorkspaceID.zod, async (id) => {
|
|
|
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
|
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
|
|
if (!row) return
|
|
if (!row) return
|
|
|
- return fromRow(row)
|
|
|
|
|
|
|
+ const space = fromRow(row)
|
|
|
|
|
+ startSync(space)
|
|
|
|
|
+ return space
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
export const remove = fn(WorkspaceID.zod, async (id) => {
|
|
export const remove = fn(WorkspaceID.zod, async (id) => {
|
|
|
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
|
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
|
|
if (row) {
|
|
if (row) {
|
|
|
|
|
+ stopSync(id)
|
|
|
|
|
+
|
|
|
const info = fromRow(row)
|
|
const info = fromRow(row)
|
|
|
const adaptor = await getAdaptor(row.type)
|
|
const adaptor = await getAdaptor(row.type)
|
|
|
adaptor.remove(info)
|
|
adaptor.remove(info)
|
|
@@ -111,58 +130,100 @@ export namespace Workspace {
|
|
|
return info
|
|
return info
|
|
|
}
|
|
}
|
|
|
})
|
|
})
|
|
|
|
|
+
|
|
|
|
|
+ const connections = new Map<WorkspaceID, ConnectionStatus>()
|
|
|
|
|
+ const aborts = new Map<WorkspaceID, AbortController>()
|
|
|
|
|
+
|
|
|
|
|
+ function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
|
|
|
|
|
+ const prev = connections.get(id)
|
|
|
|
|
+ if (prev?.status === status && prev?.error === error) return
|
|
|
|
|
+ const next = { workspaceID: id, status, error }
|
|
|
|
|
+ connections.set(id, next)
|
|
|
|
|
+ GlobalBus.emit("event", {
|
|
|
|
|
+ directory: "global",
|
|
|
|
|
+ workspace: id,
|
|
|
|
|
+ payload: {
|
|
|
|
|
+ type: Event.Status.type,
|
|
|
|
|
+ properties: next,
|
|
|
|
|
+ },
|
|
|
|
|
+ })
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ export function status(): ConnectionStatus[] {
|
|
|
|
|
+ return [...connections.values()]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
const log = Log.create({ service: "workspace-sync" })
|
|
const log = Log.create({ service: "workspace-sync" })
|
|
|
|
|
|
|
|
- async function workspaceEventLoop(space: Info, stop: AbortSignal) {
|
|
|
|
|
- while (!stop.aborted) {
|
|
|
|
|
- const adaptor = await getAdaptor(space.type)
|
|
|
|
|
- const target = await Promise.resolve(adaptor.target(space))
|
|
|
|
|
|
|
+ async function workspaceEventLoop(space: Info, signal: AbortSignal) {
|
|
|
|
|
+ log.info("starting sync: " + space.id)
|
|
|
|
|
|
|
|
- if (target.type === "local") {
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ while (!signal.aborted) {
|
|
|
|
|
+ log.info("connecting to sync: " + space.id)
|
|
|
|
|
|
|
|
- const baseURL = String(target.url).replace(/\/?$/, "/")
|
|
|
|
|
|
|
+ setStatus(space.id, "connecting")
|
|
|
|
|
+ const adaptor = await getAdaptor(space.type)
|
|
|
|
|
+ const target = await adaptor.target(space)
|
|
|
|
|
+
|
|
|
|
|
+ if (target.type === "local") return
|
|
|
|
|
|
|
|
- const res = await fetch(new URL(baseURL + "/event"), {
|
|
|
|
|
- method: "GET",
|
|
|
|
|
- signal: stop,
|
|
|
|
|
|
|
+ const res = await fetch(target.url + "/sync/event", { method: "GET", signal }).catch((err: unknown) => {
|
|
|
|
|
+ setStatus(space.id, "error", String(err))
|
|
|
|
|
+ return undefined
|
|
|
})
|
|
})
|
|
|
|
|
+ if (!res || !res.ok || !res.body) {
|
|
|
|
|
+ log.info("failed to connect to sync: " + res?.status)
|
|
|
|
|
|
|
|
- if (!res.ok || !res.body) {
|
|
|
|
|
|
|
+ setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response")
|
|
|
await sleep(1000)
|
|
await sleep(1000)
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- // await parseSSE(res.body, stop, (event) => {
|
|
|
|
|
- // GlobalBus.emit("event", {
|
|
|
|
|
- // directory: space.id,
|
|
|
|
|
- // payload: event,
|
|
|
|
|
- // })
|
|
|
|
|
- // })
|
|
|
|
|
-
|
|
|
|
|
- // Wait 250ms and retry if SSE connection fails
|
|
|
|
|
|
|
+ setStatus(space.id, "connected")
|
|
|
|
|
+ await parseSSE(res.body, signal, (evt) => {
|
|
|
|
|
+ const event = evt as SyncEvent.SerializedEvent
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ if (!event.type.startsWith("server.")) {
|
|
|
|
|
+ SyncEvent.replay(event)
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (err) {
|
|
|
|
|
+ log.warn("failed to replay sync event", {
|
|
|
|
|
+ workspaceID: space.id,
|
|
|
|
|
+ error: err,
|
|
|
|
|
+ })
|
|
|
|
|
+ }
|
|
|
|
|
+ })
|
|
|
|
|
+ setStatus(space.id, "disconnected")
|
|
|
|
|
+ log.info("disconnected to sync: " + space.id)
|
|
|
await sleep(250)
|
|
await sleep(250)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- export function startSyncing(project: Project.Info) {
|
|
|
|
|
- const stop = new AbortController()
|
|
|
|
|
- const spaces = list(project).filter((space) => space.type !== "worktree")
|
|
|
|
|
|
|
+ function startSync(space: Info) {
|
|
|
|
|
+ if (space.type === "worktree") {
|
|
|
|
|
+ void Filesystem.exists(space.directory!).then((exists) => {
|
|
|
|
|
+ setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
|
|
|
|
|
+ })
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- spaces.forEach((space) => {
|
|
|
|
|
- void workspaceEventLoop(space, stop.signal).catch((error) => {
|
|
|
|
|
- log.warn("workspace sync listener failed", {
|
|
|
|
|
- workspaceID: space.id,
|
|
|
|
|
- error,
|
|
|
|
|
- })
|
|
|
|
|
|
|
+ if (aborts.has(space.id)) return
|
|
|
|
|
+ const abort = new AbortController()
|
|
|
|
|
+ aborts.set(space.id, abort)
|
|
|
|
|
+ setStatus(space.id, "disconnected")
|
|
|
|
|
+
|
|
|
|
|
+ void workspaceEventLoop(space, abort.signal).catch((error) => {
|
|
|
|
|
+ setStatus(space.id, "error", String(error))
|
|
|
|
|
+ log.warn("workspace sync listener failed", {
|
|
|
|
|
+ workspaceID: space.id,
|
|
|
|
|
+ error,
|
|
|
})
|
|
})
|
|
|
})
|
|
})
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- return {
|
|
|
|
|
- async stop() {
|
|
|
|
|
- stop.abort()
|
|
|
|
|
- },
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ function stopSync(id: WorkspaceID) {
|
|
|
|
|
+ aborts.get(id)?.abort()
|
|
|
|
|
+ aborts.delete(id)
|
|
|
|
|
+ connections.delete(id)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|