Browse Source

feat(sync): support stop from full-sync

rcmerci 3 năm trước cách đây
mục cha
commit
5ed1b9b478
1 tập tin đã thay đổi với 54 bổ sung33 xóa
  1. 54 33
      src/main/frontend/fs/sync.cljs

+ 54 - 33
src/main/frontend/fs/sync.cljs

@@ -14,7 +14,8 @@
             [frontend.state :as state]
             [frontend.state :as state]
             [frontend.util :as util]
             [frontend.util :as util]
             [frontend.util.persist-var :as persist-var]
             [frontend.util.persist-var :as persist-var]
-            [rum.core :as rum]))
+            [rum.core :as rum]
+            [cljs.spec.alpha :as s]))
 
 
 ;;; Commentary
 ;;; Commentary
 ;;; file-sync related local files/dirs:
 ;;; file-sync related local files/dirs:
@@ -246,6 +247,9 @@
          (= (.get-normalized-path o) (.get-normalized-path other))
          (= (.get-normalized-path o) (.get-normalized-path other))
          (= etag (.-etag other))))
          (= etag (.-etag other))))
 
 
+  IHash
+  (-hash [o] (hash {:size size :etag etag :path path}))
+
   IPrintWithWriter
   IPrintWithWriter
   (-pr-writer [coll w opts]
   (-pr-writer [coll w opts]
     (write-all w (str {:size size :etag etag :path path :remote? remote?}))))
     (write-all w (str {:size size :etag etag :path path :remote? remote?}))))
@@ -570,7 +574,7 @@
   (ratelimit [this from-chan] "get watched local file-change events from FROM-CHAN,
   (ratelimit [this from-chan] "get watched local file-change events from FROM-CHAN,
   return chan returning events with rate limited")
   return chan returning events with rate limited")
   (sync-local->remote! [this ^FileChangeEvent e])
   (sync-local->remote! [this ^FileChangeEvent e])
-  (sync-local->remote-all-files! [this] "compare all local files to remote ones, sync if not equal.
+  (sync-local->remote-all-files! [this stop-chan] "compare all local files to remote ones, sync if not equal.
   ensure local-txid = remote-txid before calling this func"))
   ensure local-txid = remote-txid before calling this func"))
 
 
 (deftype Remote->LocalSyncer [graph-uuid base-path repo *txid ^SyncState sync-state ^:mutable local->remote-syncer]
 (deftype Remote->LocalSyncer [graph-uuid base-path repo *txid ^SyncState sync-state ^:mutable local->remote-syncer]
@@ -598,9 +602,7 @@
           {:unknown r}
           {:unknown r}
           {:succ true}))))
           {:succ true}))))
 
 
-  (sync-remote->local-all-files! [this]
-    ;; TODO
-    ))
+  (sync-remote->local-all-files! [this] nil))
 
 
 
 
 (defn- file-changed?
 (defn- file-changed?
@@ -617,19 +619,27 @@
 (deftype Local->RemoteSyncer [graph-uuid base-path repo ^SyncState sync-state
 (deftype Local->RemoteSyncer [graph-uuid base-path repo ^SyncState sync-state
                               ^:mutable rate *txid ^:mutable remote->local-syncer stop-chan]
                               ^:mutable rate *txid ^:mutable remote->local-syncer stop-chan]
   Object
   Object
-  (filtered-chan [_ n]
+  (filter-file-change-events-fn [this]
+    (fn [^FileChangeEvent e] (and (instance? FileChangeEvent e)
+                                  (string/starts-with? (.-dir e) base-path)
+                                  (not (contains-path? (get-ignore-files this) (-relative-path e)))
+                                  (contains-path? (get-monitored-dirs this) (-relative-path e)))))
+
+  (filtered-chan [this n]
     "check base-path"
     "check base-path"
-    (chan n (filter (fn [^FileChangeEvent e] (string/starts-with? (.-dir e) base-path)))))
+    (chan n (filter (.filter-file-change-events-fn this))))
 
 
   (set-remote->local-syncer! [_ s] (set! remote->local-syncer s))
   (set-remote->local-syncer! [_ s] (set! remote->local-syncer s))
 
 
   ILocal->RemoteSync
   ILocal->RemoteSync
-  (get-ignore-files [_] #{#"logseq/graphs-txid.edn$" #"logseq/bak/.*" #"logseq/version-files/.*"})
-  (get-monitored-dirs [_] #{"assets/" "journals/" "logseq/" "pages/"})
+  (get-ignore-files [_] #{#"logseq/graphs-txid.edn$" #"logseq/bak/.*" #"version-files/.*" #"logseq/\.recycle/.*"
+                          #"\.DS_Store$"})
+  (get-monitored-dirs [_] #{#"^assets/" #"^journals/" #"^logseq/" #"^pages/"})
   (stop-local->remote! [_] (async/close! stop-chan))
   (stop-local->remote! [_] (async/close! stop-chan))
 
 
   (ratelimit [this from-chan]
   (ratelimit [this from-chan]
-    (let [c (.filtered-chan this 10000)]
+    (let [c (.filtered-chan this 10000)
+          filter-e-fn (.filter-file-change-events-fn this)]
       (go-loop [timeout-c (timeout rate)
       (go-loop [timeout-c (timeout rate)
                 tcoll (transient [])]
                 tcoll (transient [])]
         (let [{:keys [timeout ^FileChangeEvent e stop]}
         (let [{:keys [timeout ^FileChangeEvent e stop]}
@@ -647,11 +657,12 @@
 
 
             (some? e)
             (some? e)
             (do
             (do
-              (if (= "unlink" (.-type e))
-                (conj! tcoll e)
-                (if (<! (file-changed? graph-uuid (-relative-path e) base-path))
+              (when (filter-e-fn e)
+                (if (= "unlink" (.-type e))
                   (conj! tcoll e)
                   (conj! tcoll e)
-                  (prn "file unchanged" (-relative-path e))))
+                  (if (<! (file-changed? graph-uuid (-relative-path e) base-path))
+                    (conj! tcoll e)
+                    (prn "file unchanged" (-relative-path e)))))
               (recur timeout-c tcoll))
               (recur timeout-c tcoll))
 
 
             (nil? e)
             (nil? e)
@@ -700,8 +711,7 @@
                       (println "sync-local->remote unknown:" r*)
                       (println "sync-local->remote unknown:" r*)
                       {:unknown r*}))))))))))
                       {:unknown r*}))))))))))
 
 
-  ;; TODO: support stopping in the middle of processing
-  (sync-local->remote-all-files! [this]
+  (sync-local->remote-all-files! [this stop-chan]
     (go
     (go
       (let [remote-all-files-meta-c (get-remote-all-files-meta remoteapi graph-uuid)
       (let [remote-all-files-meta-c (get-remote-all-files-meta remoteapi graph-uuid)
             local-all-files-meta-c (get-local-all-files-meta rsapi graph-uuid base-path)
             local-all-files-meta-c (get-local-all-files-meta rsapi graph-uuid base-path)
@@ -709,21 +719,26 @@
             local-all-files-meta (<! local-all-files-meta-c)
             local-all-files-meta (<! local-all-files-meta-c)
             diff-local-files (set/difference local-all-files-meta remote-all-files-meta)
             diff-local-files (set/difference local-all-files-meta remote-all-files-meta)
             ignore-files (get-ignore-files this)
             ignore-files (get-ignore-files this)
+            monitored-dirs (get-monitored-dirs this)
             change-events (->> diff-local-files
             change-events (->> diff-local-files
                                (mapv
                                (mapv
                                 #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %) nil))
                                 #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %) nil))
-                               (filterv (complement
-                                         #(contains-path? ignore-files (-relative-path %)))))]
+                               (filterv #(let [relative-path (-relative-path %)]
+                                           (and (not (contains-path? ignore-files relative-path))
+                                                (contains-path? monitored-dirs relative-path)))))]
+        (println "[full-sync]" (count change-events) "files need to sync to remote")
         (loop [es change-events]
         (loop [es change-events]
-          (if-not es
+          (if (empty? es)
             {:succ true}
             {:succ true}
-            (let [e (first es)
-                  {:keys [succ need-sync-remote unknown] :as r} (<! (sync-local->remote! this e))]
-              (cond
-                succ
-                (recur (next es))
+            (if (async/poll! stop-chan)
+              {:stop true}
+              (let [e (first es)
+                    {:keys [succ need-sync-remote unknown] :as r} (<! (sync-local->remote! this e))]
+                (cond
+                  succ
+                  (recur (next es))
 
 
-                (or need-sync-remote unknown) r))))))))
+                  (or need-sync-remote unknown) r)))))))))
 
 
 (deftype SyncState [^:mutable state ^:mutable current-local->remote-files ^:mutable current-remote->local-files
 (deftype SyncState [^:mutable state ^:mutable current-local->remote-files ^:mutable current-remote->local-files
                     ^:mutable history]
                     ^:mutable history]
@@ -770,7 +785,8 @@
       (-pr-writer pr-map w opts))))
       (-pr-writer pr-map w opts))))
 
 
 
 
-(deftype SyncManager [graph-uuid base-path ^SyncState sync-state local->remote-syncer remote->local-syncer
+(deftype SyncManager [graph-uuid base-path ^SyncState sync-state
+                      ^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer
                       full-sync-chan stop-sync-chan remote->local-sync-chan local->remote-sync-chan
                       full-sync-chan stop-sync-chan remote->local-sync-chan local->remote-sync-chan
                       local-changes-chan ^:mutable ratelimit-local-changes-chan
                       local-changes-chan ^:mutable ratelimit-local-changes-chan
                       *txid ^:mutable state ^:mutable _remote-change-chan ^:mutable _*ws]
                       *txid ^:mutable state ^:mutable _remote-change-chan ^:mutable _*ws]
@@ -802,11 +818,9 @@
     (set! ratelimit-local-changes-chan (ratelimit local->remote-syncer local-changes-chan))
     (set! ratelimit-local-changes-chan (ratelimit local->remote-syncer local-changes-chan))
     (.schedule this ::idle))
     (.schedule this ::idle))
 
 
-
-
   (idle [this]
   (idle [this]
     (go
     (go
-      (let [{:keys [stop full-sync trigger-remote trigger-local remote local]}
+      (let [{:keys [stop full-sync trigger-remote trigger-local remote local trigger-full-sync]}
             (async/alt!
             (async/alt!
               stop-sync-chan {:stop true}
               stop-sync-chan {:stop true}
               full-sync-chan {:full-sync true}
               full-sync-chan {:full-sync true}
@@ -814,11 +828,12 @@
               local->remote-sync-chan {:trigger-local true}
               local->remote-sync-chan {:trigger-local true}
               _remote-change-chan ([v] (println "remote changes:" v) {:remote v})
               _remote-change-chan ([v] (println "remote changes:" v) {:remote v})
               ratelimit-local-changes-chan ([v] (println "local changes:" v) {:local v})
               ratelimit-local-changes-chan ([v] (println "local changes:" v) {:local v})
+              (timeout (* 20 60 1000)) {:trigger-full-sync true}
               :priority true)]
               :priority true)]
         (cond
         (cond
           stop
           stop
           (<! (.schedule this ::stop))
           (<! (.schedule this ::stop))
-          full-sync
+          (or full-sync trigger-full-sync)
           (<! (.schedule this ::full-sync))
           (<! (.schedule this ::full-sync))
           remote
           remote
           (<! (.schedule this ::remote->local remote))
           (<! (.schedule this ::remote->local remote))
@@ -827,13 +842,15 @@
 
 
   (full-sync [this]
   (full-sync [this]
     (go
     (go
-      (let [{:keys [succ need-sync-remote unknown]}
-            (<! (sync-local->remote-all-files! local->remote-syncer))]
+      (let [{:keys [succ need-sync-remote unknown stop]}
+            (<! (sync-local->remote-all-files! local->remote-syncer stop-sync-chan))]
         (cond
         (cond
           succ
           succ
           (.schedule this ::idle)
           (.schedule this ::idle)
           need-sync-remote
           need-sync-remote
           (.schedule this ::remote->local=>full-sync)
           (.schedule this ::remote->local=>full-sync)
+          stop
+          (.schedule this ::stop)
           unknown
           unknown
           (do
           (do
             (debug/pprint "full-sync" unknown)
             (debug/pprint "full-sync" unknown)
@@ -873,6 +890,7 @@
   IStoppable
   IStoppable
   (-stop! [this]
   (-stop! [this]
     (ws-stop! _*ws)
     (ws-stop! _*ws)
+    (offer! stop-sync-chan true)
     (stop-local->remote! local->remote-syncer)
     (stop-local->remote! local->remote-syncer)
     (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
     (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
     (.update-state! sync-state ::stop)
     (.update-state! sync-state ::stop)
@@ -928,7 +946,7 @@
 
 
   )
   )
 
 
-(def full-sync-chan (chan))
+(def full-sync-chan (chan 1))
 (def stop-sync-chan (chan 1))
 (def stop-sync-chan (chan 1))
 (def remote->local-sync-chan (chan))
 (def remote->local-sync-chan (chan))
 (def local->remote-sync-chan (chan))
 (def local->remote-sync-chan (chan))
@@ -950,11 +968,14 @@
     ;; drain `local-changes-chan`
     ;; drain `local-changes-chan`
     (->> (repeatedly #(poll! local-changes-chan))
     (->> (repeatedly #(poll! local-changes-chan))
          (take-while identity))
          (take-while identity))
+    (poll! stop-sync-chan)
 
 
     (.start sm)
     (.start sm)
     (state/set-file-sync-state-manager sync-state)
     (state/set-file-sync-state-manager sync-state)
     (state/set-file-sync-manager sm)
     (state/set-file-sync-manager sm)
 
 
+    (offer! full-sync-chan true)
+
     ;; watch :network/online?
     ;; watch :network/online?
     (add-watch (rum/cursor state/state :network/online?) "sync-manage"
     (add-watch (rum/cursor state/state :network/online?) "sync-manage"
                (fn [k r o n]
                (fn [k r o n]