storage.ts 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import { Log } from "../util/log"
  2. import { App } from "../app/app"
  3. import { Bus } from "../bus"
  4. import path from "path"
  5. import z from "zod"
  6. import fs from "fs/promises"
  7. import { MessageV2 } from "../session/message-v2"
  8. import { Identifier } from "../id/id"
  9. export namespace Storage {
  10. const log = Log.create({ service: "storage" })
  11. export const Event = {
  12. Write: Bus.event("storage.write", z.object({ key: z.string(), content: z.any() })),
  13. }
  14. type Migration = (dir: string) => Promise<void>
  15. const MIGRATIONS: Migration[] = [
  16. async (dir: string) => {
  17. try {
  18. const files = new Bun.Glob("session/message/*/*.json").scanSync({
  19. cwd: dir,
  20. absolute: true,
  21. })
  22. for (const file of files) {
  23. const content = await Bun.file(file).json()
  24. if (!content.metadata) continue
  25. log.info("migrating to v2 message", { file })
  26. try {
  27. const result = MessageV2.fromV1(content)
  28. await Bun.write(
  29. file,
  30. JSON.stringify(
  31. {
  32. ...result.info,
  33. parts: result.parts,
  34. },
  35. null,
  36. 2,
  37. ),
  38. )
  39. } catch (e) {
  40. await fs.rename(file, file.replace("storage", "broken"))
  41. }
  42. }
  43. } catch {}
  44. },
  45. async (dir: string) => {
  46. const files = new Bun.Glob("session/message/*/*.json").scanSync({
  47. cwd: dir,
  48. absolute: true,
  49. })
  50. for (const file of files) {
  51. try {
  52. const { parts, ...info } = await Bun.file(file).json()
  53. if (!parts) continue
  54. for (const part of parts) {
  55. const id = Identifier.ascending("part")
  56. await Bun.write(
  57. [dir, "session", "part", info.sessionID, info.id, id + ".json"].join("/"),
  58. JSON.stringify({
  59. ...part,
  60. id,
  61. sessionID: info.sessionID,
  62. messageID: info.id,
  63. ...(part.type === "tool" ? { callID: part.id } : {}),
  64. }),
  65. )
  66. }
  67. await Bun.write(file, JSON.stringify(info, null, 2))
  68. } catch (e) {}
  69. }
  70. },
  71. ]
  72. const state = App.state("storage", async () => {
  73. const app = App.info()
  74. const dir = path.normalize(path.join(app.path.data, "storage"))
  75. await fs.mkdir(dir, { recursive: true })
  76. const migration = await Bun.file(path.join(dir, "migration"))
  77. .json()
  78. .then((x) => parseInt(x))
  79. .catch(() => 0)
  80. for (let index = migration; index < MIGRATIONS.length; index++) {
  81. log.info("running migration", { index })
  82. const migration = MIGRATIONS[index]
  83. await migration(dir)
  84. await Bun.write(path.join(dir, "migration"), (index + 1).toString())
  85. }
  86. return {
  87. dir,
  88. }
  89. })
  90. export async function remove(key: string) {
  91. const dir = await state().then((x) => x.dir)
  92. const target = path.join(dir, key + ".json")
  93. await fs.unlink(target).catch(() => {})
  94. }
  95. export async function removeDir(key: string) {
  96. const dir = await state().then((x) => x.dir)
  97. const target = path.join(dir, key)
  98. await fs.rm(target, { recursive: true, force: true }).catch(() => {})
  99. }
  100. export async function readJSON<T>(key: string) {
  101. const dir = await state().then((x) => x.dir)
  102. return Bun.file(path.join(dir, key + ".json")).json() as Promise<T>
  103. }
  104. export async function writeJSON<T>(key: string, content: T) {
  105. const dir = await state().then((x) => x.dir)
  106. const target = path.join(dir, key + ".json")
  107. const tmp = target + Date.now() + ".tmp"
  108. await Bun.write(tmp, JSON.stringify(content, null, 2))
  109. await fs.rename(tmp, target).catch(() => {})
  110. await fs.unlink(tmp).catch(() => {})
  111. Bus.publish(Event.Write, { key, content })
  112. }
  113. const glob = new Bun.Glob("**/*")
  114. export async function* list(prefix: string) {
  115. const dir = await state().then((x) => x.dir)
  116. try {
  117. for await (const item of glob.scan({
  118. cwd: path.join(dir, prefix),
  119. onlyFiles: true,
  120. })) {
  121. const result = path.join(prefix, item.slice(0, -5))
  122. yield result
  123. }
  124. } catch {
  125. return
  126. }
  127. }
  128. }