Ver Fonte

perf: throttle rtc-state-flow for postmessage

rcmerci há 10 meses atrás
pai
commit
e81ca25ad7

+ 5 - 0
src/main/frontend/common/missionary_util.cljs

@@ -68,6 +68,11 @@
              (catch Cancelled _
              (catch Cancelled _
                (m/amb)))))))
                (m/amb)))))))
 
 
+(defn throttle [dur-ms >in]
+  (m/ap
+    (let [x (m/?> (m/relieve {} >in))]
+      (m/amb x (do (m/? (m/sleep dur-ms)) (m/amb))))))
+
 (defn run-task
 (defn run-task
   "Return the canceler"
   "Return the canceler"
   [task key & {:keys [succ fail]}]
   [task key & {:keys [succ fail]}]

+ 2 - 1
src/main/frontend/worker/db_listener.cljs

@@ -98,7 +98,8 @@ generate asset-change events.")
     (d/unlisten! conn ::listen-db-changes!)
     (d/unlisten! conn ::listen-db-changes!)
     (prn :listen-db-changes! (keys handlers) :repo repo)
     (prn :listen-db-changes! (keys handlers) :repo repo)
     (d/listen! conn ::listen-db-changes!
     (d/listen! conn ::listen-db-changes!
-               (fn [{:keys [tx-data _db-before _db-after tx-meta] :as tx-report}]
+               (fn listen-db-changes!-inner
+                 [{:keys [tx-data _db-before _db-after tx-meta] :as tx-report}]
                  (let [tx-meta (merge (batch-tx/get-batch-opts) tx-meta)
                  (let [tx-meta (merge (batch-tx/get-batch-opts) tx-meta)
                        pipeline-replace? (:pipeline-replace? tx-meta)
                        pipeline-replace? (:pipeline-replace? tx-meta)
                        in-batch-tx-mode? (:batch-tx/batch-tx-mode? tx-meta)]
                        in-batch-tx-mode? (:batch-tx/batch-tx-mode? tx-meta)]

+ 18 - 4
src/main/frontend/worker/rtc/core.cljs

@@ -1,6 +1,7 @@
 (ns frontend.worker.rtc.core
 (ns frontend.worker.rtc.core
   "Main(use missionary) ns for rtc related fns"
   "Main(use missionary) ns for rtc related fns"
   (:require [frontend.common.missionary-util :as c.m]
   (:require [frontend.common.missionary-util :as c.m]
+            [frontend.worker.device :as worker-device]
             [frontend.worker.rtc.asset :as r.asset]
             [frontend.worker.rtc.asset :as r.asset]
             [frontend.worker.rtc.client :as r.client]
             [frontend.worker.rtc.client :as r.client]
             [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.client-op :as client-op]
@@ -16,8 +17,7 @@
             [logseq.common.config :as common-config]
             [logseq.common.config :as common-config]
             [logseq.db :as ldb]
             [logseq.db :as ldb]
             [malli.core :as ma]
             [malli.core :as ma]
-            [missionary.core :as m]
-            [frontend.worker.device :as worker-device])
+            [missionary.core :as m])
   (:import [missionary Cancelled]))
   (:import [missionary Cancelled]))
 
 
 (def ^:private rtc-state-schema
 (def ^:private rtc-state-schema
@@ -306,7 +306,7 @@
                                           :block-uuids [block-uuid]
                                           :block-uuids [block-uuid]
                                           :graph-uuid graph-uuid}))))
                                           :graph-uuid graph-uuid}))))
 
 
-(def ^:private create-get-state-flow
+(def ^:private create-get-state-flow*
   (let [rtc-loop-metadata-flow (m/watch *rtc-loop-metadata)]
   (let [rtc-loop-metadata-flow (m/watch *rtc-loop-metadata)]
     (m/ap
     (m/ap
       (let [{rtc-lock :*rtc-lock :keys [repo graph-uuid user-uuid rtc-state-flow *rtc-auto-push? *online-users]}
       (let [{rtc-lock :*rtc-lock :keys [repo graph-uuid user-uuid rtc-state-flow *rtc-auto-push? *online-users]}
@@ -331,6 +331,8 @@
               (rtc-log-and-state/create-remote-t-flow graph-uuid))))
               (rtc-log-and-state/create-remote-t-flow graph-uuid))))
           (catch Cancelled _))))))
           (catch Cancelled _))))))
 
 
+(def ^:private create-get-state-flow (c.m/throttle 300 create-get-state-flow*))
+
 (defn new-task--get-debug-state
 (defn new-task--get-debug-state
   []
   []
   (m/reduce {} nil (m/eduction (take 1) create-get-state-flow)))
   (m/reduce {} nil (m/eduction (take 1) create-get-state-flow)))
@@ -415,4 +417,16 @@
     (def a (atom 1))
     (def a (atom 1))
     (def f1 (m/watch a))
     (def f1 (m/watch a))
     (def f2 (create-pull-remote-updates-flow 5000 f1))
     (def f2 (create-pull-remote-updates-flow 5000 f1))
-    (def cancel (c.m/run-task (m/reduce (fn [_ v] (prn :v v)) f2) :xxx))))
+    (def cancel (c.m/run-task (m/reduce (fn [_ v] (prn :v v)) f2) :xxx)))
+
+  (defn sleep-emit [delays]
+    (m/ap (let [n (m/?> (m/seed delays))
+                r (m/? (m/sleep n n))]
+            (prn :xxx r (t/now))
+            r)))
+
+  (def cancel
+    ((->> (m/sample vector
+                    (m/latest identity (m/reductions {} 0  (sleep-emit [1000 1 2])))
+                    (sleep-emit [2000 3000 1000]))
+          (m/reduce (fn [_ v] (prn :v v)))) prn prn)))