background.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. package shell
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "slices"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/charmbracelet/crush/internal/csync"
  11. )
  12. const (
  13. // MaxBackgroundJobs is the maximum number of concurrent background jobs allowed
  14. MaxBackgroundJobs = 50
  15. // CompletedJobRetentionMinutes is how long to keep completed jobs before auto-cleanup (8 hours)
  16. CompletedJobRetentionMinutes = 8 * 60
  17. )
  18. // syncBuffer is a thread-safe wrapper around bytes.Buffer.
  19. type syncBuffer struct {
  20. buf bytes.Buffer
  21. mu sync.RWMutex
  22. }
  23. func (sb *syncBuffer) Write(p []byte) (n int, err error) {
  24. sb.mu.Lock()
  25. defer sb.mu.Unlock()
  26. return sb.buf.Write(p)
  27. }
  28. func (sb *syncBuffer) WriteString(s string) (n int, err error) {
  29. sb.mu.Lock()
  30. defer sb.mu.Unlock()
  31. return sb.buf.WriteString(s)
  32. }
  33. func (sb *syncBuffer) String() string {
  34. sb.mu.RLock()
  35. defer sb.mu.RUnlock()
  36. return sb.buf.String()
  37. }
  38. // BackgroundShell represents a shell running in the background.
  39. type BackgroundShell struct {
  40. ID string
  41. Command string
  42. Description string
  43. Shell *Shell
  44. WorkingDir string
  45. ctx context.Context
  46. cancel context.CancelFunc
  47. stdout *syncBuffer
  48. stderr *syncBuffer
  49. done chan struct{}
  50. exitErr error
  51. completedAt atomic.Int64 // Unix timestamp when job completed (0 if still running)
  52. }
  53. // BackgroundShellManager manages background shell instances.
  54. type BackgroundShellManager struct {
  55. shells *csync.Map[string, *BackgroundShell]
  56. }
  57. var (
  58. backgroundManager *BackgroundShellManager
  59. backgroundManagerOnce sync.Once
  60. idCounter atomic.Uint64
  61. )
  62. // newBackgroundShellManager creates a new BackgroundShellManager instance.
  63. func newBackgroundShellManager() *BackgroundShellManager {
  64. return &BackgroundShellManager{
  65. shells: csync.NewMap[string, *BackgroundShell](),
  66. }
  67. }
  68. // GetBackgroundShellManager returns the singleton background shell manager.
  69. func GetBackgroundShellManager() *BackgroundShellManager {
  70. backgroundManagerOnce.Do(func() {
  71. backgroundManager = newBackgroundShellManager()
  72. })
  73. return backgroundManager
  74. }
  75. // Start creates and starts a new background shell with the given command.
  76. func (m *BackgroundShellManager) Start(ctx context.Context, workingDir string, blockFuncs []BlockFunc, command string, description string) (*BackgroundShell, error) {
  77. // Check job limit
  78. if m.shells.Len() >= MaxBackgroundJobs {
  79. return nil, fmt.Errorf("maximum number of background jobs (%d) reached. Please terminate or wait for some jobs to complete", MaxBackgroundJobs)
  80. }
  81. id := fmt.Sprintf("%03X", idCounter.Add(1))
  82. shell := NewShell(&Options{
  83. WorkingDir: workingDir,
  84. BlockFuncs: blockFuncs,
  85. })
  86. shellCtx, cancel := context.WithCancel(ctx)
  87. bgShell := &BackgroundShell{
  88. ID: id,
  89. Command: command,
  90. Description: description,
  91. WorkingDir: workingDir,
  92. Shell: shell,
  93. ctx: shellCtx,
  94. cancel: cancel,
  95. stdout: &syncBuffer{},
  96. stderr: &syncBuffer{},
  97. done: make(chan struct{}),
  98. }
  99. m.shells.Set(id, bgShell)
  100. go func() {
  101. defer close(bgShell.done)
  102. err := shell.ExecStream(shellCtx, command, bgShell.stdout, bgShell.stderr)
  103. bgShell.exitErr = err
  104. bgShell.completedAt.Store(time.Now().Unix())
  105. }()
  106. return bgShell, nil
  107. }
  108. // Get retrieves a background shell by ID.
  109. func (m *BackgroundShellManager) Get(id string) (*BackgroundShell, bool) {
  110. return m.shells.Get(id)
  111. }
  112. // Remove removes a background shell from the manager without terminating it.
  113. // This is useful when a shell has already completed and you just want to clean up tracking.
  114. func (m *BackgroundShellManager) Remove(id string) error {
  115. _, ok := m.shells.Take(id)
  116. if !ok {
  117. return fmt.Errorf("background shell not found: %s", id)
  118. }
  119. return nil
  120. }
  121. // Kill terminates a background shell by ID.
  122. func (m *BackgroundShellManager) Kill(id string) error {
  123. shell, ok := m.shells.Take(id)
  124. if !ok {
  125. return fmt.Errorf("background shell not found: %s", id)
  126. }
  127. shell.cancel()
  128. <-shell.done
  129. return nil
  130. }
  131. // BackgroundShellInfo contains information about a background shell.
  132. type BackgroundShellInfo struct {
  133. ID string
  134. Command string
  135. Description string
  136. }
  137. // List returns all background shell IDs.
  138. func (m *BackgroundShellManager) List() []string {
  139. ids := make([]string, 0, m.shells.Len())
  140. for id := range m.shells.Seq2() {
  141. ids = append(ids, id)
  142. }
  143. return ids
  144. }
  145. // Cleanup removes completed jobs that have been finished for more than the retention period
  146. func (m *BackgroundShellManager) Cleanup() int {
  147. now := time.Now().Unix()
  148. retentionSeconds := int64(CompletedJobRetentionMinutes * 60)
  149. var toRemove []string
  150. for shell := range m.shells.Seq() {
  151. completedAt := shell.completedAt.Load()
  152. if completedAt > 0 && now-completedAt > retentionSeconds {
  153. toRemove = append(toRemove, shell.ID)
  154. }
  155. }
  156. for _, id := range toRemove {
  157. m.Remove(id)
  158. }
  159. return len(toRemove)
  160. }
  161. // KillAll terminates all background shells. The provided context bounds how
  162. // long the function waits for each shell to exit.
  163. func (m *BackgroundShellManager) KillAll(ctx context.Context) {
  164. shells := slices.Collect(m.shells.Seq())
  165. m.shells.Reset(map[string]*BackgroundShell{})
  166. var wg sync.WaitGroup
  167. for _, shell := range shells {
  168. wg.Go(func() {
  169. shell.cancel()
  170. select {
  171. case <-shell.done:
  172. case <-ctx.Done():
  173. }
  174. })
  175. }
  176. wg.Wait()
  177. }
  178. // GetOutput returns the current output of a background shell.
  179. func (bs *BackgroundShell) GetOutput() (stdout string, stderr string, done bool, err error) {
  180. select {
  181. case <-bs.done:
  182. return bs.stdout.String(), bs.stderr.String(), true, bs.exitErr
  183. default:
  184. return bs.stdout.String(), bs.stderr.String(), false, nil
  185. }
  186. }
  187. // IsDone checks if the background shell has finished execution.
  188. func (bs *BackgroundShell) IsDone() bool {
  189. select {
  190. case <-bs.done:
  191. return true
  192. default:
  193. return false
  194. }
  195. }
  196. // Wait blocks until the background shell completes.
  197. func (bs *BackgroundShell) Wait() {
  198. <-bs.done
  199. }
  200. func (bs *BackgroundShell) WaitContext(ctx context.Context) bool {
  201. select {
  202. case <-bs.done:
  203. return true
  204. case <-ctx.Done():
  205. return false
  206. }
  207. }