Browse Source

feat: shutdown batch update summary need retry when cache not cleand (#190)

zijiren 7 months ago
parent
commit
de0b417655
2 changed files with 42 additions and 8 deletions
  1. 8 5
      core/main.go
  2. 34 3
      core/model/batch.go

+ 8 - 5
core/main.go

@@ -310,12 +310,12 @@ func main() {
 
 	<-ctx.Done()
 
-	shutdownCtx, cancel := context.WithTimeout(context.Background(), 600*time.Second)
-	defer cancel()
+	shutdownSrvCtx, shutdownSrvCancel := context.WithTimeout(context.Background(), 600*time.Second)
+	defer shutdownSrvCancel()
 
 	log.Info("shutting down http server...")
 	log.Info("max wait time: 600s")
-	if err := srv.Shutdown(shutdownCtx); err != nil {
+	if err := srv.Shutdown(shutdownSrvCtx); err != nil {
 		log.Error("server forced to shutdown: " + err.Error())
 	} else {
 		log.Info("server shutdown successfully")
@@ -329,8 +329,11 @@ func main() {
 	log.Info("shutting down sync services...")
 	wg.Wait()
 
-	log.Info("shutting down batch processor...")
-	model.ProcessBatchUpdatesSummary()
+	log.Info("shutting down batch summary...")
+	log.Info("max wait time: 600s")
+	cleanCtx, cleanCancel := context.WithTimeout(context.Background(), 600*time.Second)
+	defer cleanCancel()
+	model.CleanBatchUpdatesSummary(cleanCtx)
 
 	log.Info("server exiting")
 }

+ 34 - 3
core/model/batch.go

@@ -12,7 +12,7 @@ import (
 	"github.com/shopspring/decimal"
 )
 
-type BatchUpdateData struct {
+type batchUpdateData struct {
 	Groups         map[string]*GroupUpdate
 	Tokens         map[int]*TokenUpdate
 	Channels       map[int]*ChannelUpdate
@@ -21,6 +21,21 @@ type BatchUpdateData struct {
 	sync.Mutex
 }
 
+func (b *batchUpdateData) IsClean() bool {
+	b.Lock()
+	defer b.Unlock()
+
+	return b.isCleanLocked()
+}
+
+func (b *batchUpdateData) isCleanLocked() bool {
+	return len(b.Groups) == 0 &&
+		len(b.Tokens) == 0 &&
+		len(b.Channels) == 0 &&
+		len(b.Summaries) == 0 &&
+		len(b.GroupSummaries) == 0
+}
+
 type GroupUpdate struct {
 	Amount decimal.Decimal
 	Count  int
@@ -54,10 +69,10 @@ func groupSummaryUniqueKey(unique GroupSummaryUnique) string {
 	return fmt.Sprintf("%s:%s:%s:%d", unique.GroupID, unique.TokenName, unique.Model, unique.HourTimestamp)
 }
 
-var batchData BatchUpdateData
+var batchData batchUpdateData
 
 func init() {
-	batchData = BatchUpdateData{
+	batchData = batchUpdateData{
 		Groups:         make(map[string]*GroupUpdate),
 		Tokens:         make(map[int]*TokenUpdate),
 		Channels:       make(map[int]*ChannelUpdate),
@@ -83,6 +98,22 @@ func StartBatchProcessorSummary(ctx context.Context, wg *sync.WaitGroup) {
 	}
 }
 
+func CleanBatchUpdatesSummary(ctx context.Context) {
+	for {
+		select {
+		case <-ctx.Done():
+			ProcessBatchUpdatesSummary()
+			return
+		default:
+			if batchData.IsClean() {
+				return
+			}
+		}
+		ProcessBatchUpdatesSummary()
+		time.Sleep(time.Second * 1)
+	}
+}
+
 func ProcessBatchUpdatesSummary() {
 	batchData.Lock()
 	defer batchData.Unlock()