|
|
@@ -443,7 +443,7 @@ export namespace Session {
|
|
|
const result = await ReadTool.execute(args, {
|
|
|
sessionID: input.sessionID,
|
|
|
abort: abort.signal,
|
|
|
- messageID: "", // read tool doesn't use message ID
|
|
|
+ messageID: userMsg.id,
|
|
|
metadata: async () => {},
|
|
|
})
|
|
|
return [
|
|
|
@@ -577,20 +577,22 @@ export namespace Session {
|
|
|
await updateMessage(assistantMsg)
|
|
|
const tools: Record<string, AITool> = {}
|
|
|
|
|
|
+ const processor = createProcessor(assistantMsg, model.info)
|
|
|
+
|
|
|
for (const item of await Provider.tools(input.providerID)) {
|
|
|
if (mode.tools[item.id] === false) continue
|
|
|
+ if (session.parentID && item.id === "task") continue
|
|
|
tools[item.id] = tool({
|
|
|
id: item.id as any,
|
|
|
description: item.description,
|
|
|
inputSchema: item.parameters as ZodSchema,
|
|
|
- async execute(args) {
|
|
|
+ async execute(args, options) {
|
|
|
const result = await item.execute(args, {
|
|
|
sessionID: input.sessionID,
|
|
|
abort: abort.signal,
|
|
|
messageID: assistantMsg.id,
|
|
|
- metadata: async () => {
|
|
|
- /*
|
|
|
- const match = toolCalls[opts.toolCallId]
|
|
|
+ metadata: async (val) => {
|
|
|
+ const match = processor.partFromToolCall(options.toolCallId)
|
|
|
if (match && match.state.status === "running") {
|
|
|
await updatePart({
|
|
|
...match,
|
|
|
@@ -598,14 +600,13 @@ export namespace Session {
|
|
|
title: val.title,
|
|
|
metadata: val.metadata,
|
|
|
status: "running",
|
|
|
- input: args.input,
|
|
|
+ input: args,
|
|
|
time: {
|
|
|
start: Date.now(),
|
|
|
},
|
|
|
},
|
|
|
})
|
|
|
}
|
|
|
- */
|
|
|
},
|
|
|
})
|
|
|
return result
|
|
|
@@ -676,257 +677,260 @@ export namespace Session {
|
|
|
],
|
|
|
}),
|
|
|
})
|
|
|
- const result = await processStream(assistantMsg, model.info, stream)
|
|
|
+ const result = await processor.process(stream)
|
|
|
return result
|
|
|
}
|
|
|
|
|
|
- async function processStream(
|
|
|
- assistantMsg: MessageV2.Assistant,
|
|
|
- model: ModelsDev.Model,
|
|
|
- stream: StreamTextResult<Record<string, AITool>, never>,
|
|
|
- ) {
|
|
|
- try {
|
|
|
- let currentText: MessageV2.TextPart | undefined
|
|
|
- const toolCalls: Record<string, MessageV2.ToolPart> = {}
|
|
|
+ function createProcessor(assistantMsg: MessageV2.Assistant, model: ModelsDev.Model) {
|
|
|
+ const toolCalls: Record<string, MessageV2.ToolPart> = {}
|
|
|
+ return {
|
|
|
+ partFromToolCall(toolCallID: string) {
|
|
|
+ return toolCalls[toolCallID]
|
|
|
+ },
|
|
|
+ async process(stream: StreamTextResult<Record<string, AITool>, never>) {
|
|
|
+ try {
|
|
|
+ let currentText: MessageV2.TextPart | undefined
|
|
|
|
|
|
- for await (const value of stream.fullStream) {
|
|
|
- log.info("part", {
|
|
|
- type: value.type,
|
|
|
- })
|
|
|
- switch (value.type) {
|
|
|
- case "start":
|
|
|
- const snapshot = await Snapshot.create(assistantMsg.sessionID)
|
|
|
- if (snapshot)
|
|
|
- await updatePart({
|
|
|
- id: Identifier.ascending("part"),
|
|
|
- messageID: assistantMsg.id,
|
|
|
- sessionID: assistantMsg.sessionID,
|
|
|
- type: "snapshot",
|
|
|
- snapshot,
|
|
|
- })
|
|
|
- break
|
|
|
-
|
|
|
- case "tool-input-start":
|
|
|
- const part = await updatePart({
|
|
|
- id: Identifier.ascending("part"),
|
|
|
- messageID: assistantMsg.id,
|
|
|
- sessionID: assistantMsg.sessionID,
|
|
|
- type: "tool",
|
|
|
- tool: value.toolName,
|
|
|
- callID: value.id,
|
|
|
- state: {
|
|
|
- status: "pending",
|
|
|
- },
|
|
|
+ for await (const value of stream.fullStream) {
|
|
|
+ log.info("part", {
|
|
|
+ type: value.type,
|
|
|
})
|
|
|
- toolCalls[value.id] = part as MessageV2.ToolPart
|
|
|
- break
|
|
|
-
|
|
|
- case "tool-input-delta":
|
|
|
- break
|
|
|
-
|
|
|
- case "tool-call": {
|
|
|
- const match = toolCalls[value.toolCallId]
|
|
|
- if (match) {
|
|
|
- const part = await updatePart({
|
|
|
- ...match,
|
|
|
- state: {
|
|
|
- status: "running",
|
|
|
- input: value.input,
|
|
|
- time: {
|
|
|
- start: Date.now(),
|
|
|
- },
|
|
|
- },
|
|
|
- })
|
|
|
- toolCalls[value.toolCallId] = part as MessageV2.ToolPart
|
|
|
- }
|
|
|
- break
|
|
|
- }
|
|
|
- case "tool-result": {
|
|
|
- const match = toolCalls[value.toolCallId]
|
|
|
- if (match && match.state.status === "running") {
|
|
|
- await updatePart({
|
|
|
- ...match,
|
|
|
- state: {
|
|
|
- status: "completed",
|
|
|
- input: value.input,
|
|
|
- output: value.output.output,
|
|
|
- metadata: value.output.metadata,
|
|
|
- title: value.output.title,
|
|
|
- time: {
|
|
|
- start: match.state.time.start,
|
|
|
- end: Date.now(),
|
|
|
+ switch (value.type) {
|
|
|
+ case "start":
|
|
|
+ const snapshot = await Snapshot.create(assistantMsg.sessionID)
|
|
|
+ if (snapshot)
|
|
|
+ await updatePart({
|
|
|
+ id: Identifier.ascending("part"),
|
|
|
+ messageID: assistantMsg.id,
|
|
|
+ sessionID: assistantMsg.sessionID,
|
|
|
+ type: "snapshot",
|
|
|
+ snapshot,
|
|
|
+ })
|
|
|
+ break
|
|
|
+
|
|
|
+ case "tool-input-start":
|
|
|
+ const part = await updatePart({
|
|
|
+ id: Identifier.ascending("part"),
|
|
|
+ messageID: assistantMsg.id,
|
|
|
+ sessionID: assistantMsg.sessionID,
|
|
|
+ type: "tool",
|
|
|
+ tool: value.toolName,
|
|
|
+ callID: value.id,
|
|
|
+ state: {
|
|
|
+ status: "pending",
|
|
|
},
|
|
|
- },
|
|
|
- })
|
|
|
- delete toolCalls[value.toolCallId]
|
|
|
- const snapshot = await Snapshot.create(assistantMsg.sessionID)
|
|
|
- if (snapshot)
|
|
|
+ })
|
|
|
+ toolCalls[value.id] = part as MessageV2.ToolPart
|
|
|
+ break
|
|
|
+
|
|
|
+ case "tool-input-delta":
|
|
|
+ break
|
|
|
+
|
|
|
+ case "tool-call": {
|
|
|
+ const match = toolCalls[value.toolCallId]
|
|
|
+ if (match) {
|
|
|
+ const part = await updatePart({
|
|
|
+ ...match,
|
|
|
+ state: {
|
|
|
+ status: "running",
|
|
|
+ input: value.input,
|
|
|
+ time: {
|
|
|
+ start: Date.now(),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ toolCalls[value.toolCallId] = part as MessageV2.ToolPart
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ case "tool-result": {
|
|
|
+ const match = toolCalls[value.toolCallId]
|
|
|
+ if (match && match.state.status === "running") {
|
|
|
+ await updatePart({
|
|
|
+ ...match,
|
|
|
+ state: {
|
|
|
+ status: "completed",
|
|
|
+ input: value.input,
|
|
|
+ output: value.output.output,
|
|
|
+ metadata: value.output.metadata,
|
|
|
+ title: value.output.title,
|
|
|
+ time: {
|
|
|
+ start: match.state.time.start,
|
|
|
+ end: Date.now(),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ delete toolCalls[value.toolCallId]
|
|
|
+ const snapshot = await Snapshot.create(assistantMsg.sessionID)
|
|
|
+ if (snapshot)
|
|
|
+ await updatePart({
|
|
|
+ id: Identifier.ascending("part"),
|
|
|
+ messageID: assistantMsg.id,
|
|
|
+ sessionID: assistantMsg.sessionID,
|
|
|
+ type: "snapshot",
|
|
|
+ snapshot,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ case "tool-error": {
|
|
|
+ const match = toolCalls[value.toolCallId]
|
|
|
+ if (match && match.state.status === "running") {
|
|
|
+ await updatePart({
|
|
|
+ ...match,
|
|
|
+ state: {
|
|
|
+ status: "error",
|
|
|
+ input: value.input,
|
|
|
+ error: (value.error as any).toString(),
|
|
|
+ time: {
|
|
|
+ start: match.state.time.start,
|
|
|
+ end: Date.now(),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ delete toolCalls[value.toolCallId]
|
|
|
+ const snapshot = await Snapshot.create(assistantMsg.sessionID)
|
|
|
+ if (snapshot)
|
|
|
+ await updatePart({
|
|
|
+ id: Identifier.ascending("part"),
|
|
|
+ messageID: assistantMsg.id,
|
|
|
+ sessionID: assistantMsg.sessionID,
|
|
|
+ type: "snapshot",
|
|
|
+ snapshot,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ case "error":
|
|
|
+ throw value.error
|
|
|
+
|
|
|
+ case "start-step":
|
|
|
await updatePart({
|
|
|
id: Identifier.ascending("part"),
|
|
|
messageID: assistantMsg.id,
|
|
|
sessionID: assistantMsg.sessionID,
|
|
|
- type: "snapshot",
|
|
|
- snapshot,
|
|
|
+ type: "step-start",
|
|
|
})
|
|
|
- }
|
|
|
- break
|
|
|
- }
|
|
|
+ break
|
|
|
|
|
|
- case "tool-error": {
|
|
|
- const match = toolCalls[value.toolCallId]
|
|
|
- if (match && match.state.status === "running") {
|
|
|
- await updatePart({
|
|
|
- ...match,
|
|
|
- state: {
|
|
|
- status: "error",
|
|
|
- input: value.input,
|
|
|
- error: (value.error as any).toString(),
|
|
|
- time: {
|
|
|
- start: match.state.time.start,
|
|
|
- end: Date.now(),
|
|
|
- },
|
|
|
- },
|
|
|
- })
|
|
|
- delete toolCalls[value.toolCallId]
|
|
|
- const snapshot = await Snapshot.create(assistantMsg.sessionID)
|
|
|
- if (snapshot)
|
|
|
+ case "finish-step":
|
|
|
+ const usage = getUsage(model, value.usage, value.providerMetadata)
|
|
|
+ assistantMsg.cost += usage.cost
|
|
|
+ assistantMsg.tokens = usage.tokens
|
|
|
await updatePart({
|
|
|
id: Identifier.ascending("part"),
|
|
|
messageID: assistantMsg.id,
|
|
|
sessionID: assistantMsg.sessionID,
|
|
|
- type: "snapshot",
|
|
|
- snapshot,
|
|
|
+ type: "step-finish",
|
|
|
+ tokens: usage.tokens,
|
|
|
+ cost: usage.cost,
|
|
|
})
|
|
|
- }
|
|
|
- break
|
|
|
- }
|
|
|
+ await updateMessage(assistantMsg)
|
|
|
+ break
|
|
|
|
|
|
- case "error":
|
|
|
- throw value.error
|
|
|
-
|
|
|
- case "start-step":
|
|
|
- await updatePart({
|
|
|
- id: Identifier.ascending("part"),
|
|
|
- messageID: assistantMsg.id,
|
|
|
- sessionID: assistantMsg.sessionID,
|
|
|
- type: "step-start",
|
|
|
- })
|
|
|
- break
|
|
|
-
|
|
|
- case "finish-step":
|
|
|
- const usage = getUsage(model, value.usage, value.providerMetadata)
|
|
|
- assistantMsg.cost += usage.cost
|
|
|
- assistantMsg.tokens = usage.tokens
|
|
|
- await updatePart({
|
|
|
- id: Identifier.ascending("part"),
|
|
|
- messageID: assistantMsg.id,
|
|
|
- sessionID: assistantMsg.sessionID,
|
|
|
- type: "step-finish",
|
|
|
- tokens: usage.tokens,
|
|
|
- cost: usage.cost,
|
|
|
- })
|
|
|
- await updateMessage(assistantMsg)
|
|
|
- break
|
|
|
-
|
|
|
- case "text-start":
|
|
|
- currentText = {
|
|
|
- id: Identifier.ascending("part"),
|
|
|
- messageID: assistantMsg.id,
|
|
|
- sessionID: assistantMsg.sessionID,
|
|
|
- type: "text",
|
|
|
- text: "",
|
|
|
- time: {
|
|
|
- start: Date.now(),
|
|
|
- },
|
|
|
- }
|
|
|
- break
|
|
|
+ case "text-start":
|
|
|
+ currentText = {
|
|
|
+ id: Identifier.ascending("part"),
|
|
|
+ messageID: assistantMsg.id,
|
|
|
+ sessionID: assistantMsg.sessionID,
|
|
|
+ type: "text",
|
|
|
+ text: "",
|
|
|
+ time: {
|
|
|
+ start: Date.now(),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ break
|
|
|
|
|
|
- case "text":
|
|
|
- if (currentText) {
|
|
|
- currentText.text += value.text
|
|
|
- await updatePart(currentText)
|
|
|
- }
|
|
|
- break
|
|
|
+ case "text":
|
|
|
+ if (currentText) {
|
|
|
+ currentText.text += value.text
|
|
|
+ await updatePart(currentText)
|
|
|
+ }
|
|
|
+ break
|
|
|
|
|
|
- case "text-end":
|
|
|
- if (currentText && currentText.text) {
|
|
|
- currentText.time = {
|
|
|
- start: Date.now(),
|
|
|
- end: Date.now(),
|
|
|
- }
|
|
|
- await updatePart(currentText)
|
|
|
- }
|
|
|
- currentText = undefined
|
|
|
- break
|
|
|
+ case "text-end":
|
|
|
+ if (currentText && currentText.text) {
|
|
|
+ currentText.time = {
|
|
|
+ start: Date.now(),
|
|
|
+ end: Date.now(),
|
|
|
+ }
|
|
|
+ await updatePart(currentText)
|
|
|
+ }
|
|
|
+ currentText = undefined
|
|
|
+ break
|
|
|
|
|
|
- case "finish":
|
|
|
- assistantMsg.time.completed = Date.now()
|
|
|
- await updateMessage(assistantMsg)
|
|
|
- break
|
|
|
+ case "finish":
|
|
|
+ assistantMsg.time.completed = Date.now()
|
|
|
+ await updateMessage(assistantMsg)
|
|
|
+ break
|
|
|
|
|
|
- default:
|
|
|
- log.info("unhandled", {
|
|
|
- ...value,
|
|
|
+ default:
|
|
|
+ log.info("unhandled", {
|
|
|
+ ...value,
|
|
|
+ })
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (e) {
|
|
|
+ log.error("", {
|
|
|
+ error: e,
|
|
|
+ })
|
|
|
+ switch (true) {
|
|
|
+ case e instanceof DOMException && e.name === "AbortError":
|
|
|
+ assistantMsg.error = new MessageV2.AbortedError(
|
|
|
+ { message: e.message },
|
|
|
+ {
|
|
|
+ cause: e,
|
|
|
+ },
|
|
|
+ ).toObject()
|
|
|
+ break
|
|
|
+ case MessageV2.OutputLengthError.isInstance(e):
|
|
|
+ assistantMsg.error = e
|
|
|
+ break
|
|
|
+ case LoadAPIKeyError.isInstance(e):
|
|
|
+ assistantMsg.error = new MessageV2.AuthError(
|
|
|
+ {
|
|
|
+ providerID: model.id,
|
|
|
+ message: e.message,
|
|
|
+ },
|
|
|
+ { cause: e },
|
|
|
+ ).toObject()
|
|
|
+ break
|
|
|
+ case e instanceof Error:
|
|
|
+ assistantMsg.error = new NamedError.Unknown({ message: e.toString() }, { cause: e }).toObject()
|
|
|
+ break
|
|
|
+ default:
|
|
|
+ assistantMsg.error = new NamedError.Unknown({ message: JSON.stringify(e) }, { cause: e })
|
|
|
+ }
|
|
|
+ Bus.publish(Event.Error, {
|
|
|
+ sessionID: assistantMsg.sessionID,
|
|
|
+ error: assistantMsg.error,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ const p = await parts(assistantMsg.sessionID, assistantMsg.id)
|
|
|
+ for (const part of p) {
|
|
|
+ if (part.type === "tool" && part.state.status !== "completed") {
|
|
|
+ updatePart({
|
|
|
+ ...part,
|
|
|
+ state: {
|
|
|
+ status: "error",
|
|
|
+ error: "Tool execution aborted",
|
|
|
+ time: {
|
|
|
+ start: Date.now(),
|
|
|
+ end: Date.now(),
|
|
|
+ },
|
|
|
+ input: {},
|
|
|
+ },
|
|
|
})
|
|
|
- continue
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- } catch (e) {
|
|
|
- log.error("", {
|
|
|
- error: e,
|
|
|
- })
|
|
|
- switch (true) {
|
|
|
- case e instanceof DOMException && e.name === "AbortError":
|
|
|
- assistantMsg.error = new MessageV2.AbortedError(
|
|
|
- { message: e.message },
|
|
|
- {
|
|
|
- cause: e,
|
|
|
- },
|
|
|
- ).toObject()
|
|
|
- break
|
|
|
- case MessageV2.OutputLengthError.isInstance(e):
|
|
|
- assistantMsg.error = e
|
|
|
- break
|
|
|
- case LoadAPIKeyError.isInstance(e):
|
|
|
- assistantMsg.error = new Provider.AuthError(
|
|
|
- {
|
|
|
- providerID: model.id,
|
|
|
- message: e.message,
|
|
|
- },
|
|
|
- { cause: e },
|
|
|
- ).toObject()
|
|
|
- break
|
|
|
- case e instanceof Error:
|
|
|
- assistantMsg.error = new NamedError.Unknown({ message: e.toString() }, { cause: e }).toObject()
|
|
|
- break
|
|
|
- default:
|
|
|
- assistantMsg.error = new NamedError.Unknown({ message: JSON.stringify(e) }, { cause: e })
|
|
|
- }
|
|
|
- Bus.publish(Event.Error, {
|
|
|
- sessionID: assistantMsg.sessionID,
|
|
|
- error: assistantMsg.error,
|
|
|
- })
|
|
|
- }
|
|
|
- const p = await parts(assistantMsg.sessionID, assistantMsg.id)
|
|
|
- for (const part of p) {
|
|
|
- if (part.type === "tool" && part.state.status !== "completed") {
|
|
|
- updatePart({
|
|
|
- ...part,
|
|
|
- state: {
|
|
|
- status: "error",
|
|
|
- error: "Tool execution aborted",
|
|
|
- time: {
|
|
|
- start: Date.now(),
|
|
|
- end: Date.now(),
|
|
|
- },
|
|
|
- input: {},
|
|
|
- },
|
|
|
- })
|
|
|
- }
|
|
|
+ assistantMsg.time.completed = Date.now()
|
|
|
+ await updateMessage(assistantMsg)
|
|
|
+ return { info: assistantMsg, parts: p }
|
|
|
+ },
|
|
|
}
|
|
|
- assistantMsg.time.completed = Date.now()
|
|
|
- await updateMessage(assistantMsg)
|
|
|
- return { info: assistantMsg, parts: p }
|
|
|
}
|
|
|
|
|
|
export async function revert(_input: { sessionID: string; messageID: string; part: number }) {
|
|
|
@@ -1006,6 +1010,7 @@ export namespace Session {
|
|
|
}
|
|
|
await updateMessage(next)
|
|
|
|
|
|
+ const processor = createProcessor(next, model.info)
|
|
|
const stream = streamText({
|
|
|
abortSignal: abort.signal,
|
|
|
model: model.language,
|
|
|
@@ -1029,7 +1034,7 @@ export namespace Session {
|
|
|
],
|
|
|
})
|
|
|
|
|
|
- const result = await processStream(next, model.info, stream)
|
|
|
+ const result = await processor.process(stream)
|
|
|
return result
|
|
|
}
|
|
|
|