| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617 |
- import { Cause, Deferred, Effect, Layer, Context, Scope } from "effect"
- import * as Stream from "effect/Stream"
- import { Agent } from "@/agent/agent"
- import { Bus } from "@/bus"
- import { Config } from "@/config/config"
- import { Permission } from "@/permission"
- import { Plugin } from "@/plugin"
- import { Snapshot } from "@/snapshot"
- import { EffectLogger } from "@/effect/logger"
- import { Session } from "."
- import { LLM } from "./llm"
- import { MessageV2 } from "./message-v2"
- import { isOverflow } from "./overflow"
- import { PartID } from "./schema"
- import type { SessionID } from "./schema"
- import { SessionRetry } from "./retry"
- import { SessionStatus } from "./status"
- import { SessionSummary } from "./summary"
- import type { Provider } from "@/provider/provider"
- import { Question } from "@/question"
- import { errorMessage } from "@/util/error"
- import { isRecord } from "@/util/record"
- export namespace SessionProcessor {
- const DOOM_LOOP_THRESHOLD = 3
- const log = EffectLogger.create({ service: "session.processor" })
- export type Result = "compact" | "stop" | "continue"
- export type Event = LLM.Event
- export interface Handle {
- readonly message: MessageV2.Assistant
- readonly updateToolCall: (
- toolCallID: string,
- update: (part: MessageV2.ToolPart) => MessageV2.ToolPart,
- ) => Effect.Effect<MessageV2.ToolPart | undefined>
- readonly completeToolCall: (
- toolCallID: string,
- output: {
- title: string
- metadata: Record<string, any>
- output: string
- attachments?: MessageV2.FilePart[]
- },
- ) => Effect.Effect<void>
- readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
- }
- type Input = {
- assistantMessage: MessageV2.Assistant
- sessionID: SessionID
- model: Provider.Model
- }
- export interface Interface {
- readonly create: (input: Input) => Effect.Effect<Handle>
- }
- type ToolCall = {
- partID: MessageV2.ToolPart["id"]
- messageID: MessageV2.ToolPart["messageID"]
- sessionID: MessageV2.ToolPart["sessionID"]
- done: Deferred.Deferred<void>
- }
- interface ProcessorContext extends Input {
- toolcalls: Record<string, ToolCall>
- shouldBreak: boolean
- snapshot: string | undefined
- blocked: boolean
- needsCompaction: boolean
- currentText: MessageV2.TextPart | undefined
- reasoningMap: Record<string, MessageV2.ReasoningPart>
- }
- type StreamEvent = Event
- export class Service extends Context.Service<Service, Interface>()("@opencode/SessionProcessor") {}
- export const layer: Layer.Layer<
- Service,
- never,
- | Session.Service
- | Config.Service
- | Bus.Service
- | Snapshot.Service
- | Agent.Service
- | LLM.Service
- | Permission.Service
- | Plugin.Service
- | SessionSummary.Service
- | SessionStatus.Service
- > = Layer.effect(
- Service,
- Effect.gen(function* () {
- const session = yield* Session.Service
- const config = yield* Config.Service
- const bus = yield* Bus.Service
- const snapshot = yield* Snapshot.Service
- const agents = yield* Agent.Service
- const llm = yield* LLM.Service
- const permission = yield* Permission.Service
- const plugin = yield* Plugin.Service
- const summary = yield* SessionSummary.Service
- const scope = yield* Scope.Scope
- const status = yield* SessionStatus.Service
- const create = Effect.fn("SessionProcessor.create")(function* (input: Input) {
- // Pre-capture snapshot before the LLM stream starts. The AI SDK
- // may execute tools internally before emitting start-step events,
- // so capturing inside the event handler can be too late.
- const initialSnapshot = yield* snapshot.track()
- const ctx: ProcessorContext = {
- assistantMessage: input.assistantMessage,
- sessionID: input.sessionID,
- model: input.model,
- toolcalls: {},
- shouldBreak: false,
- snapshot: initialSnapshot,
- blocked: false,
- needsCompaction: false,
- currentText: undefined,
- reasoningMap: {},
- }
- let aborted = false
- const slog = log.with({ sessionID: input.sessionID, messageID: input.assistantMessage.id })
- const parse = (e: unknown) =>
- MessageV2.fromError(e, {
- providerID: input.model.providerID,
- aborted,
- })
- const settleToolCall = Effect.fn("SessionProcessor.settleToolCall")(function* (toolCallID: string) {
- const done = ctx.toolcalls[toolCallID]?.done
- delete ctx.toolcalls[toolCallID]
- if (done) yield* Deferred.succeed(done, undefined).pipe(Effect.ignore)
- })
- const readToolCall = Effect.fn("SessionProcessor.readToolCall")(function* (toolCallID: string) {
- const call = ctx.toolcalls[toolCallID]
- if (!call) return
- const part = yield* session.getPart({
- partID: call.partID,
- messageID: call.messageID,
- sessionID: call.sessionID,
- })
- if (!part || part.type !== "tool") {
- delete ctx.toolcalls[toolCallID]
- return
- }
- return { call, part }
- })
- const updateToolCall = Effect.fn("SessionProcessor.updateToolCall")(function* (
- toolCallID: string,
- update: (part: MessageV2.ToolPart) => MessageV2.ToolPart,
- ) {
- const match = yield* readToolCall(toolCallID)
- if (!match) return
- const part = yield* session.updatePart(update(match.part))
- ctx.toolcalls[toolCallID] = {
- ...match.call,
- partID: part.id,
- messageID: part.messageID,
- sessionID: part.sessionID,
- }
- return part
- })
- const completeToolCall = Effect.fn("SessionProcessor.completeToolCall")(function* (
- toolCallID: string,
- output: {
- title: string
- metadata: Record<string, any>
- output: string
- attachments?: MessageV2.FilePart[]
- },
- ) {
- const match = yield* readToolCall(toolCallID)
- if (!match || match.part.state.status !== "running") return
- yield* session.updatePart({
- ...match.part,
- state: {
- status: "completed",
- input: match.part.state.input,
- output: output.output,
- metadata: output.metadata,
- title: output.title,
- time: { start: match.part.state.time.start, end: Date.now() },
- attachments: output.attachments,
- },
- })
- yield* settleToolCall(toolCallID)
- })
- const failToolCall = Effect.fn("SessionProcessor.failToolCall")(function* (toolCallID: string, error: unknown) {
- const match = yield* readToolCall(toolCallID)
- if (!match || match.part.state.status !== "running") return false
- yield* session.updatePart({
- ...match.part,
- state: {
- status: "error",
- input: match.part.state.input,
- error: errorMessage(error),
- time: { start: match.part.state.time.start, end: Date.now() },
- },
- })
- if (error instanceof Permission.RejectedError || error instanceof Question.RejectedError) {
- ctx.blocked = ctx.shouldBreak
- }
- yield* settleToolCall(toolCallID)
- return true
- })
- const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) {
- switch (value.type) {
- case "start":
- yield* status.set(ctx.sessionID, { type: "busy" })
- return
- case "reasoning-start":
- if (value.id in ctx.reasoningMap) return
- ctx.reasoningMap[value.id] = {
- id: PartID.ascending(),
- messageID: ctx.assistantMessage.id,
- sessionID: ctx.assistantMessage.sessionID,
- type: "reasoning",
- text: "",
- time: { start: Date.now() },
- metadata: value.providerMetadata,
- }
- yield* session.updatePart(ctx.reasoningMap[value.id])
- return
- case "reasoning-delta":
- if (!(value.id in ctx.reasoningMap)) return
- ctx.reasoningMap[value.id].text += value.text
- if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
- yield* session.updatePartDelta({
- sessionID: ctx.reasoningMap[value.id].sessionID,
- messageID: ctx.reasoningMap[value.id].messageID,
- partID: ctx.reasoningMap[value.id].id,
- field: "text",
- delta: value.text,
- })
- return
- case "reasoning-end":
- if (!(value.id in ctx.reasoningMap)) return
- ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text
- ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
- if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
- yield* session.updatePart(ctx.reasoningMap[value.id])
- delete ctx.reasoningMap[value.id]
- return
- case "tool-input-start":
- if (ctx.assistantMessage.summary) {
- throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
- }
- const part = yield* session.updatePart({
- id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(),
- messageID: ctx.assistantMessage.id,
- sessionID: ctx.assistantMessage.sessionID,
- type: "tool",
- tool: value.toolName,
- callID: value.id,
- state: { status: "pending", input: {}, raw: "" },
- metadata: value.providerExecuted ? { providerExecuted: true } : undefined,
- } satisfies MessageV2.ToolPart)
- ctx.toolcalls[value.id] = {
- done: yield* Deferred.make<void>(),
- partID: part.id,
- messageID: part.messageID,
- sessionID: part.sessionID,
- }
- return
- case "tool-input-delta":
- return
- case "tool-input-end":
- return
- case "tool-call": {
- if (ctx.assistantMessage.summary) {
- throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
- }
- yield* updateToolCall(value.toolCallId, (match) => ({
- ...match,
- tool: value.toolName,
- state: {
- ...match.state,
- status: "running",
- input: value.input,
- time: { start: Date.now() },
- },
- metadata: match.metadata?.providerExecuted
- ? { ...value.providerMetadata, providerExecuted: true }
- : value.providerMetadata,
- }))
- const parts = MessageV2.parts(ctx.assistantMessage.id)
- const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
- if (
- recentParts.length !== DOOM_LOOP_THRESHOLD ||
- !recentParts.every(
- (part) =>
- part.type === "tool" &&
- part.tool === value.toolName &&
- part.state.status !== "pending" &&
- JSON.stringify(part.state.input) === JSON.stringify(value.input),
- )
- ) {
- return
- }
- const agent = yield* agents.get(ctx.assistantMessage.agent)
- yield* permission.ask({
- permission: "doom_loop",
- patterns: [value.toolName],
- sessionID: ctx.assistantMessage.sessionID,
- metadata: { tool: value.toolName, input: value.input },
- always: [value.toolName],
- ruleset: agent.permission,
- })
- return
- }
- case "tool-result": {
- yield* completeToolCall(value.toolCallId, value.output)
- return
- }
- case "tool-error": {
- yield* failToolCall(value.toolCallId, value.error)
- return
- }
- case "error":
- throw value.error
- case "start-step":
- if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track()
- yield* session.updatePart({
- id: PartID.ascending(),
- messageID: ctx.assistantMessage.id,
- sessionID: ctx.sessionID,
- snapshot: ctx.snapshot,
- type: "step-start",
- })
- return
- case "finish-step": {
- const usage = Session.getUsage({
- model: ctx.model,
- usage: value.usage,
- metadata: value.providerMetadata,
- })
- ctx.assistantMessage.finish = value.finishReason
- ctx.assistantMessage.cost += usage.cost
- ctx.assistantMessage.tokens = usage.tokens
- yield* session.updatePart({
- id: PartID.ascending(),
- reason: value.finishReason,
- snapshot: yield* snapshot.track(),
- messageID: ctx.assistantMessage.id,
- sessionID: ctx.assistantMessage.sessionID,
- type: "step-finish",
- tokens: usage.tokens,
- cost: usage.cost,
- })
- yield* session.updateMessage(ctx.assistantMessage)
- if (ctx.snapshot) {
- const patch = yield* snapshot.patch(ctx.snapshot)
- if (patch.files.length) {
- yield* session.updatePart({
- id: PartID.ascending(),
- messageID: ctx.assistantMessage.id,
- sessionID: ctx.sessionID,
- type: "patch",
- hash: patch.hash,
- files: patch.files,
- })
- }
- ctx.snapshot = undefined
- }
- yield* summary
- .summarize({
- sessionID: ctx.sessionID,
- messageID: ctx.assistantMessage.parentID,
- })
- .pipe(Effect.ignore, Effect.forkIn(scope))
- if (
- !ctx.assistantMessage.summary &&
- isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model })
- ) {
- ctx.needsCompaction = true
- }
- return
- }
- case "text-start":
- ctx.currentText = {
- id: PartID.ascending(),
- messageID: ctx.assistantMessage.id,
- sessionID: ctx.assistantMessage.sessionID,
- type: "text",
- text: "",
- time: { start: Date.now() },
- metadata: value.providerMetadata,
- }
- yield* session.updatePart(ctx.currentText)
- return
- case "text-delta":
- if (!ctx.currentText) return
- ctx.currentText.text += value.text
- if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
- yield* session.updatePartDelta({
- sessionID: ctx.currentText.sessionID,
- messageID: ctx.currentText.messageID,
- partID: ctx.currentText.id,
- field: "text",
- delta: value.text,
- })
- return
- case "text-end":
- if (!ctx.currentText) return
- ctx.currentText.text = ctx.currentText.text
- ctx.currentText.text = (yield* plugin.trigger(
- "experimental.text.complete",
- {
- sessionID: ctx.sessionID,
- messageID: ctx.assistantMessage.id,
- partID: ctx.currentText.id,
- },
- { text: ctx.currentText.text },
- )).text
- {
- const end = Date.now()
- ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
- }
- if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
- yield* session.updatePart(ctx.currentText)
- ctx.currentText = undefined
- return
- case "finish":
- return
- default:
- yield* slog.info("unhandled", { event: value.type, value })
- return
- }
- })
- const cleanup = Effect.fn("SessionProcessor.cleanup")(function* () {
- if (ctx.snapshot) {
- const patch = yield* snapshot.patch(ctx.snapshot)
- if (patch.files.length) {
- yield* session.updatePart({
- id: PartID.ascending(),
- messageID: ctx.assistantMessage.id,
- sessionID: ctx.sessionID,
- type: "patch",
- hash: patch.hash,
- files: patch.files,
- })
- }
- ctx.snapshot = undefined
- }
- if (ctx.currentText) {
- const end = Date.now()
- ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
- yield* session.updatePart(ctx.currentText)
- ctx.currentText = undefined
- }
- for (const part of Object.values(ctx.reasoningMap)) {
- const end = Date.now()
- yield* session.updatePart({
- ...part,
- time: { start: part.time.start ?? end, end },
- })
- }
- ctx.reasoningMap = {}
- yield* Effect.forEach(
- Object.values(ctx.toolcalls),
- (call) => Deferred.await(call.done).pipe(Effect.timeout("250 millis"), Effect.ignore),
- { concurrency: "unbounded" },
- )
- for (const toolCallID of Object.keys(ctx.toolcalls)) {
- const match = yield* readToolCall(toolCallID)
- if (!match) continue
- const part = match.part
- const end = Date.now()
- const metadata = "metadata" in part.state && isRecord(part.state.metadata) ? part.state.metadata : {}
- yield* session.updatePart({
- ...part,
- state: {
- ...part.state,
- status: "error",
- error: "Tool execution aborted",
- metadata: { ...metadata, interrupted: true },
- time: { start: "time" in part.state ? part.state.time.start : end, end },
- },
- })
- }
- ctx.toolcalls = {}
- ctx.assistantMessage.time.completed = Date.now()
- yield* session.updateMessage(ctx.assistantMessage)
- })
- const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) {
- yield* slog.error("process", { error: errorMessage(e), stack: e instanceof Error ? e.stack : undefined })
- const error = parse(e)
- if (MessageV2.ContextOverflowError.isInstance(error)) {
- ctx.needsCompaction = true
- yield* bus.publish(Session.Event.Error, { sessionID: ctx.sessionID, error })
- return
- }
- ctx.assistantMessage.error = error
- yield* bus.publish(Session.Event.Error, {
- sessionID: ctx.assistantMessage.sessionID,
- error: ctx.assistantMessage.error,
- })
- yield* status.set(ctx.sessionID, { type: "idle" })
- })
- const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
- yield* slog.info("process")
- ctx.needsCompaction = false
- ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
- return yield* Effect.gen(function* () {
- yield* Effect.gen(function* () {
- ctx.currentText = undefined
- ctx.reasoningMap = {}
- const stream = llm.stream(streamInput)
- yield* stream.pipe(
- Stream.tap((event) => handleEvent(event)),
- Stream.takeUntil(() => ctx.needsCompaction),
- Stream.runDrain,
- )
- }).pipe(
- Effect.onInterrupt(() =>
- Effect.gen(function* () {
- aborted = true
- if (!ctx.assistantMessage.error) {
- yield* halt(new DOMException("Aborted", "AbortError"))
- }
- }),
- ),
- Effect.catchCauseIf(
- (cause) => !Cause.hasInterruptsOnly(cause),
- (cause) => Effect.fail(Cause.squash(cause)),
- ),
- Effect.retry(
- SessionRetry.policy({
- parse,
- set: (info) =>
- status.set(ctx.sessionID, {
- type: "retry",
- attempt: info.attempt,
- message: info.message,
- next: info.next,
- }),
- }),
- ),
- Effect.catch(halt),
- Effect.ensuring(cleanup()),
- )
- if (ctx.needsCompaction) return "compact"
- if (ctx.blocked || ctx.assistantMessage.error) return "stop"
- return "continue"
- })
- })
- return {
- get message() {
- return ctx.assistantMessage
- },
- updateToolCall,
- completeToolCall,
- process,
- } satisfies Handle
- })
- return Service.of({ create })
- }),
- )
- export const defaultLayer = Layer.suspend(() =>
- layer.pipe(
- Layer.provide(Session.defaultLayer),
- Layer.provide(Snapshot.defaultLayer),
- Layer.provide(Agent.defaultLayer),
- Layer.provide(LLM.defaultLayer),
- Layer.provide(Permission.defaultLayer),
- Layer.provide(Plugin.defaultLayer),
- Layer.provide(SessionSummary.defaultLayer),
- Layer.provide(SessionStatus.defaultLayer),
- Layer.provide(Bus.layer),
- Layer.provide(Config.defaultLayer),
- ),
- )
- }
|