background.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package shell
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "slices"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/charmbracelet/crush/internal/csync"
  11. )
  12. const (
  13. // MaxBackgroundJobs is the maximum number of concurrent background jobs allowed
  14. MaxBackgroundJobs = 50
  15. // CompletedJobRetentionMinutes is how long to keep completed jobs before auto-cleanup (8 hours)
  16. CompletedJobRetentionMinutes = 8 * 60
  17. )
  18. // syncBuffer is a thread-safe wrapper around bytes.Buffer.
  19. type syncBuffer struct {
  20. buf bytes.Buffer
  21. mu sync.RWMutex
  22. }
  23. func (sb *syncBuffer) Write(p []byte) (n int, err error) {
  24. sb.mu.Lock()
  25. defer sb.mu.Unlock()
  26. return sb.buf.Write(p)
  27. }
  28. func (sb *syncBuffer) WriteString(s string) (n int, err error) {
  29. sb.mu.Lock()
  30. defer sb.mu.Unlock()
  31. return sb.buf.WriteString(s)
  32. }
  33. func (sb *syncBuffer) String() string {
  34. sb.mu.RLock()
  35. defer sb.mu.RUnlock()
  36. return sb.buf.String()
  37. }
  38. // BackgroundShell represents a shell running in the background.
  39. type BackgroundShell struct {
  40. ID string
  41. Command string
  42. Description string
  43. Shell *Shell
  44. WorkingDir string
  45. ctx context.Context
  46. cancel context.CancelFunc
  47. stdout *syncBuffer
  48. stderr *syncBuffer
  49. done chan struct{}
  50. exitErr error
  51. completedAt int64 // Unix timestamp when job completed (0 if still running)
  52. }
  53. // BackgroundShellManager manages background shell instances.
  54. type BackgroundShellManager struct {
  55. shells *csync.Map[string, *BackgroundShell]
  56. }
  57. var (
  58. backgroundManager *BackgroundShellManager
  59. backgroundManagerOnce sync.Once
  60. idCounter atomic.Uint64
  61. )
  62. // newBackgroundShellManager creates a new BackgroundShellManager instance.
  63. func newBackgroundShellManager() *BackgroundShellManager {
  64. return &BackgroundShellManager{
  65. shells: csync.NewMap[string, *BackgroundShell](),
  66. }
  67. }
  68. // GetBackgroundShellManager returns the singleton background shell manager.
  69. func GetBackgroundShellManager() *BackgroundShellManager {
  70. backgroundManagerOnce.Do(func() {
  71. backgroundManager = newBackgroundShellManager()
  72. })
  73. return backgroundManager
  74. }
  75. // Start creates and starts a new background shell with the given command.
  76. func (m *BackgroundShellManager) Start(ctx context.Context, workingDir string, blockFuncs []BlockFunc, command string, description string) (*BackgroundShell, error) {
  77. // Check job limit
  78. if m.shells.Len() >= MaxBackgroundJobs {
  79. return nil, fmt.Errorf("maximum number of background jobs (%d) reached. Please terminate or wait for some jobs to complete", MaxBackgroundJobs)
  80. }
  81. id := fmt.Sprintf("%03X", idCounter.Add(1))
  82. shell := NewShell(&Options{
  83. WorkingDir: workingDir,
  84. BlockFuncs: blockFuncs,
  85. })
  86. shellCtx, cancel := context.WithCancel(ctx)
  87. bgShell := &BackgroundShell{
  88. ID: id,
  89. Command: command,
  90. Description: description,
  91. WorkingDir: workingDir,
  92. Shell: shell,
  93. ctx: shellCtx,
  94. cancel: cancel,
  95. stdout: &syncBuffer{},
  96. stderr: &syncBuffer{},
  97. done: make(chan struct{}),
  98. }
  99. m.shells.Set(id, bgShell)
  100. go func() {
  101. defer close(bgShell.done)
  102. err := shell.ExecStream(shellCtx, command, bgShell.stdout, bgShell.stderr)
  103. bgShell.exitErr = err
  104. atomic.StoreInt64(&bgShell.completedAt, time.Now().Unix())
  105. }()
  106. return bgShell, nil
  107. }
  108. // Get retrieves a background shell by ID.
  109. func (m *BackgroundShellManager) Get(id string) (*BackgroundShell, bool) {
  110. return m.shells.Get(id)
  111. }
  112. // Remove removes a background shell from the manager without terminating it.
  113. // This is useful when a shell has already completed and you just want to clean up tracking.
  114. func (m *BackgroundShellManager) Remove(id string) error {
  115. _, ok := m.shells.Take(id)
  116. if !ok {
  117. return fmt.Errorf("background shell not found: %s", id)
  118. }
  119. return nil
  120. }
  121. // Kill terminates a background shell by ID.
  122. func (m *BackgroundShellManager) Kill(id string) error {
  123. shell, ok := m.shells.Take(id)
  124. if !ok {
  125. return fmt.Errorf("background shell not found: %s", id)
  126. }
  127. shell.cancel()
  128. <-shell.done
  129. return nil
  130. }
  131. // BackgroundShellInfo contains information about a background shell.
  132. type BackgroundShellInfo struct {
  133. ID string
  134. Command string
  135. Description string
  136. }
  137. // List returns all background shell IDs.
  138. func (m *BackgroundShellManager) List() []string {
  139. ids := make([]string, 0, m.shells.Len())
  140. for id := range m.shells.Seq2() {
  141. ids = append(ids, id)
  142. }
  143. return ids
  144. }
  145. // Cleanup removes completed jobs that have been finished for more than the retention period
  146. func (m *BackgroundShellManager) Cleanup() int {
  147. now := time.Now().Unix()
  148. retentionSeconds := int64(CompletedJobRetentionMinutes * 60)
  149. var toRemove []string
  150. for shell := range m.shells.Seq() {
  151. completedAt := atomic.LoadInt64(&shell.completedAt)
  152. if completedAt > 0 && now-completedAt > retentionSeconds {
  153. toRemove = append(toRemove, shell.ID)
  154. }
  155. }
  156. for _, id := range toRemove {
  157. m.Remove(id)
  158. }
  159. return len(toRemove)
  160. }
  161. // KillAll terminates all background shells.
  162. func (m *BackgroundShellManager) KillAll() {
  163. shells := slices.Collect(m.shells.Seq())
  164. m.shells.Reset(map[string]*BackgroundShell{})
  165. done := make(chan struct{}, 1)
  166. go func() {
  167. var wg sync.WaitGroup
  168. for _, shell := range shells {
  169. wg.Go(func() {
  170. shell.cancel()
  171. <-shell.done
  172. })
  173. }
  174. wg.Wait()
  175. done <- struct{}{}
  176. }()
  177. select {
  178. case <-done:
  179. return
  180. case <-time.After(time.Second * 5):
  181. return
  182. }
  183. }
  184. // GetOutput returns the current output of a background shell.
  185. func (bs *BackgroundShell) GetOutput() (stdout string, stderr string, done bool, err error) {
  186. select {
  187. case <-bs.done:
  188. return bs.stdout.String(), bs.stderr.String(), true, bs.exitErr
  189. default:
  190. return bs.stdout.String(), bs.stderr.String(), false, nil
  191. }
  192. }
  193. // IsDone checks if the background shell has finished execution.
  194. func (bs *BackgroundShell) IsDone() bool {
  195. select {
  196. case <-bs.done:
  197. return true
  198. default:
  199. return false
  200. }
  201. }
  202. // Wait blocks until the background shell completes.
  203. func (bs *BackgroundShell) Wait() {
  204. <-bs.done
  205. }