Przeglądaj źródła

enhance(sync): ws ping, update sync init process

- go-loop websocket ping to keep ws alive
- remove ::remote->local=>local->remote state
- remove ::remote->local=>local->remote-full-sync state
- remote ::remote->local-full-sync=>local->remote-full-sync state
- when starting sync, first trigger remote->local, then local->remote-full-sync
rcmerci 3 lat temu
rodzic
commit
77a4facbc7
1 zmienionych plików z 192 dodań i 144 usunięć
  1. 192 144
      src/main/frontend/fs/sync.cljs

+ 192 - 144
src/main/frontend/fs/sync.cljs

@@ -33,7 +33,8 @@
 ;; files in these `get-monitored-dirs` dirs will be synchronized.
 ;;
 ;; sync strategy:
-;; - when toggle file-sync on, trigger a local->remote-full-sync first,
+;; - when toggle file-sync on,
+;;   trigger remote->local first, then local->remote-full-sync
 ;;   local->remote-full-sync will compare local-files with remote-files (by md5 & size),
 ;;   and upload new-added-files to remote server.
 ;; - if local->remote sync(normal-sync or full-sync) return :need-sync-remote,
@@ -58,12 +59,8 @@
                  ::remote->local
                  ;; local->remote full sync
                  ::local->remote-full-sync
-                 ;; exec remote->local, then local->remote
-                 ::remote->local=>local->remote
-                 ;; exec remote->local, then local->remote-full-sync
-                 ::remote->local=>local->remote-full-sync
-                 ;; exec remote->local-full-sync, then local->remote-full-sync
-                 ::remote->local-full-sync=>local->remote-full-sync
+                 ;; remote->local full sync
+                 ::remote->local-full-sync
                  ::stop})
 (s/def ::path string?)
 (s/def ::time t/date?)
@@ -117,6 +114,19 @@
   (persist-var/-reset-value! graphs-txid nil repo)
   (persist-var/persist-save graphs-txid))
 
+(defn- ws-ping-loop [ws]
+  (go-loop []
+    (let [state (.-readyState ws)]
+      ;; not closing or closed state
+      (when (not (contains? #{2 3} state))
+        (if (not= 1 state)
+          ;; when connecting, wait 1s
+          (do (<! (timeout 1000))
+              (recur))
+          (do (.send ws "PING")
+              (<! (timeout 30000))
+              (recur)))))))
+
 (defn- ws-stop! [*ws]
   (swap! *ws (fn [o] (assoc o :stop true)))
   (.close (:ws @*ws)))
@@ -124,6 +134,7 @@
 (defn- ws-listen!*
   [graph-uuid *ws remote-changes-chan]
   (reset! *ws {:ws (js/WebSocket. (util/format ws-addr graph-uuid)) :stop false})
+  (ws-ping-loop (:ws @*ws))
   ;; (set! (.-onopen (:ws @*ws)) #(println (util/format "ws opened: graph '%s'" graph-uuid %)))
   (set! (.-onclose (:ws @*ws)) (fn [_e]
                                  (when-not (true? (:stop @*ws))
@@ -133,13 +144,14 @@
                                      (ws-listen!* graph-uuid *ws remote-changes-chan)))))
   (set! (.-onmessage (:ws @*ws)) (fn [e]
                                    (let [data (js->clj (js/JSON.parse (.-data e)) :keywordize-keys true)]
-                                     (if-let [v (poll! remote-changes-chan)]
-                                       (let [last-txid (:txid v)
-                                             current-txid (:txid data)]
-                                         (if (> last-txid current-txid)
-                                           (offer! remote-changes-chan v)
-                                           (offer! remote-changes-chan data)))
-                                       (offer! remote-changes-chan data))))))
+                                     (when (some? (:txid data))
+                                       (if-let [v (poll! remote-changes-chan)]
+                                         (let [last-txid (:txid v)
+                                               current-txid (:txid data)]
+                                           (if (> last-txid current-txid)
+                                             (offer! remote-changes-chan v)
+                                             (offer! remote-changes-chan data)))
+                                         (offer! remote-changes-chan data)))))))
 
 (defn ws-listen!
   "return channal which output messages from server"
@@ -1164,144 +1176,181 @@
 
 ;;; ### put all stuff together
 
+(defn- drain-chan
+  "drop all stuffs in CH"
+  [ch]
+  (->> (repeatedly #(poll! ch))
+       (take-while identity)))
+
 (defrecord ^:large-vars/cleanup-todo
     SyncManager [graph-uuid base-path *sync-state
                  ^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer
                  full-sync-chan stop-sync-chan remote->local-sync-chan local->remote-sync-chan
                  local-changes-chan ^:mutable ratelimit-local-changes-chan
-                 *txid ^:mutable state ^:mutable _remote-change-chan ^:mutable _*ws ^:mutable stopped]
-  Object
-  (schedule [this next-state args]
-    {:pre [(s/valid? ::state next-state)]}
-    (println "[SyncManager" graph-uuid "]" (and state (name state)) "->" (and next-state (name next-state)))
-    (set! state next-state)
-    (swap! *sync-state sync-state--update-state next-state)
-    (go
-      (case state
-        ::idle
-        (<! (.idle this))
-        ::local->remote
-        (<! (.local->remote this args))
-        ::remote->local
-        (<! (.remote->local this nil args))
-        ::local->remote-full-sync
-        (<! (.full-sync this))
-        ::remote->local=>local->remote
-        (<! (.remote->local this ::local->remote args))
-        ::remote->local=>local->remote-full-sync
-        (<! (.remote->local this ::local->remote-full-sync args))
-        ::remote->local-full-sync=>local->remote-full-sync
-        (<! (.remote->local-full-sync this ::local->remote-full-sync))
-        ::stop
-        (-stop! this))))
-
-  (start [this]
-    (set! _*ws (atom nil))
-    (set! _remote-change-chan (ws-listen! graph-uuid _*ws))
-    (set! ratelimit-local-changes-chan (ratelimit local->remote-syncer local-changes-chan))
-    (.schedule this ::idle nil))
-
-  (idle [this]
-    (go
-      (let [{:keys [stop full-sync ;; trigger-remote trigger-local
-                    remote local trigger-full-sync]}
-            (async/alt!
-              stop-sync-chan {:stop true}
-              full-sync-chan {:full-sync true}
-              remote->local-sync-chan {:trigger-remote true}
-              local->remote-sync-chan {:trigger-local true}
-              _remote-change-chan ([v] (println "remote changes:" v) {:remote v})
-              ratelimit-local-changes-chan ([v] (println "local changes:" v) {:local v})
-              (timeout (* 20 60 1000)) {:trigger-full-sync true}
-              :priority true)]
-        (cond
-          stop
-          (<! (.schedule this ::stop nil))
-          (or full-sync trigger-full-sync)
-          (<! (.schedule this ::local->remote-full-sync nil))
-          remote
-          (<! (.schedule this ::remote->local {:remote remote}))
-          local
-          (<! (.schedule this ::local->remote {:local local}))
-          :else
-          (<! (.schedule this :idle nil))))))
+                 *txid ^:mutable state ^:mutable _remote-change-chan ^:mutable _*ws ^:mutable stopped
+                 ^:mutable ops-chan]
+    Object
+    (schedule [this next-state args]
+      {:pre [(s/valid? ::state next-state)]}
+      (println "[SyncManager" graph-uuid "]" (and state (name state)) "->" (and next-state (name next-state)))
+      (set! state next-state)
+      (swap! *sync-state sync-state--update-state next-state)
+      (go
+        (case state
+          ::idle
+          (<! (.idle this))
+          ::local->remote
+          (<! (.local->remote this args))
+          ::remote->local
+          (<! (.remote->local this nil args))
+          ::local->remote-full-sync
+          (<! (.full-sync this))
+          ::remote->local-full-sync
+          (<! (.remote->local-full-sync this nil))
+          ::stop
+          (-stop! this))))
+
+    (start [this]
+      (set! ops-chan (chan (async/dropping-buffer 10)))
+      (set! _*ws (atom nil))
+      (set! _remote-change-chan (ws-listen! graph-uuid _*ws))
+      (set! ratelimit-local-changes-chan (ratelimit local->remote-syncer local-changes-chan))
+      (go-loop []
+        (let [{:keys [stop remote->local local->remote-full-sync local->remote]}
+              (async/alt!
+                stop-sync-chan {:stop true}
+                remote->local-sync-chan {:remote->local true}
+                full-sync-chan {:local->remote-full-sync true}
+                _remote-change-chan ([v] (println "remote change:" v) {:remote->local v})
+                ratelimit-local-changes-chan ([v] (println "local changes:" v) {:local->remote v})
+                (timeout (* 20 60 1000)) {:local->remote-full-sync true}
+                :priority true)]
+          (cond
+            stop
+            (do
+              (drain-chan ops-chan)
+              (>! ops-chan {:stop true}))
+            remote->local
+            (let [txid
+                  (if (true? remote->local)
+                    {:txid (:TXId (<! (get-remote-graph remoteapi nil graph-uuid)))}
+                    remote->local)]
+              (when (some? txid)
+                (>! ops-chan {:remote->local txid}))
+              (recur))
+            local->remote
+            (do (>! ops-chan {:local->remote local->remote})
+                (recur))
+            local->remote-full-sync
+            (do (drain-chan ops-chan)
+                (>! ops-chan {:local->remote-full-sync true})
+                (recur)))))
+      (.schedule this ::idle nil))
+
+    (idle [this]
+      (go
+        (let [{:keys [stop remote->local local->remote local->remote-full-sync remote->local-full-sync]}
+              (<! ops-chan)]
+          (cond
+            stop
+            (<! (.schedule this ::stop nil))
+            remote->local
+            (<! (.schedule this ::remote->local {:remote remote->local}))
+            local->remote
+            (<! (.schedule this ::local->remote {:local local->remote}))
+            local->remote-full-sync
+            (<! (.schedule this ::local->remote-full-sync nil))
+            remote->local-full-sync
+            (<! (.schedule this ::remote->local-full-sync nil))
+            :else
+            (<! (.schedule this ::stop nil))))))
 
-  (full-sync [this]
-    (go
-      (let [{:keys [succ need-sync-remote unknown stop] :as r}
-            (<! (sync-local->remote-all-files! local->remote-syncer))]
-        (s/assert ::sync-local->remote-all-files!-result r)
-        (cond
-          succ
-          (.schedule this ::idle nil)
-          need-sync-remote
-          (.schedule this ::remote->local=>local->remote-full-sync nil)
-          stop
-          (.schedule this ::stop nil)
-          unknown
-          (do
-            (debug/pprint "full-sync" unknown)
-            (.schedule this ::idle nil))))))
-
-  (remote->local-full-sync [this next-state]
-    (go
-      (let [{:keys [succ unknown stop]}
-            (<! (sync-remote->local-all-files! remote->local-syncer))]
-        (cond
-          succ
-          (.schedule this next-state nil)
-          stop
-          (.schedule this ::stop nil)
-          unknown
-          (do
-            (debug/pprint "remote->local-full-sync" unknown)
-            (.schedule this ::idle nil))))))
-
-  (remote->local [this next-state {remote-val :remote :as args}]
-    (go
-      (if (some-> remote-val :txid (<= @*txid))
-        (.schedule this ::idle nil)
-        (let [{:keys [succ unknown stop need-remote->local-full-sync] :as r}
-              (<! (sync-remote->local! remote->local-syncer))]
-          (s/assert ::sync-remote->local!-result r)
+    (full-sync [this]
+      (go
+        (let [{:keys [succ need-sync-remote unknown stop] :as r}
+              (<! (sync-local->remote-all-files! local->remote-syncer))]
+          (s/assert ::sync-local->remote-all-files!-result r)
           (cond
-            need-remote->local-full-sync
-            (.schedule this ::remote->local-full-sync=>local->remote-full-sync nil)
             succ
-            (.schedule this (or next-state ::idle) args)
+            (.schedule this ::idle nil)
+            need-sync-remote
+            (do (drain-chan ops-chan)
+                (>! ops-chan {:remote->local true})
+                (>! ops-chan {:local->remote-full-sync true})
+                (.schedule this ::idle nil))
             stop
             (.schedule this ::stop nil)
             unknown
-            (do (prn "remote->local err" unknown)
-                (.schedule this ::idle nil)))))))
+            (do
+              (debug/pprint "full-sync" unknown)
+              (.schedule this ::idle nil))))))
 
-  (local->remote [this {^FileChangeEvents local-change :local}]
-    (assert (some? local-change) local-change)
-    (go
-      (let [{:keys [succ need-sync-remote unknown] :as r}
-            (<! (sync-local->remote! local->remote-syncer [local-change]))]
-        (s/assert ::sync-local->remote!-result r)
-        (cond
-          succ
+    (remote->local-full-sync [this _next-state]
+      (go
+        (let [{:keys [succ unknown stop]}
+              (<! (sync-remote->local-all-files! remote->local-syncer))]
+          (cond
+            succ
+            (.schedule this ::idle nil)
+            stop
+            (.schedule this ::stop nil)
+            unknown
+            (do
+              (debug/pprint "remote->local-full-sync" unknown)
+              (.schedule this ::idle nil))))))
+
+    (remote->local [this _next-state {remote-val :remote}]
+      (go
+        (if (some-> remote-val :txid (<= @*txid))
           (.schedule this ::idle nil)
+          (let [{:keys [succ unknown stop need-remote->local-full-sync] :as r}
+                (<! (sync-remote->local! remote->local-syncer))]
+            (s/assert ::sync-remote->local!-result r)
+            (cond
+              need-remote->local-full-sync
+              (do (drain-chan ops-chan)
+                  (>! ops-chan {:remote->local-full-sync true})
+                  (>! ops-chan {:local->remote-full-sync true})
+                  (.schedule this ::idle nil))
+              succ
+              (.schedule this ::idle nil)
+              stop
+              (.schedule this ::stop nil)
+              unknown
+              (do (prn "remote->local err" unknown)
+                  (.schedule this ::idle nil)))))))
 
-          need-sync-remote
-          (.schedule this ::remote->local=>local->remote nil)
-
-          unknown
-          (do
-            (debug/pprint "local->remote" unknown)
-            (.schedule this ::idle nil))))))
-  IStoppable
-  (-stop! [_]
-    (when-not stopped
-      (set! stopped true)
-      (ws-stop! _*ws)
-      (offer! stop-sync-chan true)
-      (stop-local->remote! local->remote-syncer)
-      (stop-remote->local! remote->local-syncer)
-      (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
-      (swap! *sync-state sync-state--update-state ::stop))))
+    (local->remote [this {^FileChangeEvents local-change :local}]
+      (assert (some? local-change) local-change)
+      (go
+        (let [{:keys [succ need-sync-remote unknown] :as r}
+              (<! (sync-local->remote! local->remote-syncer [local-change]))]
+          (s/assert ::sync-local->remote!-result r)
+          (cond
+            succ
+            (.schedule this ::idle nil)
+
+            need-sync-remote
+            (do (drain-chan ops-chan)
+                (>! ops-chan {:remote->local true})
+                (>! ops-chan {:local->remote {:local local-change}})
+                (.schedule this ::idle nil))
+
+            unknown
+            (do
+              (debug/pprint "local->remote" unknown)
+              (.schedule this ::idle nil))))))
+    IStoppable
+    (-stop! [_]
+      (when-not stopped
+        (set! stopped true)
+        (ws-stop! _*ws)
+        (offer! stop-sync-chan true)
+        (async/close! ops-chan)
+        (stop-local->remote! local->remote-syncer)
+        (stop-remote->local! remote->local-syncer)
+        (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
+        (swap! *sync-state sync-state--update-state ::stop))))
 
 (defn sync-manager [user-uuid graph-uuid base-path repo txid *sync-state full-sync-chan stop-sync-chan
                     remote->local-sync-chan local->remote-sync-chan local-changes-chan]
@@ -1318,11 +1367,11 @@
     (.set-local->remote-syncer! remote->local-syncer local->remote-syncer)
     (->SyncManager graph-uuid base-path *sync-state local->remote-syncer remote->local-syncer
                    full-sync-chan stop-sync-chan
-                   remote->local-sync-chan local->remote-sync-chan local-changes-chan nil *txid nil nil nil false)))
+                   remote->local-sync-chan local->remote-sync-chan local-changes-chan nil *txid nil nil nil false nil)))
 
 (def full-sync-chan (chan 1))
 (def stop-sync-chan (chan 1))
-(def remote->local-sync-chan (chan))
+(def remote->local-sync-chan (chan 1))
 (def local->remote-sync-chan (chan))
 
 (defn sync-stop []
@@ -1371,10 +1420,9 @@
               ;; set-env
               (set-env rsapi config/FILE-SYNC-PROD?)
 
-              ;; drain `local-changes-chan`
-              (->> (repeatedly #(poll! local-changes-chan))
-                   (take-while identity))
+              (drain-chan local-changes-chan)
               (poll! stop-sync-chan)
+              (poll! remote->local-sync-chan)
               ;; update global state when *sync-state changes
               (add-watch *sync-state ::update-global-state
                          (fn [_ _ _ n]
@@ -1382,7 +1430,7 @@
               (.start sm)
 
               (state/set-file-sync-manager sm)
-
+              (offer! remote->local-sync-chan true)
               (offer! full-sync-chan true)
 
               ;; watch :network/online?