فهرست منبع

perf(app): use cursor session history loading (#17329)

Shoubhit Dash 1 ماه پیش
والد
کامیت
46ba9c8170

+ 41 - 8
packages/app/src/context/global-sync/session-prefetch.test.ts

@@ -5,6 +5,7 @@ import {
   getSessionPrefetch,
   runSessionPrefetch,
   setSessionPrefetch,
+  shouldSkipSessionPrefetch,
 } from "./session-prefetch"
 
 describe("session prefetch", () => {
@@ -16,11 +17,12 @@ describe("session prefetch", () => {
       directory: "/tmp/a",
       sessionID: "ses_1",
       limit: 200,
+      cursor: "abc",
       complete: false,
       at: 123,
     })
 
-    expect(getSessionPrefetch("/tmp/a", "ses_1")).toEqual({ limit: 200, complete: false, at: 123 })
+    expect(getSessionPrefetch("/tmp/a", "ses_1")).toEqual({ limit: 200, cursor: "abc", complete: false, at: 123 })
     expect(getSessionPrefetch("/tmp/b", "ses_1")).toBeUndefined()
 
     clearSessionPrefetch("/tmp/a", ["ses_1"])
@@ -38,26 +40,57 @@ describe("session prefetch", () => {
         sessionID: "ses_2",
         task: async () => {
           calls += 1
-          return { limit: 100, complete: true, at: 456 }
+          return { limit: 100, cursor: "next", complete: true, at: 456 }
         },
       })
 
     const [a, b] = await Promise.all([run(), run()])
 
     expect(calls).toBe(1)
-    expect(a).toEqual({ limit: 100, complete: true, at: 456 })
-    expect(b).toEqual({ limit: 100, complete: true, at: 456 })
+    expect(a).toEqual({ limit: 100, cursor: "next", complete: true, at: 456 })
+    expect(b).toEqual({ limit: 100, cursor: "next", complete: true, at: 456 })
   })
 
   test("clears a whole directory", () => {
-    setSessionPrefetch({ directory: "/tmp/d", sessionID: "ses_1", limit: 10, complete: true, at: 1 })
-    setSessionPrefetch({ directory: "/tmp/d", sessionID: "ses_2", limit: 20, complete: false, at: 2 })
-    setSessionPrefetch({ directory: "/tmp/e", sessionID: "ses_1", limit: 30, complete: true, at: 3 })
+    setSessionPrefetch({ directory: "/tmp/d", sessionID: "ses_1", limit: 10, cursor: "a", complete: true, at: 1 })
+    setSessionPrefetch({ directory: "/tmp/d", sessionID: "ses_2", limit: 20, cursor: "b", complete: false, at: 2 })
+    setSessionPrefetch({ directory: "/tmp/e", sessionID: "ses_1", limit: 30, cursor: "c", complete: true, at: 3 })
 
     clearSessionPrefetchDirectory("/tmp/d")
 
     expect(getSessionPrefetch("/tmp/d", "ses_1")).toBeUndefined()
     expect(getSessionPrefetch("/tmp/d", "ses_2")).toBeUndefined()
-    expect(getSessionPrefetch("/tmp/e", "ses_1")).toEqual({ limit: 30, complete: true, at: 3 })
+    expect(getSessionPrefetch("/tmp/e", "ses_1")).toEqual({ limit: 30, cursor: "c", complete: true, at: 3 })
+  })
+
+  test("refreshes stale first-page prefetched history", () => {
+    expect(
+      shouldSkipSessionPrefetch({
+        message: true,
+        info: { limit: 200, cursor: "x", complete: false, at: 1 },
+        chunk: 200,
+        now: 1 + 15_001,
+      }),
+    ).toBe(false)
+  })
+
+  test("keeps deeper or complete history cached", () => {
+    expect(
+      shouldSkipSessionPrefetch({
+        message: true,
+        info: { limit: 400, cursor: "x", complete: false, at: 1 },
+        chunk: 200,
+        now: 1 + 15_001,
+      }),
+    ).toBe(true)
+
+    expect(
+      shouldSkipSessionPrefetch({
+        message: true,
+        info: { limit: 120, complete: true, at: 1 },
+        chunk: 200,
+        now: 1 + 15_001,
+      }),
+    ).toBe(true)
   })
 })

+ 15 - 0
packages/app/src/context/global-sync/session-prefetch.ts

@@ -4,10 +4,23 @@ export const SESSION_PREFETCH_TTL = 15_000
 
 type Meta = {
   limit: number
+  cursor?: string
   complete: boolean
   at: number
 }
 
+export function shouldSkipSessionPrefetch(input: { message: boolean; info?: Meta; chunk: number; now?: number }) {
+  if (input.message) {
+    if (!input.info) return true
+    if (input.info.complete) return true
+    if (input.info.limit > input.chunk) return true
+  } else {
+    if (!input.info) return false
+  }
+
+  return (input.now ?? Date.now()) - input.info.at < SESSION_PREFETCH_TTL
+}
+
 const cache = new Map<string, Meta>()
 const inflight = new Map<string, Promise<Meta | undefined>>()
 const rev = new Map<string, number>()
@@ -53,11 +66,13 @@ export function setSessionPrefetch(input: {
   directory: string
   sessionID: string
   limit: number
+  cursor?: string
   complete: boolean
   at?: number
 }) {
   cache.set(key(input.directory, input.sessionID), {
     limit: input.limit,
+    cursor: input.cursor,
     complete: input.complete,
     at: input.at ?? Date.now(),
   })

+ 36 - 9
packages/app/src/context/sync.tsx

@@ -32,6 +32,12 @@ const keyFor = (directory: string, id: string) => `${directory}\n${id}`
 
 const cmp = (a: string, b: string) => (a < b ? -1 : a > b ? 1 : 0)
 
+function merge<T extends { id: string }>(a: readonly T[], b: readonly T[]) {
+  const map = new Map(a.map((item) => [item.id, item] as const))
+  for (const item of b) map.set(item.id, item)
+  return [...map.values()].sort((x, y) => cmp(x.id, y.id))
+}
+
 type OptimisticStore = {
   message: Record<string, Message[] | undefined>
   part: Record<string, Part[] | undefined>
@@ -119,6 +125,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
     const seen = new Map<string, Set<string>>()
     const [meta, setMeta] = createStore({
       limit: {} as Record<string, number>,
+      cursor: {} as Record<string, string | undefined>,
       complete: {} as Record<string, boolean>,
       loading: {} as Record<string, boolean>,
     })
@@ -157,6 +164,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
           for (const sessionID of sessionIDs) {
             const key = keyFor(directory, sessionID)
             delete draft.limit[key]
+            delete draft.cursor[key]
             delete draft.complete[key]
             delete draft.loading[key]
           }
@@ -187,17 +195,24 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
       evict(directory, setStore, stale)
     }
 
-    const fetchMessages = async (input: { client: typeof sdk.client; sessionID: string; limit: number }) => {
+    const fetchMessages = async (input: {
+      client: typeof sdk.client
+      sessionID: string
+      limit: number
+      before?: string
+    }) => {
       const messages = await retry(() =>
-        input.client.session.messages({ sessionID: input.sessionID, limit: input.limit }),
+        input.client.session.messages({ sessionID: input.sessionID, limit: input.limit, before: input.before }),
       )
       const items = (messages.data ?? []).filter((x) => !!x?.info?.id)
       const session = items.map((x) => x.info).sort((a, b) => cmp(a.id, b.id))
       const part = items.map((message) => ({ id: message.info.id, part: sortParts(message.parts) }))
+      const cursor = messages.response.headers.get("x-next-cursor") ?? undefined
       return {
         session,
         part,
-        complete: session.length < input.limit,
+        cursor,
+        complete: !cursor,
       }
     }
 
@@ -209,6 +224,8 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
       setStore: Setter
       sessionID: string
       limit: number
+      before?: string
+      mode?: "replace" | "prepend"
     }) => {
       const key = keyFor(input.directory, input.sessionID)
       if (meta.loading[key]) return
@@ -217,17 +234,22 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
       await fetchMessages(input)
         .then((next) => {
           if (!tracked(input.directory, input.sessionID)) return
+          const [store] = globalSync.child(input.directory, { bootstrap: false })
+          const cached = input.mode === "prepend" ? (store.message[input.sessionID] ?? []) : []
+          const message = input.mode === "prepend" ? merge(cached, next.session) : next.session
           batch(() => {
-            input.setStore("message", input.sessionID, reconcile(next.session, { key: "id" }))
+            input.setStore("message", input.sessionID, reconcile(message, { key: "id" }))
             for (const p of next.part) {
               input.setStore("part", p.id, p.part)
             }
-            setMeta("limit", key, input.limit)
+            setMeta("limit", key, message.length)
+            setMeta("cursor", key, next.cursor)
             setMeta("complete", key, next.complete)
             setSessionPrefetch({
               directory: input.directory,
               sessionID: input.sessionID,
-              limit: input.limit,
+              limit: message.length,
+              cursor: next.cursor,
               complete: next.complete,
             })
           })
@@ -312,6 +334,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
           if (seeded && store.message[sessionID] !== undefined && meta.limit[key] === undefined) {
             batch(() => {
               setMeta("limit", key, seeded.limit)
+              setMeta("cursor", key, seeded.cursor)
               setMeta("complete", key, seeded.complete)
               setMeta("loading", key, false)
             })
@@ -325,6 +348,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
               if (seeded && store.message[sessionID] !== undefined && meta.limit[key] === undefined) {
                 batch(() => {
                   setMeta("limit", key, seeded.limit)
+                  setMeta("cursor", key, seeded.cursor)
                   setMeta("complete", key, seeded.complete)
                   setMeta("loading", key, false)
                 })
@@ -420,7 +444,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
             if (store.message[sessionID] === undefined) return false
             if (meta.limit[key] === undefined) return false
             if (meta.complete[key]) return false
-            return true
+            return !!meta.cursor[key]
           },
           loading(sessionID: string) {
             const key = keyFor(sdk.directory, sessionID)
@@ -435,14 +459,17 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
             const step = count ?? messagePageSize
             if (meta.loading[key]) return
             if (meta.complete[key]) return
+            const before = meta.cursor[key]
+            if (!before) return
 
-            const currentLimit = meta.limit[key] ?? messagePageSize
             await loadMessages({
               directory,
               client,
               setStore,
               sessionID,
-              limit: currentLimit + step,
+              limit: step,
+              before,
+              mode: "prepend",
             })
           },
         },

+ 10 - 6
packages/app/src/pages/layout.tsx

@@ -41,8 +41,8 @@ import {
   getSessionPrefetch,
   isSessionPrefetchCurrent,
   runSessionPrefetch,
-  SESSION_PREFETCH_TTL,
   setSessionPrefetch,
+  shouldSkipSessionPrefetch,
 } from "@/context/global-sync/session-prefetch"
 import { useNotification } from "@/context/notification"
 import { usePermission } from "@/context/permission"
@@ -770,9 +770,11 @@ export default function Layout(props: ParentProps) {
             const next = items.map((x) => x.info).filter((m): m is Message => !!m?.id)
             const sorted = mergeByID([], next)
             const stale = markPrefetched(directory, sessionID)
+            const cursor = messages.response.headers.get("x-next-cursor") ?? undefined
             const meta = {
-              limit: prefetchChunk,
-              complete: sorted.length < prefetchChunk,
+              limit: sorted.length,
+              cursor,
+              complete: !cursor,
               at: Date.now(),
             }
 
@@ -846,10 +848,12 @@ export default function Layout(props: ParentProps) {
 
     const [store] = globalSync.child(directory, { bootstrap: false })
     const cached = untrack(() => {
-      if (store.message[session.id] === undefined) return false
       const info = getSessionPrefetch(directory, session.id)
-      if (!info) return false
-      return Date.now() - info.at < SESSION_PREFETCH_TTL
+      return shouldSkipSessionPrefetch({
+        message: store.message[session.id] !== undefined,
+        info,
+        chunk: prefetchChunk,
+      })
     })
     if (cached) return