processor-effect.test.ts 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842
  1. import { NodeFileSystem } from "@effect/platform-node"
  2. import { expect } from "bun:test"
  3. import { Cause, Effect, Exit, Fiber, Layer } from "effect"
  4. import path from "path"
  5. import type { Agent } from "../../src/agent/agent"
  6. import { Agent as AgentSvc } from "../../src/agent/agent"
  7. import { Bus } from "../../src/bus"
  8. import { Config } from "../../src/config/config"
  9. import { Permission } from "../../src/permission"
  10. import { Plugin } from "../../src/plugin"
  11. import { Provider } from "../../src/provider"
  12. import { ModelID, ProviderID } from "../../src/provider/schema"
  13. import { Session } from "../../src/session"
  14. import { LLM } from "../../src/session/llm"
  15. import { MessageV2 } from "../../src/session/message-v2"
  16. import { SessionProcessor } from "../../src/session/processor"
  17. import { MessageID, PartID, SessionID } from "../../src/session/schema"
  18. import { SessionStatus } from "../../src/session/status"
  19. import { SessionSummary } from "../../src/session/summary"
  20. import { Snapshot } from "../../src/snapshot"
  21. import { Log } from "../../src/util"
  22. import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
  23. import { provideTmpdirServer } from "../fixture/fixture"
  24. import { testEffect } from "../lib/effect"
  25. import { raw, reply, TestLLMServer } from "../lib/llm-server"
  26. void Log.init({ print: false })
  27. const summary = Layer.succeed(
  28. SessionSummary.Service,
  29. SessionSummary.Service.of({
  30. summarize: () => Effect.void,
  31. diff: () => Effect.succeed([]),
  32. computeDiff: () => Effect.succeed([]),
  33. }),
  34. )
  35. const ref = {
  36. providerID: ProviderID.make("test"),
  37. modelID: ModelID.make("test-model"),
  38. }
  39. const cfg = {
  40. provider: {
  41. test: {
  42. name: "Test",
  43. id: "test",
  44. env: [],
  45. npm: "@ai-sdk/openai-compatible",
  46. models: {
  47. "test-model": {
  48. id: "test-model",
  49. name: "Test Model",
  50. attachment: false,
  51. reasoning: false,
  52. temperature: false,
  53. tool_call: true,
  54. release_date: "2025-01-01",
  55. limit: { context: 100000, output: 10000 },
  56. cost: { input: 0, output: 0 },
  57. options: {},
  58. },
  59. },
  60. options: {
  61. apiKey: "test-key",
  62. baseURL: "http://localhost:1/v1",
  63. },
  64. },
  65. },
  66. }
  67. function providerCfg(url: string) {
  68. return {
  69. ...cfg,
  70. provider: {
  71. ...cfg.provider,
  72. test: {
  73. ...cfg.provider.test,
  74. options: {
  75. ...cfg.provider.test.options,
  76. baseURL: url,
  77. },
  78. },
  79. },
  80. }
  81. }
  82. function agent(): Agent.Info {
  83. return {
  84. name: "build",
  85. mode: "primary",
  86. options: {},
  87. permission: [{ permission: "*", pattern: "*", action: "allow" }],
  88. }
  89. }
  90. function defer<T>() {
  91. let resolve!: (value: T | PromiseLike<T>) => void
  92. const promise = new Promise<T>((done) => {
  93. resolve = done
  94. })
  95. return { promise, resolve }
  96. }
  97. const user = Effect.fn("TestSession.user")(function* (sessionID: SessionID, text: string) {
  98. const session = yield* Session.Service
  99. const msg = yield* session.updateMessage({
  100. id: MessageID.ascending(),
  101. role: "user",
  102. sessionID,
  103. agent: "build",
  104. model: ref,
  105. time: { created: Date.now() },
  106. })
  107. yield* session.updatePart({
  108. id: PartID.ascending(),
  109. messageID: msg.id,
  110. sessionID,
  111. type: "text",
  112. text,
  113. })
  114. return msg
  115. })
  116. const assistant = Effect.fn("TestSession.assistant")(function* (
  117. sessionID: SessionID,
  118. parentID: MessageID,
  119. root: string,
  120. ) {
  121. const session = yield* Session.Service
  122. const msg: MessageV2.Assistant = {
  123. id: MessageID.ascending(),
  124. role: "assistant",
  125. sessionID,
  126. mode: "build",
  127. agent: "build",
  128. path: { cwd: root, root },
  129. cost: 0,
  130. tokens: {
  131. total: 0,
  132. input: 0,
  133. output: 0,
  134. reasoning: 0,
  135. cache: { read: 0, write: 0 },
  136. },
  137. modelID: ref.modelID,
  138. providerID: ref.providerID,
  139. parentID,
  140. time: { created: Date.now() },
  141. finish: "end_turn",
  142. }
  143. yield* session.updateMessage(msg)
  144. return msg
  145. })
  146. const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
  147. const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
  148. const deps = Layer.mergeAll(
  149. Session.defaultLayer,
  150. Snapshot.defaultLayer,
  151. AgentSvc.defaultLayer,
  152. Permission.defaultLayer,
  153. Plugin.defaultLayer,
  154. Config.defaultLayer,
  155. LLM.defaultLayer,
  156. Provider.defaultLayer,
  157. status,
  158. ).pipe(Layer.provideMerge(infra))
  159. const env = Layer.mergeAll(
  160. TestLLMServer.layer,
  161. SessionProcessor.layer.pipe(Layer.provide(summary), Layer.provideMerge(deps)),
  162. )
  163. const it = testEffect(env)
  164. const boot = Effect.fn("test.boot")(function* () {
  165. const processors = yield* SessionProcessor.Service
  166. const session = yield* Session.Service
  167. const provider = yield* Provider.Service
  168. return { processors, session, provider }
  169. })
  170. // ---------------------------------------------------------------------------
  171. // Tests
  172. // ---------------------------------------------------------------------------
  173. it.live("session.processor effect tests capture llm input cleanly", () =>
  174. provideTmpdirServer(
  175. ({ dir, llm }) =>
  176. Effect.gen(function* () {
  177. const { processors, session, provider } = yield* boot()
  178. yield* llm.text("hello")
  179. const chat = yield* session.create({})
  180. const parent = yield* user(chat.id, "hi")
  181. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  182. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  183. const handle = yield* processors.create({
  184. assistantMessage: msg,
  185. sessionID: chat.id,
  186. model: mdl,
  187. })
  188. const input = {
  189. user: {
  190. id: parent.id,
  191. sessionID: chat.id,
  192. role: "user",
  193. time: parent.time,
  194. agent: parent.agent,
  195. model: { providerID: ref.providerID, modelID: ref.modelID },
  196. } satisfies MessageV2.User,
  197. sessionID: chat.id,
  198. model: mdl,
  199. agent: agent(),
  200. system: [],
  201. messages: [{ role: "user", content: "hi" }],
  202. tools: {},
  203. } satisfies LLM.StreamInput
  204. const value = yield* handle.process(input)
  205. const parts = MessageV2.parts(msg.id)
  206. const calls = yield* llm.calls
  207. expect(value).toBe("continue")
  208. expect(calls).toBe(1)
  209. expect(parts.some((part) => part.type === "text" && part.text === "hello")).toBe(true)
  210. }),
  211. { git: true, config: (url) => providerCfg(url) },
  212. ),
  213. )
  214. it.live("session.processor effect tests preserve text start time", () =>
  215. provideTmpdirServer(
  216. ({ dir, llm }) =>
  217. Effect.gen(function* () {
  218. const gate = defer<void>()
  219. const { processors, session, provider } = yield* boot()
  220. yield* llm.push(
  221. raw({
  222. head: [
  223. {
  224. id: "chatcmpl-test",
  225. object: "chat.completion.chunk",
  226. choices: [{ delta: { role: "assistant" } }],
  227. },
  228. {
  229. id: "chatcmpl-test",
  230. object: "chat.completion.chunk",
  231. choices: [{ delta: { content: "hello" } }],
  232. },
  233. ],
  234. wait: gate.promise,
  235. tail: [
  236. {
  237. id: "chatcmpl-test",
  238. object: "chat.completion.chunk",
  239. choices: [{ delta: {}, finish_reason: "stop" }],
  240. },
  241. ],
  242. }),
  243. )
  244. const chat = yield* session.create({})
  245. const parent = yield* user(chat.id, "hi")
  246. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  247. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  248. const handle = yield* processors.create({
  249. assistantMessage: msg,
  250. sessionID: chat.id,
  251. model: mdl,
  252. })
  253. const run = yield* handle
  254. .process({
  255. user: {
  256. id: parent.id,
  257. sessionID: chat.id,
  258. role: "user",
  259. time: parent.time,
  260. agent: parent.agent,
  261. model: { providerID: ref.providerID, modelID: ref.modelID },
  262. } satisfies MessageV2.User,
  263. sessionID: chat.id,
  264. model: mdl,
  265. agent: agent(),
  266. system: [],
  267. messages: [{ role: "user", content: "hi" }],
  268. tools: {},
  269. })
  270. .pipe(Effect.forkChild)
  271. yield* Effect.promise(async () => {
  272. const stop = Date.now() + 500
  273. while (Date.now() < stop) {
  274. const text = MessageV2.parts(msg.id).find((part): part is MessageV2.TextPart => part.type === "text")
  275. if (text?.time?.start) return
  276. await Bun.sleep(10)
  277. }
  278. throw new Error("timed out waiting for text part")
  279. })
  280. yield* Effect.sleep("20 millis")
  281. gate.resolve()
  282. const exit = yield* Fiber.await(run)
  283. const text = MessageV2.parts(msg.id).find((part): part is MessageV2.TextPart => part.type === "text")
  284. expect(Exit.isSuccess(exit)).toBe(true)
  285. expect(text?.text).toBe("hello")
  286. expect(text?.time?.start).toBeDefined()
  287. expect(text?.time?.end).toBeDefined()
  288. if (!text?.time?.start || !text.time.end) return
  289. expect(text.time.start).toBeLessThan(text.time.end)
  290. }),
  291. { git: true, config: (url) => providerCfg(url) },
  292. ),
  293. )
  294. it.live("session.processor effect tests stop after token overflow requests compaction", () =>
  295. provideTmpdirServer(
  296. ({ dir, llm }) =>
  297. Effect.gen(function* () {
  298. const { processors, session, provider } = yield* boot()
  299. yield* llm.text("after", { usage: { input: 100, output: 0 } })
  300. const chat = yield* session.create({})
  301. const parent = yield* user(chat.id, "compact")
  302. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  303. const base = yield* provider.getModel(ref.providerID, ref.modelID)
  304. const mdl = { ...base, limit: { context: 20, output: 10 } }
  305. const handle = yield* processors.create({
  306. assistantMessage: msg,
  307. sessionID: chat.id,
  308. model: mdl,
  309. })
  310. const value = yield* handle.process({
  311. user: {
  312. id: parent.id,
  313. sessionID: chat.id,
  314. role: "user",
  315. time: parent.time,
  316. agent: parent.agent,
  317. model: { providerID: ref.providerID, modelID: ref.modelID },
  318. } satisfies MessageV2.User,
  319. sessionID: chat.id,
  320. model: mdl,
  321. agent: agent(),
  322. system: [],
  323. messages: [{ role: "user", content: "compact" }],
  324. tools: {},
  325. })
  326. const parts = MessageV2.parts(msg.id)
  327. expect(value).toBe("compact")
  328. expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
  329. expect(parts.some((part) => part.type === "step-finish")).toBe(true)
  330. }),
  331. { git: true, config: (url) => providerCfg(url) },
  332. ),
  333. )
  334. it.live("session.processor effect tests capture reasoning from http mock", () =>
  335. provideTmpdirServer(
  336. ({ dir, llm }) =>
  337. Effect.gen(function* () {
  338. const { processors, session, provider } = yield* boot()
  339. yield* llm.push(reply().reason("think").text("done").stop())
  340. const chat = yield* session.create({})
  341. const parent = yield* user(chat.id, "reason")
  342. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  343. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  344. const handle = yield* processors.create({
  345. assistantMessage: msg,
  346. sessionID: chat.id,
  347. model: mdl,
  348. })
  349. const value = yield* handle.process({
  350. user: {
  351. id: parent.id,
  352. sessionID: chat.id,
  353. role: "user",
  354. time: parent.time,
  355. agent: parent.agent,
  356. model: { providerID: ref.providerID, modelID: ref.modelID },
  357. } satisfies MessageV2.User,
  358. sessionID: chat.id,
  359. model: mdl,
  360. agent: agent(),
  361. system: [],
  362. messages: [{ role: "user", content: "reason" }],
  363. tools: {},
  364. })
  365. const parts = MessageV2.parts(msg.id)
  366. const reasoning = parts.find((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
  367. const text = parts.find((part): part is MessageV2.TextPart => part.type === "text")
  368. expect(value).toBe("continue")
  369. expect(yield* llm.calls).toBe(1)
  370. expect(reasoning?.text).toBe("think")
  371. expect(text?.text).toBe("done")
  372. }),
  373. { git: true, config: (url) => providerCfg(url) },
  374. ),
  375. )
  376. it.live("session.processor effect tests reset reasoning state across retries", () =>
  377. provideTmpdirServer(
  378. ({ dir, llm }) =>
  379. Effect.gen(function* () {
  380. const { processors, session, provider } = yield* boot()
  381. yield* llm.push(reply().reason("one").reset(), reply().reason("two").stop())
  382. const chat = yield* session.create({})
  383. const parent = yield* user(chat.id, "reason")
  384. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  385. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  386. const handle = yield* processors.create({
  387. assistantMessage: msg,
  388. sessionID: chat.id,
  389. model: mdl,
  390. })
  391. const value = yield* handle.process({
  392. user: {
  393. id: parent.id,
  394. sessionID: chat.id,
  395. role: "user",
  396. time: parent.time,
  397. agent: parent.agent,
  398. model: { providerID: ref.providerID, modelID: ref.modelID },
  399. } satisfies MessageV2.User,
  400. sessionID: chat.id,
  401. model: mdl,
  402. agent: agent(),
  403. system: [],
  404. messages: [{ role: "user", content: "reason" }],
  405. tools: {},
  406. })
  407. const parts = MessageV2.parts(msg.id)
  408. const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
  409. expect(value).toBe("continue")
  410. expect(yield* llm.calls).toBe(2)
  411. expect(reasoning.some((part) => part.text === "two")).toBe(true)
  412. expect(reasoning.some((part) => part.text === "onetwo")).toBe(false)
  413. }),
  414. { git: true, config: (url) => providerCfg(url) },
  415. ),
  416. )
  417. it.live("session.processor effect tests do not retry unknown json errors", () =>
  418. provideTmpdirServer(
  419. ({ dir, llm }) =>
  420. Effect.gen(function* () {
  421. const { processors, session, provider } = yield* boot()
  422. yield* llm.error(400, { error: { message: "no_kv_space" } })
  423. const chat = yield* session.create({})
  424. const parent = yield* user(chat.id, "json")
  425. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  426. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  427. const handle = yield* processors.create({
  428. assistantMessage: msg,
  429. sessionID: chat.id,
  430. model: mdl,
  431. })
  432. const value = yield* handle.process({
  433. user: {
  434. id: parent.id,
  435. sessionID: chat.id,
  436. role: "user",
  437. time: parent.time,
  438. agent: parent.agent,
  439. model: { providerID: ref.providerID, modelID: ref.modelID },
  440. } satisfies MessageV2.User,
  441. sessionID: chat.id,
  442. model: mdl,
  443. agent: agent(),
  444. system: [],
  445. messages: [{ role: "user", content: "json" }],
  446. tools: {},
  447. })
  448. expect(value).toBe("stop")
  449. expect(yield* llm.calls).toBe(1)
  450. expect(handle.message.error?.name).toBe("APIError")
  451. }),
  452. { git: true, config: (url) => providerCfg(url) },
  453. ),
  454. )
  455. it.live("session.processor effect tests retry recognized structured json errors", () =>
  456. provideTmpdirServer(
  457. ({ dir, llm }) =>
  458. Effect.gen(function* () {
  459. const { processors, session, provider } = yield* boot()
  460. yield* llm.error(429, { type: "error", error: { type: "too_many_requests" } })
  461. yield* llm.text("after")
  462. const chat = yield* session.create({})
  463. const parent = yield* user(chat.id, "retry json")
  464. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  465. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  466. const handle = yield* processors.create({
  467. assistantMessage: msg,
  468. sessionID: chat.id,
  469. model: mdl,
  470. })
  471. const value = yield* handle.process({
  472. user: {
  473. id: parent.id,
  474. sessionID: chat.id,
  475. role: "user",
  476. time: parent.time,
  477. agent: parent.agent,
  478. model: { providerID: ref.providerID, modelID: ref.modelID },
  479. } satisfies MessageV2.User,
  480. sessionID: chat.id,
  481. model: mdl,
  482. agent: agent(),
  483. system: [],
  484. messages: [{ role: "user", content: "retry json" }],
  485. tools: {},
  486. })
  487. const parts = MessageV2.parts(msg.id)
  488. expect(value).toBe("continue")
  489. expect(yield* llm.calls).toBe(2)
  490. expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
  491. expect(handle.message.error).toBeUndefined()
  492. }),
  493. { git: true, config: (url) => providerCfg(url) },
  494. ),
  495. )
  496. it.live("session.processor effect tests publish retry status updates", () =>
  497. provideTmpdirServer(
  498. ({ dir, llm }) =>
  499. Effect.gen(function* () {
  500. const { processors, session, provider } = yield* boot()
  501. const bus = yield* Bus.Service
  502. yield* llm.error(503, { error: "boom" })
  503. yield* llm.text("")
  504. const chat = yield* session.create({})
  505. const parent = yield* user(chat.id, "retry")
  506. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  507. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  508. const states: number[] = []
  509. const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
  510. if (evt.properties.sessionID !== chat.id) return
  511. if (evt.properties.status.type === "retry") states.push(evt.properties.status.attempt)
  512. })
  513. const handle = yield* processors.create({
  514. assistantMessage: msg,
  515. sessionID: chat.id,
  516. model: mdl,
  517. })
  518. const value = yield* handle.process({
  519. user: {
  520. id: parent.id,
  521. sessionID: chat.id,
  522. role: "user",
  523. time: parent.time,
  524. agent: parent.agent,
  525. model: { providerID: ref.providerID, modelID: ref.modelID },
  526. } satisfies MessageV2.User,
  527. sessionID: chat.id,
  528. model: mdl,
  529. agent: agent(),
  530. system: [],
  531. messages: [{ role: "user", content: "retry" }],
  532. tools: {},
  533. })
  534. off()
  535. expect(value).toBe("continue")
  536. expect(yield* llm.calls).toBe(2)
  537. expect(states).toStrictEqual([1])
  538. }),
  539. { git: true, config: (url) => providerCfg(url) },
  540. ),
  541. )
  542. it.live("session.processor effect tests compact on structured context overflow", () =>
  543. provideTmpdirServer(
  544. ({ dir, llm }) =>
  545. Effect.gen(function* () {
  546. const { processors, session, provider } = yield* boot()
  547. yield* llm.error(400, { type: "error", error: { code: "context_length_exceeded" } })
  548. const chat = yield* session.create({})
  549. const parent = yield* user(chat.id, "compact json")
  550. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  551. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  552. const handle = yield* processors.create({
  553. assistantMessage: msg,
  554. sessionID: chat.id,
  555. model: mdl,
  556. })
  557. const value = yield* handle.process({
  558. user: {
  559. id: parent.id,
  560. sessionID: chat.id,
  561. role: "user",
  562. time: parent.time,
  563. agent: parent.agent,
  564. model: { providerID: ref.providerID, modelID: ref.modelID },
  565. } satisfies MessageV2.User,
  566. sessionID: chat.id,
  567. model: mdl,
  568. agent: agent(),
  569. system: [],
  570. messages: [{ role: "user", content: "compact json" }],
  571. tools: {},
  572. })
  573. expect(value).toBe("compact")
  574. expect(yield* llm.calls).toBe(1)
  575. expect(handle.message.error).toBeUndefined()
  576. }),
  577. { git: true, config: (url) => providerCfg(url) },
  578. ),
  579. )
  580. it.live("session.processor effect tests mark pending tools as aborted on cleanup", () =>
  581. provideTmpdirServer(
  582. ({ dir, llm }) =>
  583. Effect.gen(function* () {
  584. const { processors, session, provider } = yield* boot()
  585. yield* llm.toolHang("bash", { cmd: "pwd" })
  586. const chat = yield* session.create({})
  587. const parent = yield* user(chat.id, "tool abort")
  588. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  589. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  590. const handle = yield* processors.create({
  591. assistantMessage: msg,
  592. sessionID: chat.id,
  593. model: mdl,
  594. })
  595. const run = yield* handle
  596. .process({
  597. user: {
  598. id: parent.id,
  599. sessionID: chat.id,
  600. role: "user",
  601. time: parent.time,
  602. agent: parent.agent,
  603. model: { providerID: ref.providerID, modelID: ref.modelID },
  604. } satisfies MessageV2.User,
  605. sessionID: chat.id,
  606. model: mdl,
  607. agent: agent(),
  608. system: [],
  609. messages: [{ role: "user", content: "tool abort" }],
  610. tools: {},
  611. })
  612. .pipe(Effect.forkChild)
  613. yield* llm.wait(1)
  614. yield* Effect.promise(async () => {
  615. const end = Date.now() + 500
  616. while (Date.now() < end) {
  617. const parts = await MessageV2.parts(msg.id)
  618. if (parts.some((part) => part.type === "tool")) return
  619. await Bun.sleep(10)
  620. }
  621. })
  622. yield* Fiber.interrupt(run)
  623. const exit = yield* Fiber.await(run)
  624. const parts = MessageV2.parts(msg.id)
  625. const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
  626. expect(Exit.isFailure(exit)).toBe(true)
  627. if (Exit.isFailure(exit)) {
  628. expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
  629. }
  630. expect(yield* llm.calls).toBe(1)
  631. expect(call?.state.status).toBe("error")
  632. if (call?.state.status === "error") {
  633. expect(call.state.error).toBe("Tool execution aborted")
  634. expect(call.state.metadata?.interrupted).toBe(true)
  635. expect(call.state.time.end).toBeDefined()
  636. }
  637. }),
  638. { git: true, config: (url) => providerCfg(url) },
  639. ),
  640. )
  641. it.live("session.processor effect tests record aborted errors and idle state", () =>
  642. provideTmpdirServer(
  643. ({ dir, llm }) =>
  644. Effect.gen(function* () {
  645. const seen = defer<void>()
  646. const { processors, session, provider } = yield* boot()
  647. const bus = yield* Bus.Service
  648. const sts = yield* SessionStatus.Service
  649. yield* llm.hang
  650. const chat = yield* session.create({})
  651. const parent = yield* user(chat.id, "abort")
  652. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  653. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  654. const errs: string[] = []
  655. const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => {
  656. if (evt.properties.sessionID !== chat.id) return
  657. if (!evt.properties.error) return
  658. errs.push(evt.properties.error.name)
  659. seen.resolve()
  660. })
  661. const handle = yield* processors.create({
  662. assistantMessage: msg,
  663. sessionID: chat.id,
  664. model: mdl,
  665. })
  666. const run = yield* handle
  667. .process({
  668. user: {
  669. id: parent.id,
  670. sessionID: chat.id,
  671. role: "user",
  672. time: parent.time,
  673. agent: parent.agent,
  674. model: { providerID: ref.providerID, modelID: ref.modelID },
  675. } satisfies MessageV2.User,
  676. sessionID: chat.id,
  677. model: mdl,
  678. agent: agent(),
  679. system: [],
  680. messages: [{ role: "user", content: "abort" }],
  681. tools: {},
  682. })
  683. .pipe(Effect.forkChild)
  684. yield* llm.wait(1)
  685. yield* Fiber.interrupt(run)
  686. const exit = yield* Fiber.await(run)
  687. yield* Effect.promise(() => seen.promise)
  688. const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
  689. const state = yield* sts.get(chat.id)
  690. off()
  691. expect(Exit.isFailure(exit)).toBe(true)
  692. if (Exit.isFailure(exit)) {
  693. expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
  694. }
  695. expect(handle.message.error?.name).toBe("MessageAbortedError")
  696. expect(stored.info.role).toBe("assistant")
  697. if (stored.info.role === "assistant") {
  698. expect(stored.info.error?.name).toBe("MessageAbortedError")
  699. }
  700. expect(state).toMatchObject({ type: "idle" })
  701. expect(errs).toContain("MessageAbortedError")
  702. }),
  703. { git: true, config: (url) => providerCfg(url) },
  704. ),
  705. )
  706. it.live("session.processor effect tests mark interruptions aborted without manual abort", () =>
  707. provideTmpdirServer(
  708. ({ dir, llm }) =>
  709. Effect.gen(function* () {
  710. const { processors, session, provider } = yield* boot()
  711. const sts = yield* SessionStatus.Service
  712. yield* llm.hang
  713. const chat = yield* session.create({})
  714. const parent = yield* user(chat.id, "interrupt")
  715. const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
  716. const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
  717. const handle = yield* processors.create({
  718. assistantMessage: msg,
  719. sessionID: chat.id,
  720. model: mdl,
  721. })
  722. const run = yield* handle
  723. .process({
  724. user: {
  725. id: parent.id,
  726. sessionID: chat.id,
  727. role: "user",
  728. time: parent.time,
  729. agent: parent.agent,
  730. model: { providerID: ref.providerID, modelID: ref.modelID },
  731. } satisfies MessageV2.User,
  732. sessionID: chat.id,
  733. model: mdl,
  734. agent: agent(),
  735. system: [],
  736. messages: [{ role: "user", content: "interrupt" }],
  737. tools: {},
  738. })
  739. .pipe(Effect.forkChild)
  740. yield* llm.wait(1)
  741. yield* Fiber.interrupt(run)
  742. const exit = yield* Fiber.await(run)
  743. const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
  744. const state = yield* sts.get(chat.id)
  745. expect(Exit.isFailure(exit)).toBe(true)
  746. expect(handle.message.error?.name).toBe("MessageAbortedError")
  747. expect(stored.info.role).toBe("assistant")
  748. if (stored.info.role === "assistant") {
  749. expect(stored.info.error?.name).toBe("MessageAbortedError")
  750. }
  751. expect(state).toMatchObject({ type: "idle" })
  752. }),
  753. { git: true, config: (url) => providerCfg(url) },
  754. ),
  755. )