|
|
@@ -1782,7 +1782,9 @@
|
|
|
;; add 2 simulated file-watcher events
|
|
|
(>! ch (->FileChangeEvent "unlink" repo-dir (:old-path rename-event*) nil nil))
|
|
|
(>! ch (->FileChangeEvent "add" repo-dir (:new-path rename-event*)
|
|
|
- {:mtime (tc/to-long (t/now))} "fake-checksum"))
|
|
|
+ {:mtime (tc/to-long (t/now))
|
|
|
+ :size 1 ; add a fake size
|
|
|
+ } "fake-checksum"))
|
|
|
(recur))
|
|
|
local-change
|
|
|
(cond
|
|
|
@@ -2023,11 +2025,6 @@
|
|
|
(chan 1))
|
|
|
(def full-sync-mult (async/mult full-sync-chan))
|
|
|
|
|
|
-(def stop-sync-chan
|
|
|
- "offer `true` to this chan will stop current `SyncManager`"
|
|
|
- (chan 1))
|
|
|
-(def stop-sync-mult (async/mult stop-sync-chan))
|
|
|
-
|
|
|
(def remote->local-sync-chan
|
|
|
"offer `true` to this chan will trigger a remote->local sync"
|
|
|
(chan 1))
|
|
|
@@ -2659,7 +2656,7 @@
|
|
|
*txid ^:mutable state ^:mutable remote-change-chan ^:mutable *ws *stopped? *paused?
|
|
|
^:mutable ops-chan
|
|
|
;; control chans
|
|
|
- private-full-sync-chan private-stop-sync-chan private-remote->local-sync-chan
|
|
|
+ private-full-sync-chan private-remote->local-sync-chan
|
|
|
private-remote->local-full-sync-chan private-pause-resume-chan]
|
|
|
Object
|
|
|
(schedule [this next-state args reason]
|
|
|
@@ -2694,30 +2691,30 @@
|
|
|
(set! ratelimit-local-changes-chan (<ratelimit local->remote-syncer local-changes-revised-chan))
|
|
|
(setup-local->remote! local->remote-syncer)
|
|
|
(async/tap full-sync-mult private-full-sync-chan)
|
|
|
- (async/tap stop-sync-mult private-stop-sync-chan)
|
|
|
(async/tap remote->local-sync-mult private-remote->local-sync-chan)
|
|
|
(async/tap remote->local-full-sync-mult private-remote->local-full-sync-chan)
|
|
|
(async/tap pause-resume-mult private-pause-resume-chan)
|
|
|
(go-loop []
|
|
|
- (let [{:keys [stop remote->local remote->local-full-sync local->remote-full-sync local->remote resume pause]}
|
|
|
+ (let [{:keys [remote->local remote->local-full-sync local->remote-full-sync local->remote resume pause stop]}
|
|
|
(async/alt!
|
|
|
- private-stop-sync-chan {:stop true}
|
|
|
private-remote->local-full-sync-chan {:remote->local-full-sync true}
|
|
|
private-remote->local-sync-chan {:remote->local true}
|
|
|
private-full-sync-chan {:local->remote-full-sync true}
|
|
|
private-pause-resume-chan ([v] (if v {:resume true} {:pause true}))
|
|
|
remote-change-chan ([v] (println "remote change:" v) {:remote->local v})
|
|
|
ratelimit-local-changes-chan ([v]
|
|
|
- (let [rest-v (util/drain-chan ratelimit-local-changes-chan)
|
|
|
- vs (cons v rest-v)]
|
|
|
- (println "local changes:" vs)
|
|
|
- {:local->remote vs}))
|
|
|
+ (if (nil? v)
|
|
|
+ {:stop true}
|
|
|
+ (let [rest-v (util/drain-chan ratelimit-local-changes-chan)
|
|
|
+ vs (cons v rest-v)]
|
|
|
+ (println "local changes:" vs)
|
|
|
+ {:local->remote vs})))
|
|
|
(timeout (* 20 60 1000)) {:local->remote-full-sync true}
|
|
|
:priority true)]
|
|
|
(cond
|
|
|
stop
|
|
|
- (do (util/drain-chan ops-chan)
|
|
|
- (>! ops-chan {:stop true}))
|
|
|
+ nil
|
|
|
+
|
|
|
remote->local-full-sync
|
|
|
(do (util/drain-chan ops-chan)
|
|
|
(>! ops-chan {:remote->local-full-sync true})
|
|
|
@@ -2772,8 +2769,9 @@
|
|
|
:data {:graph-uuid graph-uuid
|
|
|
:epoch (tc/to-epoch (t/now))}})
|
|
|
(go-loop []
|
|
|
- (let [{:keys [resume]} (<! ops-chan)]
|
|
|
- (if resume
|
|
|
+ (let [{:keys [resume] :as result} (<! ops-chan)]
|
|
|
+ (cond
|
|
|
+ resume
|
|
|
(let [{:keys [remote->local remote->local-full-sync local->remote local->remote-full-sync] :as resume-state}
|
|
|
(get @*resume-state graph-uuid)]
|
|
|
(resume-state--reset graph-uuid)
|
|
|
@@ -2795,6 +2793,11 @@
|
|
|
:resume-state resume-state
|
|
|
:epoch (tc/to-epoch (t/now))}})
|
|
|
(<! (.schedule this ::idle nil :resume)))
|
|
|
+
|
|
|
+ (nil? result)
|
|
|
+ (<! (.schedule this ::stop nil nil))
|
|
|
+
|
|
|
+ :else
|
|
|
(recur)))))
|
|
|
|
|
|
(idle [this]
|
|
|
@@ -2802,7 +2805,7 @@
|
|
|
(let [{:keys [stop remote->local local->remote local->remote-full-sync remote->local-full-sync pause] :as result}
|
|
|
(<! ops-chan)]
|
|
|
(cond
|
|
|
- stop
|
|
|
+ (or stop (nil? result))
|
|
|
(<! (.schedule this ::stop nil nil))
|
|
|
remote->local
|
|
|
(<! (.schedule this ::remote->local {:remote remote->local} {:remote-changed remote->local}))
|
|
|
@@ -2816,10 +2819,11 @@
|
|
|
(<! (.schedule this ::pause nil nil))
|
|
|
:else
|
|
|
(do
|
|
|
- (state/pub-event! [:instrument {:type :sync/wrong-ops-chan-when-idle
|
|
|
- :payload {:ops-chan-result result
|
|
|
- :user-id user-uuid
|
|
|
- :graph-id graph-uuid}}])
|
|
|
+ (state/pub-event! [:capture-error {:error (js/Error. "sync/wrong-ops-chan-when-idle")
|
|
|
+ :payload {:type :sync/wrong-ops-chan-when-idle
|
|
|
+ :ops-chan-result result
|
|
|
+ :user-id user-uuid
|
|
|
+ :graph-id graph-uuid}}])
|
|
|
nil)))))
|
|
|
|
|
|
(full-sync [this]
|
|
|
@@ -2849,11 +2853,11 @@
|
|
|
(.schedule this ::stop nil nil)
|
|
|
unknown
|
|
|
(do
|
|
|
- (state/pub-event! [:instrument {:type :sync/unknown
|
|
|
- :payload {:error unknown
|
|
|
- :event :local->remote-full-sync-failed
|
|
|
- :user-id user-uuid
|
|
|
- :graph-uuid graph-uuid}}])
|
|
|
+ (state/pub-event! [:capture-error {:error unknown
|
|
|
+ :payload {:type :sync/unknown
|
|
|
+ :event :local->remote-full-sync-failed
|
|
|
+ :user-id user-uuid
|
|
|
+ :graph-uuid graph-uuid}}])
|
|
|
(put-sync-event! {:event :local->remote-full-sync-failed
|
|
|
:data {:graph-uuid graph-uuid
|
|
|
:epoch (tc/to-epoch (t/now))}})
|
|
|
@@ -2877,18 +2881,18 @@
|
|
|
(.schedule this ::pause nil nil))
|
|
|
unknown
|
|
|
(do
|
|
|
- (state/pub-event! [:instrument {:type :sync/unknown
|
|
|
- :payload {:event :remote->local-full-sync-failed
|
|
|
- :graph-uuid graph-uuid
|
|
|
- :user-id user-uuid
|
|
|
- :error unknown}}])
|
|
|
+ (state/pub-event! [:capture-error {:error unknown
|
|
|
+ :payload {:event :remote->local-full-sync-failed
|
|
|
+ :type :sync/unknown
|
|
|
+ :graph-uuid graph-uuid
|
|
|
+ :user-id user-uuid}}])
|
|
|
(put-sync-event! {:event :remote->local-full-sync-failed
|
|
|
:data {:graph-uuid graph-uuid
|
|
|
:exp unknown
|
|
|
:epoch (tc/to-epoch (t/now))}})
|
|
|
(let [next-state (if (string/includes? (str (ex-cause unknown)) "404 Not Found")
|
|
|
;; TODO: this should never happen
|
|
|
- ::pause
|
|
|
+ ::stop
|
|
|
;; if any other exception occurred, re-exec remote->local-full-sync
|
|
|
::remote->local-full-sync)]
|
|
|
(.schedule this next-state nil nil)))))))
|
|
|
@@ -2922,11 +2926,11 @@
|
|
|
(.schedule this ::pause nil nil))
|
|
|
unknown
|
|
|
(do (prn "remote->local err" unknown)
|
|
|
- (state/pub-event! [:instrument {:type :sync/unknown
|
|
|
- :payload {:event :remote->local
|
|
|
- :user-id user-uuid
|
|
|
- :graph-uuid graph-uuid
|
|
|
- :error unknown}}])
|
|
|
+ (state/pub-event! [:capture-error {:error unknown
|
|
|
+ :payload {:type :sync/unknown
|
|
|
+ :event :remote->local
|
|
|
+ :user-id user-uuid
|
|
|
+ :graph-uuid graph-uuid}}])
|
|
|
(.schedule this ::idle nil nil)))))))
|
|
|
|
|
|
(local->remote [this {local-changes :local}]
|
|
|
@@ -2986,11 +2990,11 @@
|
|
|
unknown
|
|
|
(do
|
|
|
(debug/pprint "local->remote" unknown)
|
|
|
- (state/pub-event! [:instrument {:type :sync/unknown
|
|
|
- :payload {:event :local->remote
|
|
|
- :user-id user-uuid
|
|
|
- :graph-uuid graph-uuid
|
|
|
- :error unknown}}])
|
|
|
+ (state/pub-event! [:capture-error {:error unknown
|
|
|
+ :payload {:event :local->remote
|
|
|
+ :type :sync/unknown
|
|
|
+ :user-id user-uuid
|
|
|
+ :graph-uuid graph-uuid}}])
|
|
|
(.schedule this ::idle nil nil))))))
|
|
|
IStoppable
|
|
|
(-stop! [_]
|
|
|
@@ -2998,9 +3002,7 @@
|
|
|
(when-not @*stopped?
|
|
|
(vreset! *stopped? true)
|
|
|
(ws-stop! *ws)
|
|
|
- (offer! private-stop-sync-chan true)
|
|
|
(async/untap full-sync-mult private-full-sync-chan)
|
|
|
- (async/untap stop-sync-mult private-stop-sync-chan)
|
|
|
(async/untap remote->local-sync-mult private-remote->local-sync-chan)
|
|
|
(async/untap remote->local-full-sync-mult private-remote->local-full-sync-chan)
|
|
|
(async/untap pause-resume-mult private-pause-resume-chan)
|
|
|
@@ -3008,14 +3010,9 @@
|
|
|
(stop-local->remote! local->remote-syncer)
|
|
|
(stop-remote->local! remote->local-syncer)
|
|
|
(<! (<rsapi-cancel-all-requests))
|
|
|
- (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
|
|
|
(swap! *sync-state sync-state--update-state ::stop)
|
|
|
- (loop []
|
|
|
- (if (not= ::stop state)
|
|
|
- (do
|
|
|
- (<! (timeout 100))
|
|
|
- (recur))
|
|
|
- (reset! current-sm-graph-uuid nil))))))
|
|
|
+ (reset! current-sm-graph-uuid nil)
|
|
|
+ (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path]))))
|
|
|
|
|
|
IStopped?
|
|
|
(-stopped? [_]
|
|
|
@@ -3041,7 +3038,7 @@
|
|
|
(.set-local->remote-syncer! remote->local-syncer local->remote-syncer)
|
|
|
(swap! *sync-state sync-state--update-current-syncing-graph-uuid graph-uuid)
|
|
|
(->SyncManager user-uuid graph-uuid base-path *sync-state local->remote-syncer remote->local-syncer remoteapi-with-stop
|
|
|
- nil *txid nil nil nil *stopped? *paused? nil (chan 1) (chan 1) (chan 1) (chan 1) (chan 1))))
|
|
|
+ nil *txid nil nil nil *stopped? *paused? nil (chan 1) (chan 1) (chan 1) (chan 1))))
|
|
|
|
|
|
(defn sync-manager-singleton
|
|
|
[user-uuid graph-uuid base-path repo txid *sync-state]
|