compaction.ts 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. import { BusEvent } from "@/bus/bus-event"
  2. import { Bus } from "@/bus"
  3. import { wrapLanguageModel, type ModelMessage } from "ai"
  4. import { Session } from "."
  5. import { Identifier } from "../id/id"
  6. import { Instance } from "../project/instance"
  7. import { Provider } from "../provider/provider"
  8. import { MessageV2 } from "./message-v2"
  9. import { SystemPrompt } from "./system"
  10. import z from "zod"
  11. import { SessionPrompt } from "./prompt"
  12. import { Flag } from "../flag/flag"
  13. import { Token } from "../util/token"
  14. import { Config } from "../config/config"
  15. import { Log } from "../util/log"
  16. import { ProviderTransform } from "@/provider/transform"
  17. import { SessionProcessor } from "./processor"
  18. import { fn } from "@/util/fn"
  19. import { mergeDeep, pipe } from "remeda"
  20. export namespace SessionCompaction {
  21. const log = Log.create({ service: "session.compaction" })
  22. export const Event = {
  23. Compacted: BusEvent.define(
  24. "session.compacted",
  25. z.object({
  26. sessionID: z.string(),
  27. }),
  28. ),
  29. }
  30. export function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
  31. if (Flag.OPENCODE_DISABLE_AUTOCOMPACT) return false
  32. const context = input.model.limit.context
  33. if (context === 0) return false
  34. const count = input.tokens.input + input.tokens.cache.read + input.tokens.output
  35. const output = Math.min(input.model.limit.output, SessionPrompt.OUTPUT_TOKEN_MAX) || SessionPrompt.OUTPUT_TOKEN_MAX
  36. const usable = context - output
  37. return count > usable
  38. }
  39. export const PRUNE_MINIMUM = 20_000
  40. export const PRUNE_PROTECT = 40_000
  41. // goes backwards through parts until there are 40_000 tokens worth of tool
  42. // calls. then erases output of previous tool calls. idea is to throw away old
  43. // tool calls that are no longer relevant.
  44. export async function prune(input: { sessionID: string }) {
  45. if (Flag.OPENCODE_DISABLE_PRUNE) return
  46. log.info("pruning")
  47. const msgs = await Session.messages({ sessionID: input.sessionID })
  48. let total = 0
  49. let pruned = 0
  50. const toPrune = []
  51. let turns = 0
  52. loop: for (let msgIndex = msgs.length - 1; msgIndex >= 0; msgIndex--) {
  53. const msg = msgs[msgIndex]
  54. if (msg.info.role === "user") turns++
  55. if (turns < 2) continue
  56. if (msg.info.role === "assistant" && msg.info.summary) break loop
  57. for (let partIndex = msg.parts.length - 1; partIndex >= 0; partIndex--) {
  58. const part = msg.parts[partIndex]
  59. if (part.type === "tool")
  60. if (part.state.status === "completed") {
  61. if (part.state.time.compacted) break loop
  62. const estimate = Token.estimate(part.state.output)
  63. total += estimate
  64. if (total > PRUNE_PROTECT) {
  65. pruned += estimate
  66. toPrune.push(part)
  67. }
  68. }
  69. }
  70. }
  71. log.info("found", { pruned, total })
  72. if (pruned > PRUNE_MINIMUM) {
  73. for (const part of toPrune) {
  74. if (part.state.status === "completed") {
  75. part.state.time.compacted = Date.now()
  76. await Session.updatePart(part)
  77. }
  78. }
  79. log.info("pruned", { count: toPrune.length })
  80. }
  81. }
  82. export async function process(input: {
  83. parentID: string
  84. messages: MessageV2.WithParts[]
  85. sessionID: string
  86. model: {
  87. providerID: string
  88. modelID: string
  89. }
  90. agent: string
  91. abort: AbortSignal
  92. auto: boolean
  93. }) {
  94. const cfg = await Config.get()
  95. const model = await Provider.getModel(input.model.providerID, input.model.modelID)
  96. const language = await Provider.getLanguage(model)
  97. const system = [...SystemPrompt.compaction(model.providerID)]
  98. const msg = (await Session.updateMessage({
  99. id: Identifier.ascending("message"),
  100. role: "assistant",
  101. parentID: input.parentID,
  102. sessionID: input.sessionID,
  103. mode: input.agent,
  104. summary: true,
  105. path: {
  106. cwd: Instance.directory,
  107. root: Instance.worktree,
  108. },
  109. cost: 0,
  110. tokens: {
  111. output: 0,
  112. input: 0,
  113. reasoning: 0,
  114. cache: { read: 0, write: 0 },
  115. },
  116. modelID: input.model.modelID,
  117. providerID: model.providerID,
  118. time: {
  119. created: Date.now(),
  120. },
  121. })) as MessageV2.Assistant
  122. const processor = SessionProcessor.create({
  123. assistantMessage: msg,
  124. sessionID: input.sessionID,
  125. model: model,
  126. abort: input.abort,
  127. })
  128. const result = await processor.process({
  129. onError(error) {
  130. log.error("stream error", {
  131. error,
  132. })
  133. },
  134. // set to 0, we handle loop
  135. maxRetries: 0,
  136. providerOptions: ProviderTransform.providerOptions(
  137. model,
  138. pipe({}, mergeDeep(ProviderTransform.options(model, input.sessionID)), mergeDeep(model.options)),
  139. ),
  140. headers: model.headers,
  141. abortSignal: input.abort,
  142. tools: model.capabilities.toolcall ? {} : undefined,
  143. messages: [
  144. ...system.map(
  145. (x): ModelMessage => ({
  146. role: "system",
  147. content: x,
  148. }),
  149. ),
  150. ...MessageV2.toModelMessage(
  151. input.messages.filter((m) => {
  152. if (m.info.role !== "assistant" || m.info.error === undefined) {
  153. return true
  154. }
  155. if (
  156. MessageV2.AbortedError.isInstance(m.info.error) &&
  157. m.parts.some((part) => part.type !== "step-start" && part.type !== "reasoning")
  158. ) {
  159. return true
  160. }
  161. return false
  162. }),
  163. ),
  164. {
  165. role: "user",
  166. content: [
  167. {
  168. type: "text",
  169. text: "Summarize our conversation above. This summary will be the only context available when the conversation continues, so preserve critical information including: what was accomplished, current work in progress, files involved, next steps, and any key user requests or constraints. Be concise but detailed enough that work can continue seamlessly.",
  170. },
  171. ],
  172. },
  173. ],
  174. model: wrapLanguageModel({
  175. model: language,
  176. middleware: [
  177. {
  178. async transformParams(args) {
  179. if (args.type === "stream") {
  180. // @ts-expect-error
  181. args.params.prompt = ProviderTransform.message(args.params.prompt, model)
  182. }
  183. return args.params
  184. },
  185. },
  186. ],
  187. }),
  188. experimental_telemetry: {
  189. isEnabled: cfg.experimental?.openTelemetry,
  190. metadata: {
  191. userId: cfg.username ?? "unknown",
  192. sessionId: input.sessionID,
  193. },
  194. },
  195. })
  196. if (result === "continue" && input.auto) {
  197. const continueMsg = await Session.updateMessage({
  198. id: Identifier.ascending("message"),
  199. role: "user",
  200. sessionID: input.sessionID,
  201. time: {
  202. created: Date.now(),
  203. },
  204. agent: input.agent,
  205. model: input.model,
  206. })
  207. await Session.updatePart({
  208. id: Identifier.ascending("part"),
  209. messageID: continueMsg.id,
  210. sessionID: input.sessionID,
  211. type: "text",
  212. synthetic: true,
  213. text: "Continue if you have next steps",
  214. time: {
  215. start: Date.now(),
  216. end: Date.now(),
  217. },
  218. })
  219. }
  220. if (processor.message.error) return "stop"
  221. Bus.publish(Event.Compacted, { sessionID: input.sessionID })
  222. return "continue"
  223. }
  224. export const create = fn(
  225. z.object({
  226. sessionID: Identifier.schema("session"),
  227. agent: z.string(),
  228. model: z.object({
  229. providerID: z.string(),
  230. modelID: z.string(),
  231. }),
  232. auto: z.boolean(),
  233. }),
  234. async (input) => {
  235. const msg = await Session.updateMessage({
  236. id: Identifier.ascending("message"),
  237. role: "user",
  238. model: input.model,
  239. sessionID: input.sessionID,
  240. agent: input.agent,
  241. time: {
  242. created: Date.now(),
  243. },
  244. })
  245. await Session.updatePart({
  246. id: Identifier.ascending("part"),
  247. messageID: msg.id,
  248. sessionID: msg.sessionID,
  249. type: "compaction",
  250. auto: input.auto,
  251. })
  252. },
  253. )
  254. }