Просмотр исходного кода

enhance(sync): use util/ratelimit

rcmerci 3 лет назад
Родитель
Сommit
028860dcc0
2 измененных файлов с 64 добавлено и 93 удалено
  1. 21 44
      src/main/frontend/fs/sync.cljs
  2. 43 49
      src/main/frontend/util.cljc

+ 21 - 44
src/main/frontend/fs/sync.cljs

@@ -1355,39 +1355,22 @@
       (vreset! *stopped true))
 
     (ratelimit [this from-chan]
-      (let [c (.filtered-chan this 10000)
-            filter-e-fn (.filter-file-change-events-fn this)]
-        (go-loop [timeout-c (timeout rate)
-                  coll []]
-          (let [{:keys [timeout ^FileChangeEvent e stop]}
-                (async/alt! timeout-c {:timeout true}
-                            from-chan ([e] {:e e})
-                            stop-chan {:stop true})]
-            (cond
-              stop
-              (async/close! c)
-
-              timeout
-              (do (async/onto-chan! c coll false)
-                  (swap! *sync-state sync-state-reset-queued-local->remote-files)
-                  (recur (async/timeout rate) []))
-
-              (some? e)
-              (if (filter-e-fn e)
+      (let [filter-e-fn (.filter-file-change-events-fn this)]
+        (util/ratelimit
+         from-chan rate
+         :filter-fn
+         (fn [e]
+           (and (filter-e-fn e)
                 (let [e-path [(relative-path e)]]
                   (swap! *sync-state sync-state--add-queued-local->remote-files e-path)
-                  (if (<! (filter-local-changes-pred e base-path graph-uuid))
-                    (let [coll* (distinct (conj coll e))]
-                      (recur timeout-c coll*))
-                    (do (swap! *sync-state sync-state--remove-queued-local->remote-files e-path)
-                        (recur timeout-c coll))))
-                (recur timeout-c coll))
-
-              (nil? e)
-              (do (println "close ratelimit chan")
-                  (async/close! c)))))
-        c))
-
+                  (go
+                    (let [v (<! (filter-local-changes-pred e base-path graph-uuid))]
+                      (when-not v
+                        (swap! *sync-state sync-state--remove-queued-local->remote-files e-path))
+                      v)))))
+         :flush-fn #(swap! *sync-state sync-state-reset-queued-local->remote-files)
+         :stop-ch stop-chan
+         :distinct-coll? true)))
 
     (sync-local->remote! [this es]
       (if (empty? es)
@@ -1478,12 +1461,6 @@
 
 ;;; ### put all stuff together
 
-(defn- drain-chan
-  "drop all stuffs in CH"
-  [ch]
-  (->> (repeatedly #(poll! ch))
-       (take-while identity)))
-
 (defrecord ^:large-vars/cleanup-todo
     SyncManager [graph-uuid base-path *sync-state
                  ^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer remoteapi
@@ -1531,10 +1508,10 @@
                 :priority true)]
           (cond
             stop
-            (do (drain-chan ops-chan)
+            (do (util/drain-chan ops-chan)
                 (>! ops-chan {:stop true}))
             remote->local-full-sync
-            (do (drain-chan ops-chan)
+            (do (util/drain-chan ops-chan)
                 (>! ops-chan {:remote->local-full-sync true})
                 (recur))
             remote->local
@@ -1549,7 +1526,7 @@
             (do (>! ops-chan {:local->remote local->remote})
                 (recur))
             local->remote-full-sync
-            (do (drain-chan ops-chan)
+            (do (util/drain-chan ops-chan)
                 (>! ops-chan {:local->remote-full-sync true})
                 (recur)))))
       (.schedule this ::need-password nil))
@@ -1587,7 +1564,7 @@
             succ
             (.schedule this ::idle nil)
             need-sync-remote
-            (do (drain-chan ops-chan)
+            (do (util/drain-chan ops-chan)
                 (>! ops-chan {:remote->local true})
                 (>! ops-chan {:local->remote-full-sync true})
                 (.schedule this ::idle nil))
@@ -1621,7 +1598,7 @@
             (s/assert ::sync-remote->local!-result r)
             (cond
               need-remote->local-full-sync
-              (do (drain-chan ops-chan)
+              (do (util/drain-chan ops-chan)
                   (>! ops-chan {:remote->local-full-sync true})
                   (>! ops-chan {:local->remote-full-sync true})
                   (.schedule this ::idle nil))
@@ -1644,7 +1621,7 @@
             (.schedule this ::idle nil)
 
             need-sync-remote
-            (do (drain-chan ops-chan)
+            (do (util/drain-chan ops-chan)
                 (>! ops-chan {:remote->local true})
                 (>! ops-chan {:local->remote local-change})
                 (.schedule this ::idle nil))
@@ -1744,7 +1721,7 @@
                ;;      actually, each file corresponds to a file-change-event,
                ;;      we need to ignore all of them.
                (<! (timeout 5000))
-               (drain-chan local-changes-chan)
+               (util/drain-chan local-changes-chan)
                (poll! stop-sync-chan)
                (poll! remote->local-sync-chan)
                (poll! remote->local-full-sync-chan)

+ 43 - 49
src/main/frontend/util.cljc

@@ -18,9 +18,10 @@
             [goog.string :as gstring]
             [goog.userAgent]
             [promesa.core :as p]
-            [rum.core :as rum]))
+            [rum.core :as rum]
+            [clojure.core.async :as async]
+            [cljs.core.async.impl.channels :refer [ManyToManyChannel]]))
   (:require
-   [clojure.core.async :as async]
    [clojure.pprint]
    [clojure.string :as string]
    [clojure.walk :as walk]))
@@ -1072,60 +1073,53 @@
 
 (defn keyname [key] (str (namespace key) "/" (name key)))
 
-(defn batch [in max-time handler buf-atom]
-  (async/go-loop [buf buf-atom t (async/timeout max-time)]
-    (let [[v p] (async/alts! [in t])]
-      (cond
-        (or (= p t) (nil? v))
-        (let [timeout (async/timeout max-time)]
-          (handler @buf)
-          (reset! buf [])
-          (recur buf timeout))
-
-        :else
-        (do (swap! buf conj v)
-            (recur buf t))))))
-
-(defn ratelimit
-  "return a channel CH,
+#?(:cljs
+   (defn ratelimit
+     "return a channel CH,
   ratelimit flush items in in-ch every max-duration(ms),
   opts:
   - :filter-fn filter item before putting items into returned CH, (filter-fn item)
+               will poll it when its return value is channel,
   - :flush-fn exec flush-fn when time to flush, (flush-fn item-coll)
   - :stop-ch stop go-loop when stop-ch closed
   - :distinct-coll? distinct coll when put into CH
   - :chan-buffer buffer of return CH, default use (async/chan 1000)"
-  [in-ch max-duration & {:keys [filter-fn flush-fn stop-ch distinct-coll? chan-buffer]}]
-  (let [ch (if chan-buffer (async/chan chan-buffer) (async/chan 1000))
-        stop-ch* (or stop-ch (async/chan))]
-    (async/go-loop [timeout-ch (async/timeout max-duration) coll []]
-      (let [{:keys [timeout e stop]}
-            (async/alt! timeout-ch {:timeout true}
-                        in-ch ([e] {:e e})
-                        stop-ch* {:stop true})]
-        (cond
-          timeout
-          (do (async/onto-chan! ch coll false)
-              (flush-fn coll)
-              (recur (async/timeout max-duration) []))
-
-          (some? e)
-          (if (filter-fn e)
-            (recur timeout-ch (cond-> (conj coll e) distinct-coll? distinct))
-            (recur timeout-ch coll))
-
-          (or stop
-              ;; got nil from in-ch, means in-ch is closed
-              ;; so we stop the whole go-loop
-              (nil? e))
-          (async/close! ch))))
-    ch))
-
-(defn drain-chan
-  "drop all stuffs in CH, and return all of them"
-  [ch]
-  (->> (repeatedly #(async/poll! ch))
-       (take-while identity)))
+     [in-ch max-duration & {:keys [filter-fn flush-fn stop-ch distinct-coll? chan-buffer]}]
+     (let [ch (if chan-buffer (async/chan chan-buffer) (async/chan 1000))
+           stop-ch* (or stop-ch (async/chan))]
+       (async/go-loop [timeout-ch (async/timeout max-duration) coll []]
+         (let [{:keys [timeout e stop]}
+               (async/alt! timeout-ch {:timeout true}
+                           in-ch ([e] {:e e})
+                           stop-ch* {:stop true})]
+           (cond
+             timeout
+             (do (async/onto-chan! ch coll false)
+                 (flush-fn coll)
+                 (recur (async/timeout max-duration) []))
+
+             (some? e)
+             (let [filter-v (filter-fn e)
+                   filter-v* (if (instance? ManyToManyChannel filter-v)
+                               (async/<! filter-v)
+                               filter-v)]
+               (if filter-v*
+                 (recur timeout-ch (cond-> (conj coll e) distinct-coll? distinct))
+                 (recur timeout-ch coll)))
+
+             (or stop
+                 ;; got nil from in-ch, means in-ch is closed
+                 ;; so we stop the whole go-loop
+                 (nil? e))
+             (async/close! ch))))
+       ch)))
+
+#?(:cljs
+   (defn drain-chan
+     "drop all stuffs in CH, and return all of them"
+     [ch]
+     (->> (repeatedly #(async/poll! ch))
+          (take-while identity))))
 
 #?(:cljs
    (defn trace!