|
|
@@ -1599,80 +1599,83 @@
|
|
|
(defn- <fetch-remote-and-update-local-files
|
|
|
[graph-uuid base-path relative-paths]
|
|
|
(go
|
|
|
- (let [fetched-file-rpaths (<! (<fetch-remote-files rsapi graph-uuid base-path relative-paths))]
|
|
|
- (p->c (p/all (->> fetched-file-rpaths
|
|
|
- (map (fn [rpath]
|
|
|
- (p/let [incoming-file (path/path-join "logseq/version-files/incoming" rpath)
|
|
|
- base-file (path/path-join "logseq/version-files/base" rpath)
|
|
|
- current-change-file rpath
|
|
|
- format (gp-util/get-format current-change-file)
|
|
|
- repo (state/get-current-repo)
|
|
|
- repo-dir (config/get-repo-dir repo)
|
|
|
- base-exists? (fs/file-exists? repo-dir base-file)
|
|
|
- _ (prn ::base-ex base-exists?)]
|
|
|
- (cond
|
|
|
- base-exists?
|
|
|
- (p/let [base-content (fs/read-file repo-dir base-file)
|
|
|
- current-content (-> (fs/read-file repo-dir current-change-file)
|
|
|
- (p/catch (fn [_] nil)))
|
|
|
- incoming-content (fs/read-file repo-dir incoming-file)]
|
|
|
- (if (= base-content current-content)
|
|
|
- (do
|
|
|
- (prn "base=current, write directly")
|
|
|
- (p/do!
|
|
|
- (fs/copy! repo
|
|
|
- (path/path-join repo-dir incoming-file)
|
|
|
- (path/path-join repo-dir current-change-file))
|
|
|
- (fs/copy! repo
|
|
|
- (path/path-join repo-dir incoming-file)
|
|
|
- (path/path-join repo-dir base-file))
|
|
|
- (file-handler/alter-file repo current-change-file incoming-content {:re-render-root? true
|
|
|
- :from-disk? true
|
|
|
- :fs/event :fs/remote-file-change})))
|
|
|
- (do
|
|
|
- (prn "base!=current, should do a 3-way merge")
|
|
|
- (prn ::cur
|
|
|
- current-content)
|
|
|
- (p/let [current-content (or current-content "")
|
|
|
- incoming-content (fs/read-file repo-dir incoming-file)
|
|
|
- merged-content (diff-merge/three-way-merge base-content incoming-content current-content format)]
|
|
|
- (prn ::merged-content merged-content)
|
|
|
- (when (seq merged-content)
|
|
|
- (p/do!
|
|
|
- (fs/write-file! repo repo-dir current-change-file merged-content {:skip-compare? true})
|
|
|
- (file-handler/alter-file repo current-change-file merged-content {:re-render-root? true
|
|
|
- :from-disk? true
|
|
|
- :fs/event :fs/remote-file-change})))))))
|
|
|
-
|
|
|
- :else
|
|
|
- (do
|
|
|
- (prn "no base, use empty content as base, avoid loosing data")
|
|
|
- (p/let [current-content (-> (fs/read-file repo-dir current-change-file)
|
|
|
- (p/catch (fn [_] nil)))
|
|
|
- current-content (or current-content "")
|
|
|
- incoming-content (fs/read-file repo-dir incoming-file)
|
|
|
- merged-content (diff-merge/three-way-merge "" current-content incoming-content format)]
|
|
|
- (if (= incoming-content merged-content)
|
|
|
- (p/do!
|
|
|
- (fs/copy! repo
|
|
|
- (path/path-join repo-dir incoming-file)
|
|
|
- (path/path-join repo-dir current-change-file))
|
|
|
- (fs/copy! repo
|
|
|
- (path/path-join repo-dir incoming-file)
|
|
|
- (path/path-join repo-dir base-file))
|
|
|
- (file-handler/alter-file repo current-change-file merged-content {:re-render-root? true
|
|
|
- :from-disk? true
|
|
|
- :fs/event :fs/remote-file-change}))
|
|
|
-
|
|
|
- ;; else
|
|
|
- (p/do!
|
|
|
- (fs/write-file! repo repo-dir current-change-file merged-content {:skip-compare? true})
|
|
|
- (file-handler/alter-file repo current-change-file merged-content {:re-render-root? true
|
|
|
- :from-disk? true
|
|
|
- :fs/event :fs/remote-file-change})
|
|
|
- (file-handler/alter-file repo current-change-file merged-content {:re-render-root? true
|
|
|
- :from-disk? true
|
|
|
- :fs/event :fs/remote-file-change})))))))))))))))
|
|
|
+ (let [fetched-file-rpaths-or-ex (<! (<fetch-remote-files rsapi graph-uuid base-path relative-paths))]
|
|
|
+ (if (instance? ExceptionInfo fetched-file-rpaths-or-ex)
|
|
|
+ fetched-file-rpaths-or-ex
|
|
|
+ (<!
|
|
|
+ (p->c (p/all (->> fetched-file-rpaths-or-ex
|
|
|
+ (map (fn [rpath]
|
|
|
+ (p/let [incoming-file (path/path-join "logseq/version-files/incoming" rpath)
|
|
|
+ base-file (path/path-join "logseq/version-files/base" rpath)
|
|
|
+ current-change-file rpath
|
|
|
+ format (gp-util/get-format current-change-file)
|
|
|
+ repo (state/get-current-repo)
|
|
|
+ repo-dir (config/get-repo-dir repo)
|
|
|
+ base-exists? (fs/file-exists? repo-dir base-file)
|
|
|
+ _ (prn ::base-ex base-exists?)]
|
|
|
+ (cond
|
|
|
+ base-exists?
|
|
|
+ (p/let [base-content (fs/read-file repo-dir base-file)
|
|
|
+ current-content (-> (fs/read-file repo-dir current-change-file)
|
|
|
+ (p/catch (fn [_] nil)))
|
|
|
+ incoming-content (fs/read-file repo-dir incoming-file)]
|
|
|
+ (if (= base-content current-content)
|
|
|
+ (do
|
|
|
+ (prn "base=current, write directly")
|
|
|
+ (p/do!
|
|
|
+ (fs/copy! repo
|
|
|
+ (path/path-join repo-dir incoming-file)
|
|
|
+ (path/path-join repo-dir current-change-file))
|
|
|
+ (fs/copy! repo
|
|
|
+ (path/path-join repo-dir incoming-file)
|
|
|
+ (path/path-join repo-dir base-file))
|
|
|
+ (file-handler/alter-file repo current-change-file incoming-content {:re-render-root? true
|
|
|
+ :from-disk? true
|
|
|
+ :fs/event :fs/remote-file-change})))
|
|
|
+ (do
|
|
|
+ (prn "base!=current, should do a 3-way merge")
|
|
|
+ (prn ::cur
|
|
|
+ current-content)
|
|
|
+ (p/let [current-content (or current-content "")
|
|
|
+ incoming-content (fs/read-file repo-dir incoming-file)
|
|
|
+ merged-content (diff-merge/three-way-merge base-content incoming-content current-content format)]
|
|
|
+ (prn ::merged-content merged-content)
|
|
|
+ (when (seq merged-content)
|
|
|
+ (p/do!
|
|
|
+ (fs/write-file! repo repo-dir current-change-file merged-content {:skip-compare? true})
|
|
|
+ (file-handler/alter-file repo current-change-file merged-content {:re-render-root? true
|
|
|
+ :from-disk? true
|
|
|
+ :fs/event :fs/remote-file-change})))))))
|
|
|
+
|
|
|
+ :else
|
|
|
+ (do
|
|
|
+ (prn "no base, use empty content as base, avoid loosing data")
|
|
|
+ (p/let [current-content (-> (fs/read-file repo-dir current-change-file)
|
|
|
+ (p/catch (fn [_] nil)))
|
|
|
+ current-content (or current-content "")
|
|
|
+ incoming-content (fs/read-file repo-dir incoming-file)
|
|
|
+ merged-content (diff-merge/three-way-merge "" current-content incoming-content format)]
|
|
|
+ (if (= incoming-content merged-content)
|
|
|
+ (p/do!
|
|
|
+ (fs/copy! repo
|
|
|
+ (path/path-join repo-dir incoming-file)
|
|
|
+ (path/path-join repo-dir current-change-file))
|
|
|
+ (fs/copy! repo
|
|
|
+ (path/path-join repo-dir incoming-file)
|
|
|
+ (path/path-join repo-dir base-file))
|
|
|
+ (file-handler/alter-file repo current-change-file merged-content {:re-render-root? true
|
|
|
+ :from-disk? true
|
|
|
+ :fs/event :fs/remote-file-change}))
|
|
|
+
|
|
|
+ ;; else
|
|
|
+ (p/do!
|
|
|
+ (fs/write-file! repo repo-dir current-change-file merged-content {:skip-compare? true})
|
|
|
+ (file-handler/alter-file repo current-change-file merged-content {:re-render-root? true
|
|
|
+ :from-disk? true
|
|
|
+ :fs/event :fs/remote-file-change})
|
|
|
+ (file-handler/alter-file repo current-change-file merged-content {:re-render-root? true
|
|
|
+ :from-disk? true
|
|
|
+ :fs/event :fs/remote-file-change})))))))))))))))))
|
|
|
|
|
|
(defn- apply-filetxns
|
|
|
[*sync-state graph-uuid base-path filetxns *paused]
|
|
|
@@ -2849,20 +2852,20 @@
|
|
|
;;; ### put all stuff together
|
|
|
|
|
|
(defrecord ^:large-vars/cleanup-todo
|
|
|
- SyncManager [user-uuid graph-uuid base-path *sync-state
|
|
|
- ^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer remoteapi
|
|
|
- ^:mutable ratelimit-local-changes-chan
|
|
|
- *txid *txid-for-get-deletion-log
|
|
|
- ^:mutable state ^:mutable remote-change-chan ^:mutable *ws *stopped? *paused?
|
|
|
- ^:mutable ops-chan ^:mutable app-awake-from-sleep-chan
|
|
|
+ SyncManager [user-uuid graph-uuid base-path *sync-state
|
|
|
+ ^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer remoteapi
|
|
|
+ ^:mutable ratelimit-local-changes-chan
|
|
|
+ *txid *txid-for-get-deletion-log
|
|
|
+ ^:mutable state ^:mutable remote-change-chan ^:mutable *ws *stopped? *paused?
|
|
|
+ ^:mutable ops-chan ^:mutable app-awake-from-sleep-chan
|
|
|
;; control chans
|
|
|
- private-full-sync-chan private-remote->local-sync-chan
|
|
|
- private-remote->local-full-sync-chan private-pause-resume-chan]
|
|
|
+ private-full-sync-chan private-remote->local-sync-chan
|
|
|
+ private-remote->local-full-sync-chan private-pause-resume-chan]
|
|
|
Object
|
|
|
(schedule [this next-state args reason]
|
|
|
{:pre [(s/valid? ::state next-state)]}
|
|
|
(println (str "[SyncManager " graph-uuid "]")
|
|
|
- (and state (name state)) "->" (and next-state (name next-state)) :reason reason :local-txid @*txid :now (tc/to-string (t/now)))
|
|
|
+ (and state (name state)) "->" (and next-state (name next-state)) :reason reason :local-txid @*txid :args args :now (tc/to-string (t/now)))
|
|
|
(set! state next-state)
|
|
|
(swap! *sync-state sync-state--update-state next-state)
|
|
|
(go
|
|
|
@@ -3060,7 +3063,7 @@
|
|
|
:epoch (tc/to-epoch (t/now))}})
|
|
|
(.schedule this ::idle nil nil))))))
|
|
|
|
|
|
- (remote->local-full-sync [this _]
|
|
|
+ (remote->local-full-sync [this {:keys [retry-count]}]
|
|
|
(go
|
|
|
(let [{:keys [succ unknown stop pause]}
|
|
|
(<! (<sync-remote->local-all-files! remote->local-syncer))]
|
|
|
@@ -3087,12 +3090,19 @@
|
|
|
:data {:graph-uuid graph-uuid
|
|
|
:exp unknown
|
|
|
:epoch (tc/to-epoch (t/now))}})
|
|
|
- (let [next-state (if (string/includes? (str (ex-cause unknown)) "404 Not Found")
|
|
|
- ;; TODO: this should never happen
|
|
|
- ::stop
|
|
|
- ;; if any other exception occurred, re-exec remote->local-full-sync
|
|
|
- ::remote->local-full-sync)]
|
|
|
- (.schedule this next-state nil nil)))))))
|
|
|
+ (let [next-state
|
|
|
+ (cond
|
|
|
+ (string/includes? (str (ex-cause unknown)) "404 Not Found")
|
|
|
+ ;; TODO: this should never happen
|
|
|
+ ::stop
|
|
|
+ (> retry-count 3)
|
|
|
+ ::stop
|
|
|
+
|
|
|
+ :else ;; if any other exception occurred, re-exec remote->local-full-sync
|
|
|
+ ::remote->local-full-sync)]
|
|
|
+ (.schedule this next-state
|
|
|
+ (when (= ::remote->local-full-sync next-state) {:retry-count (inc retry-count)})
|
|
|
+ nil)))))))
|
|
|
|
|
|
(remote->local [this _next-state {remote-val :remote}]
|
|
|
(go
|
|
|
@@ -3137,7 +3147,7 @@
|
|
|
(let [distincted-local-changes (distinct-file-change-events local-changes)
|
|
|
_ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distincted-local-changes))
|
|
|
change-events-partitions
|
|
|
- (sequence (partition-file-change-events upload-batch-size) distincted-local-changes)
|
|
|
+ (sequence (partition-file-change-events upload-batch-size) distincted-local-changes)
|
|
|
_ (put-sync-event! {:event :start
|
|
|
:data {:type :local->remote
|
|
|
:graph-uuid graph-uuid
|