Browse Source

feat(sync): update local->remote logic

rcmerci 4 years ago
parent
commit
b185eeb0a2
2 changed files with 109 additions and 88 deletions
  1. 3 1
      src/main/electron/listener.cljs
  2. 106 87
      src/main/frontend/fs/sync.cljs

+ 3 - 1
src/main/electron/listener.cljs

@@ -5,6 +5,7 @@
             [frontend.handler.ui :as ui-handler]
             [cljs-bean.core :as bean]
             [frontend.fs.watcher-handler :as watcher-handler]
+            [frontend.fs.sync :as sync]
             [frontend.db :as db]
             [datascript.core :as d]
             [electron.ipc :as ipc]
@@ -36,7 +37,8 @@
   (js/window.apis.on "file-watcher"
                      (fn [data]
                        (let [{:keys [type payload]} (bean/->clj data)]
-                         (watcher-handler/handle-changed! type payload))))
+                         (watcher-handler/handle-changed! type payload)
+                         (sync/file-watch-handler type payload))))
 
   (js/window.apis.on "notification"
                      (fn [data]

+ 106 - 87
src/main/frontend/fs/sync.cljs

@@ -4,45 +4,40 @@
             [cljs-http.client :as http]
             [frontend.util.persist-var :as persist-var]
             [clojure.string :as string]
-            [frontend.state :as state]))
+            [frontend.state :as state]
+            [frontend.config :as config]))
 
 (def ws-addr "wss://og96xf1si7.execute-api.us-east-2.amazonaws.com/production?graphuuid=%s")
 
-
 (def local-changes-chan (chan 100))
 
 (def remote-changes-chan (chan 1))
 
 (def graphs-txid (persist-var/persist-var nil "graphs-txid"))
 
-(def ws-listen-graphs (atom #{}))
-
-(defn ws-listen! [graph-uuid]
-  (let [ws (js/WebSocket. (util/format ws-addr graph-uuid))]
-    (set! (.-onopen ws) #(println (util/format "ws opened: graph '%s'" graph-uuid %)))
-    (set! (.-onclose ws) (fn [e]
-                           (println (util/format "ws close: graph '%s'" graph-uuid e))
-                           (when (contains? @ws-listen-graphs graph-uuid)
-                             (go
-                               (timeout 1000)
-                               (println "re-connecting graph" graph-uuid)
-                               (ws-listen! graph-uuid)))))
-    (set! (.-onmessage ws) (fn [e]
-                             (let [data (js->clj (js/JSON.parse (.-data e)) :keywordize-keys true)]
-                               (if-let [v (poll! remote-changes-chan)]
-                                 (let [last-txid (:txid v)
-                                       current-txid (:txid data)]
-                                   (if (> last-txid current-txid)
-                                     (offer! remote-changes-chan v)
-                                     (offer! remote-changes-chan data)))
-                                 (offer! remote-changes-chan data)))))
-    ws))
-
-(comment
-  (reset! ws-listen-graphs (conj @ws-listen-graphs "78c7362a-e085-4b8e-9a7b-27e1930fb94b"))
-  (def ws (ws-listen! "78c7362a-e085-4b8e-9a7b-27e1930fb94b"))
-  )
-
+(defn- ws-stop! [*ws]
+  (swap! *ws (fn [o] (assoc o :stop true)))
+  (.close (:ws @*ws)))
+
+(defn ws-listen! [graph-uuid *ws]
+  (reset! *ws {:ws (js/WebSocket. (util/format ws-addr graph-uuid)) :stop false})
+  (set! (.-onopen (:ws @*ws)) #(println (util/format "ws opened: graph '%s'" graph-uuid %)))
+  (set! (.-onclose (:ws @*ws)) (fn [e]
+                                 (println (util/format "ws close: graph '%s'" graph-uuid e))
+                                 (when-not (true? (:stop @*ws))
+                                   (go
+                                     (timeout 1000)
+                                     (println "re-connecting graph" graph-uuid)
+                                     (ws-listen! graph-uuid *ws)))))
+  (set! (.-onmessage (:ws @*ws)) (fn [e]
+                                   (let [data (js->clj (js/JSON.parse (.-data e)) :keywordize-keys true)]
+                                     (if-let [v (poll! remote-changes-chan)]
+                                       (let [last-txid (:txid v)
+                                             current-txid (:txid data)]
+                                         (if (> last-txid current-txid)
+                                           (offer! remote-changes-chan v)
+                                           (offer! remote-changes-chan data)))
+                                       (offer! remote-changes-chan data))))))
 
 (defn- get-json-body [body]
   (or (and (map? body) body)
@@ -70,11 +65,8 @@
             (= 401 (get-in resp [:resp :status]))
             (= "Unauthorized" (:message (get-json-body (get-in resp [:resp :body])))))
          (do
-           (if (> retry-count 5)
-             (throw (js/Error. (str "retry count > 5, api-name:" api-name)))
-             (do
-               (println "will retry after" (* 1000 retry-count) "ms")
-               (<! (timeout (* 1000 retry-count)))))
+           (println "will retry after" (min 60000 (* 1000 retry-count)) "ms")
+           (<! (timeout (min 60000 (* 1000 retry-count))))
            (let [token (refresh-token-fn)]
              (<! (request api-name body token refresh-token-fn (inc retry-count)))))
          (:resp resp))))))
@@ -164,12 +156,12 @@
 (set! (.-EMPTY FileTxnSet) (FileTxnSet. {} 0))
 
 (defprotocol IRSAPI
-  (get-local-files-meta [this filepaths] "get local files' metadata: file-size, md5")
-  (rename-local-file [this from to])
-  (update-local-file [this filepath] "remote -> local")
-  (delete-local-file [this filepath])
-  (update-remote-file [this filepath] "local -> remote")
-  (delete-remote-file [this filepath]))
+  (get-local-files-meta [this graph-uuid filepaths] "get local files' metadata: file-size, md5")
+  (rename-local-file [this graph-uuid from to])
+  (update-local-file [this graph-uuid filepath] "remote -> local")
+  (delete-local-file [this graph-uuid filepath])
+  (update-remote-file [this graph-uuid filepath] "local -> remote")
+  (delete-remote-file [this graph-uuid filepath]))
 
 (defprotocol IRemoteAPI
   (get-remote-all-files-meta [this graph-uuid] "get remote all files' metadata")
@@ -178,17 +170,17 @@
 
 (deftype MockRSAPI []
   IRSAPI
-  (get-local-files-meta [this filepaths]
+  (get-local-files-meta [this graph-uuid filepaths]
     (into {} (map (fn [p] [p {:size 0 :md5 0}])) filepaths))
-  (rename-local-file [_ from to]
+  (rename-local-file [_ graph-uuid from to]
     (println "rename local file:" from "->" to))
-  (update-local-file [_ filepath]
+  (update-local-file [_ graph-uuid filepath]
     (println "update local file:" filepath))
-  (delete-local-file [_ filepath]
+  (delete-local-file [_ graph-uuid filepath]
     (println "delete local file:" filepath))
-  (update-remote-file [_ filepath]
+  (update-remote-file [_ graph-uuid filepath]
     (println "update remote file:" filepath))
-  (delete-remote-file [_ filepath]
+  (delete-remote-file [_ graph-uuid filepath]
     (println "delete remote file:" filepath)))
 
 (def rsapi (->MockRSAPI))
@@ -214,6 +206,7 @@
 
 (def remoteapi (->RemoteAPI nil))
 
+
 (defn- update-txn [^FileTxnSet filetxnset txn]
   (let [{:keys [TXType TXContent]} txn]
     (case TXType
@@ -232,37 +225,38 @@
 (defn update-txns [filetxnset txns]
   (reduce update-txn filetxnset txns))
 
-(defn- apply-filetxn [^FileTxn filetxn]
+(defn- apply-filetxn [graph-uuid ^FileTxn filetxn]
   (when (.renamed? filetxn)
-    (rename-local-file rsapi (.-from-path filetxn) (.-to-path filetxn)))
+    (rename-local-file rsapi graph-uuid (.-from-path filetxn) (.-to-path filetxn)))
   (when (.updated? filetxn)
-    (update-local-file rsapi (.-to-path filetxn)))
+    (update-local-file rsapi graph-uuid (.-to-path filetxn)))
   (when (.deleted? filetxn)
-    (delete-local-file rsapi (.-to-path filetxn))))
+    (delete-local-file rsapi graph-uuid (.-to-path filetxn))))
 
-(defn- apply-filetxns [filetxns]
+(defn- apply-filetxns [graph-uuid filetxns]
   (doseq [filetxn filetxns]
-    (apply-filetxn filetxn)))
+    (apply-filetxn graph-uuid filetxn)))
 
 (defn sync-remote-all-files! [graph-uuid]
   "pull all files' metadata and sync."
-  (println "sync remote all files"))
+  (println "sync remote all files" graph-uuid))
 
 (defn current-graph-uuid-and-txid []
   @graphs-txid)
 
-(defn sync-remote! []
+(defn sync-remote! [graph-uuid-expect]
   (go
     (let [[graph-uuid txid] (current-graph-uuid-and-txid)]
-      (if (some? txid)
-        (try
-          (let [[diff-txns latest-txid] (<! (get-diff remoteapi graph-uuid txid))
-                filetxnset (update-txns (.-EMPTY FileTxnSet) diff-txns)
-                repo (state/get-current-repo)]
-            (apply-filetxns filetxnset)
-            (.reset_value! graphs-txid [graph-uuid latest-txid] repo)
-            (persist-var/persist-save graphs-txid)))
-        (sync-remote-all-files! graph-uuid)))))
+      (when (or (nil? graph-uuid) (= graph-uuid graph-uuid-expect))
+        (if (some? txid)
+          (try
+            (let [[diff-txns latest-txid] (<! (get-diff remoteapi graph-uuid txid))
+                  filetxnset (update-txns (.-EMPTY FileTxnSet) diff-txns)
+                  repo (state/get-current-repo)]
+              (apply-filetxns graph-uuid filetxnset)
+              (.reset_value! graphs-txid [graph-uuid latest-txid] repo)
+              (persist-var/persist-save graphs-txid)))
+          (sync-remote-all-files! graph-uuid-expect))))))
 
 (comment
   (reset! graphs-txid ["78c7362a-e085-4b8e-9a7b-27e1930fb94b" 0])
@@ -278,35 +272,60 @@
         (> remote-txid local-txid))))
 
 
+(defn- remove-dir-prefix [dir path]
+  (string/replace path (js/RegExp. (str "^" dir)) ""))
+
+(defn file-watch-handler
+  [type {:keys [dir path content stat] :as payload}]
+  (prn "file-watch-handler" type (:path payload) (get-in payload [:stat :mtime]))
+  (go (>! local-changes-chan {:type type :payload payload})))
+
+(defn sync-local! [graph-uuid type {:keys [dir path content stat] :as payload}]
+  (prn "sync local:" type path)
+  (when (and (some? path)
+             (not (string/ends-with? path "logseq/graphs-txid.edn")))
+    (cond
+      (or (= "add" type) (= "change" type))
+      (update-remote-file rsapi graph-uuid path)
+
+      (= "unlink" type)
+      (delete-remote-file rsapi graph-uuid path)
+
+      ;; (= "rename" type)
+      ;; (rename-local-file)
+      )))
+
+
 (def stop-sync-loop (chan 1))
 (defn sync-loop!
-  []
-  (go-loop []
-    (let [{:keys [stop remote local]}
-          (async/alt!
-            stop-sync-loop {:stop true}
-            remote-changes-chan ([v] (println "remote changes:" v) {:remote v})
-            local-changes-chan ([v] (println "local changes:" v) {:local v})
-            :priority true)]
-      (cond
-        remote
-        (if (need-sync-remote? remote)
-          (sync-remote!))
-
-        local                           ;TODO sync local
-        nil)
-      (when-not stop
-        (println "recur sync loop")
-        (recur))
-      (println "sync loop stop")
-      )))
+  [graph-uuid]
+  (let [*ws (atom nil)]; reset *ws to false to stop re-connect websocket
+    (ws-listen! graph-uuid *ws)
+    (go-loop []
+      (let [{:keys [stop remote local]}
+            (async/alt!
+              stop-sync-loop {:stop true}
+              remote-changes-chan ([v] (println "remote changes:" v) {:remote v})
+              local-changes-chan ([v] {:local v})
+              :priority true)]
+        (cond
+          remote
+          (if (need-sync-remote? remote)
+            (sync-remote! graph-uuid))
+
+          local
+          (sync-local! graph-uuid (:type local) (:payload local)))
+        (when-not stop
+          (println "recur sync loop")
+          (recur))
+        (ws-stop! *ws)
+        (println "sync loop stop")))))
 
 (comment
   (reset! graphs-txid ["78c7362a-e085-4b8e-9a7b-27e1930fb94b" 0])
-  (reset! ws-listen-graphs (conj @ws-listen-graphs "78c7362a-e085-4b8e-9a7b-27e1930fb94b"))
-  (def ws (ws-listen! "78c7362a-e085-4b8e-9a7b-27e1930fb94b"))
-  (sync-loop!)
+  (sync-loop! "78c7362a-e085-4b8e-9a7b-27e1930fb94b")
 
   ;; stop
   (offer! stop-sync-loop 1)
+  (poll! stop-sync-loop)
   )