|
|
@@ -6,7 +6,7 @@ import { Identifier } from "../id/id"
|
|
|
import { LSP } from "../lsp"
|
|
|
import { Snapshot } from "@/snapshot"
|
|
|
import { fn } from "@/util/fn"
|
|
|
-import { Database, eq, desc } from "@/storage/db"
|
|
|
+import { Database, eq, desc, inArray } from "@/storage/db"
|
|
|
import { MessageTable, PartTable } from "./session.sql"
|
|
|
import { ProviderTransform } from "@/provider/transform"
|
|
|
import { STATUS_CODES } from "http"
|
|
|
@@ -608,27 +608,56 @@ export namespace MessageV2 {
|
|
|
}
|
|
|
|
|
|
export const stream = fn(Identifier.schema("session"), async function* (sessionID) {
|
|
|
- const rows = Database.use((db) =>
|
|
|
- db
|
|
|
- .select()
|
|
|
- .from(MessageTable)
|
|
|
- .where(eq(MessageTable.session_id, sessionID))
|
|
|
- .orderBy(desc(MessageTable.created_at))
|
|
|
- .all(),
|
|
|
- )
|
|
|
- for (const row of rows) {
|
|
|
- yield {
|
|
|
- info: row.data,
|
|
|
- parts: await parts(row.id),
|
|
|
+ const size = 50
|
|
|
+ let offset = 0
|
|
|
+ while (true) {
|
|
|
+ const rows = Database.use((db) =>
|
|
|
+ db
|
|
|
+ .select()
|
|
|
+ .from(MessageTable)
|
|
|
+ .where(eq(MessageTable.session_id, sessionID))
|
|
|
+ .orderBy(desc(MessageTable.created_at))
|
|
|
+ .limit(size)
|
|
|
+ .offset(offset)
|
|
|
+ .all(),
|
|
|
+ )
|
|
|
+ if (rows.length === 0) break
|
|
|
+
|
|
|
+ const ids = rows.map((row) => row.id)
|
|
|
+ const partsByMessage = new Map<string, MessageV2.Part[]>()
|
|
|
+ if (ids.length > 0) {
|
|
|
+ const partRows = Database.use((db) =>
|
|
|
+ db
|
|
|
+ .select()
|
|
|
+ .from(PartTable)
|
|
|
+ .where(inArray(PartTable.message_id, ids))
|
|
|
+ .orderBy(PartTable.message_id, PartTable.id)
|
|
|
+ .all(),
|
|
|
+ )
|
|
|
+ for (const row of partRows) {
|
|
|
+ const list = partsByMessage.get(row.message_id)
|
|
|
+ if (list) list.push(row.data)
|
|
|
+ else partsByMessage.set(row.message_id, [row.data])
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for (const row of rows) {
|
|
|
+ yield {
|
|
|
+ info: row.data,
|
|
|
+ parts: partsByMessage.get(row.id) ?? [],
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ offset += rows.length
|
|
|
+ if (rows.length < size) break
|
|
|
}
|
|
|
})
|
|
|
|
|
|
export const parts = fn(Identifier.schema("message"), async (message_id) => {
|
|
|
- const rows = Database.use((db) => db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).all())
|
|
|
- const result = rows.map((row) => row.data)
|
|
|
- result.sort((a, b) => (a.id > b.id ? 1 : -1))
|
|
|
- return result
|
|
|
+ const rows = Database.use((db) =>
|
|
|
+ db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).orderBy(PartTable.id).all(),
|
|
|
+ )
|
|
|
+ return rows.map((row) => row.data)
|
|
|
})
|
|
|
|
|
|
export const get = fn(
|