app.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package app
  2. import (
  3. "context"
  4. "database/sql"
  5. "errors"
  6. "fmt"
  7. "log/slog"
  8. "maps"
  9. "sync"
  10. "time"
  11. tea "github.com/charmbracelet/bubbletea/v2"
  12. "github.com/charmbracelet/crush/internal/config"
  13. "github.com/charmbracelet/crush/internal/db"
  14. "github.com/charmbracelet/crush/internal/format"
  15. "github.com/charmbracelet/crush/internal/history"
  16. "github.com/charmbracelet/crush/internal/llm/agent"
  17. "github.com/charmbracelet/crush/internal/log"
  18. "github.com/charmbracelet/crush/internal/pubsub"
  19. "github.com/charmbracelet/crush/internal/lsp"
  20. "github.com/charmbracelet/crush/internal/message"
  21. "github.com/charmbracelet/crush/internal/permission"
  22. "github.com/charmbracelet/crush/internal/session"
  23. )
  24. type App struct {
  25. Sessions session.Service
  26. Messages message.Service
  27. History history.Service
  28. Permissions permission.Service
  29. CoderAgent agent.Service
  30. LSPClients map[string]*lsp.Client
  31. clientsMutex sync.RWMutex
  32. watcherCancelFuncs []context.CancelFunc
  33. cancelFuncsMutex sync.Mutex
  34. lspWatcherWG sync.WaitGroup
  35. config *config.Config
  36. serviceEventsWG *sync.WaitGroup
  37. eventsCtx context.Context
  38. events chan tea.Msg
  39. tuiWG *sync.WaitGroup
  40. // global context and cleanup functions
  41. globalCtx context.Context
  42. cleanupFuncs []func()
  43. }
  44. func New(ctx context.Context, conn *sql.DB, cfg *config.Config) (*App, error) {
  45. q := db.New(conn)
  46. sessions := session.NewService(q)
  47. messages := message.NewService(q)
  48. files := history.NewService(q, conn)
  49. app := &App{
  50. Sessions: sessions,
  51. Messages: messages,
  52. History: files,
  53. Permissions: permission.NewPermissionService(cfg.WorkingDir()),
  54. LSPClients: make(map[string]*lsp.Client),
  55. globalCtx: ctx,
  56. config: cfg,
  57. events: make(chan tea.Msg, 100),
  58. serviceEventsWG: &sync.WaitGroup{},
  59. tuiWG: &sync.WaitGroup{},
  60. }
  61. app.setupEvents()
  62. // Initialize LSP clients in the background
  63. go app.initLSPClients(ctx)
  64. // TODO: remove the concept of agent config most likely
  65. if cfg.IsConfigured() {
  66. if err := app.InitCoderAgent(); err != nil {
  67. return nil, fmt.Errorf("failed to initialize coder agent: %w", err)
  68. }
  69. } else {
  70. slog.Warn("No agent configuration found")
  71. }
  72. return app, nil
  73. }
  74. // RunNonInteractive handles the execution flow when a prompt is provided via CLI flag.
  75. func (a *App) RunNonInteractive(ctx context.Context, prompt string, outputFormat string, quiet bool) error {
  76. slog.Info("Running in non-interactive mode")
  77. // Start spinner if not in quiet mode
  78. var spinner *format.Spinner
  79. if !quiet {
  80. spinner = format.NewSpinner(ctx, "Generating")
  81. spinner.Start()
  82. defer spinner.Stop()
  83. }
  84. const maxPromptLengthForTitle = 100
  85. titlePrefix := "Non-interactive: "
  86. var titleSuffix string
  87. if len(prompt) > maxPromptLengthForTitle {
  88. titleSuffix = prompt[:maxPromptLengthForTitle] + "..."
  89. } else {
  90. titleSuffix = prompt
  91. }
  92. title := titlePrefix + titleSuffix
  93. sess, err := a.Sessions.Create(ctx, title)
  94. if err != nil {
  95. return fmt.Errorf("failed to create session for non-interactive mode: %w", err)
  96. }
  97. slog.Info("Created session for non-interactive run", "session_id", sess.ID)
  98. // Automatically approve all permission requests for this non-interactive session
  99. a.Permissions.AutoApproveSession(sess.ID)
  100. done, err := a.CoderAgent.Run(ctx, sess.ID, prompt)
  101. if err != nil {
  102. return fmt.Errorf("failed to start agent processing stream: %w", err)
  103. }
  104. result := <-done
  105. // Stop spinner before printing output
  106. if !quiet && spinner != nil {
  107. spinner.Stop()
  108. }
  109. if result.Error != nil {
  110. if errors.Is(result.Error, context.Canceled) || errors.Is(result.Error, agent.ErrRequestCancelled) {
  111. slog.Info("Agent processing cancelled", "session_id", sess.ID)
  112. return nil
  113. }
  114. return fmt.Errorf("agent processing failed: %w", result.Error)
  115. }
  116. // Get the text content from the response
  117. content := "No content available"
  118. if result.Message.Content().String() != "" {
  119. content = result.Message.Content().String()
  120. }
  121. out, err := format.FormatOutput(content, outputFormat)
  122. if err != nil {
  123. return err
  124. }
  125. fmt.Println(out)
  126. slog.Info("Non-interactive run completed", "session_id", sess.ID)
  127. return nil
  128. }
  129. func (app *App) UpdateAgentModel() error {
  130. return app.CoderAgent.UpdateModel()
  131. }
  132. func (app *App) setupEvents() {
  133. ctx, cancel := context.WithCancel(app.globalCtx)
  134. app.eventsCtx = ctx
  135. setupSubscriber(ctx, app.serviceEventsWG, "sessions", app.Sessions.Subscribe, app.events)
  136. setupSubscriber(ctx, app.serviceEventsWG, "messages", app.Messages.Subscribe, app.events)
  137. setupSubscriber(ctx, app.serviceEventsWG, "permissions", app.Permissions.Subscribe, app.events)
  138. setupSubscriber(ctx, app.serviceEventsWG, "history", app.History.Subscribe, app.events)
  139. cleanupFunc := func() {
  140. cancel()
  141. app.serviceEventsWG.Wait()
  142. }
  143. app.cleanupFuncs = append(app.cleanupFuncs, cleanupFunc)
  144. }
  145. func setupSubscriber[T any](
  146. ctx context.Context,
  147. wg *sync.WaitGroup,
  148. name string,
  149. subscriber func(context.Context) <-chan pubsub.Event[T],
  150. outputCh chan<- tea.Msg,
  151. ) {
  152. wg.Add(1)
  153. go func() {
  154. defer wg.Done()
  155. subCh := subscriber(ctx)
  156. for {
  157. select {
  158. case event, ok := <-subCh:
  159. if !ok {
  160. slog.Debug("subscription channel closed", "name", name)
  161. return
  162. }
  163. var msg tea.Msg = event
  164. select {
  165. case outputCh <- msg:
  166. case <-time.After(2 * time.Second):
  167. slog.Warn("message dropped due to slow consumer", "name", name)
  168. case <-ctx.Done():
  169. slog.Debug("subscription cancelled", "name", name)
  170. return
  171. }
  172. case <-ctx.Done():
  173. slog.Debug("subscription cancelled", "name", name)
  174. return
  175. }
  176. }
  177. }()
  178. }
  179. func (app *App) InitCoderAgent() error {
  180. coderAgentCfg := app.config.Agents["coder"]
  181. if coderAgentCfg.ID == "" {
  182. return fmt.Errorf("coder agent configuration is missing")
  183. }
  184. var err error
  185. app.CoderAgent, err = agent.NewAgent(
  186. coderAgentCfg,
  187. app.Permissions,
  188. app.Sessions,
  189. app.Messages,
  190. app.History,
  191. app.LSPClients,
  192. )
  193. if err != nil {
  194. slog.Error("Failed to create coder agent", "err", err)
  195. return err
  196. }
  197. setupSubscriber(app.eventsCtx, app.serviceEventsWG, "coderAgent", app.CoderAgent.Subscribe, app.events)
  198. return nil
  199. }
  200. func (app *App) Subscribe(program *tea.Program) {
  201. defer log.RecoverPanic("app.Subscribe", func() {
  202. slog.Info("TUI subscription panic - attempting graceful shutdown")
  203. program.Quit()
  204. })
  205. app.tuiWG.Add(1)
  206. tuiCtx, tuiCancel := context.WithCancel(app.globalCtx)
  207. app.cleanupFuncs = append(app.cleanupFuncs, func() {
  208. slog.Debug("Cancelling TUI message handler")
  209. tuiCancel()
  210. app.tuiWG.Wait()
  211. })
  212. defer app.tuiWG.Done()
  213. for {
  214. select {
  215. case <-tuiCtx.Done():
  216. slog.Debug("TUI message handler shutting down")
  217. return
  218. case msg, ok := <-app.events:
  219. if !ok {
  220. slog.Debug("TUI message channel closed")
  221. return
  222. }
  223. program.Send(msg)
  224. }
  225. }
  226. }
  227. // Shutdown performs a clean shutdown of the application
  228. func (app *App) Shutdown() {
  229. app.cancelFuncsMutex.Lock()
  230. for _, cancel := range app.watcherCancelFuncs {
  231. cancel()
  232. }
  233. app.cancelFuncsMutex.Unlock()
  234. app.lspWatcherWG.Wait()
  235. app.clientsMutex.RLock()
  236. clients := make(map[string]*lsp.Client, len(app.LSPClients))
  237. maps.Copy(clients, app.LSPClients)
  238. app.clientsMutex.RUnlock()
  239. for name, client := range clients {
  240. shutdownCtx, cancel := context.WithTimeout(app.globalCtx, 5*time.Second)
  241. if err := client.Shutdown(shutdownCtx); err != nil {
  242. slog.Error("Failed to shutdown LSP client", "name", name, "error", err)
  243. }
  244. cancel()
  245. }
  246. if app.CoderAgent != nil {
  247. app.CoderAgent.CancelAll()
  248. }
  249. for _, cleanup := range app.cleanupFuncs {
  250. if cleanup != nil {
  251. cleanup()
  252. }
  253. }
  254. }