2
0
Эх сурвалжийг харах

core: reduce latency when loading shared sessions through event compaction

Dax Raad 2 сар өмнө
parent
commit
3d99dc78db

+ 70 - 30
packages/enterprise/src/core/share.ts

@@ -1,8 +1,10 @@
 import { FileDiff, Message, Model, Part, Session, SessionStatus } from "@opencode-ai/sdk"
 import { fn } from "@opencode-ai/util/fn"
 import { iife } from "@opencode-ai/util/iife"
+import { Identifier } from "@opencode-ai/util/identifier"
 import z from "zod"
 import { Storage } from "./storage"
+import { Binary } from "@opencode-ai/util/binary"
 
 export namespace Share {
   export const Info = z.object({
@@ -37,15 +39,15 @@ export namespace Share {
   export type Data = z.infer<typeof Data>
 
   export const create = fn(z.object({ sessionID: z.string() }), async (body) => {
+    const isTest = process.env.NODE_ENV === "test" || body.sessionID.startsWith("test_")
     const info: Info = {
-      id: body.sessionID.slice(-8),
+      id: (isTest ? "test_" : "") + body.sessionID.slice(-8),
       sessionID: body.sessionID,
       secret: crypto.randomUUID(),
     }
     const exists = await get(info.id)
     if (exists) throw new Errors.AlreadyExists(info.id)
     await Storage.write(["share", info.id], info)
-    console.log("created share", info.id)
     return info
   })
 
@@ -58,35 +60,72 @@ export namespace Share {
     if (!share) throw new Errors.NotFound(body.id)
     if (share.secret !== body.secret) throw new Errors.InvalidSecret(body.id)
     await Storage.remove(["share", body.id])
-    const list = await Storage.list(["share_data", body.id])
+    const list = await Storage.list({ prefix: ["share_data", body.id] })
     for (const item of list) {
       await Storage.remove(item)
     }
   })
 
-  export async function data(id: string) {
-    let time = Date.now()
-    const list = await Storage.list(["share_data", id])
-    console.log("listing share data", Date.now() - time, list.length)
-    const promises = []
-    time = Date.now()
-    for (const item of list) {
-      promises.push(
-        iife(async () => {
-          const [, , type] = item
-          return {
-            type: type as any,
-            data: await Storage.read<any>(item),
-          } as Data
-        }),
-      )
+  export const sync = fn(
+    z.object({
+      share: Info.pick({ id: true, secret: true }),
+      data: Data.array(),
+    }),
+    async (input) => {
+      const share = await get(input.share.id)
+      if (!share) throw new Errors.NotFound(input.share.id)
+      if (share.secret !== input.share.secret) throw new Errors.InvalidSecret(input.share.id)
+      await Storage.write(["share_event", input.share.id, Identifier.descending()], input.data)
+    },
+  )
+
+  type Compaction = {
+    event?: string
+    data: Data[]
+  }
+
+  export async function data(shareID: string) {
+    const compaction: Compaction = (await Storage.read<Compaction>(["share_compaction", shareID])) ?? {
+      data: [],
+      event: undefined,
+    }
+
+    const list = await Storage.list({
+      prefix: ["share_event", shareID],
+      end: compaction.event,
+    }).then((x) => x.toReversed())
+
+    const data = await Promise.all(list.map(async (event) => await Storage.read<Data[]>(event))).then((x) => x.flat())
+    for (const item of data) {
+      if (!item) continue
+      const key = (item: Data) => {
+        switch (item.type) {
+          case "session":
+            return "session"
+          case "message":
+            return `message/${item.data.id}`
+          case "part":
+            return `${item.data.messageID}/${item.data.id}`
+          case "session_diff":
+            return "session_diff"
+          case "model":
+            return "model"
+        }
+      }
+      const id = key(item)
+      const result = Binary.search(compaction.data, id, key)
+      if (result.found) {
+        compaction.data[result.index] = item
+      } else {
+        compaction.data.splice(result.index, 0, item)
+      }
     }
-    const result = await Promise.all(promises)
-    console.log("read share data", Date.now() - time, result.length)
-    return result
+    compaction.event = list.at(-1)?.at(-1)
+    await Storage.write(["share_compaction", shareID], compaction)
+    return compaction.data
   }
 
-  export const sync = fn(
+  export const syncOld = fn(
     z.object({
       share: Info.pick({ id: true, secret: true }),
       data: Data.array(),
@@ -103,15 +142,16 @@ export namespace Share {
               case "session":
                 await Storage.write(["share_data", input.share.id, "session"], item.data)
                 break
-              case "message":
-                await Storage.write(["share_data", input.share.id, "message", item.data.id], item.data)
+              case "message": {
+                const data = item.data as Message
+                await Storage.write(["share_data", input.share.id, "message", data.id], item.data)
                 break
-              case "part":
-                await Storage.write(
-                  ["share_data", input.share.id, "part", item.data.messageID, item.data.id],
-                  item.data,
-                )
+              }
+              case "part": {
+                const data = item.data as Part
+                await Storage.write(["share_data", input.share.id, "part", data.messageID, data.id], item.data)
                 break
+              }
               case "session_diff":
                 await Storage.write(["share_data", input.share.id, "session_diff"], item.data)
                 break

+ 15 - 5
packages/enterprise/src/core/storage.ts

@@ -6,7 +6,7 @@ export namespace Storage {
     read(path: string): Promise<string | undefined>
     write(path: string, value: string): Promise<void>
     remove(path: string): Promise<void>
-    list(prefix: string): Promise<string[]>
+    list(options?: { prefix?: string; limit?: number; start?: string; end?: string }): Promise<string[]>
   }
 
   function createAdapter(client: AwsClient, endpoint: string, bucket: string): Adapter {
@@ -37,8 +37,14 @@ export namespace Storage {
         if (!response.ok) throw new Error(`Failed to remove ${path}: ${response.status}`)
       },
 
-      async list(prefix: string): Promise<string[]> {
+      async list(options?: { prefix?: string; limit?: number; start?: string; end?: string }): Promise<string[]> {
+        const prefix = options?.prefix || ""
         const params = new URLSearchParams({ "list-type": "2", prefix })
+        if (options?.limit) params.set("max-keys", options.limit.toString())
+        if (options?.start) {
+          const startPath = prefix + options.start + ".json"
+          params.set("start-after", startPath)
+        }
         const response = await client.fetch(`${base}?${params}`)
         if (!response.ok) throw new Error(`Failed to list ${prefix}: ${response.status}`)
         const xml = await response.text()
@@ -48,6 +54,10 @@ export namespace Storage {
         while ((match = regex.exec(xml)) !== null) {
           keys.push(match[1])
         }
+        if (options?.end) {
+          const endPath = prefix + options.end + ".json"
+          return keys.filter((key) => key <= endPath)
+        }
         return keys
       },
     }
@@ -98,9 +108,9 @@ export namespace Storage {
     return adapter().remove(resolve(key))
   }
 
-  export async function list(prefix: string[]) {
-    const p = prefix.join("/") + (prefix.length ? "/" : "")
-    const result = await adapter().list(p)
+  export async function list(options?: { prefix?: string[]; limit?: number; start?: string; end?: string }) {
+    const p = options?.prefix ? options.prefix.join("/") + (options.prefix.length ? "/" : "") : ""
+    const result = await adapter().list({ prefix: p, limit: options?.limit, start: options?.start, end: options?.end })
     return result.map((x) => x.replace(/\.json$/, "").split("/"))
   }
 

+ 37 - 0
packages/enterprise/test-debug.ts

@@ -0,0 +1,37 @@
+import { Share } from "./src/core/share"
+import { Storage } from "./src/core/storage"
+
+async function test() {
+  const shareInfo = await Share.create({ sessionID: "test-debug-" + Date.now() })
+
+  const batch1: Share.Data[] = [
+    { type: "part", data: { id: "part1", sessionID: "session1", messageID: "msg1", type: "text", text: "Hello" } },
+  ]
+
+  const batch2: Share.Data[] = [
+    { type: "part", data: { id: "part1", sessionID: "session1", messageID: "msg1", type: "text", text: "Hello Updated" } },
+  ]
+
+  await Share.sync({
+    share: { id: shareInfo.id, secret: shareInfo.secret },
+    data: batch1,
+  })
+
+  await Share.sync({
+    share: { id: shareInfo.id, secret: shareInfo.secret },
+    data: batch2,
+  })
+
+  const events = await Storage.list({ prefix: ["share_event", shareInfo.id] })
+  console.log("Events (raw):", events)
+  console.log("Events (reversed):", events.toReversed())
+  
+  for (const event of events.toReversed()) {
+    const data = await Storage.read(event)
+    console.log("Event data (reversed order):", event, data)
+  }
+
+  await Share.remove({ id: shareInfo.id, secret: shareInfo.secret })
+}
+
+test()

+ 269 - 0
packages/enterprise/test/core/share.test.ts

@@ -0,0 +1,269 @@
+import { describe, expect, test, afterAll } from "bun:test"
+import { Share } from "../../src/core/share"
+import { Storage } from "../../src/core/storage"
+import { Identifier } from "@opencode-ai/util/identifier"
+
+describe.concurrent("core.share", () => {
+  test("should create a share", async () => {
+    const sessionID = Identifier.descending()
+    const share = await Share.create({ sessionID })
+
+    expect(share.sessionID).toBe(sessionID)
+    expect(share.secret).toBeDefined()
+
+    await Share.remove({ id: share.id, secret: share.secret })
+  })
+
+  test("should sync data to a share", async () => {
+    const sessionID = Identifier.descending()
+    const share = await Share.create({ sessionID })
+
+    const data: Share.Data[] = [
+      {
+        type: "part",
+        data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" },
+      },
+    ]
+
+    await Share.sync({
+      share: { id: share.id, secret: share.secret },
+      data,
+    })
+
+    const events = await Storage.list({ prefix: ["share_event", share.id] })
+    expect(events.length).toBe(1)
+
+    await Share.remove({ id: share.id, secret: share.secret })
+  })
+
+  test("should sync multiple batches of data", async () => {
+    const sessionID = Identifier.descending()
+    const share = await Share.create({ sessionID })
+
+    const data1: Share.Data[] = [
+      {
+        type: "part",
+        data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" },
+      },
+    ]
+
+    const data2: Share.Data[] = [
+      {
+        type: "part",
+        data: { id: "part2", sessionID, messageID: "msg1", type: "text", text: "World" },
+      },
+    ]
+
+    await Share.sync({
+      share: { id: share.id, secret: share.secret },
+      data: data1,
+    })
+
+    await Share.sync({
+      share: { id: share.id, secret: share.secret },
+      data: data2,
+    })
+
+    const events = await Storage.list({ prefix: ["share_event", share.id] })
+    expect(events.length).toBe(2)
+
+    await Share.remove({ id: share.id, secret: share.secret })
+  })
+
+  test("should retrieve synced data", async () => {
+    const sessionID = Identifier.descending()
+    const share = await Share.create({ sessionID })
+
+    const data: Share.Data[] = [
+      {
+        type: "part",
+        data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" },
+      },
+      {
+        type: "part",
+        data: { id: "part2", sessionID, messageID: "msg1", type: "text", text: "World" },
+      },
+    ]
+
+    await Share.sync({
+      share: { id: share.id, secret: share.secret },
+      data,
+    })
+
+    const result = await Share.data(share.id)
+
+    expect(result.length).toBe(2)
+    expect(result[0].type).toBe("part")
+    expect(result[1].type).toBe("part")
+
+    await Share.remove({ id: share.id, secret: share.secret })
+  })
+
+  test("should retrieve data from multiple syncs", async () => {
+    const sessionID = Identifier.descending()
+    const share = await Share.create({ sessionID })
+
+    const data1: Share.Data[] = [
+      {
+        type: "part",
+        data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" },
+      },
+    ]
+
+    const data2: Share.Data[] = [
+      {
+        type: "part",
+        data: { id: "part2", sessionID, messageID: "msg2", type: "text", text: "World" },
+      },
+    ]
+
+    const data3: Share.Data[] = [
+      { type: "part", data: { id: "part3", sessionID, messageID: "msg3", type: "text", text: "!" } },
+    ]
+
+    await Share.sync({
+      share: { id: share.id, secret: share.secret },
+      data: data1,
+    })
+
+    await Share.sync({
+      share: { id: share.id, secret: share.secret },
+      data: data2,
+    })
+
+    await Share.sync({
+      share: { id: share.id, secret: share.secret },
+      data: data3,
+    })
+
+    const result = await Share.data(share.id)
+
+    expect(result.length).toBe(3)
+    const parts = result.filter((d) => d.type === "part")
+    expect(parts.length).toBe(3)
+
+    await Share.remove({ id: share.id, secret: share.secret })
+  })
+
+  test("should return latest data when syncing duplicate parts", async () => {
+    const sessionID = Identifier.descending()
+    const share = await Share.create({ sessionID })
+
+    const data1: Share.Data[] = [
+      {
+        type: "part",
+        data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" },
+      },
+    ]
+
+    const data2: Share.Data[] = [
+      {
+        type: "part",
+        data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello Updated" },
+      },
+    ]
+
+    await Share.sync({
+      share: { id: share.id, secret: share.secret },
+      data: data1,
+    })
+
+    await Share.sync({
+      share: { id: share.id, secret: share.secret },
+      data: data2,
+    })
+
+    const result = await Share.data(share.id)
+
+    expect(result.length).toBe(1)
+    const [first] = result
+    expect(first.type).toBe("part")
+    expect(first.type === "part" && first.data.type === "text" && first.data.text).toBe("Hello Updated")
+
+    await Share.remove({ id: share.id, secret: share.secret })
+  })
+
+  test("should return empty array for share with no data", async () => {
+    const sessionID = Identifier.descending()
+    const share = await Share.create({ sessionID })
+
+    const result = await Share.data(share.id)
+
+    expect(result).toEqual([])
+
+    await Share.remove({ id: share.id, secret: share.secret })
+  })
+
+  test("should throw error for invalid secret", async () => {
+    const sessionID = Identifier.descending()
+    const share = await Share.create({ sessionID })
+
+    const data: Share.Data[] = [
+      {
+        type: "part",
+        data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Test" },
+      },
+    ]
+
+    expect(async () => {
+      await Share.sync({
+        share: { id: share.id, secret: "invalid-secret" },
+        data,
+      })
+    }).toThrow()
+
+    await Share.remove({ id: share.id, secret: share.secret })
+  })
+
+  test("should throw error for non-existent share", async () => {
+    const sessionID = Identifier.descending()
+    const data: Share.Data[] = [
+      {
+        type: "part",
+        data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Test" },
+      },
+    ]
+
+    expect(async () => {
+      await Share.sync({
+        share: { id: "non-existent-id", secret: "some-secret" },
+        data,
+      })
+    }).toThrow()
+  })
+
+  test("should handle different data types", async () => {
+    const sessionID = Identifier.descending()
+    const share = await Share.create({ sessionID })
+
+    const data: Share.Data[] = [
+      { type: "session", data: { id: sessionID, status: "running" } as any },
+      { type: "message", data: { id: "msg1", sessionID } as any },
+      {
+        type: "part",
+        data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" },
+      },
+    ]
+
+    await Share.sync({
+      share: { id: share.id, secret: share.secret },
+      data,
+    })
+
+    const result = await Share.data(share.id)
+
+    expect(result.length).toBe(3)
+    expect(result.some((d) => d.type === "session")).toBe(true)
+    expect(result.some((d) => d.type === "message")).toBe(true)
+    expect(result.some((d) => d.type === "part")).toBe(true)
+
+    await Share.remove({ id: share.id, secret: share.secret })
+  })
+
+  afterAll(async () => {
+    const files = await Storage.list()
+    for (const file of files) {
+      Storage.remove(file)
+    }
+  })
+})

+ 67 - 0
packages/enterprise/test/core/storage.test.ts

@@ -0,0 +1,67 @@
+import { describe, expect, test, afterAll } from "bun:test"
+import { Storage } from "../../src/core/storage"
+
+describe("core.storage", () => {
+  test("should list files with start and end range", async () => {
+    await Storage.write(["test", "users", "user1"], { name: "user1" })
+    await Storage.write(["test", "users", "user2"], { name: "user2" })
+    await Storage.write(["test", "users", "user3"], { name: "user3" })
+    await Storage.write(["test", "users", "user4"], { name: "user4" })
+    await Storage.write(["test", "users", "user5"], { name: "user5" })
+
+    const result = await Storage.list({ prefix: ["test", "users"], start: "user2", end: "user4" })
+
+    expect(result).toEqual([
+      ["test", "users", "user3"],
+      ["test", "users", "user4"],
+    ])
+  })
+
+  test("should list files with start only", async () => {
+    const result = await Storage.list({ prefix: ["test", "users"], start: "user3" })
+
+    expect(result).toEqual([
+      ["test", "users", "user4"],
+      ["test", "users", "user5"],
+    ])
+  })
+
+  test("should list files with limit", async () => {
+    const result = await Storage.list({ prefix: ["test", "users"], limit: 3 })
+
+    expect(result).toEqual([
+      ["test", "users", "user1"],
+      ["test", "users", "user2"],
+      ["test", "users", "user3"],
+    ])
+  })
+
+  test("should list all files without prefix", async () => {
+    const result = await Storage.list()
+
+    expect(result.length).toBeGreaterThan(0)
+  })
+
+  test("should list all files with prefix", async () => {
+    const result = await Storage.list({ prefix: ["test", "users"] })
+
+    expect(result).toEqual([
+      ["test", "users", "user1"],
+      ["test", "users", "user2"],
+      ["test", "users", "user3"],
+      ["test", "users", "user4"],
+      ["test", "users", "user5"],
+    ])
+  })
+
+  afterAll(async () => {
+    const testFiles = await Storage.list({ prefix: ["test"] })
+
+    for (const file of testFiles) {
+      await Storage.remove(file)
+    }
+
+    const remainingFiles = await Storage.list({ prefix: ["test"] })
+    expect(remainingFiles).toEqual([])
+  })
+})

+ 48 - 0
packages/util/src/identifier.ts

@@ -0,0 +1,48 @@
+import { randomBytes } from "crypto"
+
+export namespace Identifier {
+  const LENGTH = 26
+
+  // State for monotonic ID generation
+  let lastTimestamp = 0
+  let counter = 0
+
+  export function ascending() {
+    return create(false)
+  }
+
+  export function descending() {
+    return create(true)
+  }
+
+  function randomBase62(length: number): string {
+    const chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+    let result = ""
+    const bytes = randomBytes(length)
+    for (let i = 0; i < length; i++) {
+      result += chars[bytes[i] % 62]
+    }
+    return result
+  }
+
+  export function create(descending: boolean, timestamp?: number): string {
+    const currentTimestamp = timestamp ?? Date.now()
+
+    if (currentTimestamp !== lastTimestamp) {
+      lastTimestamp = currentTimestamp
+      counter = 0
+    }
+    counter++
+
+    let now = BigInt(currentTimestamp) * BigInt(0x1000) + BigInt(counter)
+
+    now = descending ? ~now : now
+
+    const timeBytes = Buffer.alloc(6)
+    for (let i = 0; i < 6; i++) {
+      timeBytes[i] = Number((now >> BigInt(40 - 8 * i)) & BigInt(0xff))
+    }
+
+    return timeBytes.toString("hex") + randomBase62(LENGTH - 12)
+  }
+}