storage.ts 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import { Log } from "../util/log"
  2. import { App } from "../app/app"
  3. import { Bus } from "../bus"
  4. import path from "path"
  5. import z from "zod"
  6. import fs from "fs/promises"
  7. import { MessageV2 } from "../session/message-v2"
  8. import { Identifier } from "../id/id"
  9. export namespace Storage {
  10. const log = Log.create({ service: "storage" })
  11. export const Event = {
  12. Write: Bus.event("storage.write", z.object({ key: z.string(), content: z.any() })),
  13. }
  14. type Migration = (dir: string) => Promise<void>
  15. const MIGRATIONS: Migration[] = [
  16. async (dir: string) => {
  17. try {
  18. const files = new Bun.Glob("session/message/*/*.json").scanSync({
  19. cwd: dir,
  20. absolute: true,
  21. })
  22. for (const file of files) {
  23. const content = await Bun.file(file).json()
  24. if (!content.metadata) continue
  25. log.info("migrating to v2 message", { file })
  26. try {
  27. const result = MessageV2.fromV1(content)
  28. await Bun.write(
  29. file,
  30. JSON.stringify(
  31. {
  32. ...result.info,
  33. parts: result.parts,
  34. },
  35. null,
  36. 2,
  37. ),
  38. )
  39. } catch (e) {
  40. await fs.rename(file, file.replace("storage", "broken"))
  41. }
  42. }
  43. } catch {}
  44. },
  45. async (dir: string) => {
  46. const files = new Bun.Glob("session/message/*/*.json").scanSync({
  47. cwd: dir,
  48. absolute: true,
  49. })
  50. for (const file of files) {
  51. try {
  52. const { parts, ...info } = await Bun.file(file).json()
  53. if (!parts) continue
  54. for (const part of parts) {
  55. const id = Identifier.ascending("part")
  56. await Bun.write(
  57. [dir, "session", "part", info.sessionID, info.id, id + ".json"].join("/"),
  58. JSON.stringify({
  59. ...part,
  60. id,
  61. sessionID: info.sessionID,
  62. messageID: info.id,
  63. ...(part.type === "tool" ? { callID: part.id } : {}),
  64. }),
  65. )
  66. }
  67. await Bun.write(file, JSON.stringify(info, null, 2))
  68. } catch (e) {}
  69. }
  70. },
  71. async (dir: string) => {
  72. const files = new Bun.Glob("session/message/*/*.json").scanSync({
  73. cwd: dir,
  74. absolute: true,
  75. })
  76. for (const file of files) {
  77. try {
  78. const content = await Bun.file(file).json()
  79. if (content.role === "assistant" && !content.mode) {
  80. log.info("adding mode field to message", { file })
  81. content.mode = "build"
  82. await Bun.write(file, JSON.stringify(content, null, 2))
  83. }
  84. } catch (e) {}
  85. }
  86. },
  87. ]
  88. const state = App.state("storage", async () => {
  89. const app = App.info()
  90. const dir = path.normalize(path.join(app.path.data, "storage"))
  91. await fs.mkdir(dir, { recursive: true })
  92. const migration = await Bun.file(path.join(dir, "migration"))
  93. .json()
  94. .then((x) => parseInt(x))
  95. .catch(() => 0)
  96. for (let index = migration; index < MIGRATIONS.length; index++) {
  97. log.info("running migration", { index })
  98. const migration = MIGRATIONS[index]
  99. await migration(dir)
  100. await Bun.write(path.join(dir, "migration"), (index + 1).toString())
  101. }
  102. return {
  103. dir,
  104. }
  105. })
  106. export async function remove(key: string) {
  107. const dir = await state().then((x) => x.dir)
  108. const target = path.join(dir, key + ".json")
  109. await fs.unlink(target).catch(() => {})
  110. }
  111. export async function removeDir(key: string) {
  112. const dir = await state().then((x) => x.dir)
  113. const target = path.join(dir, key)
  114. await fs.rm(target, { recursive: true, force: true }).catch(() => {})
  115. }
  116. export async function readJSON<T>(key: string) {
  117. const dir = await state().then((x) => x.dir)
  118. return Bun.file(path.join(dir, key + ".json")).json() as Promise<T>
  119. }
  120. export async function writeJSON<T>(key: string, content: T) {
  121. const dir = await state().then((x) => x.dir)
  122. const target = path.join(dir, key + ".json")
  123. const tmp = target + Date.now() + ".tmp"
  124. await Bun.write(tmp, JSON.stringify(content, null, 2))
  125. await fs.rename(tmp, target).catch(() => {})
  126. await fs.unlink(tmp).catch(() => {})
  127. Bus.publish(Event.Write, { key, content })
  128. }
  129. const glob = new Bun.Glob("**/*")
  130. export async function list(prefix: string) {
  131. const dir = await state().then((x) => x.dir)
  132. try {
  133. const result = await Array.fromAsync(
  134. glob.scan({
  135. cwd: path.join(dir, prefix),
  136. onlyFiles: true,
  137. }),
  138. ).then((items) => items.map((item) => path.join(prefix, item.slice(0, -5))))
  139. result.sort()
  140. return result
  141. } catch {
  142. return []
  143. }
  144. }
  145. }