|
|
@@ -93,7 +93,11 @@
|
|
|
(defprotocol IRelativePath
|
|
|
(-relative-path [this]))
|
|
|
|
|
|
-;from-path, to-path is relative path
|
|
|
+(defprotocol IStoppable
|
|
|
+ (-stop! [this]))
|
|
|
+(defprotocol IStopped?
|
|
|
+ (-stopped? [this]))
|
|
|
+ ;from-path, to-path is relative path
|
|
|
(deftype FileTxn [from-path to-path updated deleted seq-id]
|
|
|
Object
|
|
|
(rename [_ to]
|
|
|
@@ -459,6 +463,7 @@
|
|
|
(.deleted? filetxn)
|
|
|
(delete-local-files rsapi graph-uuid base-path [(.-to-path filetxn)])))
|
|
|
|
|
|
+;;; TODO: support stop from processing
|
|
|
(defn- apply-filetxns [^SyncState sync-state graph-uuid base-path filetxns]
|
|
|
(go-loop [filetxns* filetxns]
|
|
|
(when (seq filetxns*)
|
|
|
@@ -471,21 +476,6 @@
|
|
|
r
|
|
|
(recur (next filetxns*)))))))
|
|
|
|
|
|
-(defn sync-remote-all-files!
|
|
|
- "pull all files' metadata and sync."
|
|
|
- [graph-uuid]
|
|
|
- (go
|
|
|
- (exception->
|
|
|
- (<! (get-remote-all-files-meta remoteapi graph-uuid))
|
|
|
- (as-> v (prn "get-remote-all-files-meta:") v))))
|
|
|
-
|
|
|
-(comment
|
|
|
- (reset! graphs-txid ["78c7362a-e085-4b8e-9a7b-27e1930fb94b" 0])
|
|
|
- (sync-remote->local!)
|
|
|
- graphs-txid
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
(defmulti need-sync-remote? (fn [v] (cond
|
|
|
(= :max v)
|
|
|
:max
|
|
|
@@ -538,7 +528,12 @@
|
|
|
|
|
|
(defn file-watch-handler
|
|
|
[type {:keys [dir path _content stat] :as payload}]
|
|
|
- (go (>! local-changes-chan (->FileChangeEvent type dir path stat))))
|
|
|
+ (go
|
|
|
+ (when (some-> (state/get-file-sync-state-manager)
|
|
|
+ -stopped?
|
|
|
+ not)
|
|
|
+ (prn "file-watch-handler" path)
|
|
|
+ (>! local-changes-chan (->FileChangeEvent type dir path stat)))))
|
|
|
|
|
|
|
|
|
(defprotocol IRemote->LocalSync
|
|
|
@@ -548,6 +543,7 @@
|
|
|
(defprotocol ILocal->RemoteSync
|
|
|
(get-ignore-files [this] "ignored-files won't be synced to remote")
|
|
|
;; TODO: get-monitored-dirs
|
|
|
+ (stop-local->remote! [this])
|
|
|
(ratelimit [this from-chan] "get watched local file-change events from FROM-CHAN,
|
|
|
return chan returning events with rate limited")
|
|
|
(sync-local->remote! [this ^FileChangeEvent e])
|
|
|
@@ -596,7 +592,7 @@
|
|
|
(reduce #(when (re-find %2 path) (reduced true)) false regexps))
|
|
|
|
|
|
(deftype Local->RemoteSyncer [graph-uuid base-path repo ^SyncState sync-state
|
|
|
- ^:mutable rate *txid ^:mutable remote->local-syncer]
|
|
|
+ ^:mutable rate *txid ^:mutable remote->local-syncer stop-chan]
|
|
|
Object
|
|
|
(filtered-chan [_ n]
|
|
|
"check base-path"
|
|
|
@@ -607,14 +603,20 @@
|
|
|
ILocal->RemoteSync
|
|
|
(get-ignore-files [_] #{#"logseq/graphs-txid.edn$" #"logseq/bak/.*"})
|
|
|
|
|
|
+ (stop-local->remote! [_] (async/close! stop-chan))
|
|
|
+
|
|
|
(ratelimit [this from-chan]
|
|
|
(let [c (.filtered-chan this 10000)]
|
|
|
(go-loop [timeout-c (timeout rate)
|
|
|
tcoll (transient [])]
|
|
|
- (let [{:keys [timeout ^FileChangeEvent e]}
|
|
|
+ (let [{:keys [timeout ^FileChangeEvent e stop]}
|
|
|
(async/alt! timeout-c {:timeout true}
|
|
|
- from-chan ([e] {:e e}))]
|
|
|
+ from-chan ([e] {:e e})
|
|
|
+ stop-chan {:stop true})]
|
|
|
(cond
|
|
|
+ stop
|
|
|
+ (async/close! c)
|
|
|
+
|
|
|
timeout
|
|
|
(do
|
|
|
(<! (async/onto-chan! c (distinct (persistent! tcoll)) false))
|
|
|
@@ -673,6 +675,7 @@
|
|
|
(println "sync-local->remote unknown:" r*)
|
|
|
{:unknown r*}))))))))))
|
|
|
|
|
|
+ ;; TODO: support stopping in the middle of processing
|
|
|
(sync-local->remote-all-files! [this]
|
|
|
(go
|
|
|
(let [remote-all-files-meta-c (get-remote-all-files-meta remoteapi graph-uuid)
|
|
|
@@ -722,7 +725,8 @@
|
|
|
(set! current-remote->local-files #{})
|
|
|
(state/set-file-sync-downloading-files current-remote->local-files))
|
|
|
|
|
|
- (stopped? [_] (or (nil? state) (= ::stop state)))
|
|
|
+ IStopped?
|
|
|
+ (-stopped? [_] (or (nil? state) (= ::stop state)))
|
|
|
|
|
|
IPrintWithWriter
|
|
|
(-pr-writer [coll w opts]
|
|
|
@@ -735,10 +739,10 @@
|
|
|
(deftype SyncManager [graph-uuid base-path ^SyncState sync-state local->remote-syncer remote->local-syncer
|
|
|
full-sync-chan stop-sync-chan remote->local-sync-chan local->remote-sync-chan
|
|
|
local-changes-chan ^:mutable ratelimit-local-changes-chan
|
|
|
- *txid ^:mutable state ^:mutable _remote-change-chan ^:mutable _*ws ^:mutable _id]
|
|
|
+ *txid ^:mutable state ^:mutable _remote-change-chan ^:mutable _*ws]
|
|
|
Object
|
|
|
(schedule [this next-state & args]
|
|
|
- (println "[SyncManager" (str _id) "]" state "->" next-state)
|
|
|
+ (println "[SyncManager" graph-uuid "]" state "->" next-state)
|
|
|
(set! state next-state)
|
|
|
(.update-state! sync-state next-state)
|
|
|
(go
|
|
|
@@ -754,20 +758,15 @@
|
|
|
::remote->local=>full-sync
|
|
|
(<! (.remote->local this ::full-sync args))
|
|
|
::stop
|
|
|
- (.stop this))))
|
|
|
+ (-stop! this))))
|
|
|
|
|
|
(start [this]
|
|
|
- (set! _id (medley.core/random-uuid))
|
|
|
(set! _*ws (atom nil))
|
|
|
(set! _remote-change-chan (ws-listen! graph-uuid _*ws))
|
|
|
(set! ratelimit-local-changes-chan (ratelimit local->remote-syncer local-changes-chan))
|
|
|
(.schedule this ::idle))
|
|
|
|
|
|
- (stop [this]
|
|
|
- (ws-stop! _*ws)
|
|
|
- (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
|
|
|
- (.update-state! sync-state ::stop)
|
|
|
- )
|
|
|
+
|
|
|
|
|
|
(idle [this]
|
|
|
(go
|
|
|
@@ -834,7 +833,15 @@
|
|
|
unknown
|
|
|
(do
|
|
|
(debug/pprint "local->remote" unknown)
|
|
|
- (.schedule this ::idle)))))))
|
|
|
+ (.schedule this ::idle))))))
|
|
|
+ IStoppable
|
|
|
+ (-stop! [this]
|
|
|
+ (ws-stop! _*ws)
|
|
|
+ (stop-local->remote! local->remote-syncer)
|
|
|
+ (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
|
|
|
+ (.update-state! sync-state ::stop)
|
|
|
+ )
|
|
|
+ )
|
|
|
|
|
|
|
|
|
(defn sync-manager [graph-uuid base-path repo txid sync-state full-sync-chan stop-sync-chan
|
|
|
@@ -844,7 +851,7 @@
|
|
|
base-path
|
|
|
repo sync-state
|
|
|
20000
|
|
|
- *txid nil)
|
|
|
+ *txid nil (chan))
|
|
|
remote->local-syncer (->Remote->LocalSyncer graph-uuid
|
|
|
base-path
|
|
|
repo *txid sync-state nil)]
|
|
|
@@ -852,7 +859,7 @@
|
|
|
(.set-local->remote-syncer! remote->local-syncer local->remote-syncer)
|
|
|
(->SyncManager graph-uuid base-path sync-state local->remote-syncer remote->local-syncer
|
|
|
full-sync-chan stop-sync-chan
|
|
|
- remote->local-sync-chan local->remote-sync-chan local-changes-chan nil *txid nil nil nil nil)))
|
|
|
+ remote->local-sync-chan local->remote-sync-chan local-changes-chan nil *txid nil nil nil)))
|
|
|
|
|
|
(comment
|
|
|
(create-graph remoteapi "test3")
|
|
|
@@ -894,7 +901,7 @@
|
|
|
(defn sync-stop []
|
|
|
(when-let [sm (state/get-file-sync-manager)]
|
|
|
(println "stopping sync-manager")
|
|
|
- (.stop sm)))
|
|
|
+ (-stop! sm)))
|
|
|
|
|
|
(defn sync-start []
|
|
|
(let [graph-uuid (first @graphs-txid)
|