import { BusEvent } from "@/bus/bus-event" import z from "zod" import { NamedError } from "@opencode-ai/util/error" import { APICallError, convertToModelMessages, LoadAPIKeyError, type ModelMessage, type UIMessage } from "ai" import { Identifier } from "../id/id" import { LSP } from "../lsp" import { Snapshot } from "@/snapshot" import { fn } from "@/util/fn" import { Storage } from "@/storage/storage" import { ProviderTransform } from "@/provider/transform" import { STATUS_CODES } from "http" import { iife } from "@/util/iife" import { type SystemError } from "bun" import type { Provider } from "@/provider/provider" export namespace MessageV2 { export const OutputLengthError = NamedError.create("MessageOutputLengthError", z.object({})) export const AbortedError = NamedError.create("MessageAbortedError", z.object({ message: z.string() })) export const AuthError = NamedError.create( "ProviderAuthError", z.object({ providerID: z.string(), message: z.string(), }), ) export const APIError = NamedError.create( "APIError", z.object({ message: z.string(), statusCode: z.number().optional(), isRetryable: z.boolean(), responseHeaders: z.record(z.string(), z.string()).optional(), responseBody: z.string().optional(), metadata: z.record(z.string(), z.string()).optional(), }), ) export type APIError = z.infer const PartBase = z.object({ id: z.string(), sessionID: z.string(), messageID: z.string(), }) export const SnapshotPart = PartBase.extend({ type: z.literal("snapshot"), snapshot: z.string(), }).meta({ ref: "SnapshotPart", }) export type SnapshotPart = z.infer export const PatchPart = PartBase.extend({ type: z.literal("patch"), hash: z.string(), files: z.string().array(), }).meta({ ref: "PatchPart", }) export type PatchPart = z.infer export const TextPart = PartBase.extend({ type: z.literal("text"), text: z.string(), synthetic: z.boolean().optional(), ignored: z.boolean().optional(), time: z .object({ start: z.number(), end: z.number().optional(), }) .optional(), metadata: z.record(z.string(), z.any()).optional(), }).meta({ ref: "TextPart", }) export type TextPart = z.infer export const ReasoningPart = PartBase.extend({ type: z.literal("reasoning"), text: z.string(), metadata: z.record(z.string(), z.any()).optional(), time: z.object({ start: z.number(), end: z.number().optional(), }), }).meta({ ref: "ReasoningPart", }) export type ReasoningPart = z.infer const FilePartSourceBase = z.object({ text: z .object({ value: z.string(), start: z.number().int(), end: z.number().int(), }) .meta({ ref: "FilePartSourceText", }), }) export const FileSource = FilePartSourceBase.extend({ type: z.literal("file"), path: z.string(), }).meta({ ref: "FileSource", }) export const SymbolSource = FilePartSourceBase.extend({ type: z.literal("symbol"), path: z.string(), range: LSP.Range, name: z.string(), kind: z.number().int(), }).meta({ ref: "SymbolSource", }) export const ResourceSource = FilePartSourceBase.extend({ type: z.literal("resource"), clientName: z.string(), uri: z.string(), }).meta({ ref: "ResourceSource", }) export const FilePartSource = z.discriminatedUnion("type", [FileSource, SymbolSource, ResourceSource]).meta({ ref: "FilePartSource", }) export const FilePart = PartBase.extend({ type: z.literal("file"), mime: z.string(), filename: z.string().optional(), url: z.string(), source: FilePartSource.optional(), }).meta({ ref: "FilePart", }) export type FilePart = z.infer export const AgentPart = PartBase.extend({ type: z.literal("agent"), name: z.string(), source: z .object({ value: z.string(), start: z.number().int(), end: z.number().int(), }) .optional(), }).meta({ ref: "AgentPart", }) export type AgentPart = z.infer export const CompactionPart = PartBase.extend({ type: z.literal("compaction"), auto: z.boolean(), }).meta({ ref: "CompactionPart", }) export type CompactionPart = z.infer export const SubtaskPart = PartBase.extend({ type: z.literal("subtask"), prompt: z.string(), description: z.string(), agent: z.string(), model: z .object({ providerID: z.string(), modelID: z.string(), }) .optional(), command: z.string().optional(), }).meta({ ref: "SubtaskPart", }) export type SubtaskPart = z.infer export const RetryPart = PartBase.extend({ type: z.literal("retry"), attempt: z.number(), error: APIError.Schema, time: z.object({ created: z.number(), }), }).meta({ ref: "RetryPart", }) export type RetryPart = z.infer export const StepStartPart = PartBase.extend({ type: z.literal("step-start"), snapshot: z.string().optional(), }).meta({ ref: "StepStartPart", }) export type StepStartPart = z.infer export const StepFinishPart = PartBase.extend({ type: z.literal("step-finish"), reason: z.string(), snapshot: z.string().optional(), cost: z.number(), tokens: z.object({ input: z.number(), output: z.number(), reasoning: z.number(), cache: z.object({ read: z.number(), write: z.number(), }), }), }).meta({ ref: "StepFinishPart", }) export type StepFinishPart = z.infer export const ToolStatePending = z .object({ status: z.literal("pending"), input: z.record(z.string(), z.any()), raw: z.string(), }) .meta({ ref: "ToolStatePending", }) export type ToolStatePending = z.infer export const ToolStateRunning = z .object({ status: z.literal("running"), input: z.record(z.string(), z.any()), title: z.string().optional(), metadata: z.record(z.string(), z.any()).optional(), time: z.object({ start: z.number(), }), }) .meta({ ref: "ToolStateRunning", }) export type ToolStateRunning = z.infer export const ToolStateCompleted = z .object({ status: z.literal("completed"), input: z.record(z.string(), z.any()), output: z.string(), title: z.string(), metadata: z.record(z.string(), z.any()), time: z.object({ start: z.number(), end: z.number(), compacted: z.number().optional(), }), attachments: FilePart.array().optional(), }) .meta({ ref: "ToolStateCompleted", }) export type ToolStateCompleted = z.infer export const ToolStateError = z .object({ status: z.literal("error"), input: z.record(z.string(), z.any()), error: z.string(), metadata: z.record(z.string(), z.any()).optional(), time: z.object({ start: z.number(), end: z.number(), }), }) .meta({ ref: "ToolStateError", }) export type ToolStateError = z.infer export const ToolState = z .discriminatedUnion("status", [ToolStatePending, ToolStateRunning, ToolStateCompleted, ToolStateError]) .meta({ ref: "ToolState", }) export const ToolPart = PartBase.extend({ type: z.literal("tool"), callID: z.string(), tool: z.string(), state: ToolState, metadata: z.record(z.string(), z.any()).optional(), }).meta({ ref: "ToolPart", }) export type ToolPart = z.infer const Base = z.object({ id: z.string(), sessionID: z.string(), }) export const User = Base.extend({ role: z.literal("user"), time: z.object({ created: z.number(), }), summary: z .object({ title: z.string().optional(), body: z.string().optional(), diffs: Snapshot.FileDiff.array(), }) .optional(), agent: z.string(), model: z.object({ providerID: z.string(), modelID: z.string(), }), system: z.string().optional(), tools: z.record(z.string(), z.boolean()).optional(), variant: z.string().optional(), }).meta({ ref: "UserMessage", }) export type User = z.infer export const Part = z .discriminatedUnion("type", [ TextPart, SubtaskPart, ReasoningPart, FilePart, ToolPart, StepStartPart, StepFinishPart, SnapshotPart, PatchPart, AgentPart, RetryPart, CompactionPart, ]) .meta({ ref: "Part", }) export type Part = z.infer export const Assistant = Base.extend({ role: z.literal("assistant"), time: z.object({ created: z.number(), completed: z.number().optional(), }), error: z .discriminatedUnion("name", [ AuthError.Schema, NamedError.Unknown.Schema, OutputLengthError.Schema, AbortedError.Schema, APIError.Schema, ]) .optional(), parentID: z.string(), modelID: z.string(), providerID: z.string(), /** * @deprecated */ mode: z.string(), agent: z.string(), path: z.object({ cwd: z.string(), root: z.string(), }), summary: z.boolean().optional(), cost: z.number(), tokens: z.object({ input: z.number(), output: z.number(), reasoning: z.number(), cache: z.object({ read: z.number(), write: z.number(), }), }), finish: z.string().optional(), }).meta({ ref: "AssistantMessage", }) export type Assistant = z.infer export const Info = z.discriminatedUnion("role", [User, Assistant]).meta({ ref: "Message", }) export type Info = z.infer export const Event = { Updated: BusEvent.define( "message.updated", z.object({ info: Info, }), ), Removed: BusEvent.define( "message.removed", z.object({ sessionID: z.string(), messageID: z.string(), }), ), PartUpdated: BusEvent.define( "message.part.updated", z.object({ part: Part, delta: z.string().optional(), }), ), PartRemoved: BusEvent.define( "message.part.removed", z.object({ sessionID: z.string(), messageID: z.string(), partID: z.string(), }), ), } export const WithParts = z.object({ info: Info, parts: z.array(Part), }) export type WithParts = z.infer export function toModelMessages(input: WithParts[], model: Provider.Model): ModelMessage[] { const result: UIMessage[] = [] const toolNames = new Set() // Track media from tool results that need to be injected as user messages // for providers that don't support media in tool results. // // OpenAI-compatible APIs only support string content in tool results, so we need // to extract media and inject as user messages. Other SDKs (anthropic, google, // bedrock) handle type: "content" with media parts natively. // // Only apply this workaround if the model actually supports image input - // otherwise there's no point extracting images. const supportsMediaInToolResults = (() => { if (model.api.npm === "@ai-sdk/anthropic") return true if (model.api.npm === "@ai-sdk/openai") return true if (model.api.npm === "@ai-sdk/amazon-bedrock") return true if (model.api.npm === "@ai-sdk/google-vertex/anthropic") return true if (model.api.npm === "@ai-sdk/google") { const id = model.api.id.toLowerCase() return id.includes("gemini-3") && !id.includes("gemini-2") } return false })() const toModelOutput = (output: unknown) => { if (typeof output === "string") { return { type: "text", value: output } } if (typeof output === "object") { const outputObject = output as { text: string attachments?: Array<{ mime: string; url: string }> } const attachments = (outputObject.attachments ?? []).filter((attachment) => { return attachment.url.startsWith("data:") && attachment.url.includes(",") }) return { type: "content", value: [ { type: "text", text: outputObject.text }, ...attachments.map((attachment) => ({ type: "media", mediaType: attachment.mime, data: iife(() => { const commaIndex = attachment.url.indexOf(",") return commaIndex === -1 ? attachment.url : attachment.url.slice(commaIndex + 1) }), })), ], } } return { type: "json", value: output as never } } for (const msg of input) { if (msg.parts.length === 0) continue if (msg.info.role === "user") { const userMessage: UIMessage = { id: msg.info.id, role: "user", parts: [], } result.push(userMessage) for (const part of msg.parts) { if (part.type === "text" && !part.ignored) userMessage.parts.push({ type: "text", text: part.text, }) // text/plain and directory files are converted into text parts, ignore them if (part.type === "file" && part.mime !== "text/plain" && part.mime !== "application/x-directory") userMessage.parts.push({ type: "file", url: part.url, mediaType: part.mime, filename: part.filename, }) if (part.type === "compaction") { userMessage.parts.push({ type: "text", text: "What did we do so far?", }) } if (part.type === "subtask") { userMessage.parts.push({ type: "text", text: "The following tool was executed by the user", }) } } } if (msg.info.role === "assistant") { const differentModel = `${model.providerID}/${model.id}` !== `${msg.info.providerID}/${msg.info.modelID}` const media: Array<{ mime: string; url: string }> = [] if ( msg.info.error && !( MessageV2.AbortedError.isInstance(msg.info.error) && msg.parts.some((part) => part.type !== "step-start" && part.type !== "reasoning") ) ) { continue } const assistantMessage: UIMessage = { id: msg.info.id, role: "assistant", parts: [], } for (const part of msg.parts) { if (part.type === "text") assistantMessage.parts.push({ type: "text", text: part.text, ...(differentModel ? {} : { providerMetadata: part.metadata }), }) if (part.type === "step-start") assistantMessage.parts.push({ type: "step-start", }) if (part.type === "tool") { toolNames.add(part.tool) if (part.state.status === "completed") { const outputText = part.state.time.compacted ? "[Old tool result content cleared]" : part.state.output const attachments = part.state.time.compacted ? [] : (part.state.attachments ?? []) // For providers that don't support media in tool results, extract media files // (images, PDFs) to be sent as a separate user message const isMediaAttachment = (a: { mime: string }) => a.mime.startsWith("image/") || a.mime === "application/pdf" const mediaAttachments = attachments.filter(isMediaAttachment) const nonMediaAttachments = attachments.filter((a) => !isMediaAttachment(a)) if (!supportsMediaInToolResults && mediaAttachments.length > 0) { media.push(...mediaAttachments) } const finalAttachments = supportsMediaInToolResults ? attachments : nonMediaAttachments const output = finalAttachments.length > 0 ? { text: outputText, attachments: finalAttachments, } : outputText assistantMessage.parts.push({ type: ("tool-" + part.tool) as `tool-${string}`, state: "output-available", toolCallId: part.callID, input: part.state.input, output, ...(differentModel ? {} : { callProviderMetadata: part.metadata }), }) } if (part.state.status === "error") assistantMessage.parts.push({ type: ("tool-" + part.tool) as `tool-${string}`, state: "output-error", toolCallId: part.callID, input: part.state.input, errorText: part.state.error, ...(differentModel ? {} : { callProviderMetadata: part.metadata }), }) // Handle pending/running tool calls to prevent dangling tool_use blocks // Anthropic/Claude APIs require every tool_use to have a corresponding tool_result if (part.state.status === "pending" || part.state.status === "running") assistantMessage.parts.push({ type: ("tool-" + part.tool) as `tool-${string}`, state: "output-error", toolCallId: part.callID, input: part.state.input, errorText: "[Tool execution was interrupted]", ...(differentModel ? {} : { callProviderMetadata: part.metadata }), }) } if (part.type === "reasoning") { assistantMessage.parts.push({ type: "reasoning", text: part.text, ...(differentModel ? {} : { providerMetadata: part.metadata }), }) } } if (assistantMessage.parts.length > 0) { result.push(assistantMessage) // Inject pending media as a user message for providers that don't support // media (images, PDFs) in tool results if (media.length > 0) { result.push({ id: Identifier.ascending("message"), role: "user", parts: [ { type: "text" as const, text: "Attached image(s) from tool result:", }, ...media.map((attachment) => ({ type: "file" as const, url: attachment.url, mediaType: attachment.mime, })), ], }) } } } } const tools = Object.fromEntries(Array.from(toolNames).map((toolName) => [toolName, { toModelOutput }])) return convertToModelMessages( result.filter((msg) => msg.parts.some((part) => part.type !== "step-start")), { //@ts-expect-error (convertToModelMessages expects a ToolSet but only actually needs tools[name]?.toModelOutput) tools, }, ) } export const stream = fn(Identifier.schema("session"), async function* (sessionID) { const list = await Array.fromAsync(await Storage.list(["message", sessionID])) for (let i = list.length - 1; i >= 0; i--) { yield await get({ sessionID, messageID: list[i][2], }) } }) export const parts = fn(Identifier.schema("message"), async (messageID) => { const result = [] as MessageV2.Part[] for (const item of await Storage.list(["part", messageID])) { const read = await Storage.read(item) result.push(read) } result.sort((a, b) => (a.id > b.id ? 1 : -1)) return result }) export const get = fn( z.object({ sessionID: Identifier.schema("session"), messageID: Identifier.schema("message"), }), async (input): Promise => { return { info: await Storage.read(["message", input.sessionID, input.messageID]), parts: await parts(input.messageID), } }, ) export async function filterCompacted(stream: AsyncIterable) { const result = [] as MessageV2.WithParts[] const completed = new Set() for await (const msg of stream) { result.push(msg) if ( msg.info.role === "user" && completed.has(msg.info.id) && msg.parts.some((part) => part.type === "compaction") ) break if (msg.info.role === "assistant" && msg.info.summary && msg.info.finish) completed.add(msg.info.parentID) } result.reverse() return result } const isOpenAiErrorRetryable = (e: APICallError) => { const status = e.statusCode if (!status) return e.isRetryable // openai sometimes returns 404 for models that are actually available return status === 404 || e.isRetryable } export function fromError(e: unknown, ctx: { providerID: string }) { switch (true) { case e instanceof DOMException && e.name === "AbortError": return new MessageV2.AbortedError( { message: e.message }, { cause: e, }, ).toObject() case MessageV2.OutputLengthError.isInstance(e): return e case LoadAPIKeyError.isInstance(e): return new MessageV2.AuthError( { providerID: ctx.providerID, message: e.message, }, { cause: e }, ).toObject() case (e as SystemError)?.code === "ECONNRESET": return new MessageV2.APIError( { message: "Connection reset by server", isRetryable: true, metadata: { code: (e as SystemError).code ?? "", syscall: (e as SystemError).syscall ?? "", message: (e as SystemError).message ?? "", }, }, { cause: e }, ).toObject() case APICallError.isInstance(e): const message = iife(() => { let msg = e.message if (msg === "") { if (e.responseBody) return e.responseBody if (e.statusCode) { const err = STATUS_CODES[e.statusCode] if (err) return err } return "Unknown error" } const transformed = ProviderTransform.error(ctx.providerID, e) if (transformed !== msg) { return transformed } if (!e.responseBody || (e.statusCode && msg !== STATUS_CODES[e.statusCode])) { return msg } try { const body = JSON.parse(e.responseBody) // try to extract common error message fields const errMsg = body.message || body.error || body.error?.message if (errMsg && typeof errMsg === "string") { return `${msg}: ${errMsg}` } } catch {} return `${msg}: ${e.responseBody}` }).trim() const metadata = e.url ? { url: e.url } : undefined return new MessageV2.APIError( { message, statusCode: e.statusCode, isRetryable: ctx.providerID.startsWith("openai") ? isOpenAiErrorRetryable(e) : e.isRetryable, responseHeaders: e.responseHeaders, responseBody: e.responseBody, metadata, }, { cause: e }, ).toObject() case e instanceof Error: return new NamedError.Unknown({ message: e.toString() }, { cause: e }).toObject() default: return new NamedError.Unknown({ message: JSON.stringify(e) }, { cause: e }) } } }