|
|
@@ -33,7 +33,8 @@
|
|
|
;; files in these `get-monitored-dirs` dirs will be synchronized.
|
|
|
;;
|
|
|
;; sync strategy:
|
|
|
-;; - when toggle file-sync on, trigger a local->remote-full-sync first,
|
|
|
+;; - when toggle file-sync on,
|
|
|
+;; trigger remote->local first, then local->remote-full-sync
|
|
|
;; local->remote-full-sync will compare local-files with remote-files (by md5 & size),
|
|
|
;; and upload new-added-files to remote server.
|
|
|
;; - if local->remote sync(normal-sync or full-sync) return :need-sync-remote,
|
|
|
@@ -58,12 +59,8 @@
|
|
|
::remote->local
|
|
|
;; local->remote full sync
|
|
|
::local->remote-full-sync
|
|
|
- ;; exec remote->local, then local->remote
|
|
|
- ::remote->local=>local->remote
|
|
|
- ;; exec remote->local, then local->remote-full-sync
|
|
|
- ::remote->local=>local->remote-full-sync
|
|
|
- ;; exec remote->local-full-sync, then local->remote-full-sync
|
|
|
- ::remote->local-full-sync=>local->remote-full-sync
|
|
|
+ ;; remote->local full sync
|
|
|
+ ::remote->local-full-sync
|
|
|
::stop})
|
|
|
(s/def ::path string?)
|
|
|
(s/def ::time t/date?)
|
|
|
@@ -74,6 +71,7 @@
|
|
|
(s/def ::sync-state (s/keys :req-un [::state
|
|
|
::current-local->remote-files
|
|
|
::current-remote->local-files
|
|
|
+ ::queued-local->remote-files
|
|
|
::history]))
|
|
|
|
|
|
;; diff
|
|
|
@@ -109,7 +107,9 @@
|
|
|
|
|
|
(def graphs-txid (persist-var/persist-var nil "graphs-txid"))
|
|
|
|
|
|
-(defn update-graphs-txid! [latest-txid graph-uuid user-uuid repo]
|
|
|
+(defn update-graphs-txid!
|
|
|
+ [latest-txid graph-uuid user-uuid repo]
|
|
|
+ {:pre [(int? latest-txid) (>= latest-txid 0)]}
|
|
|
(persist-var/-reset-value! graphs-txid [user-uuid graph-uuid latest-txid] repo)
|
|
|
(persist-var/persist-save graphs-txid))
|
|
|
|
|
|
@@ -117,6 +117,19 @@
|
|
|
(persist-var/-reset-value! graphs-txid nil repo)
|
|
|
(persist-var/persist-save graphs-txid))
|
|
|
|
|
|
+(defn- ws-ping-loop [ws]
|
|
|
+ (go-loop []
|
|
|
+ (let [state (.-readyState ws)]
|
|
|
+ ;; not closing or closed state
|
|
|
+ (when (not (contains? #{2 3} state))
|
|
|
+ (if (not= 1 state)
|
|
|
+ ;; when connecting, wait 1s
|
|
|
+ (do (<! (timeout 1000))
|
|
|
+ (recur))
|
|
|
+ (do (.send ws "PING")
|
|
|
+ (<! (timeout 30000))
|
|
|
+ (recur)))))))
|
|
|
+
|
|
|
(defn- ws-stop! [*ws]
|
|
|
(swap! *ws (fn [o] (assoc o :stop true)))
|
|
|
(.close (:ws @*ws)))
|
|
|
@@ -124,6 +137,7 @@
|
|
|
(defn- ws-listen!*
|
|
|
[graph-uuid *ws remote-changes-chan]
|
|
|
(reset! *ws {:ws (js/WebSocket. (util/format ws-addr graph-uuid)) :stop false})
|
|
|
+ (ws-ping-loop (:ws @*ws))
|
|
|
;; (set! (.-onopen (:ws @*ws)) #(println (util/format "ws opened: graph '%s'" graph-uuid %)))
|
|
|
(set! (.-onclose (:ws @*ws)) (fn [_e]
|
|
|
(when-not (true? (:stop @*ws))
|
|
|
@@ -133,13 +147,14 @@
|
|
|
(ws-listen!* graph-uuid *ws remote-changes-chan)))))
|
|
|
(set! (.-onmessage (:ws @*ws)) (fn [e]
|
|
|
(let [data (js->clj (js/JSON.parse (.-data e)) :keywordize-keys true)]
|
|
|
- (if-let [v (poll! remote-changes-chan)]
|
|
|
- (let [last-txid (:txid v)
|
|
|
- current-txid (:txid data)]
|
|
|
- (if (> last-txid current-txid)
|
|
|
- (offer! remote-changes-chan v)
|
|
|
- (offer! remote-changes-chan data)))
|
|
|
- (offer! remote-changes-chan data))))))
|
|
|
+ (when (some? (:txid data))
|
|
|
+ (if-let [v (poll! remote-changes-chan)]
|
|
|
+ (let [last-txid (:txid v)
|
|
|
+ current-txid (:txid data)]
|
|
|
+ (if (> last-txid current-txid)
|
|
|
+ (offer! remote-changes-chan v)
|
|
|
+ (offer! remote-changes-chan data)))
|
|
|
+ (offer! remote-changes-chan data)))))))
|
|
|
|
|
|
(defn ws-listen!
|
|
|
"return channal which output messages from server"
|
|
|
@@ -825,6 +840,71 @@
|
|
|
not)
|
|
|
(go (>! local-changes-chan (->FileChangeEvent type dir path stat)))))
|
|
|
|
|
|
+;;; ### sync state
|
|
|
+
|
|
|
+
|
|
|
+(defn sync-state
|
|
|
+ "create a new sync-state"
|
|
|
+ []
|
|
|
+ {:post [(s/valid? ::sync-state %)]}
|
|
|
+ {:state ::idle
|
|
|
+ :current-local->remote-files #{}
|
|
|
+ :current-remote->local-files #{}
|
|
|
+ :queued-local->remote-files #{}
|
|
|
+ :history '()})
|
|
|
+
|
|
|
+(defn- sync-state--update-state
|
|
|
+ [sync-state next-state]
|
|
|
+ {:pre [(s/valid? ::state next-state)]
|
|
|
+ :post [(s/valid? ::sync-state %)]}
|
|
|
+ (assoc sync-state :state next-state))
|
|
|
+
|
|
|
+(defn sync-state--add-current-remote->local-files
|
|
|
+ [sync-state paths]
|
|
|
+ {:post [(s/valid? ::sync-state %)]}
|
|
|
+ (update sync-state :current-remote->local-files into paths))
|
|
|
+
|
|
|
+(defn sync-state--add-current-local->remote-files
|
|
|
+ [sync-state paths]
|
|
|
+ {:post [(s/valid? ::sync-state %)]}
|
|
|
+ (update sync-state :current-local->remote-files into paths))
|
|
|
+
|
|
|
+(defn sync-state--update-queued-local->remote-files
|
|
|
+ [sync-state paths]
|
|
|
+ {:post [(s/valid? ::sync-state %)]}
|
|
|
+ (update sync-state :queued-local->remote-files (fn [_ n] (set n)) paths))
|
|
|
+
|
|
|
+(defn- add-history-items
|
|
|
+ [history paths now]
|
|
|
+ (sequence
|
|
|
+ (comp
|
|
|
+ ;; only reserve the latest one of same-path-items
|
|
|
+ (dedupe-by :path)
|
|
|
+ ;; reserve the latest 20 history items
|
|
|
+ (take 20))
|
|
|
+ (into history
|
|
|
+ (map (fn [path] {:path path :time now}) paths))))
|
|
|
+
|
|
|
+(defn sync-state--remove-current-remote->local-files
|
|
|
+ [sync-state paths]
|
|
|
+ {:post [(s/valid? ::sync-state %)]}
|
|
|
+ (let [now (t/now)]
|
|
|
+ (-> sync-state
|
|
|
+ (update :current-remote->local-files set/difference paths)
|
|
|
+ (update :history add-history-items paths now))))
|
|
|
+
|
|
|
+(defn sync-state--remove-current-local->remote-files
|
|
|
+ [sync-state paths]
|
|
|
+ {:post [(s/valid? ::sync-state %)]}
|
|
|
+ (let [now (t/now)]
|
|
|
+ (-> sync-state
|
|
|
+ (update :current-local->remote-files set/difference paths)
|
|
|
+ (update :history add-history-items paths now))))
|
|
|
+
|
|
|
+(defn sync-state--stopped?
|
|
|
+ [sync-state]
|
|
|
+ (= ::stop (:state sync-state)))
|
|
|
+
|
|
|
;;; ### remote->local syncer & local->remote syncer
|
|
|
|
|
|
(defprotocol IRemote->LocalSync
|
|
|
@@ -941,32 +1021,26 @@
|
|
|
(defn- contains-path? [regexps path]
|
|
|
(reduce #(when (re-find %2 path) (reduced true)) false regexps))
|
|
|
|
|
|
-(defn- filter-local-changes
|
|
|
+(defn- filter-local-changes-pred
|
|
|
"filter local-change events:
|
|
|
- for 'unlink' event
|
|
|
- when related file exists on local dir, ignore this event
|
|
|
- for 'add' | 'change' event
|
|
|
- when related file's content is same as remote file, ignore it"
|
|
|
- [to-ch from-ch basepath graph-uuid]
|
|
|
- (async/pipeline-async
|
|
|
- 1 to-ch
|
|
|
- (fn [^FileChangeEvent e result]
|
|
|
- (go
|
|
|
- (case (.-type e)
|
|
|
- "unlink"
|
|
|
- (let [r (<! (get-local-files-meta rsapi "" basepath [(relative-path e)]))]
|
|
|
- (when (some-> r ex-cause ;; str (string/index-of "No such file or directory")
|
|
|
- )
|
|
|
- (>! result e)))
|
|
|
-
|
|
|
- ("add" "change")
|
|
|
- (let [path (relative-path e)]
|
|
|
- (when (and (<! (local-file-exists? path basepath))
|
|
|
- (<! (file-changed? graph-uuid path basepath)))
|
|
|
- (>! result e))))
|
|
|
- (async/close! result)))
|
|
|
- from-ch false))
|
|
|
-
|
|
|
+ [^FileChangeEvent e basepath graph-uuid]
|
|
|
+ (go
|
|
|
+ (let [r-path (relative-path e)]
|
|
|
+ (case (.-type e)
|
|
|
+ "unlink"
|
|
|
+ (let [r (<! (get-local-files-meta rsapi "" basepath [r-path]))]
|
|
|
+ ;; keep this e when it's not found
|
|
|
+ (some-> r ex-cause))
|
|
|
+
|
|
|
+ ("add" "change")
|
|
|
+ ;; 1. local file exists
|
|
|
+ ;; 2. compare with remote file, and changed
|
|
|
+ (and (<! (local-file-exists? r-path basepath))
|
|
|
+ (<! (file-changed? graph-uuid r-path basepath)))))))
|
|
|
|
|
|
(defrecord ^:large-vars/cleanup-todo
|
|
|
Local->RemoteSyncer [user-uuid graph-uuid base-path repo *sync-state
|
|
|
@@ -997,7 +1071,7 @@
|
|
|
(let [c (.filtered-chan this 10000)
|
|
|
filter-e-fn (.filter-file-change-events-fn this)]
|
|
|
(go-loop [timeout-c (timeout rate)
|
|
|
- tcoll (transient [])]
|
|
|
+ coll []]
|
|
|
(let [{:keys [timeout ^FileChangeEvent e stop]}
|
|
|
(async/alt! timeout-c {:timeout true}
|
|
|
from-chan ([e] {:e e})
|
|
|
@@ -1007,22 +1081,22 @@
|
|
|
(async/close! c)
|
|
|
|
|
|
timeout
|
|
|
- (let [from-c (chan 10000)]
|
|
|
- (<! (async/onto-chan! from-c (distinct (persistent! tcoll)) false))
|
|
|
- (filter-local-changes c from-c base-path graph-uuid)
|
|
|
- (async/close! from-c)
|
|
|
- (recur (async/timeout rate) (transient [])))
|
|
|
+ (do (async/onto-chan! c coll false)
|
|
|
+ (swap! *sync-state sync-state--update-queued-local->remote-files nil)
|
|
|
+ (recur (async/timeout rate) []))
|
|
|
|
|
|
(some? e)
|
|
|
- (do
|
|
|
- (when (filter-e-fn e)
|
|
|
- (conj! tcoll e))
|
|
|
- (recur timeout-c tcoll))
|
|
|
+ (if (and (filter-e-fn e)
|
|
|
+ (<! (filter-local-changes-pred e base-path graph-uuid)))
|
|
|
+ (let [coll* (distinct (conj coll e))]
|
|
|
+ (swap! *sync-state sync-state--update-queued-local->remote-files
|
|
|
+ (mapv relative-path coll*))
|
|
|
+ (recur timeout-c coll*))
|
|
|
+ (recur timeout-c coll))
|
|
|
|
|
|
(nil? e)
|
|
|
- (do
|
|
|
- (println "close ratelimit chan")
|
|
|
- (async/close! c)))))
|
|
|
+ (do (println "close ratelimit chan")
|
|
|
+ (async/close! c)))))
|
|
|
c))
|
|
|
|
|
|
|
|
|
@@ -1102,206 +1176,186 @@
|
|
|
(or need-sync-remote unknown) r)))))))))
|
|
|
|
|
|
|
|
|
-;;; ### sync state
|
|
|
-
|
|
|
-
|
|
|
-(defn sync-state
|
|
|
- "create a new sync-state"
|
|
|
- []
|
|
|
- {:post [(s/valid? ::sync-state %)]}
|
|
|
- {:state ::idle
|
|
|
- :current-local->remote-files #{}
|
|
|
- :current-remote->local-files #{}
|
|
|
- :history '()})
|
|
|
-
|
|
|
-(defn- sync-state--update-state
|
|
|
- [sync-state next-state]
|
|
|
- {:pre [(s/valid? ::state next-state)]
|
|
|
- :post [(s/valid? ::sync-state %)]}
|
|
|
- (assoc sync-state :state next-state))
|
|
|
-
|
|
|
-(defn sync-state--add-current-remote->local-files
|
|
|
- [sync-state paths]
|
|
|
- {:post [(s/valid? ::sync-state %)]}
|
|
|
- (update sync-state :current-remote->local-files into paths))
|
|
|
-
|
|
|
-(defn sync-state--add-current-local->remote-files
|
|
|
- [sync-state paths]
|
|
|
- {:post [(s/valid? ::sync-state %)]}
|
|
|
- (update sync-state :current-local->remote-files into paths))
|
|
|
-
|
|
|
-(defn- add-history-items
|
|
|
- [history paths now]
|
|
|
- (sequence
|
|
|
- (comp
|
|
|
- ;; only reserve the latest one of same-path-items
|
|
|
- (dedupe-by :path)
|
|
|
- ;; reserve the latest 20 history items
|
|
|
- (take 20))
|
|
|
- (into history
|
|
|
- (map (fn [path] {:path path :time now}) paths))))
|
|
|
-
|
|
|
-(defn sync-state--remove-current-remote->local-files
|
|
|
- [sync-state paths]
|
|
|
- {:post [(s/valid? ::sync-state %)]}
|
|
|
- (let [now (t/now)]
|
|
|
- (-> sync-state
|
|
|
- (update :current-remote->local-files set/difference paths)
|
|
|
- (update :history add-history-items paths now))))
|
|
|
-
|
|
|
-(defn sync-state--remove-current-local->remote-files
|
|
|
- [sync-state paths]
|
|
|
- {:post [(s/valid? ::sync-state %)]}
|
|
|
- (let [now (t/now)]
|
|
|
- (-> sync-state
|
|
|
- (update :current-local->remote-files set/difference paths)
|
|
|
- (update :history add-history-items paths now))))
|
|
|
|
|
|
-(defn sync-state--stopped?
|
|
|
- [sync-state]
|
|
|
- (= ::stop (:state sync-state)))
|
|
|
|
|
|
|
|
|
;;; ### put all stuff together
|
|
|
|
|
|
+(defn- drain-chan
|
|
|
+ "drop all stuffs in CH"
|
|
|
+ [ch]
|
|
|
+ (->> (repeatedly #(poll! ch))
|
|
|
+ (take-while identity)))
|
|
|
+
|
|
|
(defrecord ^:large-vars/cleanup-todo
|
|
|
SyncManager [graph-uuid base-path *sync-state
|
|
|
^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer
|
|
|
full-sync-chan stop-sync-chan remote->local-sync-chan local->remote-sync-chan
|
|
|
local-changes-chan ^:mutable ratelimit-local-changes-chan
|
|
|
- *txid ^:mutable state ^:mutable _remote-change-chan ^:mutable _*ws ^:mutable stopped]
|
|
|
- Object
|
|
|
- (schedule [this next-state args]
|
|
|
- {:pre [(s/valid? ::state next-state)]}
|
|
|
- (println "[SyncManager" graph-uuid "]" (and state (name state)) "->" (and next-state (name next-state)))
|
|
|
- (set! state next-state)
|
|
|
- (swap! *sync-state sync-state--update-state next-state)
|
|
|
- (go
|
|
|
- (case state
|
|
|
- ::idle
|
|
|
- (<! (.idle this))
|
|
|
- ::local->remote
|
|
|
- (<! (.local->remote this args))
|
|
|
- ::remote->local
|
|
|
- (<! (.remote->local this nil args))
|
|
|
- ::local->remote-full-sync
|
|
|
- (<! (.full-sync this))
|
|
|
- ::remote->local=>local->remote
|
|
|
- (<! (.remote->local this ::local->remote args))
|
|
|
- ::remote->local=>local->remote-full-sync
|
|
|
- (<! (.remote->local this ::local->remote-full-sync args))
|
|
|
- ::remote->local-full-sync=>local->remote-full-sync
|
|
|
- (<! (.remote->local-full-sync this ::local->remote-full-sync))
|
|
|
- ::stop
|
|
|
- (-stop! this))))
|
|
|
-
|
|
|
- (start [this]
|
|
|
- (set! _*ws (atom nil))
|
|
|
- (set! _remote-change-chan (ws-listen! graph-uuid _*ws))
|
|
|
- (set! ratelimit-local-changes-chan (ratelimit local->remote-syncer local-changes-chan))
|
|
|
- (.schedule this ::idle nil))
|
|
|
-
|
|
|
- (idle [this]
|
|
|
- (go
|
|
|
- (let [{:keys [stop full-sync ;; trigger-remote trigger-local
|
|
|
- remote local trigger-full-sync]}
|
|
|
- (async/alt!
|
|
|
- stop-sync-chan {:stop true}
|
|
|
- full-sync-chan {:full-sync true}
|
|
|
- remote->local-sync-chan {:trigger-remote true}
|
|
|
- local->remote-sync-chan {:trigger-local true}
|
|
|
- _remote-change-chan ([v] (println "remote changes:" v) {:remote v})
|
|
|
- ratelimit-local-changes-chan ([v] (println "local changes:" v) {:local v})
|
|
|
- (timeout (* 20 60 1000)) {:trigger-full-sync true}
|
|
|
- :priority true)]
|
|
|
- (cond
|
|
|
- stop
|
|
|
- (<! (.schedule this ::stop nil))
|
|
|
- (or full-sync trigger-full-sync)
|
|
|
- (<! (.schedule this ::local->remote-full-sync nil))
|
|
|
- remote
|
|
|
- (<! (.schedule this ::remote->local {:remote remote}))
|
|
|
- local
|
|
|
- (<! (.schedule this ::local->remote {:local local}))
|
|
|
- :else
|
|
|
- (<! (.schedule this :idle nil))))))
|
|
|
+ *txid ^:mutable state ^:mutable _remote-change-chan ^:mutable _*ws ^:mutable stopped
|
|
|
+ ^:mutable ops-chan]
|
|
|
+ Object
|
|
|
+ (schedule [this next-state args]
|
|
|
+ {:pre [(s/valid? ::state next-state)]}
|
|
|
+ (println "[SyncManager" graph-uuid "]" (and state (name state)) "->" (and next-state (name next-state)))
|
|
|
+ (set! state next-state)
|
|
|
+ (swap! *sync-state sync-state--update-state next-state)
|
|
|
+ (go
|
|
|
+ (case state
|
|
|
+ ::idle
|
|
|
+ (<! (.idle this))
|
|
|
+ ::local->remote
|
|
|
+ (<! (.local->remote this args))
|
|
|
+ ::remote->local
|
|
|
+ (<! (.remote->local this nil args))
|
|
|
+ ::local->remote-full-sync
|
|
|
+ (<! (.full-sync this))
|
|
|
+ ::remote->local-full-sync
|
|
|
+ (<! (.remote->local-full-sync this nil))
|
|
|
+ ::stop
|
|
|
+ (-stop! this))))
|
|
|
+
|
|
|
+ (start [this]
|
|
|
+ (set! ops-chan (chan (async/dropping-buffer 10)))
|
|
|
+ (set! _*ws (atom nil))
|
|
|
+ (set! _remote-change-chan (ws-listen! graph-uuid _*ws))
|
|
|
+ (set! ratelimit-local-changes-chan (ratelimit local->remote-syncer local-changes-chan))
|
|
|
+ (go-loop []
|
|
|
+ (let [{:keys [stop remote->local local->remote-full-sync local->remote]}
|
|
|
+ (async/alt!
|
|
|
+ stop-sync-chan {:stop true}
|
|
|
+ remote->local-sync-chan {:remote->local true}
|
|
|
+ full-sync-chan {:local->remote-full-sync true}
|
|
|
+ _remote-change-chan ([v] (println "remote change:" v) {:remote->local v})
|
|
|
+ ratelimit-local-changes-chan ([v] (println "local changes:" v) {:local->remote v})
|
|
|
+ (timeout (* 20 60 1000)) {:local->remote-full-sync true}
|
|
|
+ :priority true)]
|
|
|
+ (cond
|
|
|
+ stop
|
|
|
+ (do
|
|
|
+ (drain-chan ops-chan)
|
|
|
+ (>! ops-chan {:stop true}))
|
|
|
+ remote->local
|
|
|
+ (let [txid
|
|
|
+ (if (true? remote->local)
|
|
|
+ {:txid (:TXId (<! (get-remote-graph remoteapi nil graph-uuid)))}
|
|
|
+ remote->local)]
|
|
|
+ (when (some? txid)
|
|
|
+ (>! ops-chan {:remote->local txid}))
|
|
|
+ (recur))
|
|
|
+ local->remote
|
|
|
+ (do (>! ops-chan {:local->remote local->remote})
|
|
|
+ (recur))
|
|
|
+ local->remote-full-sync
|
|
|
+ (do (drain-chan ops-chan)
|
|
|
+ (>! ops-chan {:local->remote-full-sync true})
|
|
|
+ (recur)))))
|
|
|
+ (.schedule this ::idle nil))
|
|
|
+
|
|
|
+ (idle [this]
|
|
|
+ (go
|
|
|
+ (let [{:keys [stop remote->local local->remote local->remote-full-sync remote->local-full-sync]}
|
|
|
+ (<! ops-chan)]
|
|
|
+ (cond
|
|
|
+ stop
|
|
|
+ (<! (.schedule this ::stop nil))
|
|
|
+ remote->local
|
|
|
+ (<! (.schedule this ::remote->local {:remote remote->local}))
|
|
|
+ local->remote
|
|
|
+ (<! (.schedule this ::local->remote {:local local->remote}))
|
|
|
+ local->remote-full-sync
|
|
|
+ (<! (.schedule this ::local->remote-full-sync nil))
|
|
|
+ remote->local-full-sync
|
|
|
+ (<! (.schedule this ::remote->local-full-sync nil))
|
|
|
+ :else
|
|
|
+ (<! (.schedule this ::stop nil))))))
|
|
|
|
|
|
- (full-sync [this]
|
|
|
- (go
|
|
|
- (let [{:keys [succ need-sync-remote unknown stop] :as r}
|
|
|
- (<! (sync-local->remote-all-files! local->remote-syncer))]
|
|
|
- (s/assert ::sync-local->remote-all-files!-result r)
|
|
|
- (cond
|
|
|
- succ
|
|
|
- (.schedule this ::idle nil)
|
|
|
- need-sync-remote
|
|
|
- (.schedule this ::remote->local=>local->remote-full-sync nil)
|
|
|
- stop
|
|
|
- (.schedule this ::stop nil)
|
|
|
- unknown
|
|
|
- (do
|
|
|
- (debug/pprint "full-sync" unknown)
|
|
|
- (.schedule this ::idle nil))))))
|
|
|
-
|
|
|
- (remote->local-full-sync [this next-state]
|
|
|
- (go
|
|
|
- (let [{:keys [succ unknown stop]}
|
|
|
- (<! (sync-remote->local-all-files! remote->local-syncer))]
|
|
|
- (cond
|
|
|
- succ
|
|
|
- (.schedule this next-state nil)
|
|
|
- stop
|
|
|
- (.schedule this ::stop nil)
|
|
|
- unknown
|
|
|
- (do
|
|
|
- (debug/pprint "remote->local-full-sync" unknown)
|
|
|
- (.schedule this ::idle nil))))))
|
|
|
-
|
|
|
- (remote->local [this next-state {remote-val :remote :as args}]
|
|
|
- (go
|
|
|
- (if (some-> remote-val :txid (<= @*txid))
|
|
|
- (.schedule this ::idle nil)
|
|
|
- (let [{:keys [succ unknown stop need-remote->local-full-sync] :as r}
|
|
|
- (<! (sync-remote->local! remote->local-syncer))]
|
|
|
- (s/assert ::sync-remote->local!-result r)
|
|
|
+ (full-sync [this]
|
|
|
+ (go
|
|
|
+ (let [{:keys [succ need-sync-remote unknown stop] :as r}
|
|
|
+ (<! (sync-local->remote-all-files! local->remote-syncer))]
|
|
|
+ (s/assert ::sync-local->remote-all-files!-result r)
|
|
|
(cond
|
|
|
- need-remote->local-full-sync
|
|
|
- (.schedule this ::remote->local-full-sync=>local->remote-full-sync nil)
|
|
|
succ
|
|
|
- (.schedule this (or next-state ::idle) args)
|
|
|
+ (.schedule this ::idle nil)
|
|
|
+ need-sync-remote
|
|
|
+ (do (drain-chan ops-chan)
|
|
|
+ (>! ops-chan {:remote->local true})
|
|
|
+ (>! ops-chan {:local->remote-full-sync true})
|
|
|
+ (.schedule this ::idle nil))
|
|
|
stop
|
|
|
(.schedule this ::stop nil)
|
|
|
unknown
|
|
|
- (do (prn "remote->local err" unknown)
|
|
|
- (.schedule this ::idle nil)))))))
|
|
|
+ (do
|
|
|
+ (debug/pprint "full-sync" unknown)
|
|
|
+ (.schedule this ::idle nil))))))
|
|
|
|
|
|
- (local->remote [this {^FileChangeEvents local-change :local}]
|
|
|
- (assert (some? local-change) local-change)
|
|
|
- (go
|
|
|
- (let [{:keys [succ need-sync-remote unknown] :as r}
|
|
|
- (<! (sync-local->remote! local->remote-syncer [local-change]))]
|
|
|
- (s/assert ::sync-local->remote!-result r)
|
|
|
- (cond
|
|
|
- succ
|
|
|
+ (remote->local-full-sync [this _next-state]
|
|
|
+ (go
|
|
|
+ (let [{:keys [succ unknown stop]}
|
|
|
+ (<! (sync-remote->local-all-files! remote->local-syncer))]
|
|
|
+ (cond
|
|
|
+ succ
|
|
|
+ (.schedule this ::idle nil)
|
|
|
+ stop
|
|
|
+ (.schedule this ::stop nil)
|
|
|
+ unknown
|
|
|
+ (do
|
|
|
+ (debug/pprint "remote->local-full-sync" unknown)
|
|
|
+ (.schedule this ::idle nil))))))
|
|
|
+
|
|
|
+ (remote->local [this _next-state {remote-val :remote}]
|
|
|
+ (go
|
|
|
+ (if (some-> remote-val :txid (<= @*txid))
|
|
|
(.schedule this ::idle nil)
|
|
|
+ (let [{:keys [succ unknown stop need-remote->local-full-sync] :as r}
|
|
|
+ (<! (sync-remote->local! remote->local-syncer))]
|
|
|
+ (s/assert ::sync-remote->local!-result r)
|
|
|
+ (cond
|
|
|
+ need-remote->local-full-sync
|
|
|
+ (do (drain-chan ops-chan)
|
|
|
+ (>! ops-chan {:remote->local-full-sync true})
|
|
|
+ (>! ops-chan {:local->remote-full-sync true})
|
|
|
+ (.schedule this ::idle nil))
|
|
|
+ succ
|
|
|
+ (.schedule this ::idle nil)
|
|
|
+ stop
|
|
|
+ (.schedule this ::stop nil)
|
|
|
+ unknown
|
|
|
+ (do (prn "remote->local err" unknown)
|
|
|
+ (.schedule this ::idle nil)))))))
|
|
|
+
|
|
|
+ (local->remote [this {^FileChangeEvents local-change :local}]
|
|
|
+ (assert (some? local-change) local-change)
|
|
|
+ (go
|
|
|
+ (let [{:keys [succ need-sync-remote unknown] :as r}
|
|
|
+ (<! (sync-local->remote! local->remote-syncer [local-change]))]
|
|
|
+ (s/assert ::sync-local->remote!-result r)
|
|
|
+ (cond
|
|
|
+ succ
|
|
|
+ (.schedule this ::idle nil)
|
|
|
|
|
|
- need-sync-remote
|
|
|
- (.schedule this ::remote->local=>local->remote nil)
|
|
|
-
|
|
|
- unknown
|
|
|
- (do
|
|
|
- (debug/pprint "local->remote" unknown)
|
|
|
- (.schedule this ::idle nil))))))
|
|
|
- IStoppable
|
|
|
- (-stop! [_]
|
|
|
- (when-not stopped
|
|
|
- (set! stopped true)
|
|
|
- (ws-stop! _*ws)
|
|
|
- (offer! stop-sync-chan true)
|
|
|
- (stop-local->remote! local->remote-syncer)
|
|
|
- (stop-remote->local! remote->local-syncer)
|
|
|
- (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
|
|
|
- (swap! *sync-state sync-state--update-state ::stop))))
|
|
|
+ need-sync-remote
|
|
|
+ (do (drain-chan ops-chan)
|
|
|
+ (>! ops-chan {:remote->local true})
|
|
|
+ (>! ops-chan {:local->remote {:local local-change}})
|
|
|
+ (.schedule this ::idle nil))
|
|
|
+
|
|
|
+ unknown
|
|
|
+ (do
|
|
|
+ (debug/pprint "local->remote" unknown)
|
|
|
+ (.schedule this ::idle nil))))))
|
|
|
+ IStoppable
|
|
|
+ (-stop! [_]
|
|
|
+ (when-not stopped
|
|
|
+ (set! stopped true)
|
|
|
+ (ws-stop! _*ws)
|
|
|
+ (offer! stop-sync-chan true)
|
|
|
+ (async/close! ops-chan)
|
|
|
+ (stop-local->remote! local->remote-syncer)
|
|
|
+ (stop-remote->local! remote->local-syncer)
|
|
|
+ (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
|
|
|
+ (swap! *sync-state sync-state--update-state ::stop))))
|
|
|
|
|
|
(defn sync-manager [user-uuid graph-uuid base-path repo txid *sync-state full-sync-chan stop-sync-chan
|
|
|
remote->local-sync-chan local->remote-sync-chan local-changes-chan]
|
|
|
@@ -1318,11 +1372,11 @@
|
|
|
(.set-local->remote-syncer! remote->local-syncer local->remote-syncer)
|
|
|
(->SyncManager graph-uuid base-path *sync-state local->remote-syncer remote->local-syncer
|
|
|
full-sync-chan stop-sync-chan
|
|
|
- remote->local-sync-chan local->remote-sync-chan local-changes-chan nil *txid nil nil nil false)))
|
|
|
+ remote->local-sync-chan local->remote-sync-chan local-changes-chan nil *txid nil nil nil false nil)))
|
|
|
|
|
|
(def full-sync-chan (chan 1))
|
|
|
(def stop-sync-chan (chan 1))
|
|
|
-(def remote->local-sync-chan (chan))
|
|
|
+(def remote->local-sync-chan (chan 1))
|
|
|
(def local->remote-sync-chan (chan))
|
|
|
|
|
|
(defn sync-stop []
|
|
|
@@ -1371,10 +1425,9 @@
|
|
|
;; set-env
|
|
|
(set-env rsapi config/FILE-SYNC-PROD?)
|
|
|
|
|
|
- ;; drain `local-changes-chan`
|
|
|
- (->> (repeatedly #(poll! local-changes-chan))
|
|
|
- (take-while identity))
|
|
|
+ (drain-chan local-changes-chan)
|
|
|
(poll! stop-sync-chan)
|
|
|
+ (poll! remote->local-sync-chan)
|
|
|
;; update global state when *sync-state changes
|
|
|
(add-watch *sync-state ::update-global-state
|
|
|
(fn [_ _ _ n]
|
|
|
@@ -1382,7 +1435,7 @@
|
|
|
(.start sm)
|
|
|
|
|
|
(state/set-file-sync-manager sm)
|
|
|
-
|
|
|
+ (offer! remote->local-sync-chan true)
|
|
|
(offer! full-sync-chan true)
|
|
|
|
|
|
;; watch :network/online?
|