storage.ts 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import { Log } from "../util/log"
  2. import path from "path"
  3. import fs from "fs/promises"
  4. import { Global } from "../global"
  5. import { Filesystem } from "../util/filesystem"
  6. import { lazy } from "../util/lazy"
  7. import { Lock } from "../util/lock"
  8. import { $ } from "bun"
  9. import { NamedError } from "@opencode-ai/util/error"
  10. import z from "zod"
  11. export namespace Storage {
  12. const log = Log.create({ service: "storage" })
  13. type Migration = (dir: string) => Promise<void>
  14. export const NotFoundError = NamedError.create(
  15. "NotFoundError",
  16. z.object({
  17. message: z.string(),
  18. }),
  19. )
  20. const MIGRATIONS: Migration[] = [
  21. async (dir) => {
  22. const project = path.resolve(dir, "../project")
  23. if (!(await Filesystem.isDir(project))) return
  24. for await (const projectDir of new Bun.Glob("*").scan({
  25. cwd: project,
  26. onlyFiles: false,
  27. })) {
  28. log.info(`migrating project ${projectDir}`)
  29. let projectID = projectDir
  30. const fullProjectDir = path.join(project, projectDir)
  31. let worktree = "/"
  32. if (projectID !== "global") {
  33. for await (const msgFile of new Bun.Glob("storage/session/message/*/*.json").scan({
  34. cwd: path.join(project, projectDir),
  35. absolute: true,
  36. })) {
  37. const json = await Bun.file(msgFile).json()
  38. worktree = json.path?.root
  39. if (worktree) break
  40. }
  41. if (!worktree) continue
  42. if (!(await Filesystem.isDir(worktree))) continue
  43. const [id] = await $`git rev-list --max-parents=0 --all`
  44. .quiet()
  45. .nothrow()
  46. .cwd(worktree)
  47. .text()
  48. .then((x) =>
  49. x
  50. .split("\n")
  51. .filter(Boolean)
  52. .map((x) => x.trim())
  53. .toSorted(),
  54. )
  55. if (!id) continue
  56. projectID = id
  57. await Bun.write(
  58. path.join(dir, "project", projectID + ".json"),
  59. JSON.stringify({
  60. id,
  61. vcs: "git",
  62. worktree,
  63. time: {
  64. created: Date.now(),
  65. initialized: Date.now(),
  66. },
  67. }),
  68. )
  69. log.info(`migrating sessions for project ${projectID}`)
  70. for await (const sessionFile of new Bun.Glob("storage/session/info/*.json").scan({
  71. cwd: fullProjectDir,
  72. absolute: true,
  73. })) {
  74. const dest = path.join(dir, "session", projectID, path.basename(sessionFile))
  75. log.info("copying", {
  76. sessionFile,
  77. dest,
  78. })
  79. const session = await Bun.file(sessionFile).json()
  80. await Bun.write(dest, JSON.stringify(session))
  81. log.info(`migrating messages for session ${session.id}`)
  82. for await (const msgFile of new Bun.Glob(`storage/session/message/${session.id}/*.json`).scan({
  83. cwd: fullProjectDir,
  84. absolute: true,
  85. })) {
  86. const dest = path.join(dir, "message", session.id, path.basename(msgFile))
  87. log.info("copying", {
  88. msgFile,
  89. dest,
  90. })
  91. const message = await Bun.file(msgFile).json()
  92. await Bun.write(dest, JSON.stringify(message))
  93. log.info(`migrating parts for message ${message.id}`)
  94. for await (const partFile of new Bun.Glob(`storage/session/part/${session.id}/${message.id}/*.json`).scan(
  95. {
  96. cwd: fullProjectDir,
  97. absolute: true,
  98. },
  99. )) {
  100. const dest = path.join(dir, "part", message.id, path.basename(partFile))
  101. const part = await Bun.file(partFile).json()
  102. log.info("copying", {
  103. partFile,
  104. dest,
  105. })
  106. await Bun.write(dest, JSON.stringify(part))
  107. }
  108. }
  109. }
  110. }
  111. }
  112. },
  113. async (dir) => {
  114. for await (const item of new Bun.Glob("session/*/*.json").scan({
  115. cwd: dir,
  116. absolute: true,
  117. })) {
  118. const session = await Bun.file(item).json()
  119. if (!session.projectID) continue
  120. if (!session.summary?.diffs) continue
  121. const { diffs } = session.summary
  122. await Bun.file(path.join(dir, "session_diff", session.id + ".json")).write(JSON.stringify(diffs))
  123. await Bun.file(path.join(dir, "session", session.projectID, session.id + ".json")).write(
  124. JSON.stringify({
  125. ...session,
  126. summary: {
  127. additions: diffs.reduce((sum: any, x: any) => sum + x.additions, 0),
  128. deletions: diffs.reduce((sum: any, x: any) => sum + x.deletions, 0),
  129. },
  130. }),
  131. )
  132. }
  133. },
  134. ]
  135. const state = lazy(async () => {
  136. const dir = path.join(Global.Path.data, "storage")
  137. const migration = await Bun.file(path.join(dir, "migration"))
  138. .json()
  139. .then((x) => parseInt(x))
  140. .catch(() => 0)
  141. for (let index = migration; index < MIGRATIONS.length; index++) {
  142. log.info("running migration", { index })
  143. const migration = MIGRATIONS[index]
  144. await migration(dir).catch(() => log.error("failed to run migration", { index }))
  145. await Bun.write(path.join(dir, "migration"), (index + 1).toString())
  146. }
  147. return {
  148. dir,
  149. }
  150. })
  151. export async function remove(key: string[]) {
  152. const dir = await state().then((x) => x.dir)
  153. const target = path.join(dir, ...key) + ".json"
  154. return withErrorHandling(async () => {
  155. await fs.unlink(target).catch(() => {})
  156. })
  157. }
  158. export async function read<T>(key: string[]) {
  159. const dir = await state().then((x) => x.dir)
  160. const target = path.join(dir, ...key) + ".json"
  161. return withErrorHandling(async () => {
  162. using _ = await Lock.read(target)
  163. const result = await Bun.file(target).json()
  164. return result as T
  165. })
  166. }
  167. export async function update<T>(key: string[], fn: (draft: T) => void) {
  168. const dir = await state().then((x) => x.dir)
  169. const target = path.join(dir, ...key) + ".json"
  170. return withErrorHandling(async () => {
  171. using _ = await Lock.write(target)
  172. const content = await Bun.file(target).json()
  173. fn(content)
  174. await Bun.write(target, JSON.stringify(content, null, 2))
  175. return content as T
  176. })
  177. }
  178. export async function write<T>(key: string[], content: T) {
  179. const dir = await state().then((x) => x.dir)
  180. const target = path.join(dir, ...key) + ".json"
  181. return withErrorHandling(async () => {
  182. using _ = await Lock.write(target)
  183. await Bun.write(target, JSON.stringify(content, null, 2))
  184. })
  185. }
  186. async function withErrorHandling<T>(body: () => Promise<T>) {
  187. return body().catch((e) => {
  188. if (!(e instanceof Error)) throw e
  189. const errnoException = e as NodeJS.ErrnoException
  190. if (errnoException.code === "ENOENT") {
  191. throw new NotFoundError({ message: `Resource not found: ${errnoException.path}` })
  192. }
  193. throw e
  194. })
  195. }
  196. const glob = new Bun.Glob("**/*")
  197. export async function list(prefix: string[]) {
  198. const dir = await state().then((x) => x.dir)
  199. try {
  200. const result = await Array.fromAsync(
  201. glob.scan({
  202. cwd: path.join(dir, ...prefix),
  203. onlyFiles: true,
  204. }),
  205. ).then((results) => results.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)]))
  206. result.sort()
  207. return result
  208. } catch {
  209. return []
  210. }
  211. }
  212. }