Просмотр исходного кода

🔧 fix(api_request): enhance ping keep-alive mechanism with error handling and timeout controls

CaIon 6 месяцев назад
Родитель
Сommit
136a46218b
1 измененных файлов с 63 добавлено и 15 удалено
  1. 63 15
      relay/channel/api_request.go

+ 63 - 15
relay/channel/api_request.go

@@ -109,6 +109,12 @@ func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.Canc
 
 	gopool.Go(func() {
 		defer func() {
+			// 增加panic恢复处理
+			if r := recover(); r != nil {
+				if common2.DebugEnabled {
+					println("SSE ping goroutine panic recovered:", fmt.Sprintf("%v", r))
+				}
+			}
 			if common2.DebugEnabled {
 				println("SSE ping goroutine stopped.")
 			}
@@ -119,19 +125,32 @@ func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.Canc
 		}
 
 		ticker := time.NewTicker(pingInterval)
-		// 退出时清理 ticker
-		defer ticker.Stop()
+		// 确保在任何情况下都清理ticker
+		defer func() {
+			ticker.Stop()
+			if common2.DebugEnabled {
+				println("SSE ping ticker stopped")
+			}
+		}()
 
 		var pingMutex sync.Mutex
 		if common2.DebugEnabled {
 			println("SSE ping goroutine started")
 		}
 
+		// 增加超时控制,防止goroutine长时间运行
+		maxPingDuration := 120 * time.Minute // 最大ping持续时间
+		pingTimeout := time.NewTimer(maxPingDuration)
+		defer pingTimeout.Stop()
+
 		for {
 			select {
 			// 发送 ping 数据
 			case <-ticker.C:
 				if err := sendPingData(c, &pingMutex); err != nil {
+					if common2.DebugEnabled {
+						println("SSE ping error, stopping goroutine:", err.Error())
+					}
 					return
 				}
 			// 收到退出信号
@@ -140,6 +159,12 @@ func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.Canc
 			// request 结束
 			case <-c.Request.Context().Done():
 				return
+			// 超时保护,防止goroutine无限运行
+			case <-pingTimeout.C:
+				if common2.DebugEnabled {
+					println("SSE ping goroutine timeout, stopping")
+				}
+				return
 			}
 		}
 	})
@@ -148,19 +173,34 @@ func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.Canc
 }
 
 func sendPingData(c *gin.Context, mutex *sync.Mutex) error {
-	mutex.Lock()
-	defer mutex.Unlock()
+	// 增加超时控制,防止锁死等待
+	done := make(chan error, 1)
+	go func() {
+		mutex.Lock()
+		defer mutex.Unlock()
 
-	err := helper.PingData(c)
-	if err != nil {
-		common2.LogError(c, "SSE ping error: "+err.Error())
-		return err
-	}
+		err := helper.PingData(c)
+		if err != nil {
+			common2.LogError(c, "SSE ping error: "+err.Error())
+			done <- err
+			return
+		}
 
-	if common2.DebugEnabled {
-		println("SSE ping data sent.")
+		if common2.DebugEnabled {
+			println("SSE ping data sent.")
+		}
+		done <- nil
+	}()
+
+	// 设置发送ping数据的超时时间
+	select {
+	case err := <-done:
+		return err
+	case <-time.After(10 * time.Second):
+		return errors.New("SSE ping data send timeout")
+	case <-c.Request.Context().Done():
+		return errors.New("request context cancelled during ping")
 	}
-	return nil
 }
 
 func doRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
@@ -175,15 +215,23 @@ func doRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http
 		client = service.GetHttpClient()
 	}
 
+	var stopPinger context.CancelFunc
 	if info.IsStream {
 		helper.SetEventStreamHeaders(c)
-
 		// 处理流式请求的 ping 保活
 		generalSettings := operation_setting.GetGeneralSetting()
 		if generalSettings.PingIntervalEnabled {
 			pingInterval := time.Duration(generalSettings.PingIntervalSeconds) * time.Second
-			stopPinger := startPingKeepAlive(c, pingInterval)
-			defer stopPinger()
+			stopPinger = startPingKeepAlive(c, pingInterval)
+			// 使用defer确保在任何情况下都能停止ping goroutine
+			defer func() {
+				if stopPinger != nil {
+					stopPinger()
+					if common2.DebugEnabled {
+						println("SSE ping goroutine stopped by defer")
+					}
+				}
+			}()
 		}
 	}