|
|
@@ -48,8 +48,6 @@ type Service interface {
|
|
|
IsBusy() bool
|
|
|
Update(agentName config.AgentName, modelID models.ModelID) (models.Model, error)
|
|
|
CompactSession(ctx context.Context, sessionID string) error
|
|
|
- PauseSession(sessionID string) error
|
|
|
- ResumeSession(sessionID string) error
|
|
|
}
|
|
|
|
|
|
type agent struct {
|
|
|
@@ -62,7 +60,6 @@ type agent struct {
|
|
|
titleProvider provider.Provider
|
|
|
|
|
|
activeRequests sync.Map
|
|
|
- pauseLock sync.RWMutex // Lock for pausing message processing
|
|
|
}
|
|
|
|
|
|
func NewAgent(
|
|
|
@@ -420,13 +417,9 @@ func (a *agent) processEvent(ctx context.Context, sessionID string, assistantMsg
|
|
|
case <-ctx.Done():
|
|
|
return ctx.Err()
|
|
|
default:
|
|
|
- // Continue processing.
|
|
|
+ // Continue processing
|
|
|
}
|
|
|
|
|
|
- // Check if session is paused - use RLock to allow concurrent reads but block during pause
|
|
|
- a.pauseLock.RLock()
|
|
|
- defer a.pauseLock.RUnlock()
|
|
|
-
|
|
|
switch event.Type {
|
|
|
case provider.EventThinkingDelta:
|
|
|
assistantMsg.AppendReasoningContent(event.Content)
|
|
|
@@ -482,7 +475,7 @@ func (a *agent) GetUsage(ctx context.Context, sessionID string) (*int64, error)
|
|
|
return &usage, nil
|
|
|
}
|
|
|
|
|
|
-func (a *agent) TrackUsage(ctx context.Context, sessionID string, model models.Model, usage provider.TokenUsage) error {
|
|
|
+func (a *agent) TrackUsage(ctx context.Context, sessionID string, model models.Model, usage provider.TokenUsage) error { //nolint:lll
|
|
|
sess, err := a.sessions.Get(ctx, sessionID)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("failed to get session: %w", err)
|
|
|
@@ -523,36 +516,10 @@ func (a *agent) Update(agentName config.AgentName, modelID models.ModelID) (mode
|
|
|
return a.provider.Model(), nil
|
|
|
}
|
|
|
|
|
|
-// PauseSession pauses message processing for a specific session
|
|
|
-// This should be called before performing operations that require exclusive access
|
|
|
-func (a *agent) PauseSession(sessionID string) error {
|
|
|
- if !a.IsSessionBusy(sessionID) {
|
|
|
- return nil // Session is not active, no need to pause
|
|
|
- }
|
|
|
-
|
|
|
- status.Info(fmt.Sprintf("Pausing session: %s", sessionID))
|
|
|
- a.pauseLock.Lock() // Acquire write lock to block new operations
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-// ResumeSession resumes message processing for a session
|
|
|
-// This should be called after completing operations that required exclusive access
|
|
|
-func (a *agent) ResumeSession(sessionID string) error {
|
|
|
- status.Info(fmt.Sprintf("Resuming session: %s", sessionID))
|
|
|
- a.pauseLock.Unlock() // Release write lock to allow operations to continue
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
func (a *agent) CompactSession(ctx context.Context, sessionID string) error {
|
|
|
// Check if the session is busy
|
|
|
if a.IsSessionBusy(sessionID) {
|
|
|
- // Pause the session before compaction
|
|
|
- if err := a.PauseSession(sessionID); err != nil {
|
|
|
- return fmt.Errorf("failed to pause session: %w", err)
|
|
|
- }
|
|
|
- // Make sure to resume the session when we're done
|
|
|
- defer a.ResumeSession(sessionID)
|
|
|
- status.Info(fmt.Sprintf("Session %s paused for compaction", sessionID))
|
|
|
+ return ErrSessionBusy
|
|
|
}
|
|
|
|
|
|
// Create a cancellable context
|