processor-effect.test.ts 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849
  1. import { NodeFileSystem } from "@effect/platform-node"
  2. import { expect } from "bun:test"
  3. import { Cause, Effect, Exit, Fiber, Layer } from "effect"
  4. import path from "path"
  5. import type { Agent } from "../../src/agent/agent"
  6. import { Agent as AgentSvc } from "../../src/agent/agent"
  7. import { Bus } from "../../src/bus"
  8. import { Config } from "../../src/config/config"
  9. import { Permission } from "../../src/permission"
  10. import { Plugin } from "../../src/plugin"
  11. import { Provider } from "../../src/provider/provider"
  12. import { ModelID, ProviderID } from "../../src/provider/schema"
  13. import { Session } from "../../src/session"
  14. import { LLM } from "../../src/session/llm"
  15. import { MessageV2 } from "../../src/session/message-v2"
  16. import { SessionProcessor } from "../../src/session/processor"
  17. import { MessageID, PartID, SessionID } from "../../src/session/schema"
  18. import { SessionStatus } from "../../src/session/status"
  19. import { SessionSummary } from "../../src/session/summary"
  20. import { Snapshot } from "../../src/snapshot"
  21. import { Log } from "../../src/util/log"
  22. import { SessionNetwork } from "../../src/session/network" // kilocode_change
  23. import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
  24. import { provideTmpdirServer } from "../fixture/fixture"
  25. import { testEffect } from "../lib/effect"
  26. import { raw, reply, TestLLMServer } from "../lib/llm-server"
  27. Log.init({ print: false })
  28. const summary = Layer.succeed(
  29. SessionSummary.Service,
  30. SessionSummary.Service.of({
  31. summarize: () => Effect.void,
  32. diff: () => Effect.succeed([]),
  33. computeDiff: () => Effect.succeed([]),
  34. }),
  35. )
  36. const ref = {
  37. providerID: ProviderID.make("test"),
  38. modelID: ModelID.make("test-model"),
  39. }
  40. const cfg = {
  41. provider: {
  42. test: {
  43. name: "Test",
  44. id: "test",
  45. env: [],
  46. npm: "@ai-sdk/openai-compatible",
  47. models: {
  48. "test-model": {
  49. id: "test-model",
  50. name: "Test Model",
  51. attachment: false,
  52. reasoning: false,
  53. temperature: false,
  54. tool_call: true,
  55. release_date: "2025-01-01",
  56. limit: { context: 100000, output: 10000 },
  57. cost: { input: 0, output: 0 },
  58. options: {},
  59. },
  60. },
  61. options: {
  62. apiKey: "test-key",
  63. baseURL: "http://localhost:1/v1",
  64. },
  65. },
  66. },
  67. }
  68. function providerCfg(url: string) {
  69. return {
  70. ...cfg,
  71. provider: {
  72. ...cfg.provider,
  73. test: {
  74. ...cfg.provider.test,
  75. options: {
  76. ...cfg.provider.test.options,
  77. baseURL: url,
  78. },
  79. },
  80. },
  81. }
  82. }
  83. function agent(): Agent.Info {
  84. return {
  85. name: "build",
  86. mode: "primary",
  87. options: {},
  88. permission: [{ permission: "*", pattern: "*", action: "allow" }],
  89. }
  90. }
  91. function defer<T>() {
  92. let resolve!: (value: T | PromiseLike<T>) => void
  93. const promise = new Promise<T>((done) => {
  94. resolve = done
  95. })
  96. return { promise, resolve }
  97. }
  98. const user = Effect.fn("TestSession.user")(function* (sessionID: SessionID, text: string) {
  99. const session = yield* Session.Service
  100. const msg = yield* session.updateMessage({
  101. id: MessageID.ascending(),
  102. role: "user",
  103. sessionID,
  104. agent: "build",
  105. model: ref,
  106. time: { created: Date.now() },
  107. })
  108. yield* session.updatePart({
  109. id: PartID.ascending(),
  110. messageID: msg.id,
  111. sessionID,
  112. type: "text",
  113. text,
  114. })
  115. return msg
  116. })
  117. const assistant = Effect.fn("TestSession.assistant")(function* (
  118. sessionID: SessionID,
  119. parentID: MessageID,
  120. root: string,
  121. ) {
  122. const session = yield* Session.Service
  123. const msg: MessageV2.Assistant = {
  124. id: MessageID.ascending(),
  125. role: "assistant",
  126. sessionID,
  127. mode: "build",
  128. agent: "build",
  129. path: { cwd: root, root },
  130. cost: 0,
  131. tokens: {
  132. total: 0,
  133. input: 0,
  134. output: 0,
  135. reasoning: 0,
  136. cache: { read: 0, write: 0 },
  137. },
  138. modelID: ref.modelID,
  139. providerID: ref.providerID,
  140. parentID,
  141. time: { created: Date.now() },
  142. finish: "end_turn",
  143. }
  144. yield* session.updateMessage(msg)
  145. return msg
  146. })
  147. const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
  148. const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
  149. const deps = Layer.mergeAll(
  150. Session.defaultLayer,
  151. Snapshot.defaultLayer,
  152. AgentSvc.defaultLayer,
  153. Permission.defaultLayer,
  154. Plugin.defaultLayer,
  155. Config.defaultLayer,
  156. LLM.defaultLayer,
  157. Provider.defaultLayer,
  158. status,
  159. ).pipe(Layer.provideMerge(infra))
  160. const env = Layer.mergeAll(
  161. TestLLMServer.layer,
  162. SessionProcessor.layer.pipe(Layer.provide(summary), Layer.provideMerge(deps)),
  163. )
  164. const it = testEffect(env)
  165. const boot = Effect.fn("test.boot")(function* () {
  166. const processors = yield* SessionProcessor.Service
  167. const session = yield* Session.Service
  168. const provider = yield* Provider.Service
  169. return { processors, session, provider }
  170. })
  171. // ---------------------------------------------------------------------------
  172. // Tests
  173. // ---------------------------------------------------------------------------
  174. it.live("session.processor effect tests capture llm input cleanly", () =>
  175. provideTmpdirServer(
  176. ({ dir, llm }) =>
  177. Effect.gen(function* () {
  178. const { processors, session, provider } = yield* boot()
  179. yield* llm.text("hello")
  180. const chat = yield* session.create({})
  181. const parent = yield* user(chat.id, "hi")
  182. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  183. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  184. const handle = yield* processors.create({
  185. assistantMessage: msg,
  186. sessionID: chat.id,
  187. model: mdl,
  188. })
  189. const input = {
  190. user: {
  191. id: parent.id,
  192. sessionID: chat.id,
  193. role: "user",
  194. time: parent.time,
  195. agent: parent.agent,
  196. model: { providerID: ref.providerID, modelID: ref.modelID },
  197. } satisfies MessageV2.User,
  198. sessionID: chat.id,
  199. model: mdl,
  200. agent: agent(),
  201. system: [],
  202. messages: [{ role: "user", content: "hi" }],
  203. tools: {},
  204. } satisfies LLM.StreamInput
  205. const value = yield* handle.process(input)
  206. const parts = MessageV2.parts(msg.id)
  207. const calls = yield* llm.calls
  208. expect(value).toBe("continue")
  209. expect(calls).toBe(1)
  210. expect(parts.some((part) => part.type === "text" && part.text === "hello")).toBe(true)
  211. }),
  212. { git: true, config: (url) => providerCfg(url) },
  213. ),
  214. )
  215. it.live("session.processor effect tests preserve text start time", () =>
  216. provideTmpdirServer(
  217. ({ dir, llm }) =>
  218. Effect.gen(function* () {
  219. const gate = defer<void>()
  220. const { processors, session, provider } = yield* boot()
  221. yield* llm.push(
  222. raw({
  223. head: [
  224. {
  225. id: "chatcmpl-test",
  226. object: "chat.completion.chunk",
  227. choices: [{ delta: { role: "assistant" } }],
  228. },
  229. {
  230. id: "chatcmpl-test",
  231. object: "chat.completion.chunk",
  232. choices: [{ delta: { content: "hello" } }],
  233. },
  234. ],
  235. wait: gate.promise,
  236. tail: [
  237. {
  238. id: "chatcmpl-test",
  239. object: "chat.completion.chunk",
  240. choices: [{ delta: {}, finish_reason: "stop" }],
  241. },
  242. ],
  243. }),
  244. )
  245. const chat = yield* session.create({})
  246. const parent = yield* user(chat.id, "hi")
  247. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  248. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  249. const handle = yield* processors.create({
  250. assistantMessage: msg,
  251. sessionID: chat.id,
  252. model: mdl,
  253. })
  254. const run = yield* handle
  255. .process({
  256. user: {
  257. id: parent.id,
  258. sessionID: chat.id,
  259. role: "user",
  260. time: parent.time,
  261. agent: parent.agent,
  262. model: { providerID: ref.providerID, modelID: ref.modelID },
  263. } satisfies MessageV2.User,
  264. sessionID: chat.id,
  265. model: mdl,
  266. agent: agent(),
  267. system: [],
  268. messages: [{ role: "user", content: "hi" }],
  269. tools: {},
  270. })
  271. .pipe(Effect.forkChild)
  272. yield* Effect.promise(async () => {
  273. const stop = Date.now() + 500
  274. while (Date.now() < stop) {
  275. const text = MessageV2.parts(msg.id).find((part): part is MessageV2.TextPart => part.type === "text")
  276. if (text?.time?.start) return
  277. await Bun.sleep(10)
  278. }
  279. throw new Error("timed out waiting for text part")
  280. })
  281. yield* Effect.sleep("20 millis")
  282. gate.resolve()
  283. const exit = yield* Fiber.await(run)
  284. const text = MessageV2.parts(msg.id).find((part): part is MessageV2.TextPart => part.type === "text")
  285. expect(Exit.isSuccess(exit)).toBe(true)
  286. expect(text?.text).toBe("hello")
  287. expect(text?.time?.start).toBeDefined()
  288. expect(text?.time?.end).toBeDefined()
  289. if (!text?.time?.start || !text.time.end) return
  290. expect(text.time.start).toBeLessThan(text.time.end)
  291. }),
  292. { git: true, config: (url) => providerCfg(url) },
  293. ),
  294. )
  295. it.live("session.processor effect tests stop after token overflow requests compaction", () =>
  296. provideTmpdirServer(
  297. ({ dir, llm }) =>
  298. Effect.gen(function* () {
  299. const { processors, session, provider } = yield* boot()
  300. yield* llm.text("after", { usage: { input: 100, output: 0 } })
  301. const chat = yield* session.create({})
  302. const parent = yield* user(chat.id, "compact")
  303. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  304. const base = yield* provider.getModel(ref.providerID, ref.modelID)
  305. const mdl = { ...base, limit: { context: 20, output: 10 } }
  306. const handle = yield* processors.create({
  307. assistantMessage: msg,
  308. sessionID: chat.id,
  309. model: mdl,
  310. })
  311. const value = yield* handle.process({
  312. user: {
  313. id: parent.id,
  314. sessionID: chat.id,
  315. role: "user",
  316. time: parent.time,
  317. agent: parent.agent,
  318. model: { providerID: ref.providerID, modelID: ref.modelID },
  319. } satisfies MessageV2.User,
  320. sessionID: chat.id,
  321. model: mdl,
  322. agent: agent(),
  323. system: [],
  324. messages: [{ role: "user", content: "compact" }],
  325. tools: {},
  326. })
  327. const parts = MessageV2.parts(msg.id)
  328. expect(value).toBe("compact")
  329. expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
  330. expect(parts.some((part) => part.type === "step-finish")).toBe(true)
  331. }),
  332. { git: true, config: (url) => providerCfg(url) },
  333. ),
  334. )
  335. it.live("session.processor effect tests capture reasoning from http mock", () =>
  336. provideTmpdirServer(
  337. ({ dir, llm }) =>
  338. Effect.gen(function* () {
  339. const { processors, session, provider } = yield* boot()
  340. yield* llm.push(reply().reason("think").text("done").stop())
  341. const chat = yield* session.create({})
  342. const parent = yield* user(chat.id, "reason")
  343. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  344. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  345. const handle = yield* processors.create({
  346. assistantMessage: msg,
  347. sessionID: chat.id,
  348. model: mdl,
  349. })
  350. const value = yield* handle.process({
  351. user: {
  352. id: parent.id,
  353. sessionID: chat.id,
  354. role: "user",
  355. time: parent.time,
  356. agent: parent.agent,
  357. model: { providerID: ref.providerID, modelID: ref.modelID },
  358. } satisfies MessageV2.User,
  359. sessionID: chat.id,
  360. model: mdl,
  361. agent: agent(),
  362. system: [],
  363. messages: [{ role: "user", content: "reason" }],
  364. tools: {},
  365. })
  366. const parts = MessageV2.parts(msg.id)
  367. const reasoning = parts.find((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
  368. const text = parts.find((part): part is MessageV2.TextPart => part.type === "text")
  369. expect(value).toBe("continue")
  370. expect(yield* llm.calls).toBe(1)
  371. expect(reasoning?.text).toBe("think")
  372. expect(text?.text).toBe("done")
  373. }),
  374. { git: true, config: (url) => providerCfg(url) },
  375. ),
  376. )
  377. it.live("session.processor effect tests reset reasoning state across retries", () =>
  378. provideTmpdirServer(
  379. ({ dir, llm }) =>
  380. Effect.gen(function* () {
  381. const { processors, session, provider } = yield* boot()
  382. // kilocode_change start — auto-reply to network reconnection prompts triggered by reset()
  383. const offAsk = Bus.subscribe(SessionNetwork.Event.Asked, (event) => {
  384. void SessionNetwork.reply({ requestID: event.properties.id })
  385. })
  386. // kilocode_change end
  387. yield* llm.push(reply().reason("one").reset(), reply().reason("two").stop())
  388. const chat = yield* session.create({})
  389. const parent = yield* user(chat.id, "reason")
  390. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  391. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  392. const handle = yield* processors.create({
  393. assistantMessage: msg,
  394. sessionID: chat.id,
  395. model: mdl,
  396. })
  397. const value = yield* handle.process({
  398. user: {
  399. id: parent.id,
  400. sessionID: chat.id,
  401. role: "user",
  402. time: parent.time,
  403. agent: parent.agent,
  404. model: { providerID: ref.providerID, modelID: ref.modelID },
  405. } satisfies MessageV2.User,
  406. sessionID: chat.id,
  407. model: mdl,
  408. agent: agent(),
  409. system: [],
  410. messages: [{ role: "user", content: "reason" }],
  411. tools: {},
  412. })
  413. const parts = MessageV2.parts(msg.id)
  414. const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
  415. expect(value).toBe("continue")
  416. expect(yield* llm.calls).toBe(2)
  417. expect(reasoning.some((part) => part.text === "two")).toBe(true)
  418. expect(reasoning.some((part) => part.text === "onetwo")).toBe(false)
  419. offAsk() // kilocode_change — cleanup subscriber
  420. }),
  421. { git: true, config: (url) => providerCfg(url) },
  422. ),
  423. )
  424. it.live("session.processor effect tests do not retry unknown json errors", () =>
  425. provideTmpdirServer(
  426. ({ dir, llm }) =>
  427. Effect.gen(function* () {
  428. const { processors, session, provider } = yield* boot()
  429. yield* llm.error(400, { error: { message: "no_kv_space" } })
  430. const chat = yield* session.create({})
  431. const parent = yield* user(chat.id, "json")
  432. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  433. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  434. const handle = yield* processors.create({
  435. assistantMessage: msg,
  436. sessionID: chat.id,
  437. model: mdl,
  438. })
  439. const value = yield* handle.process({
  440. user: {
  441. id: parent.id,
  442. sessionID: chat.id,
  443. role: "user",
  444. time: parent.time,
  445. agent: parent.agent,
  446. model: { providerID: ref.providerID, modelID: ref.modelID },
  447. } satisfies MessageV2.User,
  448. sessionID: chat.id,
  449. model: mdl,
  450. agent: agent(),
  451. system: [],
  452. messages: [{ role: "user", content: "json" }],
  453. tools: {},
  454. })
  455. expect(value).toBe("stop")
  456. expect(yield* llm.calls).toBe(1)
  457. expect(handle.message.error?.name).toBe("APIError")
  458. }),
  459. { git: true, config: (url) => providerCfg(url) },
  460. ),
  461. )
  462. it.live("session.processor effect tests retry recognized structured json errors", () =>
  463. provideTmpdirServer(
  464. ({ dir, llm }) =>
  465. Effect.gen(function* () {
  466. const { processors, session, provider } = yield* boot()
  467. yield* llm.error(429, { type: "error", error: { type: "too_many_requests" } })
  468. yield* llm.text("after")
  469. const chat = yield* session.create({})
  470. const parent = yield* user(chat.id, "retry json")
  471. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  472. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  473. const handle = yield* processors.create({
  474. assistantMessage: msg,
  475. sessionID: chat.id,
  476. model: mdl,
  477. })
  478. const value = yield* handle.process({
  479. user: {
  480. id: parent.id,
  481. sessionID: chat.id,
  482. role: "user",
  483. time: parent.time,
  484. agent: parent.agent,
  485. model: { providerID: ref.providerID, modelID: ref.modelID },
  486. } satisfies MessageV2.User,
  487. sessionID: chat.id,
  488. model: mdl,
  489. agent: agent(),
  490. system: [],
  491. messages: [{ role: "user", content: "retry json" }],
  492. tools: {},
  493. })
  494. const parts = MessageV2.parts(msg.id)
  495. expect(value).toBe("continue")
  496. expect(yield* llm.calls).toBe(2)
  497. expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
  498. expect(handle.message.error).toBeUndefined()
  499. }),
  500. { git: true, config: (url) => providerCfg(url) },
  501. ),
  502. )
  503. it.live("session.processor effect tests publish retry status updates", () =>
  504. provideTmpdirServer(
  505. ({ dir, llm }) =>
  506. Effect.gen(function* () {
  507. const { processors, session, provider } = yield* boot()
  508. const bus = yield* Bus.Service
  509. yield* llm.error(503, { error: "boom" })
  510. yield* llm.text("")
  511. const chat = yield* session.create({})
  512. const parent = yield* user(chat.id, "retry")
  513. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  514. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  515. const states: number[] = []
  516. const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
  517. if (evt.properties.sessionID !== chat.id) return
  518. if (evt.properties.status.type === "retry") states.push(evt.properties.status.attempt)
  519. })
  520. const handle = yield* processors.create({
  521. assistantMessage: msg,
  522. sessionID: chat.id,
  523. model: mdl,
  524. })
  525. const value = yield* handle.process({
  526. user: {
  527. id: parent.id,
  528. sessionID: chat.id,
  529. role: "user",
  530. time: parent.time,
  531. agent: parent.agent,
  532. model: { providerID: ref.providerID, modelID: ref.modelID },
  533. } satisfies MessageV2.User,
  534. sessionID: chat.id,
  535. model: mdl,
  536. agent: agent(),
  537. system: [],
  538. messages: [{ role: "user", content: "retry" }],
  539. tools: {},
  540. })
  541. off()
  542. expect(value).toBe("continue")
  543. expect(yield* llm.calls).toBe(2)
  544. expect(states).toStrictEqual([1])
  545. }),
  546. { git: true, config: (url) => providerCfg(url) },
  547. ),
  548. )
  549. it.live("session.processor effect tests compact on structured context overflow", () =>
  550. provideTmpdirServer(
  551. ({ dir, llm }) =>
  552. Effect.gen(function* () {
  553. const { processors, session, provider } = yield* boot()
  554. yield* llm.error(400, { type: "error", error: { code: "context_length_exceeded" } })
  555. const chat = yield* session.create({})
  556. const parent = yield* user(chat.id, "compact json")
  557. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  558. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  559. const handle = yield* processors.create({
  560. assistantMessage: msg,
  561. sessionID: chat.id,
  562. model: mdl,
  563. })
  564. const value = yield* handle.process({
  565. user: {
  566. id: parent.id,
  567. sessionID: chat.id,
  568. role: "user",
  569. time: parent.time,
  570. agent: parent.agent,
  571. model: { providerID: ref.providerID, modelID: ref.modelID },
  572. } satisfies MessageV2.User,
  573. sessionID: chat.id,
  574. model: mdl,
  575. agent: agent(),
  576. system: [],
  577. messages: [{ role: "user", content: "compact json" }],
  578. tools: {},
  579. })
  580. expect(value).toBe("compact")
  581. expect(yield* llm.calls).toBe(1)
  582. expect(handle.message.error).toBeUndefined()
  583. }),
  584. { git: true, config: (url) => providerCfg(url) },
  585. ),
  586. )
  587. it.live("session.processor effect tests mark pending tools as aborted on cleanup", () =>
  588. provideTmpdirServer(
  589. ({ dir, llm }) =>
  590. Effect.gen(function* () {
  591. const { processors, session, provider } = yield* boot()
  592. yield* llm.toolHang("bash", { cmd: "pwd" })
  593. const chat = yield* session.create({})
  594. const parent = yield* user(chat.id, "tool abort")
  595. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  596. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  597. const handle = yield* processors.create({
  598. assistantMessage: msg,
  599. sessionID: chat.id,
  600. model: mdl,
  601. })
  602. const run = yield* handle
  603. .process({
  604. user: {
  605. id: parent.id,
  606. sessionID: chat.id,
  607. role: "user",
  608. time: parent.time,
  609. agent: parent.agent,
  610. model: { providerID: ref.providerID, modelID: ref.modelID },
  611. } satisfies MessageV2.User,
  612. sessionID: chat.id,
  613. model: mdl,
  614. agent: agent(),
  615. system: [],
  616. messages: [{ role: "user", content: "tool abort" }],
  617. tools: {},
  618. })
  619. .pipe(Effect.forkChild)
  620. yield* llm.wait(1)
  621. yield* Effect.promise(async () => {
  622. const end = Date.now() + 500
  623. while (Date.now() < end) {
  624. const parts = await MessageV2.parts(msg.id)
  625. if (parts.some((part) => part.type === "tool")) return
  626. await Bun.sleep(10)
  627. }
  628. })
  629. yield* Fiber.interrupt(run)
  630. const exit = yield* Fiber.await(run)
  631. const parts = MessageV2.parts(msg.id)
  632. const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
  633. expect(Exit.isFailure(exit)).toBe(true)
  634. if (Exit.isFailure(exit)) {
  635. expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
  636. }
  637. expect(yield* llm.calls).toBe(1)
  638. expect(call?.state.status).toBe("error")
  639. if (call?.state.status === "error") {
  640. expect(call.state.error).toBe("Tool execution aborted")
  641. expect(call.state.metadata?.interrupted).toBe(true)
  642. expect(call.state.time.end).toBeDefined()
  643. }
  644. }),
  645. { git: true, config: (url) => providerCfg(url) },
  646. ),
  647. )
  648. it.live("session.processor effect tests record aborted errors and idle state", () =>
  649. provideTmpdirServer(
  650. ({ dir, llm }) =>
  651. Effect.gen(function* () {
  652. const seen = defer<void>()
  653. const { processors, session, provider } = yield* boot()
  654. const bus = yield* Bus.Service
  655. const sts = yield* SessionStatus.Service
  656. yield* llm.hang
  657. const chat = yield* session.create({})
  658. const parent = yield* user(chat.id, "abort")
  659. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  660. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  661. const errs: string[] = []
  662. const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => {
  663. if (evt.properties.sessionID !== chat.id) return
  664. if (!evt.properties.error) return
  665. errs.push(evt.properties.error.name)
  666. seen.resolve()
  667. })
  668. const handle = yield* processors.create({
  669. assistantMessage: msg,
  670. sessionID: chat.id,
  671. model: mdl,
  672. })
  673. const run = yield* handle
  674. .process({
  675. user: {
  676. id: parent.id,
  677. sessionID: chat.id,
  678. role: "user",
  679. time: parent.time,
  680. agent: parent.agent,
  681. model: { providerID: ref.providerID, modelID: ref.modelID },
  682. } satisfies MessageV2.User,
  683. sessionID: chat.id,
  684. model: mdl,
  685. agent: agent(),
  686. system: [],
  687. messages: [{ role: "user", content: "abort" }],
  688. tools: {},
  689. })
  690. .pipe(Effect.forkChild)
  691. yield* llm.wait(1)
  692. yield* Fiber.interrupt(run)
  693. const exit = yield* Fiber.await(run)
  694. yield* Effect.promise(() => seen.promise)
  695. const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
  696. const state = yield* sts.get(chat.id)
  697. off()
  698. expect(Exit.isFailure(exit)).toBe(true)
  699. if (Exit.isFailure(exit)) {
  700. expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
  701. }
  702. expect(handle.message.error?.name).toBe("MessageAbortedError")
  703. expect(stored.info.role).toBe("assistant")
  704. if (stored.info.role === "assistant") {
  705. expect(stored.info.error?.name).toBe("MessageAbortedError")
  706. }
  707. expect(state).toMatchObject({ type: "idle" })
  708. expect(errs).toContain("MessageAbortedError")
  709. }),
  710. { git: true, config: (url) => providerCfg(url) },
  711. ),
  712. )
  713. it.live("session.processor effect tests mark interruptions aborted without manual abort", () =>
  714. provideTmpdirServer(
  715. ({ dir, llm }) =>
  716. Effect.gen(function* () {
  717. const { processors, session, provider } = yield* boot()
  718. const sts = yield* SessionStatus.Service
  719. yield* llm.hang
  720. const chat = yield* session.create({})
  721. const parent = yield* user(chat.id, "interrupt")
  722. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  723. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  724. const handle = yield* processors.create({
  725. assistantMessage: msg,
  726. sessionID: chat.id,
  727. model: mdl,
  728. })
  729. const run = yield* handle
  730. .process({
  731. user: {
  732. id: parent.id,
  733. sessionID: chat.id,
  734. role: "user",
  735. time: parent.time,
  736. agent: parent.agent,
  737. model: { providerID: ref.providerID, modelID: ref.modelID },
  738. } satisfies MessageV2.User,
  739. sessionID: chat.id,
  740. model: mdl,
  741. agent: agent(),
  742. system: [],
  743. messages: [{ role: "user", content: "interrupt" }],
  744. tools: {},
  745. })
  746. .pipe(Effect.forkChild)
  747. yield* llm.wait(1)
  748. yield* Fiber.interrupt(run)
  749. const exit = yield* Fiber.await(run)
  750. const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
  751. const state = yield* sts.get(chat.id)
  752. expect(Exit.isFailure(exit)).toBe(true)
  753. expect(handle.message.error?.name).toBe("MessageAbortedError")
  754. expect(stored.info.role).toBe("assistant")
  755. if (stored.info.role === "assistant") {
  756. expect(stored.info.error?.name).toBe("MessageAbortedError")
  757. }
  758. expect(state).toMatchObject({ type: "idle" })
  759. }),
  760. { git: true, config: (url) => providerCfg(url) },
  761. ),
  762. )