| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- package shell
- import (
- "bytes"
- "context"
- "fmt"
- "slices"
- "sync"
- "sync/atomic"
- "time"
- "github.com/charmbracelet/crush/internal/csync"
- )
- const (
- // MaxBackgroundJobs is the maximum number of concurrent background jobs allowed
- MaxBackgroundJobs = 50
- // CompletedJobRetentionMinutes is how long to keep completed jobs before auto-cleanup (8 hours)
- CompletedJobRetentionMinutes = 8 * 60
- )
- // syncBuffer is a thread-safe wrapper around bytes.Buffer.
- type syncBuffer struct {
- buf bytes.Buffer
- mu sync.RWMutex
- }
- func (sb *syncBuffer) Write(p []byte) (n int, err error) {
- sb.mu.Lock()
- defer sb.mu.Unlock()
- return sb.buf.Write(p)
- }
- func (sb *syncBuffer) WriteString(s string) (n int, err error) {
- sb.mu.Lock()
- defer sb.mu.Unlock()
- return sb.buf.WriteString(s)
- }
- func (sb *syncBuffer) String() string {
- sb.mu.RLock()
- defer sb.mu.RUnlock()
- return sb.buf.String()
- }
- // BackgroundShell represents a shell running in the background.
- type BackgroundShell struct {
- ID string
- Command string
- Description string
- Shell *Shell
- WorkingDir string
- ctx context.Context
- cancel context.CancelFunc
- stdout *syncBuffer
- stderr *syncBuffer
- done chan struct{}
- exitErr error
- completedAt int64 // Unix timestamp when job completed (0 if still running)
- }
- // BackgroundShellManager manages background shell instances.
- type BackgroundShellManager struct {
- shells *csync.Map[string, *BackgroundShell]
- }
- var (
- backgroundManager *BackgroundShellManager
- backgroundManagerOnce sync.Once
- idCounter atomic.Uint64
- )
- // newBackgroundShellManager creates a new BackgroundShellManager instance.
- func newBackgroundShellManager() *BackgroundShellManager {
- return &BackgroundShellManager{
- shells: csync.NewMap[string, *BackgroundShell](),
- }
- }
- // GetBackgroundShellManager returns the singleton background shell manager.
- func GetBackgroundShellManager() *BackgroundShellManager {
- backgroundManagerOnce.Do(func() {
- backgroundManager = newBackgroundShellManager()
- })
- return backgroundManager
- }
- // Start creates and starts a new background shell with the given command.
- func (m *BackgroundShellManager) Start(ctx context.Context, workingDir string, blockFuncs []BlockFunc, command string, description string) (*BackgroundShell, error) {
- // Check job limit
- if m.shells.Len() >= MaxBackgroundJobs {
- return nil, fmt.Errorf("maximum number of background jobs (%d) reached. Please terminate or wait for some jobs to complete", MaxBackgroundJobs)
- }
- id := fmt.Sprintf("%03X", idCounter.Add(1))
- shell := NewShell(&Options{
- WorkingDir: workingDir,
- BlockFuncs: blockFuncs,
- })
- shellCtx, cancel := context.WithCancel(ctx)
- bgShell := &BackgroundShell{
- ID: id,
- Command: command,
- Description: description,
- WorkingDir: workingDir,
- Shell: shell,
- ctx: shellCtx,
- cancel: cancel,
- stdout: &syncBuffer{},
- stderr: &syncBuffer{},
- done: make(chan struct{}),
- }
- m.shells.Set(id, bgShell)
- go func() {
- defer close(bgShell.done)
- err := shell.ExecStream(shellCtx, command, bgShell.stdout, bgShell.stderr)
- bgShell.exitErr = err
- atomic.StoreInt64(&bgShell.completedAt, time.Now().Unix())
- }()
- return bgShell, nil
- }
- // Get retrieves a background shell by ID.
- func (m *BackgroundShellManager) Get(id string) (*BackgroundShell, bool) {
- return m.shells.Get(id)
- }
- // Remove removes a background shell from the manager without terminating it.
- // This is useful when a shell has already completed and you just want to clean up tracking.
- func (m *BackgroundShellManager) Remove(id string) error {
- _, ok := m.shells.Take(id)
- if !ok {
- return fmt.Errorf("background shell not found: %s", id)
- }
- return nil
- }
- // Kill terminates a background shell by ID.
- func (m *BackgroundShellManager) Kill(id string) error {
- shell, ok := m.shells.Take(id)
- if !ok {
- return fmt.Errorf("background shell not found: %s", id)
- }
- shell.cancel()
- <-shell.done
- return nil
- }
- // BackgroundShellInfo contains information about a background shell.
- type BackgroundShellInfo struct {
- ID string
- Command string
- Description string
- }
- // List returns all background shell IDs.
- func (m *BackgroundShellManager) List() []string {
- ids := make([]string, 0, m.shells.Len())
- for id := range m.shells.Seq2() {
- ids = append(ids, id)
- }
- return ids
- }
- // Cleanup removes completed jobs that have been finished for more than the retention period
- func (m *BackgroundShellManager) Cleanup() int {
- now := time.Now().Unix()
- retentionSeconds := int64(CompletedJobRetentionMinutes * 60)
- var toRemove []string
- for shell := range m.shells.Seq() {
- completedAt := atomic.LoadInt64(&shell.completedAt)
- if completedAt > 0 && now-completedAt > retentionSeconds {
- toRemove = append(toRemove, shell.ID)
- }
- }
- for _, id := range toRemove {
- m.Remove(id)
- }
- return len(toRemove)
- }
- // KillAll terminates all background shells.
- func (m *BackgroundShellManager) KillAll() {
- shells := slices.Collect(m.shells.Seq())
- m.shells.Reset(map[string]*BackgroundShell{})
- done := make(chan struct{}, 1)
- go func() {
- var wg sync.WaitGroup
- for _, shell := range shells {
- wg.Go(func() {
- shell.cancel()
- <-shell.done
- })
- }
- wg.Wait()
- done <- struct{}{}
- }()
- select {
- case <-done:
- return
- case <-time.After(time.Second * 5):
- return
- }
- }
- // GetOutput returns the current output of a background shell.
- func (bs *BackgroundShell) GetOutput() (stdout string, stderr string, done bool, err error) {
- select {
- case <-bs.done:
- return bs.stdout.String(), bs.stderr.String(), true, bs.exitErr
- default:
- return bs.stdout.String(), bs.stderr.String(), false, nil
- }
- }
- // IsDone checks if the background shell has finished execution.
- func (bs *BackgroundShell) IsDone() bool {
- select {
- case <-bs.done:
- return true
- default:
- return false
- }
- }
- // Wait blocks until the background shell completes.
- func (bs *BackgroundShell) Wait() {
- <-bs.done
- }
|