storage.ts 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import { Log } from "../util/log"
  2. import path from "path"
  3. import fs from "fs/promises"
  4. import { Global } from "../global"
  5. import { lazy } from "../util/lazy"
  6. import { Lock } from "../util/lock"
  7. import { $ } from "bun"
  8. import { NamedError } from "@opencode-ai/util/error"
  9. import z from "zod"
  10. export namespace Storage {
  11. const log = Log.create({ service: "storage" })
  12. type Migration = (dir: string) => Promise<void>
  13. export const NotFoundError = NamedError.create(
  14. "NotFoundError",
  15. z.object({
  16. message: z.string(),
  17. }),
  18. )
  19. const MIGRATIONS: Migration[] = [
  20. async (dir) => {
  21. const project = path.resolve(dir, "../project")
  22. if (!fs.exists(project)) return
  23. for await (const projectDir of new Bun.Glob("*").scan({
  24. cwd: project,
  25. onlyFiles: false,
  26. })) {
  27. log.info(`migrating project ${projectDir}`)
  28. let projectID = projectDir
  29. const fullProjectDir = path.join(project, projectDir)
  30. let worktree = "/"
  31. if (projectID !== "global") {
  32. for await (const msgFile of new Bun.Glob("storage/session/message/*/*.json").scan({
  33. cwd: path.join(project, projectDir),
  34. absolute: true,
  35. })) {
  36. const json = await Bun.file(msgFile).json()
  37. worktree = json.path?.root
  38. if (worktree) break
  39. }
  40. if (!worktree) continue
  41. if (!(await fs.exists(worktree))) continue
  42. const [id] = await $`git rev-list --max-parents=0 --all`
  43. .quiet()
  44. .nothrow()
  45. .cwd(worktree)
  46. .text()
  47. .then((x) =>
  48. x
  49. .split("\n")
  50. .filter(Boolean)
  51. .map((x) => x.trim())
  52. .toSorted(),
  53. )
  54. if (!id) continue
  55. projectID = id
  56. await Bun.write(
  57. path.join(dir, "project", projectID + ".json"),
  58. JSON.stringify({
  59. id,
  60. vcs: "git",
  61. worktree,
  62. time: {
  63. created: Date.now(),
  64. initialized: Date.now(),
  65. },
  66. }),
  67. )
  68. log.info(`migrating sessions for project ${projectID}`)
  69. for await (const sessionFile of new Bun.Glob("storage/session/info/*.json").scan({
  70. cwd: fullProjectDir,
  71. absolute: true,
  72. })) {
  73. const dest = path.join(dir, "session", projectID, path.basename(sessionFile))
  74. log.info("copying", {
  75. sessionFile,
  76. dest,
  77. })
  78. const session = await Bun.file(sessionFile).json()
  79. await Bun.write(dest, JSON.stringify(session))
  80. log.info(`migrating messages for session ${session.id}`)
  81. for await (const msgFile of new Bun.Glob(`storage/session/message/${session.id}/*.json`).scan({
  82. cwd: fullProjectDir,
  83. absolute: true,
  84. })) {
  85. const dest = path.join(dir, "message", session.id, path.basename(msgFile))
  86. log.info("copying", {
  87. msgFile,
  88. dest,
  89. })
  90. const message = await Bun.file(msgFile).json()
  91. await Bun.write(dest, JSON.stringify(message))
  92. log.info(`migrating parts for message ${message.id}`)
  93. for await (const partFile of new Bun.Glob(`storage/session/part/${session.id}/${message.id}/*.json`).scan(
  94. {
  95. cwd: fullProjectDir,
  96. absolute: true,
  97. },
  98. )) {
  99. const dest = path.join(dir, "part", message.id, path.basename(partFile))
  100. const part = await Bun.file(partFile).json()
  101. log.info("copying", {
  102. partFile,
  103. dest,
  104. })
  105. await Bun.write(dest, JSON.stringify(part))
  106. }
  107. }
  108. }
  109. }
  110. }
  111. },
  112. async (dir) => {
  113. for await (const item of new Bun.Glob("session/*/*.json").scan({
  114. cwd: dir,
  115. absolute: true,
  116. })) {
  117. const session = await Bun.file(item).json()
  118. if (!session.projectID) continue
  119. if (!session.summary?.diffs) continue
  120. const { diffs } = session.summary
  121. await Bun.file(path.join(dir, "session_diff", session.id + ".json")).write(JSON.stringify(diffs))
  122. await Bun.file(path.join(dir, "session", session.projectID, session.id + ".json")).write(
  123. JSON.stringify({
  124. ...session,
  125. summary: {
  126. additions: diffs.reduce((sum: any, x: any) => sum + x.additions, 0),
  127. deletions: diffs.reduce((sum: any, x: any) => sum + x.deletions, 0),
  128. },
  129. }),
  130. )
  131. }
  132. },
  133. ]
  134. const state = lazy(async () => {
  135. const dir = path.join(Global.Path.data, "storage")
  136. const migration = await Bun.file(path.join(dir, "migration"))
  137. .json()
  138. .then((x) => parseInt(x))
  139. .catch(() => 0)
  140. for (let index = migration; index < MIGRATIONS.length; index++) {
  141. log.info("running migration", { index })
  142. const migration = MIGRATIONS[index]
  143. await migration(dir).catch(() => log.error("failed to run migration", { index }))
  144. await Bun.write(path.join(dir, "migration"), (index + 1).toString())
  145. }
  146. return {
  147. dir,
  148. }
  149. })
  150. export async function remove(key: string[]) {
  151. const dir = await state().then((x) => x.dir)
  152. const target = path.join(dir, ...key) + ".json"
  153. return withErrorHandling(async () => {
  154. await fs.unlink(target).catch(() => {})
  155. })
  156. }
  157. export async function read<T>(key: string[]) {
  158. const dir = await state().then((x) => x.dir)
  159. const target = path.join(dir, ...key) + ".json"
  160. return withErrorHandling(async () => {
  161. using _ = await Lock.read(target)
  162. const result = await Bun.file(target).json()
  163. return result as T
  164. })
  165. }
  166. export async function update<T>(key: string[], fn: (draft: T) => void) {
  167. const dir = await state().then((x) => x.dir)
  168. const target = path.join(dir, ...key) + ".json"
  169. return withErrorHandling(async () => {
  170. using _ = await Lock.write(target)
  171. const content = await Bun.file(target).json()
  172. fn(content)
  173. await Bun.write(target, JSON.stringify(content, null, 2))
  174. return content as T
  175. })
  176. }
  177. export async function write<T>(key: string[], content: T) {
  178. const dir = await state().then((x) => x.dir)
  179. const target = path.join(dir, ...key) + ".json"
  180. return withErrorHandling(async () => {
  181. using _ = await Lock.write(target)
  182. await Bun.write(target, JSON.stringify(content, null, 2))
  183. })
  184. }
  185. async function withErrorHandling<T>(body: () => Promise<T>) {
  186. return body().catch((e) => {
  187. if (!(e instanceof Error)) throw e
  188. const errnoException = e as NodeJS.ErrnoException
  189. if (errnoException.code === "ENOENT") {
  190. throw new NotFoundError({ message: `Resource not found: ${errnoException.path}` })
  191. }
  192. throw e
  193. })
  194. }
  195. const glob = new Bun.Glob("**/*")
  196. export async function list(prefix: string[]) {
  197. const dir = await state().then((x) => x.dir)
  198. try {
  199. const result = await Array.fromAsync(
  200. glob.scan({
  201. cwd: path.join(dir, ...prefix),
  202. onlyFiles: true,
  203. }),
  204. ).then((results) => results.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)]))
  205. result.sort()
  206. return result
  207. } catch {
  208. return []
  209. }
  210. }
  211. }