Browse Source

feat(sync): add Local->RemoteSyncer

rcmerci 3 years ago
parent
commit
0bb99e4269
1 changed files with 59 additions and 22 deletions
  1. 59 22
      src/main/frontend/fs/sync.cljs

+ 59 - 22
src/main/frontend/fs/sync.cljs

@@ -11,7 +11,6 @@
 
 (def ws-addr "wss://og96xf1si7.execute-api.us-east-2.amazonaws.com/production?graphuuid=%s")
 
-(def local-changes-chan (chan 100))
 
 (def remote-changes-chan (chan 1))
 
@@ -197,7 +196,7 @@
     (or token (.refresh-token this)))
   (refresh-token [_]
     ;; TODO
-    (set! token "<access-token>")
+    (set! token "<id-token>")
     token)
   (request [this api-name body]
     (let [c (chan)]
@@ -321,42 +320,78 @@ else: return err resp"
     (or (nil? local-txid)
         (> remote-txid local-txid))))
 
-
 (defn- remove-dir-prefix [dir path]
   (string/replace path (js/RegExp. (str "^" dir)) ""))
 
+(def local-changes-chan (chan 100))
+
+
+(deftype FileChangeEvent [type dir path stat])
+
 (defn file-watch-handler
   [type {:keys [dir path content stat] :as payload}]
   (prn "file-watch-handler" type (:path payload) (get-in payload [:stat :mtime]))
-  (go (>! local-changes-chan {:type type :payload payload})))
+  (go (>! local-changes-chan (->FileChangeEvent type dir path stat))))
 
-(defn sync-local! [graph-uuid type {:keys [dir path content stat] :as payload}]
-  (prn "sync local:" type path)
-  (when (and (some? path)
-             (not (string/ends-with? path "logseq/graphs-txid.edn")))
-    (cond
-      (or (= "add" type) (= "change" type))
-      (update-remote-file rsapi graph-uuid path)
 
-      (= "unlink" type)
-      (delete-remote-file rsapi graph-uuid path)
+(defprotocol ILocal->RemoteSync
+  (ratelimit [this from-chan] "get watched local file-change events from FROM-CHAN,
+  return chan returning events with rate limited")
+  (sync-local->remote! [this ^FileChangeEvent e]))
 
-      ;; (= "rename" type)
-      ;; (rename-local-file)
-      )))
+(deftype Local->RemoteSyncer [graph-uuid dir-prefix ^:mutable rate]
+  Object
+  (filter-dir-prefix-chan [_ n]
+    (chan n (filter (fn [^FileChangeEvent e]
+                      (string/starts-with? dir-prefix (.-dir e))))))
+
+  ILocal->RemoteSync
+  (ratelimit [this from-chan]
+    (let [c (.filter-dir-prefix-chan this 10000)]
+      (go-loop [timeout-c (timeout rate)
+                tcoll (transient [])]
+        (->
+         (let [{:keys [timeout e]}
+               (async/alt! timeout-c {:timeout true}
+                           from-chan ([e] {:e e}))]
+           (if timeout
+             (do
+               (<! (async/onto-chan! c (persistent! tcoll)))
+               (recur (timeout rate) (transient [])))
+             (do
+               (conj! tcoll e)
+               (recur timeout-c tcoll))))))
+      c))
+
+  (sync-local->remote! [_ ^FileChangeEvent e]
+    (let [type (.-type e)
+          path (.-path e)]
+      (when (and (some? path)
+                 (not (string/ends-with? path "logseq/graphs-txid.edn")))
+        (cond
+          (or (= "add" type) (= "change" type))
+          (update-remote-file rsapi graph-uuid path)
+
+          (= "unlink" type)
+          (delete-remote-file rsapi graph-uuid path)
+
+          ;; (= "rename" type)
+          ;; (rename-local-file)
+          )))))
 
 
 (def stop-sync-loop (chan 1))
 (defn sync-loop!
-  [graph-uuid]
-  (let [*ws (atom nil)] ; reset *ws to false to stop re-connect websocket
+  [graph-uuid local->remote-syncer]
+  (let [*ws (atom nil)  ; reset *ws to false to stop re-connect websocket
+        local-chan (ratelimit local->remote-syncer local-changes-chan)]
     (ws-listen! graph-uuid *ws)
     (go-loop []
       (let [{:keys [stop remote local]}
             (async/alt!
               stop-sync-loop {:stop true}
               remote-changes-chan ([v] (println "remote changes:" v) {:remote v})
-              local-changes-chan ([v] {:local v})
+              local-chan ([v] {:local v})
               :priority true)]
         (cond
           remote
@@ -366,7 +401,7 @@ else: return err resp"
                 (offer! stop-sync-loop 1))))
 
           local
-          (sync-local! graph-uuid (:type local) (:payload local)))
+          (sync-local->remote! local->remote-syncer local))
         (when-not stop
           (println "recur sync loop")
           (recur))
@@ -374,8 +409,10 @@ else: return err resp"
         (println "sync loop stop")))))
 
 (comment
-  (reset! graphs-txid ["78c7362a-e085-4b8e-9a7b-27e1930fb94b" 0])
-  (sync-loop! "78c7362a-e085-4b8e-9a7b-27e1930fb94b")
+  (def graph-uuid "78c7362a-e085-4b8e-9a7b-27e1930fb94b")
+  (reset! graphs-txid [graph-uuid 0])
+  (def local->remote-syncer (->Local->RemoteSyncer graph-uuid (config/get-repo-dir (state/get-current-repo)) 10000))
+  (sync-loop! graph-uuid local->remote-syncer)
 
   ;; stop
   (offer! stop-sync-loop 1)