| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- import type { ModelsDev } from "@/provider/models"
- import { MessageV2 } from "./message-v2"
- import { type StreamTextResult, type Tool as AITool, APICallError } from "ai"
- import { Log } from "@/util/log"
- import { Identifier } from "@/id/id"
- import { Session } from "."
- import { Agent } from "@/agent/agent"
- import { Permission } from "@/permission"
- import { Snapshot } from "@/snapshot"
- import { SessionSummary } from "./summary"
- import { Bus } from "@/bus"
- import { SessionRetry } from "./retry"
- import { SessionStatus } from "./status"
- export namespace SessionProcessor {
- const DOOM_LOOP_THRESHOLD = 3
- const log = Log.create({ service: "session.processor" })
- export type Info = Awaited<ReturnType<typeof create>>
- export type Result = Awaited<ReturnType<Info["process"]>>
- export function create(input: {
- assistantMessage: MessageV2.Assistant
- sessionID: string
- providerID: string
- model: ModelsDev.Model
- abort: AbortSignal
- }) {
- const toolcalls: Record<string, MessageV2.ToolPart> = {}
- let snapshot: string | undefined
- let blocked = false
- let attempt = 0
- const result = {
- get message() {
- return input.assistantMessage
- },
- partFromToolCall(toolCallID: string) {
- return toolcalls[toolCallID]
- },
- async process(fn: () => StreamTextResult<Record<string, AITool>, never>) {
- log.info("process")
- while (true) {
- try {
- let currentText: MessageV2.TextPart | undefined
- let reasoningMap: Record<string, MessageV2.ReasoningPart> = {}
- const stream = fn()
- for await (const value of stream.fullStream) {
- input.abort.throwIfAborted()
- switch (value.type) {
- case "start":
- SessionStatus.set(input.sessionID, { type: "busy" })
- break
- case "reasoning-start":
- if (value.id in reasoningMap) {
- continue
- }
- reasoningMap[value.id] = {
- id: Identifier.ascending("part"),
- messageID: input.assistantMessage.id,
- sessionID: input.assistantMessage.sessionID,
- type: "reasoning",
- text: "",
- time: {
- start: Date.now(),
- },
- metadata: value.providerMetadata,
- }
- break
- case "reasoning-delta":
- if (value.id in reasoningMap) {
- const part = reasoningMap[value.id]
- part.text += value.text
- if (value.providerMetadata) part.metadata = value.providerMetadata
- if (part.text) await Session.updatePart({ part, delta: value.text })
- }
- break
- case "reasoning-end":
- if (value.id in reasoningMap) {
- const part = reasoningMap[value.id]
- part.text = part.text.trimEnd()
- part.time = {
- ...part.time,
- end: Date.now(),
- }
- if (value.providerMetadata) part.metadata = value.providerMetadata
- await Session.updatePart(part)
- delete reasoningMap[value.id]
- }
- break
- case "tool-input-start":
- const part = await Session.updatePart({
- id: toolcalls[value.id]?.id ?? Identifier.ascending("part"),
- messageID: input.assistantMessage.id,
- sessionID: input.assistantMessage.sessionID,
- type: "tool",
- tool: value.toolName,
- callID: value.id,
- state: {
- status: "pending",
- input: {},
- raw: "",
- },
- })
- toolcalls[value.id] = part as MessageV2.ToolPart
- break
- case "tool-input-delta":
- break
- case "tool-input-end":
- break
- case "tool-call": {
- const match = toolcalls[value.toolCallId]
- if (match) {
- const part = await Session.updatePart({
- ...match,
- tool: value.toolName,
- state: {
- status: "running",
- input: value.input,
- time: {
- start: Date.now(),
- },
- },
- metadata: value.providerMetadata,
- })
- toolcalls[value.toolCallId] = part as MessageV2.ToolPart
- const parts = await MessageV2.parts(input.assistantMessage.id)
- const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD)
- if (
- lastThree.length === DOOM_LOOP_THRESHOLD &&
- lastThree.every(
- (p) =>
- p.type === "tool" &&
- p.tool === value.toolName &&
- p.state.status !== "pending" &&
- JSON.stringify(p.state.input) === JSON.stringify(value.input),
- )
- ) {
- const permission = await Agent.get(input.assistantMessage.mode).then((x) => x.permission)
- if (permission.doom_loop === "ask") {
- await Permission.ask({
- type: "doom_loop",
- pattern: value.toolName,
- sessionID: input.assistantMessage.sessionID,
- messageID: input.assistantMessage.id,
- callID: value.toolCallId,
- title: `Possible doom loop: "${value.toolName}" called ${DOOM_LOOP_THRESHOLD} times with identical arguments`,
- metadata: {
- tool: value.toolName,
- input: value.input,
- },
- })
- } else if (permission.doom_loop === "deny") {
- throw new Permission.RejectedError(
- input.assistantMessage.sessionID,
- "doom_loop",
- value.toolCallId,
- {
- tool: value.toolName,
- input: value.input,
- },
- `You seem to be stuck in a doom loop, please stop repeating the same action`,
- )
- }
- }
- }
- break
- }
- case "tool-result": {
- const match = toolcalls[value.toolCallId]
- if (match && match.state.status === "running") {
- await Session.updatePart({
- ...match,
- state: {
- status: "completed",
- input: value.input,
- output: value.output.output,
- metadata: value.output.metadata,
- title: value.output.title,
- time: {
- start: match.state.time.start,
- end: Date.now(),
- },
- attachments: value.output.attachments,
- },
- })
- delete toolcalls[value.toolCallId]
- }
- break
- }
- case "tool-error": {
- const match = toolcalls[value.toolCallId]
- if (match && match.state.status === "running") {
- await Session.updatePart({
- ...match,
- state: {
- status: "error",
- input: value.input,
- error: (value.error as any).toString(),
- metadata: value.error instanceof Permission.RejectedError ? value.error.metadata : undefined,
- time: {
- start: match.state.time.start,
- end: Date.now(),
- },
- },
- })
- if (value.error instanceof Permission.RejectedError) {
- blocked = true
- }
- delete toolcalls[value.toolCallId]
- }
- break
- }
- case "error":
- throw value.error
- case "start-step":
- snapshot = await Snapshot.track()
- await Session.updatePart({
- id: Identifier.ascending("part"),
- messageID: input.assistantMessage.id,
- sessionID: input.sessionID,
- snapshot,
- type: "step-start",
- })
- break
- case "finish-step":
- const usage = Session.getUsage({
- model: input.model,
- usage: value.usage,
- metadata: value.providerMetadata,
- })
- input.assistantMessage.finish = value.finishReason
- input.assistantMessage.cost += usage.cost
- input.assistantMessage.tokens = usage.tokens
- await Session.updatePart({
- id: Identifier.ascending("part"),
- reason: value.finishReason,
- snapshot: await Snapshot.track(),
- messageID: input.assistantMessage.id,
- sessionID: input.assistantMessage.sessionID,
- type: "step-finish",
- tokens: usage.tokens,
- cost: usage.cost,
- })
- await Session.updateMessage(input.assistantMessage)
- if (snapshot) {
- const patch = await Snapshot.patch(snapshot)
- if (patch.files.length) {
- await Session.updatePart({
- id: Identifier.ascending("part"),
- messageID: input.assistantMessage.id,
- sessionID: input.sessionID,
- type: "patch",
- hash: patch.hash,
- files: patch.files,
- })
- }
- snapshot = undefined
- }
- SessionSummary.summarize({
- sessionID: input.sessionID,
- messageID: input.assistantMessage.parentID,
- })
- break
- case "text-start":
- currentText = {
- id: Identifier.ascending("part"),
- messageID: input.assistantMessage.id,
- sessionID: input.assistantMessage.sessionID,
- type: "text",
- text: "",
- time: {
- start: Date.now(),
- },
- metadata: value.providerMetadata,
- }
- break
- case "text-delta":
- if (currentText) {
- currentText.text += value.text
- if (value.providerMetadata) currentText.metadata = value.providerMetadata
- if (currentText.text)
- await Session.updatePart({
- part: currentText,
- delta: value.text,
- })
- }
- break
- case "text-end":
- if (currentText) {
- currentText.text = currentText.text.trimEnd()
- currentText.time = {
- start: Date.now(),
- end: Date.now(),
- }
- if (value.providerMetadata) currentText.metadata = value.providerMetadata
- await Session.updatePart(currentText)
- }
- currentText = undefined
- break
- case "finish":
- break
- default:
- log.info("unhandled", {
- ...value,
- })
- continue
- }
- }
- } catch (e) {
- log.error("process", {
- error: e,
- })
- const error = MessageV2.fromError(e, { providerID: input.providerID })
- if ((error?.name === "APIError" && error.data.isRetryable) || error.data.message.includes("Overloaded")) {
- attempt++
- const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined)
- SessionStatus.set(input.sessionID, {
- type: "retry",
- attempt,
- message: error.data.message.includes("Overloaded") ? "Provider is overloaded" : error.data.message,
- next: Date.now() + delay,
- })
- await SessionRetry.sleep(delay, input.abort).catch(() => {})
- continue
- }
- input.assistantMessage.error = error
- Bus.publish(Session.Event.Error, {
- sessionID: input.assistantMessage.sessionID,
- error: input.assistantMessage.error,
- })
- }
- const p = await MessageV2.parts(input.assistantMessage.id)
- for (const part of p) {
- if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") {
- await Session.updatePart({
- ...part,
- state: {
- ...part.state,
- status: "error",
- error: "Tool execution aborted",
- time: {
- start: Date.now(),
- end: Date.now(),
- },
- },
- })
- }
- }
- input.assistantMessage.time.completed = Date.now()
- await Session.updateMessage(input.assistantMessage)
- if (blocked) return "stop"
- if (input.assistantMessage.error) return "stop"
- return "continue"
- }
- },
- }
- return result
- }
- }
|