|
@@ -619,6 +619,7 @@
|
|
|
|
|
|
(def local-changes-chan (chan 100))
|
|
|
|
|
|
+;;; type = "change" | "add" | "unlink"
|
|
|
(deftype FileChangeEvent [type dir path stat]
|
|
|
IRelativePath
|
|
|
(-relative-path [_] (remove-dir-prefix dir path))
|
|
@@ -633,6 +634,20 @@
|
|
|
(-pr-writer [_ w _opts]
|
|
|
(write-all w (str {:type type :base-path dir :path path}))))
|
|
|
|
|
|
+(defn- partition-file-change-events
|
|
|
+ "return transducer.
|
|
|
+ partition `FileChangeEvent`s, at most N file-change-events in each partition.
|
|
|
+ only one type in a partition."
|
|
|
+ [n]
|
|
|
+ (comp
|
|
|
+ (partition-by (fn [^FileChangeEvent e]
|
|
|
+ (case (.-type e)
|
|
|
+ ("add" "change") :add-or-change
|
|
|
+ "unlink" :unlink)))
|
|
|
+ (map #(partition-all n %))
|
|
|
+ cat))
|
|
|
+
|
|
|
+
|
|
|
(defn file-watch-handler
|
|
|
[type {:keys [dir path _content stat] :as _payload}]
|
|
|
(go
|
|
@@ -652,7 +667,7 @@
|
|
|
(stop-local->remote! [this])
|
|
|
(ratelimit [this from-chan] "get watched local file-change events from FROM-CHAN,
|
|
|
return chan returning events with rate limited")
|
|
|
- (sync-local->remote! [this ^FileChangeEvent e])
|
|
|
+ (sync-local->remote! [this es] "es is a sequence of `FileChangeEvent`, all items have same type.")
|
|
|
(sync-local->remote-all-files! [this stop-chan] "compare all local files to remote ones, sync if not equal.
|
|
|
ensure local-txid = remote-txid before calling this func"))
|
|
|
|
|
@@ -756,44 +771,42 @@
|
|
|
(async/close! c)))))
|
|
|
c))
|
|
|
|
|
|
- (sync-local->remote! [this ^FileChangeEvent e]
|
|
|
- (let [type (.-type e)]
|
|
|
- (if (contains-path? (get-ignore-files this) (relative-path e))
|
|
|
- (go {:succ true}) ; ignore
|
|
|
- (do
|
|
|
- (prn "sync-local->remote!" e)
|
|
|
- (let [path* (relative-path e)
|
|
|
- r
|
|
|
- (cond
|
|
|
- (or (= "add" type) (= "change" type))
|
|
|
- (update-remote-file rsapi graph-uuid base-path path* @*txid)
|
|
|
-
|
|
|
- (= "unlink" type)
|
|
|
- (delete-remote-files rsapi graph-uuid base-path [path*] @*txid)
|
|
|
-
|
|
|
- ;; (= "rename" type)
|
|
|
- ;; (rename-local-file)
|
|
|
- )]
|
|
|
- (go
|
|
|
- (let [_ (.add-current-local->remote-files! sync-state [path*])
|
|
|
- r* (<! r)
|
|
|
- _ (.remove-current-local->remote-files! sync-state [path*])]
|
|
|
- (cond
|
|
|
- (need-sync-remote? r*)
|
|
|
- {:need-sync-remote true}
|
|
|
-
|
|
|
- (number? r*) ; succ
|
|
|
- (do
|
|
|
- (println "sync-local->remote! update txid" r*)
|
|
|
- ;; persist txid
|
|
|
- (update-graphs-txid! r* graph-uuid repo)
|
|
|
- (reset! *txid r*)
|
|
|
- {:succ true})
|
|
|
-
|
|
|
- :else
|
|
|
- (do
|
|
|
- (println "sync-local->remote unknown:" r*)
|
|
|
- {:unknown r*})))))))))
|
|
|
+ (sync-local->remote! [this es]
|
|
|
+ (if (empty? es)
|
|
|
+ {:succ true}
|
|
|
+ (let [type (.-type ^FileChangeEvent (first es))
|
|
|
+ ignore-files (get-ignore-files this)
|
|
|
+ es->paths-xf (comp
|
|
|
+ (map #(relative-path %))
|
|
|
+ (filter #(not (contains-path? ignore-files %))))
|
|
|
+ paths (sequence es->paths-xf es)]
|
|
|
+ (println "sync-local->remote" paths)
|
|
|
+ (let [r (case type
|
|
|
+ ("add" "change")
|
|
|
+ (update-remote-files rsapi graph-uuid base-path paths @*txid)
|
|
|
+
|
|
|
+ "unlink"
|
|
|
+ (delete-remote-files rsapi graph-uuid base-path paths @*txid))]
|
|
|
+ (go
|
|
|
+ (let [_ (.add-current-local->remote-files! sync-state paths)
|
|
|
+ r* (<! r)
|
|
|
+ _ (.remove-current-local->remote-files! sync-state paths)]
|
|
|
+ (cond
|
|
|
+ (need-sync-remote? r*)
|
|
|
+ {:need-sync-remote true}
|
|
|
+
|
|
|
+ (number? r*) ; succ
|
|
|
+ (do
|
|
|
+ (println "sync-local->remote! update txid" r*)
|
|
|
+ ;; persist txid
|
|
|
+ (update-graphs-txid! r* graph-uuid repo)
|
|
|
+ (reset! *txid r*)
|
|
|
+ {:succ true})
|
|
|
+
|
|
|
+ :else
|
|
|
+ (do
|
|
|
+ (println "sync-local->remote unknown:" r*)
|
|
|
+ {:unknown r*}))))))))
|
|
|
|
|
|
(sync-local->remote-all-files! [this stop-chan]
|
|
|
(go
|
|
@@ -804,23 +817,29 @@
|
|
|
diff-local-files (set/difference local-all-files-meta remote-all-files-meta)
|
|
|
ignore-files (get-ignore-files this)
|
|
|
monitored-dirs (get-monitored-dirs this)
|
|
|
- change-events (->> diff-local-files
|
|
|
- (mapv
|
|
|
- #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %) nil))
|
|
|
- (filterv #(let [path (relative-path %)]
|
|
|
- (and (not (contains-path? ignore-files path))
|
|
|
- (contains-path? monitored-dirs path)))))]
|
|
|
- (println "[full-sync]" (count change-events) "files need to sync to remote")
|
|
|
- (loop [es change-events]
|
|
|
- (if (empty? es)
|
|
|
+ change-events-partitions
|
|
|
+ (sequence
|
|
|
+ (comp
|
|
|
+ ;; convert to FileChangeEvent
|
|
|
+ (map #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %) nil))
|
|
|
+ ;; filter ignore-files & monitored-dirs
|
|
|
+ (filter #(let [path (relative-path %)]
|
|
|
+ (and (not (contains-path? ignore-files path))
|
|
|
+ (contains-path? monitored-dirs path))))
|
|
|
+ ;; partition FileChangeEvents
|
|
|
+ (partition-file-change-events 5))
|
|
|
+ diff-local-files)]
|
|
|
+ (println "[full-sync]" (count (flatten change-events-partitions)) "files need to sync to remote")
|
|
|
+ (loop [es-partitions change-events-partitions]
|
|
|
+ (if (empty? es-partitions)
|
|
|
{:succ true}
|
|
|
(if (async/poll! stop-chan)
|
|
|
{:stop true}
|
|
|
- (let [e (first es)
|
|
|
- {:keys [succ need-sync-remote unknown] :as r} (<! (sync-local->remote! this e))]
|
|
|
+ (let [{:keys [succ need-sync-remote unknown] :as r}
|
|
|
+ (<! (sync-local->remote! this (first es-partitions)))]
|
|
|
(cond
|
|
|
succ
|
|
|
- (recur (next es))
|
|
|
+ (recur (next es-partitions))
|
|
|
|
|
|
(or need-sync-remote unknown) r)))))))))
|
|
|
|
|
@@ -960,7 +979,7 @@
|
|
|
(assert (some? local-change))
|
|
|
(go
|
|
|
(let [{:keys [succ need-sync-remote unknown]}
|
|
|
- (<! (sync-local->remote! local->remote-syncer local-change))]
|
|
|
+ (<! (sync-local->remote! local->remote-syncer [local-change]))]
|
|
|
(cond
|
|
|
succ
|
|
|
(.schedule this ::idle)
|