|
@@ -92,6 +92,7 @@
|
|
|
(s/def ::current-local->remote-files (s/coll-of ::path :kind set?))
|
|
|
(s/def ::current-remote->local-files (s/coll-of ::path :kind set?))
|
|
|
(s/def ::recent-remote->local-files (s/coll-of ::recent-remote->local-file-item :kind set?))
|
|
|
+(s/def ::recent-remote->local-files-map (s/map-of ::path ::recent-remote->local-file-item))
|
|
|
(s/def ::history-item (s/keys :req-un [::path ::time]))
|
|
|
(s/def ::history (s/coll-of ::history-item :kind seq?))
|
|
|
(s/def ::sync-state (s/keys :req-un [::current-syncing-graph-uuid
|
|
@@ -103,6 +104,7 @@
|
|
|
;; causes unreasonable information in the content of ::queued-local->remote-files,
|
|
|
;; use ::recent-remote->local-files to filter such events
|
|
|
::recent-remote->local-files
|
|
|
+ ::recent-remote->local-files-map
|
|
|
::history]))
|
|
|
|
|
|
;; diff
|
|
@@ -1493,14 +1495,15 @@
|
|
|
(.-updated? e) :update-filetxns
|
|
|
(.-deleted? e) :delete-filetxns
|
|
|
(.renamed? e) :rename-filetxns)) filetxns)
|
|
|
- update-file-items (map
|
|
|
+ update-file-items (mapv
|
|
|
(fn [filetxn]
|
|
|
(let [path (relative-path filetxn)]
|
|
|
{:remote->local-type :update
|
|
|
:checksum (-checksum filetxn)
|
|
|
:path path}))
|
|
|
update-filetxns)
|
|
|
- rename-file-items (mapcat
|
|
|
+ update-file-items {:update update-file-items}
|
|
|
+ rename-file-items (mapv
|
|
|
(fn [^FileTxn filetxn]
|
|
|
(let [to-path (relative-path filetxn)
|
|
|
from-path (.-from-path filetxn)]
|
|
@@ -1511,14 +1514,17 @@
|
|
|
:checksum nil
|
|
|
:path from-path}]))
|
|
|
rename-filetxns)
|
|
|
- delete-file-items (map
|
|
|
+ rename-file-items {:update (mapv first rename-file-items)
|
|
|
+ :delete (mapv second rename-file-items)}
|
|
|
+ delete-file-items (mapv
|
|
|
(fn [filetxn]
|
|
|
(let [path (relative-path filetxn)]
|
|
|
{:remote->local-type :delete
|
|
|
:checksum (-checksum filetxn)
|
|
|
:path path}))
|
|
|
- delete-filetxns)]
|
|
|
- (set (concat update-file-items rename-file-items delete-file-items))))
|
|
|
+ delete-filetxns)
|
|
|
+ delete-file-items {:delete delete-file-items}]
|
|
|
+ (merge-with concat update-file-items rename-file-items delete-file-items)))
|
|
|
|
|
|
(defn- apply-filetxns
|
|
|
[*sync-state graph-uuid base-path filetxns *paused]
|
|
@@ -1593,20 +1599,27 @@
|
|
|
(when (seq filetxns-partitions*)
|
|
|
(let [filetxns (first filetxns-partitions*)
|
|
|
paths (map relative-path filetxns)
|
|
|
- recent-remote->local-file-items (filetxns=>recent-remote->local-files filetxns)
|
|
|
- _ (when-not full-sync?
|
|
|
- (swap! *sync-state #(sync-state-reset-full-remote->local-files % recent-remote->local-file-items)))
|
|
|
+ {update-items :update delete-items :delete}
|
|
|
+ (filetxns=>recent-remote->local-files filetxns)
|
|
|
+ recent-remote->local-file-items (concat update-items delete-items)
|
|
|
+ _
|
|
|
+ (when-not full-sync? ; when full-sync, :full-remote->local-files has been set outside
|
|
|
+ (swap! *sync-state
|
|
|
+ #(sync-state-reset-full-remote->local-files
|
|
|
+ % (mapv (fn [e] (select-keys e [:path :checksum])) recent-remote->local-file-items))))
|
|
|
;; update recent-remote->local-files
|
|
|
- _ (swap! *sync-state sync-state--add-recent-remote->local-files
|
|
|
- recent-remote->local-file-items)
|
|
|
- _ (swap! *sync-state sync-state--add-current-remote->local-files paths)
|
|
|
+ _
|
|
|
+ (swap! *sync-state sync-state--add-recent-remote->local-files recent-remote->local-file-items)
|
|
|
+ _
|
|
|
+ (swap! *sync-state sync-state--add-current-remote->local-files paths)
|
|
|
r (<! (apply-filetxns *sync-state graph-uuid base-path filetxns *paused))
|
|
|
- _ (swap! *sync-state sync-state--remove-current-remote->local-files paths
|
|
|
- (not (instance? ExceptionInfo r)))]
|
|
|
- ;; remove these recent-remote->local-file-items 5s later
|
|
|
+ _
|
|
|
+ (swap! *sync-state sync-state--remove-current-remote->local-files paths (not (instance? ExceptionInfo r)))]
|
|
|
+ ;; remove these delete type recent-remote->local-file-items 5s later
|
|
|
+ ;; update type items still exist in recent-remote->local-files,
|
|
|
+ ;; because there's a checksum attr in update-items, won't conflict with another same file update event
|
|
|
(go (<! (timeout 5000))
|
|
|
- (swap! *sync-state sync-state--remove-recent-remote->local-files
|
|
|
- recent-remote->local-file-items))
|
|
|
+ (swap! *sync-state sync-state--remove-recent-remote->local-files delete-items))
|
|
|
(cond
|
|
|
(instance? ExceptionInfo r) r
|
|
|
@*paused {:pause true}
|
|
@@ -1706,7 +1719,7 @@
|
|
|
path (relative-path e)]
|
|
|
{:remote->local-type tp
|
|
|
:checksum (if (= tp :delete) nil
|
|
|
- (val (first (<! (get-local-files-checksum graph-uuid (.-dir e) [path])))))
|
|
|
+ (val (first (<! (get-local-files-checksum graph-uuid (.-dir e) [path])))))
|
|
|
:path path})))
|
|
|
|
|
|
(defn- distinct-file-change-events-xf
|
|
@@ -2089,15 +2102,16 @@
|
|
|
"create a new sync-state"
|
|
|
[]
|
|
|
{:post [(s/valid? ::sync-state %)]}
|
|
|
- {:current-syncing-graph-uuid nil
|
|
|
- :state ::starting
|
|
|
- :full-local->remote-files #{}
|
|
|
- :current-local->remote-files #{}
|
|
|
- :full-remote->local-files #{}
|
|
|
- :current-remote->local-files #{}
|
|
|
- :queued-local->remote-files #{}
|
|
|
- :recent-remote->local-files #{}
|
|
|
- :history '()})
|
|
|
+ {:current-syncing-graph-uuid nil
|
|
|
+ :state ::starting
|
|
|
+ :full-local->remote-files #{}
|
|
|
+ :current-local->remote-files #{}
|
|
|
+ :full-remote->local-files #{}
|
|
|
+ :current-remote->local-files #{}
|
|
|
+ :queued-local->remote-files #{}
|
|
|
+ :recent-remote->local-files #{}
|
|
|
+ :recent-remote->local-files-map {}
|
|
|
+ :history '()})
|
|
|
|
|
|
(defn- sync-state--update-current-syncing-graph-uuid
|
|
|
[sync-state graph-uuid]
|
|
@@ -2145,21 +2159,34 @@
|
|
|
[sync-state items]
|
|
|
{:pre [(s/valid? (s/coll-of ::recent-remote->local-file-item) items)]
|
|
|
:post [(s/valid? ::sync-state %)]}
|
|
|
- (update sync-state :recent-remote->local-files (partial apply conj) items))
|
|
|
+ (let [paths (mapv :path items)
|
|
|
+ items-to-delete (vals (select-keys (:recent-remote->local-files-map sync-state) paths))]
|
|
|
+ (-> sync-state
|
|
|
+ (update :recent-remote->local-files (partial apply disj) items-to-delete) ;delete same path items
|
|
|
+ (update :recent-remote->local-files (partial apply conj) items)
|
|
|
+ (update :recent-remote->local-files-map (partial apply conj) (mapv (juxt :path identity) items)))))
|
|
|
+
|
|
|
|
|
|
(defn sync-state--remove-recent-remote->local-files
|
|
|
[sync-state items]
|
|
|
- {:post [(s/valid? ::sync-state %)]}
|
|
|
- (update sync-state :recent-remote->local-files set/difference items))
|
|
|
+ {:pre [(s/valid? (s/coll-of ::recent-remote->local-file-item) items)]
|
|
|
+ :post [(s/valid? ::sync-state %)]}
|
|
|
+ (let [paths (mapv :path items)
|
|
|
+ items-to-delete (vals (apply select-keys (:recent-remote->local-files-map sync-state) paths))]
|
|
|
+ (-> sync-state
|
|
|
+ (update :recent-remote->local-files-map (partial apply dissoc) paths)
|
|
|
+ (update :recent-remote->local-files (partial apply disj) items-to-delete))))
|
|
|
|
|
|
(defn sync-state-reset-full-local->remote-files
|
|
|
[sync-state events]
|
|
|
- {:post [(s/valid? ::sync-state %)]}
|
|
|
+ {:pre [(s/valid? (s/coll-of #(instance? FileChangeEvent %)) events)]
|
|
|
+ :post [(s/valid? ::sync-state %)]}
|
|
|
(assoc sync-state :full-local->remote-files events))
|
|
|
|
|
|
(defn sync-state-reset-full-remote->local-files
|
|
|
[sync-state events]
|
|
|
- {:post [(s/valid? ::sync-state %)]}
|
|
|
+ {:pre [(s/valid? (s/coll-of (s/keys :req-un [::path ::checksum])) events)]
|
|
|
+ :post [(s/valid? ::sync-state %)]}
|
|
|
(assoc sync-state :full-remote->local-files events))
|
|
|
|
|
|
(defn- add-history-items
|
|
@@ -2313,8 +2340,7 @@
|
|
|
diff-remote-files (diff-file-metadata-sets remote-all-files-meta local-all-files-meta)
|
|
|
recent-10-days-range ((juxt #(tc/to-long (t/minus % (t/days 10))) #(tc/to-long %)) (t/today))
|
|
|
sorted-diff-remote-files
|
|
|
- (sort-by
|
|
|
- (sort-file-metadata-fn :recent-days-range recent-10-days-range) > diff-remote-files)
|
|
|
+ (sort-by (sort-file-metadata-fn :recent-days-range recent-10-days-range) > diff-remote-files)
|
|
|
remote-graph-info-or-ex (<! (<get-remote-graph remoteapi nil graph-uuid))
|
|
|
latest-txid (:TXId remote-graph-info-or-ex)]
|
|
|
(if (or (instance? ExceptionInfo remote-graph-info-or-ex) (nil? latest-txid))
|
|
@@ -2325,7 +2351,10 @@
|
|
|
{:stop true})
|
|
|
(do (println "[full-sync(remote->local)]" (count sorted-diff-remote-files) "files need to sync")
|
|
|
(let [filtered-files (filter-download-files-with-reserved-chars sorted-diff-remote-files)]
|
|
|
- (swap! *sync-state #(sync-state-reset-full-remote->local-files % sorted-diff-remote-files))
|
|
|
+ (swap! *sync-state #(sync-state-reset-full-remote->local-files % (mapv (fn [e]
|
|
|
+ {:path (:path e)
|
|
|
+ :checksum (:etag e)})
|
|
|
+ sorted-diff-remote-files)))
|
|
|
(<! (.sync-files-remote->local!
|
|
|
this (map (juxt relative-path -checksum)
|
|
|
filtered-files)
|