agent.go 26 KB


  1. // Package agent is the core orchestration layer for Crush AI agents.
  2. //
  3. // It provides session-based AI agent functionality for managing
  4. // conversations, tool execution, and message handling. It coordinates
  5. // interactions between language models, messages, sessions, and tools while
  6. // handling features like automatic summarization, queuing, and token
  7. // management.
  8. package agent
  9. import (
  10. "cmp"
  11. "context"
  12. _ "embed"
  13. "errors"
  14. "fmt"
  15. "log/slog"
  16. "os"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "time"
  21. "charm.land/fantasy"
  22. "charm.land/fantasy/providers/anthropic"
  23. "charm.land/fantasy/providers/bedrock"
  24. "charm.land/fantasy/providers/google"
  25. "charm.land/fantasy/providers/openai"
  26. "charm.land/fantasy/providers/openrouter"
  27. "github.com/charmbracelet/catwalk/pkg/catwalk"
  28. "github.com/charmbracelet/crush/internal/agent/tools"
  29. "github.com/charmbracelet/crush/internal/config"
  30. "github.com/charmbracelet/crush/internal/csync"
  31. "github.com/charmbracelet/crush/internal/message"
  32. "github.com/charmbracelet/crush/internal/permission"
  33. "github.com/charmbracelet/crush/internal/session"
  34. "github.com/charmbracelet/crush/internal/stringext"
  35. )
  36. //go:embed templates/title.md
  37. var titlePrompt []byte
  38. //go:embed templates/summary.md
  39. var summaryPrompt []byte
  40. type SessionAgentCall struct {
  41. SessionID string
  42. Prompt string
  43. ProviderOptions fantasy.ProviderOptions
  44. Attachments []message.Attachment
  45. MaxOutputTokens int64
  46. Temperature *float64
  47. TopP *float64
  48. TopK *int64
  49. FrequencyPenalty *float64
  50. PresencePenalty *float64
  51. }
  52. type SessionAgent interface {
  53. Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
  54. SetModels(large Model, small Model)
  55. SetTools(tools []fantasy.AgentTool)
  56. Cancel(sessionID string)
  57. CancelAll()
  58. IsSessionBusy(sessionID string) bool
  59. IsBusy() bool
  60. QueuedPrompts(sessionID string) int
  61. ClearQueue(sessionID string)
  62. Summarize(context.Context, string, fantasy.ProviderOptions) error
  63. Model() Model
  64. }
  65. type Model struct {
  66. Model fantasy.LanguageModel
  67. CatwalkCfg catwalk.Model
  68. ModelCfg config.SelectedModel
  69. }
  70. type sessionAgent struct {
  71. largeModel Model
  72. smallModel Model
  73. systemPromptPrefix string
  74. systemPrompt string
  75. tools []fantasy.AgentTool
  76. sessions session.Service
  77. messages message.Service
  78. disableAutoSummarize bool
  79. isYolo bool
  80. messageQueue *csync.Map[string, []SessionAgentCall]
  81. activeRequests *csync.Map[string, context.CancelFunc]
  82. }
  83. type SessionAgentOptions struct {
  84. LargeModel Model
  85. SmallModel Model
  86. SystemPromptPrefix string
  87. SystemPrompt string
  88. DisableAutoSummarize bool
  89. IsYolo bool
  90. Sessions session.Service
  91. Messages message.Service
  92. Tools []fantasy.AgentTool
  93. }
  94. func NewSessionAgent(
  95. opts SessionAgentOptions,
  96. ) SessionAgent {
  97. return &sessionAgent{
  98. largeModel: opts.LargeModel,
  99. smallModel: opts.SmallModel,
  100. systemPromptPrefix: opts.SystemPromptPrefix,
  101. systemPrompt: opts.SystemPrompt,
  102. sessions: opts.Sessions,
  103. messages: opts.Messages,
  104. disableAutoSummarize: opts.DisableAutoSummarize,
  105. tools: opts.Tools,
  106. isYolo: opts.IsYolo,
  107. messageQueue: csync.NewMap[string, []SessionAgentCall](),
  108. activeRequests: csync.NewMap[string, context.CancelFunc](),
  109. }
  110. }
  111. func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (*fantasy.AgentResult, error) {
  112. if call.Prompt == "" {
  113. return nil, ErrEmptyPrompt
  114. }
  115. if call.SessionID == "" {
  116. return nil, ErrSessionMissing
  117. }
  118. // Queue the message if busy
  119. if a.IsSessionBusy(call.SessionID) {
  120. existing, ok := a.messageQueue.Get(call.SessionID)
  121. if !ok {
  122. existing = []SessionAgentCall{}
  123. }
  124. existing = append(existing, call)
  125. a.messageQueue.Set(call.SessionID, existing)
  126. return nil, nil
  127. }
  128. if len(a.tools) > 0 {
  129. // Add Anthropic caching to the last tool.
  130. a.tools[len(a.tools)-1].SetProviderOptions(a.getCacheControlOptions())
  131. }
  132. agent := fantasy.NewAgent(
  133. a.largeModel.Model,
  134. fantasy.WithSystemPrompt(a.systemPrompt),
  135. fantasy.WithTools(a.tools...),
  136. )
  137. sessionLock := sync.Mutex{}
  138. currentSession, err := a.sessions.Get(ctx, call.SessionID)
  139. if err != nil {
  140. return nil, fmt.Errorf("failed to get session: %w", err)
  141. }
  142. msgs, err := a.getSessionMessages(ctx, currentSession)
  143. if err != nil {
  144. return nil, fmt.Errorf("failed to get session messages: %w", err)
  145. }
  146. var wg sync.WaitGroup
  147. // Generate title if first message.
  148. if len(msgs) == 0 {
  149. wg.Go(func() {
  150. sessionLock.Lock()
  151. a.generateTitle(ctx, &currentSession, call.Prompt)
  152. sessionLock.Unlock()
  153. })
  154. }
  155. // Add the user message to the session.
  156. _, err = a.createUserMessage(ctx, call)
  157. if err != nil {
  158. return nil, err
  159. }
  160. // Add the session to the context.
  161. ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
  162. genCtx, cancel := context.WithCancel(ctx)
  163. a.activeRequests.Set(call.SessionID, cancel)
  164. defer cancel()
  165. defer a.activeRequests.Del(call.SessionID)
  166. history, files := a.preparePrompt(msgs, call.Attachments...)
  167. startTime := time.Now()
  168. a.eventPromptSent(call.SessionID)
  169. var currentAssistant *message.Message
  170. var shouldSummarize bool
  171. result, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
  172. Prompt: call.Prompt,
  173. Files: files,
  174. Messages: history,
  175. ProviderOptions: call.ProviderOptions,
  176. MaxOutputTokens: &call.MaxOutputTokens,
  177. TopP: call.TopP,
  178. Temperature: call.Temperature,
  179. PresencePenalty: call.PresencePenalty,
  180. TopK: call.TopK,
  181. FrequencyPenalty: call.FrequencyPenalty,
  182. // Before each step create a new assistant message.
  183. PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
  184. prepared.Messages = options.Messages
  185. // Reset all cached items.
  186. for i := range prepared.Messages {
  187. prepared.Messages[i].ProviderOptions = nil
  188. }
  189. queuedCalls, _ := a.messageQueue.Get(call.SessionID)
  190. a.messageQueue.Del(call.SessionID)
  191. for _, queued := range queuedCalls {
  192. userMessage, createErr := a.createUserMessage(callContext, queued)
  193. if createErr != nil {
  194. return callContext, prepared, createErr
  195. }
  196. prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
  197. }
  198. lastSystemRoleInx := 0
  199. systemMessageUpdated := false
  200. for i, msg := range prepared.Messages {
  201. // Only add cache control to the last message.
  202. if msg.Role == fantasy.MessageRoleSystem {
  203. lastSystemRoleInx = i
  204. } else if !systemMessageUpdated {
  205. prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
  206. systemMessageUpdated = true
  207. }
  208. // Than add cache control to the last 2 messages.
  209. if i > len(prepared.Messages)-3 {
  210. prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
  211. }
  212. }
  213. if a.systemPromptPrefix != "" {
  214. prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(a.systemPromptPrefix)}, prepared.Messages...)
  215. }
  216. var assistantMsg message.Message
  217. assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
  218. Role: message.Assistant,
  219. Parts: []message.ContentPart{},
  220. Model: a.largeModel.ModelCfg.Model,
  221. Provider: a.largeModel.ModelCfg.Provider,
  222. })
  223. if err != nil {
  224. return callContext, prepared, err
  225. }
  226. callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
  227. currentAssistant = &assistantMsg
  228. return callContext, prepared, err
  229. },
  230. OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
  231. currentAssistant.AppendReasoningContent(reasoning.Text)
  232. return a.messages.Update(genCtx, *currentAssistant)
  233. },
  234. OnReasoningDelta: func(id string, text string) error {
  235. currentAssistant.AppendReasoningContent(text)
  236. return a.messages.Update(genCtx, *currentAssistant)
  237. },
  238. OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
  239. // handle anthropic signature
  240. if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
  241. if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
  242. currentAssistant.AppendReasoningSignature(reasoning.Signature)
  243. }
  244. }
  245. if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
  246. if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
  247. currentAssistant.AppendReasoningSignature(reasoning.Signature)
  248. }
  249. }
  250. if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
  251. if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
  252. currentAssistant.SetReasoningResponsesData(reasoning)
  253. }
  254. }
  255. currentAssistant.FinishThinking()
  256. return a.messages.Update(genCtx, *currentAssistant)
  257. },
  258. OnTextDelta: func(id string, text string) error {
  259. // Strip leading newline from initial text content. This is is
  260. // particularly important in non-interactive mode where leading
  261. // newlines are very visible.
  262. if len(currentAssistant.Parts) == 0 {
  263. text = strings.TrimPrefix(text, "\n")
  264. }
  265. currentAssistant.AppendContent(text)
  266. return a.messages.Update(genCtx, *currentAssistant)
  267. },
  268. OnToolInputStart: func(id string, toolName string) error {
  269. toolCall := message.ToolCall{
  270. ID: id,
  271. Name: toolName,
  272. ProviderExecuted: false,
  273. Finished: false,
  274. }
  275. currentAssistant.AddToolCall(toolCall)
  276. return a.messages.Update(genCtx, *currentAssistant)
  277. },
  278. OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
  279. // TODO: implement
  280. },
  281. OnToolCall: func(tc fantasy.ToolCallContent) error {
  282. toolCall := message.ToolCall{
  283. ID: tc.ToolCallID,
  284. Name: tc.ToolName,
  285. Input: tc.Input,
  286. ProviderExecuted: false,
  287. Finished: true,
  288. }
  289. currentAssistant.AddToolCall(toolCall)
  290. return a.messages.Update(genCtx, *currentAssistant)
  291. },
  292. OnToolResult: func(result fantasy.ToolResultContent) error {
  293. var resultContent string
  294. isError := false
  295. switch result.Result.GetType() {
  296. case fantasy.ToolResultContentTypeText:
  297. r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result)
  298. if ok {
  299. resultContent = r.Text
  300. }
  301. case fantasy.ToolResultContentTypeError:
  302. r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result)
  303. if ok {
  304. isError = true
  305. resultContent = r.Error.Error()
  306. }
  307. case fantasy.ToolResultContentTypeMedia:
  308. // TODO: handle this message type
  309. }
  310. toolResult := message.ToolResult{
  311. ToolCallID: result.ToolCallID,
  312. Name: result.ToolName,
  313. Content: resultContent,
  314. IsError: isError,
  315. Metadata: result.ClientMetadata,
  316. }
  317. _, createMsgErr := a.messages.Create(genCtx, currentAssistant.SessionID, message.CreateMessageParams{
  318. Role: message.Tool,
  319. Parts: []message.ContentPart{
  320. toolResult,
  321. },
  322. })
  323. if createMsgErr != nil {
  324. return createMsgErr
  325. }
  326. return nil
  327. },
  328. OnStepFinish: func(stepResult fantasy.StepResult) error {
  329. finishReason := message.FinishReasonUnknown
  330. switch stepResult.FinishReason {
  331. case fantasy.FinishReasonLength:
  332. finishReason = message.FinishReasonMaxTokens
  333. case fantasy.FinishReasonStop:
  334. finishReason = message.FinishReasonEndTurn
  335. case fantasy.FinishReasonToolCalls:
  336. finishReason = message.FinishReasonToolUse
  337. }
  338. currentAssistant.AddFinish(finishReason, "", "")
  339. a.updateSessionUsage(a.largeModel, &currentSession, stepResult.Usage, a.openrouterCost(stepResult.ProviderMetadata))
  340. sessionLock.Lock()
  341. _, sessionErr := a.sessions.Save(genCtx, currentSession)
  342. sessionLock.Unlock()
  343. if sessionErr != nil {
  344. return sessionErr
  345. }
  346. return a.messages.Update(genCtx, *currentAssistant)
  347. },
  348. StopWhen: []fantasy.StopCondition{
  349. func(_ []fantasy.StepResult) bool {
  350. cw := int64(a.largeModel.CatwalkCfg.ContextWindow)
  351. tokens := currentSession.CompletionTokens + currentSession.PromptTokens
  352. remaining := cw - tokens
  353. var threshold int64
  354. if cw > 200_000 {
  355. threshold = 20_000
  356. } else {
  357. threshold = int64(float64(cw) * 0.2)
  358. }
  359. if (remaining <= threshold) && !a.disableAutoSummarize {
  360. shouldSummarize = true
  361. return true
  362. }
  363. return false
  364. },
  365. },
  366. })
  367. a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
  368. if err != nil {
  369. isCancelErr := errors.Is(err, context.Canceled)
  370. isPermissionErr := errors.Is(err, permission.ErrorPermissionDenied)
  371. if currentAssistant == nil {
  372. return result, err
  373. }
  374. // Ensure we finish thinking on error to close the reasoning state.
  375. currentAssistant.FinishThinking()
  376. toolCalls := currentAssistant.ToolCalls()
  377. // INFO: we use the parent context here because the genCtx has been cancelled.
  378. msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
  379. if createErr != nil {
  380. return nil, createErr
  381. }
  382. for _, tc := range toolCalls {
  383. if !tc.Finished {
  384. tc.Finished = true
  385. tc.Input = "{}"
  386. currentAssistant.AddToolCall(tc)
  387. updateErr := a.messages.Update(ctx, *currentAssistant)
  388. if updateErr != nil {
  389. return nil, updateErr
  390. }
  391. }
  392. found := false
  393. for _, msg := range msgs {
  394. if msg.Role == message.Tool {
  395. for _, tr := range msg.ToolResults() {
  396. if tr.ToolCallID == tc.ID {
  397. found = true
  398. break
  399. }
  400. }
  401. }
  402. if found {
  403. break
  404. }
  405. }
  406. if found {
  407. continue
  408. }
  409. content := "There was an error while executing the tool"
  410. if isCancelErr {
  411. content = "Tool execution canceled by user"
  412. } else if isPermissionErr {
  413. content = "User denied permission"
  414. }
  415. toolResult := message.ToolResult{
  416. ToolCallID: tc.ID,
  417. Name: tc.Name,
  418. Content: content,
  419. IsError: true,
  420. }
  421. _, createErr = a.messages.Create(context.Background(), currentAssistant.SessionID, message.CreateMessageParams{
  422. Role: message.Tool,
  423. Parts: []message.ContentPart{
  424. toolResult,
  425. },
  426. })
  427. if createErr != nil {
  428. return nil, createErr
  429. }
  430. }
  431. var fantasyErr *fantasy.Error
  432. var providerErr *fantasy.ProviderError
  433. const defaultTitle = "Provider Error"
  434. if isCancelErr {
  435. currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
  436. } else if isPermissionErr {
  437. currentAssistant.AddFinish(message.FinishReasonPermissionDenied, "User denied permission", "")
  438. } else if errors.As(err, &providerErr) {
  439. currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
  440. } else if errors.As(err, &fantasyErr) {
  441. currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
  442. } else {
  443. currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
  444. }
  445. // Note: we use the parent context here because the genCtx has been
  446. // cancelled.
  447. updateErr := a.messages.Update(ctx, *currentAssistant)
  448. if updateErr != nil {
  449. return nil, updateErr
  450. }
  451. return nil, err
  452. }
  453. wg.Wait()
  454. if shouldSummarize {
  455. a.activeRequests.Del(call.SessionID)
  456. if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
  457. return nil, summarizeErr
  458. }
  459. // If the agent wasn't done...
  460. if len(currentAssistant.ToolCalls()) > 0 {
  461. existing, ok := a.messageQueue.Get(call.SessionID)
  462. if !ok {
  463. existing = []SessionAgentCall{}
  464. }
  465. call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
  466. existing = append(existing, call)
  467. a.messageQueue.Set(call.SessionID, existing)
  468. }
  469. }
  470. // Release active request before processing queued messages.
  471. a.activeRequests.Del(call.SessionID)
  472. cancel()
  473. queuedMessages, ok := a.messageQueue.Get(call.SessionID)
  474. if !ok || len(queuedMessages) == 0 {
  475. return result, err
  476. }
  477. // There are queued messages restart the loop.
  478. firstQueuedMessage := queuedMessages[0]
  479. a.messageQueue.Set(call.SessionID, queuedMessages[1:])
  480. return a.Run(ctx, firstQueuedMessage)
  481. }
  482. func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
  483. if a.IsSessionBusy(sessionID) {
  484. return ErrSessionBusy
  485. }
  486. currentSession, err := a.sessions.Get(ctx, sessionID)
  487. if err != nil {
  488. return fmt.Errorf("failed to get session: %w", err)
  489. }
  490. msgs, err := a.getSessionMessages(ctx, currentSession)
  491. if err != nil {
  492. return err
  493. }
  494. if len(msgs) == 0 {
  495. // Nothing to summarize.
  496. return nil
  497. }
  498. aiMsgs, _ := a.preparePrompt(msgs)
  499. genCtx, cancel := context.WithCancel(ctx)
  500. a.activeRequests.Set(sessionID, cancel)
  501. defer a.activeRequests.Del(sessionID)
  502. defer cancel()
  503. agent := fantasy.NewAgent(a.largeModel.Model,
  504. fantasy.WithSystemPrompt(string(summaryPrompt)),
  505. )
  506. summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
  507. Role: message.Assistant,
  508. Model: a.largeModel.Model.Model(),
  509. Provider: a.largeModel.Model.Provider(),
  510. IsSummaryMessage: true,
  511. })
  512. if err != nil {
  513. return err
  514. }
  515. resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
  516. Prompt: "Provide a detailed summary of our conversation above.",
  517. Messages: aiMsgs,
  518. ProviderOptions: opts,
  519. PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
  520. prepared.Messages = options.Messages
  521. if a.systemPromptPrefix != "" {
  522. prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(a.systemPromptPrefix)}, prepared.Messages...)
  523. }
  524. return callContext, prepared, nil
  525. },
  526. OnReasoningDelta: func(id string, text string) error {
  527. summaryMessage.AppendReasoningContent(text)
  528. return a.messages.Update(genCtx, summaryMessage)
  529. },
  530. OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
  531. // Handle anthropic signature.
  532. if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
  533. if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
  534. summaryMessage.AppendReasoningSignature(signature.Signature)
  535. }
  536. }
  537. summaryMessage.FinishThinking()
  538. return a.messages.Update(genCtx, summaryMessage)
  539. },
  540. OnTextDelta: func(id, text string) error {
  541. summaryMessage.AppendContent(text)
  542. return a.messages.Update(genCtx, summaryMessage)
  543. },
  544. })
  545. if err != nil {
  546. isCancelErr := errors.Is(err, context.Canceled)
  547. if isCancelErr {
  548. // User cancelled summarize we need to remove the summary message.
  549. deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
  550. return deleteErr
  551. }
  552. return err
  553. }
  554. summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
  555. err = a.messages.Update(genCtx, summaryMessage)
  556. if err != nil {
  557. return err
  558. }
  559. var openrouterCost *float64
  560. for _, step := range resp.Steps {
  561. stepCost := a.openrouterCost(step.ProviderMetadata)
  562. if stepCost != nil {
  563. newCost := *stepCost
  564. if openrouterCost != nil {
  565. newCost += *openrouterCost
  566. }
  567. openrouterCost = &newCost
  568. }
  569. }
  570. a.updateSessionUsage(a.largeModel, &currentSession, resp.TotalUsage, openrouterCost)
  571. // Just in case, get just the last usage info.
  572. usage := resp.Response.Usage
  573. currentSession.SummaryMessageID = summaryMessage.ID
  574. currentSession.CompletionTokens = usage.OutputTokens
  575. currentSession.PromptTokens = 0
  576. _, err = a.sessions.Save(genCtx, currentSession)
  577. return err
  578. }
  579. func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
  580. if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
  581. return fantasy.ProviderOptions{}
  582. }
  583. return fantasy.ProviderOptions{
  584. anthropic.Name: &anthropic.ProviderCacheControlOptions{
  585. CacheControl: anthropic.CacheControl{Type: "ephemeral"},
  586. },
  587. bedrock.Name: &anthropic.ProviderCacheControlOptions{
  588. CacheControl: anthropic.CacheControl{Type: "ephemeral"},
  589. },
  590. }
  591. }
  592. func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
  593. var attachmentParts []message.ContentPart
  594. for _, attachment := range call.Attachments {
  595. attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
  596. }
  597. parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
  598. parts = append(parts, attachmentParts...)
  599. msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
  600. Role: message.User,
  601. Parts: parts,
  602. })
  603. if err != nil {
  604. return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
  605. }
  606. return msg, nil
  607. }
  608. func (a *sessionAgent) preparePrompt(msgs []message.Message, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
  609. var history []fantasy.Message
  610. for _, m := range msgs {
  611. if len(m.Parts) == 0 {
  612. continue
  613. }
  614. // Assistant message without content or tool calls (cancelled before it
  615. // returned anything).
  616. if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
  617. continue
  618. }
  619. history = append(history, m.ToAIMessage()...)
  620. }
  621. var files []fantasy.FilePart
  622. for _, attachment := range attachments {
  623. files = append(files, fantasy.FilePart{
  624. Filename: attachment.FileName,
  625. Data: attachment.Content,
  626. MediaType: attachment.MimeType,
  627. })
  628. }
  629. return history, files
  630. }
  631. func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
  632. msgs, err := a.messages.List(ctx, session.ID)
  633. if err != nil {
  634. return nil, fmt.Errorf("failed to list messages: %w", err)
  635. }
  636. if session.SummaryMessageID != "" {
  637. summaryMsgInex := -1
  638. for i, msg := range msgs {
  639. if msg.ID == session.SummaryMessageID {
  640. summaryMsgInex = i
  641. break
  642. }
  643. }
  644. if summaryMsgInex != -1 {
  645. msgs = msgs[summaryMsgInex:]
  646. msgs[0].Role = message.User
  647. }
  648. }
  649. return msgs, nil
  650. }
  651. func (a *sessionAgent) generateTitle(ctx context.Context, session *session.Session, prompt string) {
  652. if prompt == "" {
  653. return
  654. }
  655. var maxOutput int64 = 40
  656. if a.smallModel.CatwalkCfg.CanReason {
  657. maxOutput = a.smallModel.CatwalkCfg.DefaultMaxTokens
  658. }
  659. agent := fantasy.NewAgent(a.smallModel.Model,
  660. fantasy.WithSystemPrompt(string(titlePrompt)+"\n /no_think"),
  661. fantasy.WithMaxOutputTokens(maxOutput),
  662. )
  663. resp, err := agent.Stream(ctx, fantasy.AgentStreamCall{
  664. Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", prompt),
  665. PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
  666. prepared.Messages = options.Messages
  667. if a.systemPromptPrefix != "" {
  668. prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(a.systemPromptPrefix)}, prepared.Messages...)
  669. }
  670. return callContext, prepared, nil
  671. },
  672. })
  673. if err != nil {
  674. slog.Error("error generating title", "err", err)
  675. return
  676. }
  677. title := resp.Response.Content.Text()
  678. title = strings.ReplaceAll(title, "\n", " ")
  679. // Remove thinking tags if present.
  680. if idx := strings.Index(title, "</think>"); idx > 0 {
  681. title = title[idx+len("</think>"):]
  682. }
  683. title = strings.TrimSpace(title)
  684. if title == "" {
  685. slog.Warn("failed to generate title", "warn", "empty title")
  686. return
  687. }
  688. session.Title = title
  689. var openrouterCost *float64
  690. for _, step := range resp.Steps {
  691. stepCost := a.openrouterCost(step.ProviderMetadata)
  692. if stepCost != nil {
  693. newCost := *stepCost
  694. if openrouterCost != nil {
  695. newCost += *openrouterCost
  696. }
  697. openrouterCost = &newCost
  698. }
  699. }
  700. a.updateSessionUsage(a.smallModel, session, resp.TotalUsage, openrouterCost)
  701. _, saveErr := a.sessions.Save(ctx, *session)
  702. if saveErr != nil {
  703. slog.Error("failed to save session title & usage", "error", saveErr)
  704. return
  705. }
  706. }
  707. func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
  708. openrouterMetadata, ok := metadata[openrouter.Name]
  709. if !ok {
  710. return nil
  711. }
  712. opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
  713. if !ok {
  714. return nil
  715. }
  716. return &opts.Usage.Cost
  717. }
  718. func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64) {
  719. modelConfig := model.CatwalkCfg
  720. cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
  721. modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
  722. modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
  723. modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
  724. a.eventTokensUsed(session.ID, model, usage, cost)
  725. if overrideCost != nil {
  726. session.Cost += *overrideCost
  727. } else {
  728. session.Cost += cost
  729. }
  730. session.CompletionTokens = usage.OutputTokens + usage.CacheReadTokens
  731. session.PromptTokens = usage.InputTokens + usage.CacheCreationTokens
  732. }
  733. func (a *sessionAgent) Cancel(sessionID string) {
  734. // Cancel regular requests.
  735. if cancel, ok := a.activeRequests.Take(sessionID); ok && cancel != nil {
  736. slog.Info("Request cancellation initiated", "session_id", sessionID)
  737. cancel()
  738. }
  739. // Also check for summarize requests.
  740. if cancel, ok := a.activeRequests.Take(sessionID + "-summarize"); ok && cancel != nil {
  741. slog.Info("Summarize cancellation initiated", "session_id", sessionID)
  742. cancel()
  743. }
  744. if a.QueuedPrompts(sessionID) > 0 {
  745. slog.Info("Clearing queued prompts", "session_id", sessionID)
  746. a.messageQueue.Del(sessionID)
  747. }
  748. }
  749. func (a *sessionAgent) ClearQueue(sessionID string) {
  750. if a.QueuedPrompts(sessionID) > 0 {
  751. slog.Info("Clearing queued prompts", "session_id", sessionID)
  752. a.messageQueue.Del(sessionID)
  753. }
  754. }
  755. func (a *sessionAgent) CancelAll() {
  756. if !a.IsBusy() {
  757. return
  758. }
  759. for key := range a.activeRequests.Seq2() {
  760. a.Cancel(key) // key is sessionID
  761. }
  762. timeout := time.After(5 * time.Second)
  763. for a.IsBusy() {
  764. select {
  765. case <-timeout:
  766. return
  767. default:
  768. time.Sleep(200 * time.Millisecond)
  769. }
  770. }
  771. }
  772. func (a *sessionAgent) IsBusy() bool {
  773. var busy bool
  774. for cancelFunc := range a.activeRequests.Seq() {
  775. if cancelFunc != nil {
  776. busy = true
  777. break
  778. }
  779. }
  780. return busy
  781. }
  782. func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
  783. _, busy := a.activeRequests.Get(sessionID)
  784. return busy
  785. }
  786. func (a *sessionAgent) QueuedPrompts(sessionID string) int {
  787. l, ok := a.messageQueue.Get(sessionID)
  788. if !ok {
  789. return 0
  790. }
  791. return len(l)
  792. }
  793. func (a *sessionAgent) SetModels(large Model, small Model) {
  794. a.largeModel = large
  795. a.smallModel = small
  796. }
  797. func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
  798. a.tools = tools
  799. }
  800. func (a *sessionAgent) Model() Model {
  801. return a.largeModel
  802. }