|
|
@@ -71,6 +71,7 @@
|
|
|
(s/def ::sync-state (s/keys :req-un [::state
|
|
|
::current-local->remote-files
|
|
|
::current-remote->local-files
|
|
|
+ ::queued-local->remote-files
|
|
|
::history]))
|
|
|
|
|
|
;; diff
|
|
|
@@ -106,7 +107,9 @@
|
|
|
|
|
|
(def graphs-txid (persist-var/persist-var nil "graphs-txid"))
|
|
|
|
|
|
-(defn update-graphs-txid! [latest-txid graph-uuid user-uuid repo]
|
|
|
+(defn update-graphs-txid!
|
|
|
+ [latest-txid graph-uuid user-uuid repo]
|
|
|
+ {:pre [(int? latest-txid) (>= latest-txid 0)]}
|
|
|
(persist-var/-reset-value! graphs-txid [user-uuid graph-uuid latest-txid] repo)
|
|
|
(persist-var/persist-save graphs-txid))
|
|
|
|
|
|
@@ -837,6 +840,71 @@
|
|
|
not)
|
|
|
(go (>! local-changes-chan (->FileChangeEvent type dir path stat)))))
|
|
|
|
|
|
+;;; ### sync state
|
|
|
+
|
|
|
+
|
|
|
+(defn sync-state
|
|
|
+ "create a new sync-state"
|
|
|
+ []
|
|
|
+ {:post [(s/valid? ::sync-state %)]}
|
|
|
+ {:state ::idle
|
|
|
+ :current-local->remote-files #{}
|
|
|
+ :current-remote->local-files #{}
|
|
|
+ :queued-local->remote-files #{}
|
|
|
+ :history '()})
|
|
|
+
|
|
|
+(defn- sync-state--update-state
|
|
|
+ [sync-state next-state]
|
|
|
+ {:pre [(s/valid? ::state next-state)]
|
|
|
+ :post [(s/valid? ::sync-state %)]}
|
|
|
+ (assoc sync-state :state next-state))
|
|
|
+
|
|
|
+(defn sync-state--add-current-remote->local-files
|
|
|
+ [sync-state paths]
|
|
|
+ {:post [(s/valid? ::sync-state %)]}
|
|
|
+ (update sync-state :current-remote->local-files into paths))
|
|
|
+
|
|
|
+(defn sync-state--add-current-local->remote-files
|
|
|
+ [sync-state paths]
|
|
|
+ {:post [(s/valid? ::sync-state %)]}
|
|
|
+ (update sync-state :current-local->remote-files into paths))
|
|
|
+
|
|
|
+(defn sync-state--update-queued-local->remote-files
|
|
|
+ [sync-state paths]
|
|
|
+ {:post [(s/valid? ::sync-state %)]}
|
|
|
+ (update sync-state :queued-local->remote-files (fn [_ n] (set n)) paths))
|
|
|
+
|
|
|
+(defn- add-history-items
|
|
|
+ [history paths now]
|
|
|
+ (sequence
|
|
|
+ (comp
|
|
|
+ ;; only reserve the latest one of same-path-items
|
|
|
+ (dedupe-by :path)
|
|
|
+ ;; reserve the latest 20 history items
|
|
|
+ (take 20))
|
|
|
+ (into history
|
|
|
+ (map (fn [path] {:path path :time now}) paths))))
|
|
|
+
|
|
|
+(defn sync-state--remove-current-remote->local-files
|
|
|
+ [sync-state paths]
|
|
|
+ {:post [(s/valid? ::sync-state %)]}
|
|
|
+ (let [now (t/now)]
|
|
|
+ (-> sync-state
|
|
|
+ (update :current-remote->local-files set/difference paths)
|
|
|
+ (update :history add-history-items paths now))))
|
|
|
+
|
|
|
+(defn sync-state--remove-current-local->remote-files
|
|
|
+ [sync-state paths]
|
|
|
+ {:post [(s/valid? ::sync-state %)]}
|
|
|
+ (let [now (t/now)]
|
|
|
+ (-> sync-state
|
|
|
+ (update :current-local->remote-files set/difference paths)
|
|
|
+ (update :history add-history-items paths now))))
|
|
|
+
|
|
|
+(defn sync-state--stopped?
|
|
|
+ [sync-state]
|
|
|
+ (= ::stop (:state sync-state)))
|
|
|
+
|
|
|
;;; ### remote->local syncer & local->remote syncer
|
|
|
|
|
|
(defprotocol IRemote->LocalSync
|
|
|
@@ -953,32 +1021,26 @@
|
|
|
(defn- contains-path? [regexps path]
|
|
|
(reduce #(when (re-find %2 path) (reduced true)) false regexps))
|
|
|
|
|
|
-(defn- filter-local-changes
|
|
|
+(defn- filter-local-changes-pred
|
|
|
"filter local-change events:
|
|
|
- for 'unlink' event
|
|
|
- when related file exists on local dir, ignore this event
|
|
|
- for 'add' | 'change' event
|
|
|
- when related file's content is same as remote file, ignore it"
|
|
|
- [to-ch from-ch basepath graph-uuid]
|
|
|
- (async/pipeline-async
|
|
|
- 1 to-ch
|
|
|
- (fn [^FileChangeEvent e result]
|
|
|
- (go
|
|
|
- (case (.-type e)
|
|
|
- "unlink"
|
|
|
- (let [r (<! (get-local-files-meta rsapi "" basepath [(relative-path e)]))]
|
|
|
- (when (some-> r ex-cause ;; str (string/index-of "No such file or directory")
|
|
|
- )
|
|
|
- (>! result e)))
|
|
|
-
|
|
|
- ("add" "change")
|
|
|
- (let [path (relative-path e)]
|
|
|
- (when (and (<! (local-file-exists? path basepath))
|
|
|
- (<! (file-changed? graph-uuid path basepath)))
|
|
|
- (>! result e))))
|
|
|
- (async/close! result)))
|
|
|
- from-ch false))
|
|
|
-
|
|
|
+ [^FileChangeEvent e basepath graph-uuid]
|
|
|
+ (go
|
|
|
+ (let [r-path (relative-path e)]
|
|
|
+ (case (.-type e)
|
|
|
+ "unlink"
|
|
|
+ (let [r (<! (get-local-files-meta rsapi "" basepath [r-path]))]
|
|
|
+ ;; keep this e when it's not found
|
|
|
+ (some-> r ex-cause))
|
|
|
+
|
|
|
+ ("add" "change")
|
|
|
+ ;; 1. local file exists
|
|
|
+ ;; 2. compare with remote file, and changed
|
|
|
+ (and (<! (local-file-exists? r-path basepath))
|
|
|
+ (<! (file-changed? graph-uuid r-path basepath)))))))
|
|
|
|
|
|
(defrecord ^:large-vars/cleanup-todo
|
|
|
Local->RemoteSyncer [user-uuid graph-uuid base-path repo *sync-state
|
|
|
@@ -1009,7 +1071,7 @@
|
|
|
(let [c (.filtered-chan this 10000)
|
|
|
filter-e-fn (.filter-file-change-events-fn this)]
|
|
|
(go-loop [timeout-c (timeout rate)
|
|
|
- tcoll (transient [])]
|
|
|
+ coll []]
|
|
|
(let [{:keys [timeout ^FileChangeEvent e stop]}
|
|
|
(async/alt! timeout-c {:timeout true}
|
|
|
from-chan ([e] {:e e})
|
|
|
@@ -1019,22 +1081,22 @@
|
|
|
(async/close! c)
|
|
|
|
|
|
timeout
|
|
|
- (let [from-c (chan 10000)]
|
|
|
- (<! (async/onto-chan! from-c (distinct (persistent! tcoll)) false))
|
|
|
- (filter-local-changes c from-c base-path graph-uuid)
|
|
|
- (async/close! from-c)
|
|
|
- (recur (async/timeout rate) (transient [])))
|
|
|
+ (do (async/onto-chan! c coll false)
|
|
|
+ (swap! *sync-state sync-state--update-queued-local->remote-files nil)
|
|
|
+ (recur (async/timeout rate) []))
|
|
|
|
|
|
(some? e)
|
|
|
- (do
|
|
|
- (when (filter-e-fn e)
|
|
|
- (conj! tcoll e))
|
|
|
- (recur timeout-c tcoll))
|
|
|
+ (if (and (filter-e-fn e)
|
|
|
+ (<! (filter-local-changes-pred e base-path graph-uuid)))
|
|
|
+ (let [coll* (distinct (conj coll e))]
|
|
|
+ (swap! *sync-state sync-state--update-queued-local->remote-files
|
|
|
+ (mapv relative-path coll*))
|
|
|
+ (recur timeout-c coll*))
|
|
|
+ (recur timeout-c coll))
|
|
|
|
|
|
(nil? e)
|
|
|
- (do
|
|
|
- (println "close ratelimit chan")
|
|
|
- (async/close! c)))))
|
|
|
+ (do (println "close ratelimit chan")
|
|
|
+ (async/close! c)))))
|
|
|
c))
|
|
|
|
|
|
|
|
|
@@ -1114,64 +1176,7 @@
|
|
|
(or need-sync-remote unknown) r)))))))))
|
|
|
|
|
|
|
|
|
-;;; ### sync state
|
|
|
-
|
|
|
-
|
|
|
-(defn sync-state
|
|
|
- "create a new sync-state"
|
|
|
- []
|
|
|
- {:post [(s/valid? ::sync-state %)]}
|
|
|
- {:state ::idle
|
|
|
- :current-local->remote-files #{}
|
|
|
- :current-remote->local-files #{}
|
|
|
- :history '()})
|
|
|
-
|
|
|
-(defn- sync-state--update-state
|
|
|
- [sync-state next-state]
|
|
|
- {:pre [(s/valid? ::state next-state)]
|
|
|
- :post [(s/valid? ::sync-state %)]}
|
|
|
- (assoc sync-state :state next-state))
|
|
|
-
|
|
|
-(defn sync-state--add-current-remote->local-files
|
|
|
- [sync-state paths]
|
|
|
- {:post [(s/valid? ::sync-state %)]}
|
|
|
- (update sync-state :current-remote->local-files into paths))
|
|
|
-
|
|
|
-(defn sync-state--add-current-local->remote-files
|
|
|
- [sync-state paths]
|
|
|
- {:post [(s/valid? ::sync-state %)]}
|
|
|
- (update sync-state :current-local->remote-files into paths))
|
|
|
-
|
|
|
-(defn- add-history-items
|
|
|
- [history paths now]
|
|
|
- (sequence
|
|
|
- (comp
|
|
|
- ;; only reserve the latest one of same-path-items
|
|
|
- (dedupe-by :path)
|
|
|
- ;; reserve the latest 20 history items
|
|
|
- (take 20))
|
|
|
- (into history
|
|
|
- (map (fn [path] {:path path :time now}) paths))))
|
|
|
-
|
|
|
-(defn sync-state--remove-current-remote->local-files
|
|
|
- [sync-state paths]
|
|
|
- {:post [(s/valid? ::sync-state %)]}
|
|
|
- (let [now (t/now)]
|
|
|
- (-> sync-state
|
|
|
- (update :current-remote->local-files set/difference paths)
|
|
|
- (update :history add-history-items paths now))))
|
|
|
-
|
|
|
-(defn sync-state--remove-current-local->remote-files
|
|
|
- [sync-state paths]
|
|
|
- {:post [(s/valid? ::sync-state %)]}
|
|
|
- (let [now (t/now)]
|
|
|
- (-> sync-state
|
|
|
- (update :current-local->remote-files set/difference paths)
|
|
|
- (update :history add-history-items paths now))))
|
|
|
|
|
|
-(defn sync-state--stopped?
|
|
|
- [sync-state]
|
|
|
- (= ::stop (:state sync-state)))
|
|
|
|
|
|
|
|
|
;;; ### put all stuff together
|