Explorar el Código

重构代码

Signed-off-by: allan716 <[email protected]>
allan716 hace 3 años
padre
commit
855590121e

+ 4 - 1
cmd/chinesesubfinder/main.go

@@ -156,7 +156,10 @@ func main() {
 	nowPort := my_util.ReadCustomPortFile(loggerBase)
 	loggerBase.Infoln(fmt.Sprintf("WebUI will listen at 0.0.0.0:%d", nowPort))
 	// 支持在外部配置特殊的端口号,以防止本地本占用了无法使用
-	backend.StartBackEnd(fileDownloader, nowPort, cronHelper)
+	bend := backend.NewBackEnd(loggerBase, settings.GetSettings(), cronHelper, nowPort)
+	bend.Start()
+	//// 阻塞
+	//select {}
 }
 
 /*

+ 66 - 18
internal/backend/backend.go

@@ -1,21 +1,48 @@
 package backend
 
 import (
+	"context"
 	"fmt"
 	"io/ioutil"
 	"net/http"
+	"sync"
+	"time"
+
+	"github.com/allanpk716/ChineseSubFinder/pkg/settings"
 
 	"github.com/allanpk716/ChineseSubFinder/frontend/dist"
-	"github.com/allanpk716/ChineseSubFinder/internal/backend/routers"
-	"github.com/allanpk716/ChineseSubFinder/internal/backend/ws_helper"
-	"github.com/allanpk716/ChineseSubFinder/pkg/logic/cron_helper"
-	"github.com/allanpk716/ChineseSubFinder/pkg/logic/file_downloader"
 	"github.com/gin-contrib/cors"
 	"github.com/gin-gonic/gin"
+
+	"github.com/sirupsen/logrus"
+
+	"github.com/allanpk716/ChineseSubFinder/pkg/logic/cron_helper"
 )
 
-// StartBackEnd 开启后端的服务器
-func StartBackEnd(fileDownloader *file_downloader.FileDownloader, httpPort int, cronHelper *cron_helper.CronHelper) {
+type BackEnd struct {
+	logger     *logrus.Logger
+	settings   *settings.Settings
+	cronHelper *cron_helper.CronHelper
+	httpPort   int
+	running    bool
+	srv        *http.Server
+	locker     sync.Mutex
+}
+
+func NewBackEnd(logger *logrus.Logger, settings *settings.Settings, cronHelper *cron_helper.CronHelper, httpPort int) *BackEnd {
+	return &BackEnd{logger: logger, settings: settings, cronHelper: cronHelper, httpPort: httpPort}
+}
+
+func (b *BackEnd) Start() {
+
+	defer b.locker.Unlock()
+	b.locker.Lock()
+
+	if b.running == true {
+		b.logger.Warningln("Http Server is already running")
+		return
+	}
+	b.running = true
 
 	gin.SetMode(gin.ReleaseMode)
 	gin.DefaultWriter = ioutil.Discard
@@ -23,7 +50,7 @@ func StartBackEnd(fileDownloader *file_downloader.FileDownloader, httpPort int,
 	engine := gin.Default()
 	// 默认所有都通过
 	engine.Use(cors.Default())
-	v1Router := routers.InitRouter(fileDownloader, engine, cronHelper)
+	v1Router := InitRouter(b.logger, b.settings, engine, b.cronHelper)
 	defer func() {
 		v1Router.Close()
 	}()
@@ -42,19 +69,40 @@ func StartBackEnd(fileDownloader *file_downloader.FileDownloader, httpPort int,
 		c.Redirect(http.StatusMovedPermanently, "/")
 	})
 
-	hub := ws_helper.NewHub()
-	go hub.Run()
+	// listen and serve on 0.0.0.0:8080(default)
+	b.srv = &http.Server{
+		Addr:    fmt.Sprintf(":%d", b.httpPort),
+		Handler: engine,
+	}
+	//go func() {
+	b.logger.Infoln("Try Start Http Server At Port", b.httpPort)
+	if err := b.srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+		b.logger.Errorln("Start Server Error:", err)
+	}
+	//}()
+}
+
+func (b *BackEnd) Stop() {
 	defer func() {
-		hub.Clear()
+		b.locker.Unlock()
 	}()
-	engine.GET("/ws", func(context *gin.Context) {
-		ws_helper.ServeWs(fileDownloader.Log, hub, context.Writer, context.Request)
-	})
+	b.locker.Lock()
 
-	// listen and serve on 0.0.0.0:8080(default)
-	fileDownloader.Log.Infoln("Try Start Server At Port", httpPort)
-	err := engine.Run(":" + fmt.Sprintf("%d", httpPort))
-	if err != nil {
-		fileDownloader.Log.Errorln("Start Server At Port", httpPort, "Error", err)
+	if b.running == false {
+		b.logger.Warningln("Http Server is not running")
+		return
+	}
+
+	b.running = false
+
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+	if err := b.srv.Shutdown(ctx); err != nil {
+		b.logger.Errorln("Http Server Shutdown:", err)
+	}
+	select {
+	case <-ctx.Done():
+		b.logger.Warningln("timeout of 5 seconds.")
 	}
+	b.logger.Infoln("Http Server exiting")
 }

+ 15 - 7
internal/backend/routers/base_router.go → internal/backend/base_router.go

@@ -1,32 +1,40 @@
-package routers
+package backend
 
 import (
 	"fmt"
 	"net/http"
 
+	"github.com/allanpk716/ChineseSubFinder/pkg/settings"
+
+	"github.com/sirupsen/logrus"
+
 	"github.com/allanpk716/ChineseSubFinder/internal/backend/controllers/base"
 	v1 "github.com/allanpk716/ChineseSubFinder/internal/backend/controllers/v1"
 	"github.com/allanpk716/ChineseSubFinder/internal/backend/middle"
 	"github.com/allanpk716/ChineseSubFinder/pkg/logic/cron_helper"
-	"github.com/allanpk716/ChineseSubFinder/pkg/logic/file_downloader"
 	"github.com/gin-gonic/gin"
 )
 
-func InitRouter(fileDownloader *file_downloader.FileDownloader, router *gin.Engine, cronHelper *cron_helper.CronHelper) *v1.ControllerBase {
+func InitRouter(
+	log *logrus.Logger,
+	settings *settings.Settings, // 设置实例
+	router *gin.Engine,
+	cronHelper *cron_helper.CronHelper,
+) *v1.ControllerBase {
 
-	cbBase := base.NewControllerBase(fileDownloader)
-	cbV1 := v1.NewControllerBase(fileDownloader.Log, cronHelper)
+	cbBase := base.NewControllerBase(log)
+	cbV1 := v1.NewControllerBase(log, cronHelper)
 	// --------------------------------------------------
 	// 静态文件服务器
 	// 添加电影的
-	for i, path := range fileDownloader.Settings.CommonSettings.MoviePaths {
+	for i, path := range settings.CommonSettings.MoviePaths {
 
 		nowUrl := "/movie_dir_" + fmt.Sprintf("%d", i)
 		cbV1.SetPathUrlMapItem(path, nowUrl)
 		router.StaticFS(nowUrl, http.Dir(path))
 	}
 	// 添加连续剧的
-	for i, path := range fileDownloader.Settings.CommonSettings.SeriesPaths {
+	for i, path := range settings.CommonSettings.SeriesPaths {
 
 		nowUrl := "/series_dir_" + fmt.Sprintf("%d", i)
 		cbV1.SetPathUrlMapItem(path, nowUrl)

+ 13 - 2
internal/backend/controllers/base/controller_base.go

@@ -1,6 +1,11 @@
 package base
 
 import (
+	"github.com/allanpk716/ChineseSubFinder/pkg/cache_center"
+	"github.com/allanpk716/ChineseSubFinder/pkg/global_value"
+	"github.com/allanpk716/ChineseSubFinder/pkg/random_auth_key"
+	"github.com/allanpk716/ChineseSubFinder/pkg/settings"
+	"github.com/sirupsen/logrus"
 	"net/http"
 
 	"github.com/allanpk716/ChineseSubFinder/pkg/types/backend"
@@ -13,9 +18,15 @@ type ControllerBase struct {
 	fileDownloader *file_downloader.FileDownloader
 }
 
-func NewControllerBase(fileDownloader *file_downloader.FileDownloader) *ControllerBase {
+func NewControllerBase(loggerBase *logrus.Logger) *ControllerBase {
 	return &ControllerBase{
-		fileDownloader: fileDownloader,
+		fileDownloader: file_downloader.NewFileDownloader(
+			cache_center.NewCacheCenter("local_task_queue", settings.GetSettings(), loggerBase),
+			random_auth_key.AuthKey{
+				BaseKey:  global_value.BaseKey(),
+				AESKey16: global_value.AESKey16(),
+				AESIv16:  global_value.AESIv16(),
+			}),
 	}
 }
 

+ 0 - 263
internal/backend/ws_helper/client.go

@@ -1,263 +0,0 @@
-package ws_helper
-
-import (
-	"bytes"
-	"encoding/json"
-	"net/http"
-	"sync"
-	"time"
-
-	ws2 "github.com/allanpk716/ChineseSubFinder/pkg/types/backend/ws"
-
-	"github.com/allanpk716/ChineseSubFinder/pkg/common"
-	"github.com/gorilla/websocket"
-	"github.com/sirupsen/logrus"
-)
-
-const (
-	// Time allowed to write a message to the peer.
-	writeWait = 10 * time.Second
-
-	// Time allowed to read the next pong message from the peer.
-	pongWait = 60 * time.Second
-
-	// Send pings to peer with this period. Must be less than pongWait.
-	pingPeriod = (pongWait * 9) / 10
-
-	// Maximum message size allowed from peer.
-	maxMessageSize = 5 * 1024
-
-	// 发送 chan 的队列长度
-	bufSize = 5 * 1024
-
-	upGraderReadBufferSize = 5 * 1024
-
-	upGraderWriteBufferSize = 5 * 1024
-)
-
-var upGrader = websocket.Upgrader{
-	ReadBufferSize:  upGraderReadBufferSize,
-	WriteBufferSize: upGraderWriteBufferSize,
-	CheckOrigin: func(r *http.Request) bool {
-		return true
-	},
-}
-
-type Client struct {
-	log              *logrus.Logger
-	hub              *Hub
-	conn             *websocket.Conn // 与服务器连接实例
-	sendLogLineIndex int             // 日志发送到那个位置了
-	authed           bool            // 是否已经通过认证
-	send             chan []byte     // 发送给 client 的内容 bytes
-	closeOnce        sync.Once
-}
-
-func NewClient(log *logrus.Logger, hub *Hub, conn *websocket.Conn, sendLogLineIndex int, authed bool, send chan []byte) *Client {
-	return &Client{log: log, hub: hub, conn: conn, sendLogLineIndex: sendLogLineIndex, authed: authed, send: send}
-}
-
-func (c *Client) close() {
-	c.closeOnce.Do(func() {
-		c.hub.unregister <- c
-		_ = c.conn.Close()
-	})
-}
-
-// 接收 Client 发送来的消息
-func (c *Client) readPump() {
-
-	defer func() {
-		if err := recover(); err != nil {
-			c.log.Debugln("readPump.recover", err)
-		}
-	}()
-
-	defer func() {
-		// 触发移除 client 的逻辑
-		//c.hub.unregister <- c
-		c.close()
-	}()
-	var err error
-	var message []byte
-	c.conn.SetReadLimit(maxMessageSize)
-	err = c.conn.SetReadDeadline(time.Now().Add(pongWait))
-	if err != nil {
-		c.log.Debugln("readPump.SetReadDeadline", err)
-		return
-	}
-	c.conn.SetPongHandler(func(string) error {
-		return c.conn.SetReadDeadline(time.Now().Add(pongWait))
-	})
-	// 收取 client 发送过来的消息
-	for {
-		_, message, err = c.conn.ReadMessage()
-		if err != nil {
-			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
-				c.log.Debugln("readPump.IsUnexpectedCloseError", err)
-			}
-			return
-		}
-
-		revMessage := ws2.BaseMessage{}
-		err = json.Unmarshal(message, &revMessage)
-		if err != nil {
-			c.log.Debugln("readPump.BaseMessage.parse", err)
-			return
-		}
-
-		if c.authed == false {
-			// 如果没有经过认证,那么第一条一定需要判断是认证的消息
-			if revMessage.Type != ws2.Auth.String() {
-				// 提掉线
-				return
-			}
-			// 认证
-			login := ws2.Login{}
-			err = json.Unmarshal([]byte(revMessage.Data), &login)
-			if err != nil {
-				c.log.Debugln("readPump.Login.parse", err)
-				return
-			}
-
-			if login.Token != common.GetAccessToken() {
-				// 登录 Token 不对
-				// 发送 token 失败的消息
-				outBytes, err := AuthReply(ws2.AuthError)
-				if err != nil {
-					c.log.Debugln("readPump.AuthReply", err)
-					return
-				}
-				c.send <- outBytes
-				// 直接退出可能会导致发送的队列没有清空,这里单独判断一条特殊的命令,收到 Write 线程就退出
-				c.send <- ws2.CloseThisConnect
-
-			} else {
-				// Token 通过
-				outBytes, err := AuthReply(ws2.AuthOk)
-				if err != nil {
-					c.log.Debugln("readPump.AuthReply", err)
-					return
-				}
-				c.send <- outBytes
-				c.authed = true
-			}
-
-		} else {
-			// 进过认证后的消息,无需再次带有 token 信息
-		}
-	}
-}
-
-// 向 Client 发送消息的队列
-func (c *Client) writePump() {
-
-	defer func() {
-		if err := recover(); err != nil {
-			c.log.Debugln("writePump.recover", err)
-		}
-	}()
-
-	// 心跳计时器
-	pingTicker := time.NewTicker(pingPeriod)
-	defer func() {
-		pingTicker.Stop()
-		c.close()
-	}()
-
-	for {
-		select {
-		case message, ok := <-c.send:
-
-			if bytes.Equal(message, ws2.CloseThisConnect) == true {
-				return
-			}
-
-			// 这里是需要发送给 client 的消息
-			// 当然首先还是得先把当前消息的发送超时,给确定下来
-			err := c.conn.SetWriteDeadline(time.Now().Add(writeWait))
-			if err != nil {
-				c.log.Debugln("writePump.SetWriteDeadline", err)
-				return
-			}
-			if ok == false {
-				// The hub closed the channel.
-				err = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
-				if err != nil {
-					c.log.Debugln("writePump close hub WriteMessage", err)
-				}
-				return
-			}
-
-			w, err := c.conn.NextWriter(websocket.TextMessage)
-			if err != nil {
-				c.log.Debugln("writePump.NextWriter", err)
-				return
-			}
-			_, err = w.Write(message)
-			if err != nil {
-				c.log.Debugln("writePump.Write", err)
-				return
-			}
-
-			if err := w.Close(); err != nil {
-				c.log.Debugln("writePump.Close", err)
-				return
-			}
-		case <-pingTicker.C:
-			// 心跳相关,这里是定时器到了触发的间隔,设置发送下一条心跳的超时时间
-			if err := c.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
-				c.log.Debugln("writePump.pingTicker.C.SetWriteDeadline", err)
-				return
-			}
-			// 然后发送心跳
-			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
-				c.log.Debugln("writePump.pingTicker.C.WriteMessage", err)
-				return
-			}
-		}
-	}
-}
-
-// AuthReply 生成认证通过的回复数据
-func AuthReply(inType ws2.AuthMessage) ([]byte, error) {
-
-	var err error
-	var outData, outBytes []byte
-	outData, err = json.Marshal(&ws2.Reply{
-		Message: inType.String(),
-	})
-	if err != nil {
-		return nil, err
-	}
-
-	outBytes, err = ws2.NewBaseMessage(ws2.CommonReply.String(), string(outData)).Bytes()
-	if err != nil {
-		return nil, err
-	}
-
-	return outBytes, nil
-}
-
-// ServeWs 每个 Client 连接 ws 上线时触发
-func ServeWs(log *logrus.Logger, hub *Hub, w http.ResponseWriter, r *http.Request) {
-
-	conn, err := upGrader.Upgrade(w, r, nil)
-	if err != nil {
-		log.Errorln("ServeWs.Upgrade", err)
-		return
-	}
-
-	client := NewClient(
-		log,
-		hub,
-		conn,
-		0,
-		false,
-		make(chan []byte, bufSize),
-	)
-	client.hub.register <- client
-
-	go client.writePump()
-	go client.readPump()
-}

+ 0 - 58
internal/backend/ws_helper/hub.go

@@ -1,58 +0,0 @@
-package ws_helper
-
-// Hub maintains the set of active clients and broadcasts messages to the
-// clients.
-type Hub struct {
-	// Registered clients.
-	clients map[*Client]bool
-
-	// Inbound messages from the clients.
-	broadcast chan []byte
-
-	// Register requests from the clients.
-	register chan *Client
-
-	// Unregister requests from clients.
-	unregister chan *Client
-}
-
-func NewHub() *Hub {
-	return &Hub{
-		broadcast:  make(chan []byte),
-		register:   make(chan *Client),
-		unregister: make(chan *Client),
-		clients:    make(map[*Client]bool),
-	}
-}
-
-func (h *Hub) Run() {
-	for {
-		select {
-		case client := <-h.register:
-			// 上线后先注册,然后等待 client 发送认证的 token 来确认有效,才进行后续的通信
-			h.clients[client] = true
-		case client := <-h.unregister:
-			// 下线后删除实例
-			if _, ok := h.clients[client]; ok {
-				delete(h.clients, client)
-				close(client.send)
-			}
-		case message := <-h.broadcast:
-			// 向所有的 Client 广播
-			for client := range h.clients {
-				select {
-				case client.send <- message:
-				default:
-					close(client.send)
-					delete(h.clients, client)
-				}
-			}
-		}
-	}
-}
-
-func (h *Hub) Clear() {
-	// close channel
-	close(h.broadcast)
-	close(h.register)
-}

+ 1 - 1
pkg/settings/common_settings.go

@@ -7,7 +7,7 @@ type CommonSettings struct {
 	RunScanAtStartUp         bool     `json:"run_scan_at_start_up"`         // 完成引导设置后,下次运行程序就开始扫描
 	MoviePaths               []string `json:"movie_paths"`                  // 电影的目录
 	SeriesPaths              []string `json:"series_paths"`                 // 连续剧的目录
-	LocalStaticFilePort      string   `json:"local_static_file_port"`       // 本地静态文件的端口
+	LocalStaticFilePort      string   `json:"local_static_file_port"`       // 本地静态文件的端口,取消
 }
 
 func NewCommonSettings() *CommonSettings {