Browse Source

调整 ws 关闭的逻辑

Signed-off-by: allan716 <[email protected]>
allan716 3 years ago
parent
commit
6028179288
2 changed files with 21 additions and 5 deletions
  1. 16 4
      internal/backend/ws_helper/client.go
  2. 5 1
      internal/pkg/log_helper/log_hub.go

+ 16 - 4
internal/backend/ws_helper/client.go

@@ -3,12 +3,14 @@ package ws_helper
 import (
 import (
 	"bytes"
 	"bytes"
 	"encoding/json"
 	"encoding/json"
+	"errors"
 	"github.com/allanpk716/ChineseSubFinder/internal/pkg/common"
 	"github.com/allanpk716/ChineseSubFinder/internal/pkg/common"
 	"github.com/allanpk716/ChineseSubFinder/internal/pkg/log_helper"
 	"github.com/allanpk716/ChineseSubFinder/internal/pkg/log_helper"
 	"github.com/allanpk716/ChineseSubFinder/internal/types/backend/ws"
 	"github.com/allanpk716/ChineseSubFinder/internal/types/backend/ws"
 	"github.com/allanpk716/ChineseSubFinder/internal/types/log_hub"
 	"github.com/allanpk716/ChineseSubFinder/internal/types/log_hub"
 	"github.com/gorilla/websocket"
 	"github.com/gorilla/websocket"
 	"net/http"
 	"net/http"
+	"sync"
 	"time"
 	"time"
 )
 )
 
 
@@ -51,6 +53,13 @@ type Client struct {
 	sendLogLineIndex int             // 日志发送到那个位置了
 	sendLogLineIndex int             // 日志发送到那个位置了
 	authed           bool            // 是否已经通过认证
 	authed           bool            // 是否已经通过认证
 	send             chan []byte     // 发送给 client 的内容 bytes
 	send             chan []byte     // 发送给 client 的内容 bytes
+	closeOnce        sync.Once
+}
+
+func (c *Client) close() {
+	c.closeOnce.Do(func() {
+		_ = c.conn.Close()
+	})
 }
 }
 
 
 // 接收 Client 发送来的消息
 // 接收 Client 发送来的消息
@@ -65,7 +74,7 @@ func (c *Client) readPump() {
 	defer func() {
 	defer func() {
 		// 触发移除 client 的逻辑
 		// 触发移除 client 的逻辑
 		c.hub.unregister <- c
 		c.hub.unregister <- c
-		_ = c.conn.Close()
+		c.close()
 	}()
 	}()
 	var err error
 	var err error
 	var message []byte
 	var message []byte
@@ -157,7 +166,7 @@ func (c *Client) writePump() {
 		pingTicker.Stop()
 		pingTicker.Stop()
 		subScanJobStatusTicker.Stop()
 		subScanJobStatusTicker.Stop()
 		runningLogTicker.Stop()
 		runningLogTicker.Stop()
-		_ = c.conn.Close()
+		c.close()
 	}()
 	}()
 
 
 	for {
 	for {
@@ -179,7 +188,7 @@ func (c *Client) writePump() {
 				// The hub closed the channel.
 				// The hub closed the channel.
 				err = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
 				err = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
 				if err != nil {
 				if err != nil {
-					log_helper.GetLogger().Errorln("writePump close hub WriteMessage", err)
+					log_helper.GetLogger().Warningln("writePump close hub WriteMessage", err)
 				}
 				}
 				return
 				return
 			}
 			}
@@ -236,6 +245,9 @@ func (c *Client) writePump() {
 				continue
 				continue
 			}
 			}
 			nowRunningLog := log_helper.GetOnceLog4Running()
 			nowRunningLog := log_helper.GetOnceLog4Running()
+			if nowRunningLog == nil {
+				continue
+			}
 			// 找到日志,把当前已有的日志发送出去,然后记录发送到哪里了
 			// 找到日志,把当前已有的日志发送出去,然后记录发送到哪里了
 			// 这里需要考虑一次性的信息太多,超过发送的缓冲区,所以需要拆分发送
 			// 这里需要考虑一次性的信息太多,超过发送的缓冲区,所以需要拆分发送
 			outLogsBytes, err := RunningLogReply(nowRunningLog, c.sendLogLineIndex)
 			outLogsBytes, err := RunningLogReply(nowRunningLog, c.sendLogLineIndex)
@@ -276,7 +288,7 @@ func AuthReply(inType ws.AuthMessage) ([]byte, error) {
 func RunningLogReply(log *log_hub.OnceLog, iPreSendLines ...int) ([][]byte, error) {
 func RunningLogReply(log *log_hub.OnceLog, iPreSendLines ...int) ([][]byte, error) {
 
 
 	if log == nil {
 	if log == nil {
-		return nil, nil
+		return nil, errors.New("RunningLogReply input log is nil")
 	}
 	}
 
 
 	var outLogBytes = make([][]byte, 0)
 	var outLogBytes = make([][]byte, 0)

+ 5 - 1
internal/pkg/log_helper/log_hub.go

@@ -141,6 +141,10 @@ func GetOnceLog4Running() *log_hub.OnceLog {
 // GetSpiltOnceLog 拆分到一行一个,没有锁,所以需要考虑并发问题
 // GetSpiltOnceLog 拆分到一行一个,没有锁,所以需要考虑并发问题
 func GetSpiltOnceLog(log *log_hub.OnceLog) []*log_hub.OnceLog {
 func GetSpiltOnceLog(log *log_hub.OnceLog) []*log_hub.OnceLog {
 
 
+	if log == nil {
+		return nil
+	}
+
 	var outList = make([]*log_hub.OnceLog, len(log.LogLines))
 	var outList = make([]*log_hub.OnceLog, len(log.LogLines))
 	for i := 0; i < len(log.LogLines); i++ {
 	for i := 0; i < len(log.LogLines); i++ {
 		outList[i] = &log_hub.OnceLog{
 		outList[i] = &log_hub.OnceLog{
@@ -257,7 +261,7 @@ var (
 	onceLoggerFile      *os.File                     // 单次扫描保存 Log 文件的实例
 	onceLoggerFile      *os.File                     // 单次扫描保存 Log 文件的实例
 	onceLogs            = make([]log_hub.OnceLog, 0) // 本地缓存的多次,单次扫描的 Log 内容
 	onceLogs            = make([]log_hub.OnceLog, 0) // 本地缓存的多次,单次扫描的 Log 内容
 	onceLogsLock        sync.Mutex                   // 对应的锁
 	onceLogsLock        sync.Mutex                   // 对应的锁
-	onceLog4Running     *log_hub.OnceLog             // 当前正在扫描时候日志的日志内容实例,注意,开启任务不代表就在扫描
+	onceLog4Running     = log_hub.NewOnceLog(0)      // 当前正在扫描时候日志的日志内容实例,注意,开启任务不代表就在扫描
 	onceLog4RunningLock sync.Mutex                   // 对应的锁
 	onceLog4RunningLock sync.Mutex                   // 对应的锁
 )
 )