processor.ts 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617
  1. import { Cause, Deferred, Effect, Layer, Context, Scope } from "effect"
  2. import * as Stream from "effect/Stream"
  3. import { Agent } from "@/agent/agent"
  4. import { Bus } from "@/bus"
  5. import { Config } from "@/config/config"
  6. import { Permission } from "@/permission"
  7. import { Plugin } from "@/plugin"
  8. import { Snapshot } from "@/snapshot"
  9. import { EffectLogger } from "@/effect/logger"
  10. import { Session } from "."
  11. import { LLM } from "./llm"
  12. import { MessageV2 } from "./message-v2"
  13. import { isOverflow } from "./overflow"
  14. import { PartID } from "./schema"
  15. import type { SessionID } from "./schema"
  16. import { SessionRetry } from "./retry"
  17. import { SessionStatus } from "./status"
  18. import { SessionSummary } from "./summary"
  19. import type { Provider } from "@/provider/provider"
  20. import { Question } from "@/question"
  21. import { errorMessage } from "@/util/error"
  22. import { isRecord } from "@/util/record"
  23. export namespace SessionProcessor {
  24. const DOOM_LOOP_THRESHOLD = 3
  25. const log = EffectLogger.create({ service: "session.processor" })
  26. export type Result = "compact" | "stop" | "continue"
  27. export type Event = LLM.Event
  28. export interface Handle {
  29. readonly message: MessageV2.Assistant
  30. readonly updateToolCall: (
  31. toolCallID: string,
  32. update: (part: MessageV2.ToolPart) => MessageV2.ToolPart,
  33. ) => Effect.Effect<MessageV2.ToolPart | undefined>
  34. readonly completeToolCall: (
  35. toolCallID: string,
  36. output: {
  37. title: string
  38. metadata: Record<string, any>
  39. output: string
  40. attachments?: MessageV2.FilePart[]
  41. },
  42. ) => Effect.Effect<void>
  43. readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
  44. }
  45. type Input = {
  46. assistantMessage: MessageV2.Assistant
  47. sessionID: SessionID
  48. model: Provider.Model
  49. }
  50. export interface Interface {
  51. readonly create: (input: Input) => Effect.Effect<Handle>
  52. }
  53. type ToolCall = {
  54. partID: MessageV2.ToolPart["id"]
  55. messageID: MessageV2.ToolPart["messageID"]
  56. sessionID: MessageV2.ToolPart["sessionID"]
  57. done: Deferred.Deferred<void>
  58. }
  59. interface ProcessorContext extends Input {
  60. toolcalls: Record<string, ToolCall>
  61. shouldBreak: boolean
  62. snapshot: string | undefined
  63. blocked: boolean
  64. needsCompaction: boolean
  65. currentText: MessageV2.TextPart | undefined
  66. reasoningMap: Record<string, MessageV2.ReasoningPart>
  67. }
  68. type StreamEvent = Event
  69. export class Service extends Context.Service<Service, Interface>()("@opencode/SessionProcessor") {}
  70. export const layer: Layer.Layer<
  71. Service,
  72. never,
  73. | Session.Service
  74. | Config.Service
  75. | Bus.Service
  76. | Snapshot.Service
  77. | Agent.Service
  78. | LLM.Service
  79. | Permission.Service
  80. | Plugin.Service
  81. | SessionSummary.Service
  82. | SessionStatus.Service
  83. > = Layer.effect(
  84. Service,
  85. Effect.gen(function* () {
  86. const session = yield* Session.Service
  87. const config = yield* Config.Service
  88. const bus = yield* Bus.Service
  89. const snapshot = yield* Snapshot.Service
  90. const agents = yield* Agent.Service
  91. const llm = yield* LLM.Service
  92. const permission = yield* Permission.Service
  93. const plugin = yield* Plugin.Service
  94. const summary = yield* SessionSummary.Service
  95. const scope = yield* Scope.Scope
  96. const status = yield* SessionStatus.Service
  97. const create = Effect.fn("SessionProcessor.create")(function* (input: Input) {
  98. // Pre-capture snapshot before the LLM stream starts. The AI SDK
  99. // may execute tools internally before emitting start-step events,
  100. // so capturing inside the event handler can be too late.
  101. const initialSnapshot = yield* snapshot.track()
  102. const ctx: ProcessorContext = {
  103. assistantMessage: input.assistantMessage,
  104. sessionID: input.sessionID,
  105. model: input.model,
  106. toolcalls: {},
  107. shouldBreak: false,
  108. snapshot: initialSnapshot,
  109. blocked: false,
  110. needsCompaction: false,
  111. currentText: undefined,
  112. reasoningMap: {},
  113. }
  114. let aborted = false
  115. const slog = log.with({ sessionID: input.sessionID, messageID: input.assistantMessage.id })
  116. const parse = (e: unknown) =>
  117. MessageV2.fromError(e, {
  118. providerID: input.model.providerID,
  119. aborted,
  120. })
  121. const settleToolCall = Effect.fn("SessionProcessor.settleToolCall")(function* (toolCallID: string) {
  122. const done = ctx.toolcalls[toolCallID]?.done
  123. delete ctx.toolcalls[toolCallID]
  124. if (done) yield* Deferred.succeed(done, undefined).pipe(Effect.ignore)
  125. })
  126. const readToolCall = Effect.fn("SessionProcessor.readToolCall")(function* (toolCallID: string) {
  127. const call = ctx.toolcalls[toolCallID]
  128. if (!call) return
  129. const part = yield* session.getPart({
  130. partID: call.partID,
  131. messageID: call.messageID,
  132. sessionID: call.sessionID,
  133. })
  134. if (!part || part.type !== "tool") {
  135. delete ctx.toolcalls[toolCallID]
  136. return
  137. }
  138. return { call, part }
  139. })
  140. const updateToolCall = Effect.fn("SessionProcessor.updateToolCall")(function* (
  141. toolCallID: string,
  142. update: (part: MessageV2.ToolPart) => MessageV2.ToolPart,
  143. ) {
  144. const match = yield* readToolCall(toolCallID)
  145. if (!match) return
  146. const part = yield* session.updatePart(update(match.part))
  147. ctx.toolcalls[toolCallID] = {
  148. ...match.call,
  149. partID: part.id,
  150. messageID: part.messageID,
  151. sessionID: part.sessionID,
  152. }
  153. return part
  154. })
  155. const completeToolCall = Effect.fn("SessionProcessor.completeToolCall")(function* (
  156. toolCallID: string,
  157. output: {
  158. title: string
  159. metadata: Record<string, any>
  160. output: string
  161. attachments?: MessageV2.FilePart[]
  162. },
  163. ) {
  164. const match = yield* readToolCall(toolCallID)
  165. if (!match || match.part.state.status !== "running") return
  166. yield* session.updatePart({
  167. ...match.part,
  168. state: {
  169. status: "completed",
  170. input: match.part.state.input,
  171. output: output.output,
  172. metadata: output.metadata,
  173. title: output.title,
  174. time: { start: match.part.state.time.start, end: Date.now() },
  175. attachments: output.attachments,
  176. },
  177. })
  178. yield* settleToolCall(toolCallID)
  179. })
  180. const failToolCall = Effect.fn("SessionProcessor.failToolCall")(function* (toolCallID: string, error: unknown) {
  181. const match = yield* readToolCall(toolCallID)
  182. if (!match || match.part.state.status !== "running") return false
  183. yield* session.updatePart({
  184. ...match.part,
  185. state: {
  186. status: "error",
  187. input: match.part.state.input,
  188. error: errorMessage(error),
  189. time: { start: match.part.state.time.start, end: Date.now() },
  190. },
  191. })
  192. if (error instanceof Permission.RejectedError || error instanceof Question.RejectedError) {
  193. ctx.blocked = ctx.shouldBreak
  194. }
  195. yield* settleToolCall(toolCallID)
  196. return true
  197. })
  198. const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) {
  199. switch (value.type) {
  200. case "start":
  201. yield* status.set(ctx.sessionID, { type: "busy" })
  202. return
  203. case "reasoning-start":
  204. if (value.id in ctx.reasoningMap) return
  205. ctx.reasoningMap[value.id] = {
  206. id: PartID.ascending(),
  207. messageID: ctx.assistantMessage.id,
  208. sessionID: ctx.assistantMessage.sessionID,
  209. type: "reasoning",
  210. text: "",
  211. time: { start: Date.now() },
  212. metadata: value.providerMetadata,
  213. }
  214. yield* session.updatePart(ctx.reasoningMap[value.id])
  215. return
  216. case "reasoning-delta":
  217. if (!(value.id in ctx.reasoningMap)) return
  218. ctx.reasoningMap[value.id].text += value.text
  219. if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
  220. yield* session.updatePartDelta({
  221. sessionID: ctx.reasoningMap[value.id].sessionID,
  222. messageID: ctx.reasoningMap[value.id].messageID,
  223. partID: ctx.reasoningMap[value.id].id,
  224. field: "text",
  225. delta: value.text,
  226. })
  227. return
  228. case "reasoning-end":
  229. if (!(value.id in ctx.reasoningMap)) return
  230. ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text
  231. ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
  232. if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
  233. yield* session.updatePart(ctx.reasoningMap[value.id])
  234. delete ctx.reasoningMap[value.id]
  235. return
  236. case "tool-input-start":
  237. if (ctx.assistantMessage.summary) {
  238. throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
  239. }
  240. const part = yield* session.updatePart({
  241. id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(),
  242. messageID: ctx.assistantMessage.id,
  243. sessionID: ctx.assistantMessage.sessionID,
  244. type: "tool",
  245. tool: value.toolName,
  246. callID: value.id,
  247. state: { status: "pending", input: {}, raw: "" },
  248. metadata: value.providerExecuted ? { providerExecuted: true } : undefined,
  249. } satisfies MessageV2.ToolPart)
  250. ctx.toolcalls[value.id] = {
  251. done: yield* Deferred.make<void>(),
  252. partID: part.id,
  253. messageID: part.messageID,
  254. sessionID: part.sessionID,
  255. }
  256. return
  257. case "tool-input-delta":
  258. return
  259. case "tool-input-end":
  260. return
  261. case "tool-call": {
  262. if (ctx.assistantMessage.summary) {
  263. throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
  264. }
  265. yield* updateToolCall(value.toolCallId, (match) => ({
  266. ...match,
  267. tool: value.toolName,
  268. state: {
  269. ...match.state,
  270. status: "running",
  271. input: value.input,
  272. time: { start: Date.now() },
  273. },
  274. metadata: match.metadata?.providerExecuted
  275. ? { ...value.providerMetadata, providerExecuted: true }
  276. : value.providerMetadata,
  277. }))
  278. const parts = MessageV2.parts(ctx.assistantMessage.id)
  279. const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
  280. if (
  281. recentParts.length !== DOOM_LOOP_THRESHOLD ||
  282. !recentParts.every(
  283. (part) =>
  284. part.type === "tool" &&
  285. part.tool === value.toolName &&
  286. part.state.status !== "pending" &&
  287. JSON.stringify(part.state.input) === JSON.stringify(value.input),
  288. )
  289. ) {
  290. return
  291. }
  292. const agent = yield* agents.get(ctx.assistantMessage.agent)
  293. yield* permission.ask({
  294. permission: "doom_loop",
  295. patterns: [value.toolName],
  296. sessionID: ctx.assistantMessage.sessionID,
  297. metadata: { tool: value.toolName, input: value.input },
  298. always: [value.toolName],
  299. ruleset: agent.permission,
  300. })
  301. return
  302. }
  303. case "tool-result": {
  304. yield* completeToolCall(value.toolCallId, value.output)
  305. return
  306. }
  307. case "tool-error": {
  308. yield* failToolCall(value.toolCallId, value.error)
  309. return
  310. }
  311. case "error":
  312. throw value.error
  313. case "start-step":
  314. if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track()
  315. yield* session.updatePart({
  316. id: PartID.ascending(),
  317. messageID: ctx.assistantMessage.id,
  318. sessionID: ctx.sessionID,
  319. snapshot: ctx.snapshot,
  320. type: "step-start",
  321. })
  322. return
  323. case "finish-step": {
  324. const usage = Session.getUsage({
  325. model: ctx.model,
  326. usage: value.usage,
  327. metadata: value.providerMetadata,
  328. })
  329. ctx.assistantMessage.finish = value.finishReason
  330. ctx.assistantMessage.cost += usage.cost
  331. ctx.assistantMessage.tokens = usage.tokens
  332. yield* session.updatePart({
  333. id: PartID.ascending(),
  334. reason: value.finishReason,
  335. snapshot: yield* snapshot.track(),
  336. messageID: ctx.assistantMessage.id,
  337. sessionID: ctx.assistantMessage.sessionID,
  338. type: "step-finish",
  339. tokens: usage.tokens,
  340. cost: usage.cost,
  341. })
  342. yield* session.updateMessage(ctx.assistantMessage)
  343. if (ctx.snapshot) {
  344. const patch = yield* snapshot.patch(ctx.snapshot)
  345. if (patch.files.length) {
  346. yield* session.updatePart({
  347. id: PartID.ascending(),
  348. messageID: ctx.assistantMessage.id,
  349. sessionID: ctx.sessionID,
  350. type: "patch",
  351. hash: patch.hash,
  352. files: patch.files,
  353. })
  354. }
  355. ctx.snapshot = undefined
  356. }
  357. yield* summary
  358. .summarize({
  359. sessionID: ctx.sessionID,
  360. messageID: ctx.assistantMessage.parentID,
  361. })
  362. .pipe(Effect.ignore, Effect.forkIn(scope))
  363. if (
  364. !ctx.assistantMessage.summary &&
  365. isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model })
  366. ) {
  367. ctx.needsCompaction = true
  368. }
  369. return
  370. }
  371. case "text-start":
  372. ctx.currentText = {
  373. id: PartID.ascending(),
  374. messageID: ctx.assistantMessage.id,
  375. sessionID: ctx.assistantMessage.sessionID,
  376. type: "text",
  377. text: "",
  378. time: { start: Date.now() },
  379. metadata: value.providerMetadata,
  380. }
  381. yield* session.updatePart(ctx.currentText)
  382. return
  383. case "text-delta":
  384. if (!ctx.currentText) return
  385. ctx.currentText.text += value.text
  386. if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
  387. yield* session.updatePartDelta({
  388. sessionID: ctx.currentText.sessionID,
  389. messageID: ctx.currentText.messageID,
  390. partID: ctx.currentText.id,
  391. field: "text",
  392. delta: value.text,
  393. })
  394. return
  395. case "text-end":
  396. if (!ctx.currentText) return
  397. ctx.currentText.text = ctx.currentText.text
  398. ctx.currentText.text = (yield* plugin.trigger(
  399. "experimental.text.complete",
  400. {
  401. sessionID: ctx.sessionID,
  402. messageID: ctx.assistantMessage.id,
  403. partID: ctx.currentText.id,
  404. },
  405. { text: ctx.currentText.text },
  406. )).text
  407. {
  408. const end = Date.now()
  409. ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
  410. }
  411. if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
  412. yield* session.updatePart(ctx.currentText)
  413. ctx.currentText = undefined
  414. return
  415. case "finish":
  416. return
  417. default:
  418. yield* slog.info("unhandled", { event: value.type, value })
  419. return
  420. }
  421. })
  422. const cleanup = Effect.fn("SessionProcessor.cleanup")(function* () {
  423. if (ctx.snapshot) {
  424. const patch = yield* snapshot.patch(ctx.snapshot)
  425. if (patch.files.length) {
  426. yield* session.updatePart({
  427. id: PartID.ascending(),
  428. messageID: ctx.assistantMessage.id,
  429. sessionID: ctx.sessionID,
  430. type: "patch",
  431. hash: patch.hash,
  432. files: patch.files,
  433. })
  434. }
  435. ctx.snapshot = undefined
  436. }
  437. if (ctx.currentText) {
  438. const end = Date.now()
  439. ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
  440. yield* session.updatePart(ctx.currentText)
  441. ctx.currentText = undefined
  442. }
  443. for (const part of Object.values(ctx.reasoningMap)) {
  444. const end = Date.now()
  445. yield* session.updatePart({
  446. ...part,
  447. time: { start: part.time.start ?? end, end },
  448. })
  449. }
  450. ctx.reasoningMap = {}
  451. yield* Effect.forEach(
  452. Object.values(ctx.toolcalls),
  453. (call) => Deferred.await(call.done).pipe(Effect.timeout("250 millis"), Effect.ignore),
  454. { concurrency: "unbounded" },
  455. )
  456. for (const toolCallID of Object.keys(ctx.toolcalls)) {
  457. const match = yield* readToolCall(toolCallID)
  458. if (!match) continue
  459. const part = match.part
  460. const end = Date.now()
  461. const metadata = "metadata" in part.state && isRecord(part.state.metadata) ? part.state.metadata : {}
  462. yield* session.updatePart({
  463. ...part,
  464. state: {
  465. ...part.state,
  466. status: "error",
  467. error: "Tool execution aborted",
  468. metadata: { ...metadata, interrupted: true },
  469. time: { start: "time" in part.state ? part.state.time.start : end, end },
  470. },
  471. })
  472. }
  473. ctx.toolcalls = {}
  474. ctx.assistantMessage.time.completed = Date.now()
  475. yield* session.updateMessage(ctx.assistantMessage)
  476. })
  477. const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) {
  478. yield* slog.error("process", { error: errorMessage(e), stack: e instanceof Error ? e.stack : undefined })
  479. const error = parse(e)
  480. if (MessageV2.ContextOverflowError.isInstance(error)) {
  481. ctx.needsCompaction = true
  482. yield* bus.publish(Session.Event.Error, { sessionID: ctx.sessionID, error })
  483. return
  484. }
  485. ctx.assistantMessage.error = error
  486. yield* bus.publish(Session.Event.Error, {
  487. sessionID: ctx.assistantMessage.sessionID,
  488. error: ctx.assistantMessage.error,
  489. })
  490. yield* status.set(ctx.sessionID, { type: "idle" })
  491. })
  492. const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
  493. yield* slog.info("process")
  494. ctx.needsCompaction = false
  495. ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
  496. return yield* Effect.gen(function* () {
  497. yield* Effect.gen(function* () {
  498. ctx.currentText = undefined
  499. ctx.reasoningMap = {}
  500. const stream = llm.stream(streamInput)
  501. yield* stream.pipe(
  502. Stream.tap((event) => handleEvent(event)),
  503. Stream.takeUntil(() => ctx.needsCompaction),
  504. Stream.runDrain,
  505. )
  506. }).pipe(
  507. Effect.onInterrupt(() =>
  508. Effect.gen(function* () {
  509. aborted = true
  510. if (!ctx.assistantMessage.error) {
  511. yield* halt(new DOMException("Aborted", "AbortError"))
  512. }
  513. }),
  514. ),
  515. Effect.catchCauseIf(
  516. (cause) => !Cause.hasInterruptsOnly(cause),
  517. (cause) => Effect.fail(Cause.squash(cause)),
  518. ),
  519. Effect.retry(
  520. SessionRetry.policy({
  521. parse,
  522. set: (info) =>
  523. status.set(ctx.sessionID, {
  524. type: "retry",
  525. attempt: info.attempt,
  526. message: info.message,
  527. next: info.next,
  528. }),
  529. }),
  530. ),
  531. Effect.catch(halt),
  532. Effect.ensuring(cleanup()),
  533. )
  534. if (ctx.needsCompaction) return "compact"
  535. if (ctx.blocked || ctx.assistantMessage.error) return "stop"
  536. return "continue"
  537. })
  538. })
  539. return {
  540. get message() {
  541. return ctx.assistantMessage
  542. },
  543. updateToolCall,
  544. completeToolCall,
  545. process,
  546. } satisfies Handle
  547. })
  548. return Service.of({ create })
  549. }),
  550. )
  551. export const defaultLayer = Layer.suspend(() =>
  552. layer.pipe(
  553. Layer.provide(Session.defaultLayer),
  554. Layer.provide(Snapshot.defaultLayer),
  555. Layer.provide(Agent.defaultLayer),
  556. Layer.provide(LLM.defaultLayer),
  557. Layer.provide(Permission.defaultLayer),
  558. Layer.provide(Plugin.defaultLayer),
  559. Layer.provide(SessionSummary.defaultLayer),
  560. Layer.provide(SessionStatus.defaultLayer),
  561. Layer.provide(Bus.layer),
  562. Layer.provide(Config.defaultLayer),
  563. ),
  564. )
  565. }