Browse Source

enhance(rtc): split ws messages, size limit=32kb

rcmerci 2 years ago
parent
commit
73a26f36d8

+ 35 - 5
src/main/frontend/db/rtc/core.cljs

@@ -529,17 +529,47 @@
     [update-page-ops remove-ops move-ops update-ops remove-page-ops]))
 
 
+(defn- <get-N-ops
+  [repo n]
+  (go
+    (let [{:keys [ops local-tx]} (<! (p->c (op/<get-ops&local-tx repo)))
+          ops (take n ops)
+          op-keys (map first ops)
+          ops (map second ops)
+          max-op-key (apply max op-keys)]
+      {:ops ops :op-keys op-keys :max-op-key max-op-key :local-tx local-tx})))
+
+(def ^:private size-30kb (* 30 1024))
+
+(defn- <gen-remote-ops-<30kb
+  [repo & n]
+  (go
+    (let [n (or n 100)
+          {:keys [ops local-tx op-keys max-op-key]} (<! (<get-N-ops repo n))
+          ops-for-remote (apply concat (local-ops->remote-ops repo ops nil))
+          ops-for-remote-str (-> ops-for-remote
+                                 rtc-const/data-to-ws-decoder
+                                 rtc-const/data-to-ws-encoder
+                                 clj->js
+                                 js/JSON.stringify)
+          size (.-size (js/Blob. [ops-for-remote-str]))]
+      (if (<= size size-30kb)
+        {:ops-for-remote ops-for-remote
+         :local-tx local-tx
+         :op-keys op-keys
+         :max-op-key max-op-key}
+        (let [n* (int (/ n (/ size size-30kb)))]
+          (assert (pos? n*) {:n* n :n n :size size})
+          (<! (<gen-remote-ops-<30kb repo n*)))))))
+
+
 (defn- <client-op-update-handler
   [state]
   {:pre [(some? @(:*graph-uuid state))
          (some? @(:*repo state))]}
   (go
     (let [repo @(:*repo state)
-          {:keys [ops local-tx]} (<! (p->c (op/<get-ops&local-tx repo)))
-          ops* (mapv second ops)
-          op-keys (mapv first ops)
-          max-op-key (apply max op-keys)
-          ops-for-remote (apply concat (local-ops->remote-ops repo ops* nil))
+          {:keys [ops-for-remote local-tx op-keys max-op-key]} (<! (<gen-remote-ops-<30kb repo))
           r (with-sub-data-from-ws state
               (<! (ws/<send! state {:req-id (get-req-id)
                                     :action "apply-ops" :graph-uuid @(:*graph-uuid state)

+ 21 - 2
src/main/frontend/db/rtc/db_listener.cljs

@@ -4,7 +4,8 @@
             [frontend.db :as db]
             [frontend.db.rtc.op :as op]
             [clojure.set :as set]
-            [clojure.data :as data]))
+            [clojure.data :as data]
+            [clojure.core.async :as async :refer [go <!]]))
 
 
 (defn- entity-datoms=>attr->datom
@@ -138,6 +139,22 @@
                        ops)]
         ops*))))
 
+(def ^:private *ops-pending-to-store (atom []))
+
+(remove-watch *ops-pending-to-store :add-ops)
+(add-watch *ops-pending-to-store :add-ops
+           (fn [_k r _o n]
+             (when (seq n)
+               ;; the following reset! will trigger another call of this fn
+               ;; the above `when` to avoid going forward
+               (go
+                 ;; another check on the value of this atom to ensure no 2-go-threads running for same value
+                 (when (seq @r)
+                   (reset! r [])
+                   (doseq [{:keys [ops repo]} n]
+                     (prn ::add-ops ops)
+                     (<! (op/<add-ops! repo ops))))))))
+
 (defn- generate-rtc-ops
   [repo db-before db-after datoms]
   (let [same-entity-datoms-coll (->> datoms
@@ -145,7 +162,9 @@
                                      (group-by first)
                                      vals)
         ops (mapcat (partial entity-datoms=>ops repo db-before db-after) same-entity-datoms-coll)]
-    (op/<add-ops! repo ops)))
+    (when (seq ops)
+      (swap! *ops-pending-to-store conj {:ops ops :repo repo})
+      (prn :*ops-pending-to-store  @*ops-pending-to-store))))
 
 
 (defn listen-db-to-generate-ops

+ 8 - 7
src/main/frontend/db/rtc/ops_idb_store.cljs

@@ -38,13 +38,14 @@
               (do (idb-keyval/set key* (clj->js op) store)
                   (p/recur (inc key*) other-ops)))))))))
 
-(def ^:private add-ops-ch (async/chan 100))
-(async/go-loop []
-  (if-let [[repo ops] (async/<! add-ops-ch)]
-    (do (prn :add-ops ops)
-        (async/<! (p->c (<add-ops*! repo ops)))
-        (recur))
-    (recur)))
+(defonce ^:private add-ops-ch (async/chan 100))
+(defonce #_:clj-kondo/ignore _add-ops-loop
+  (async/go-loop []
+    (if-let [[repo ops] (async/<! add-ops-ch)]
+      (do (prn :add-ops ops)
+          (async/<! (p->c (<add-ops*! repo ops)))
+          (recur))
+      (recur))))
 
 (defn <add-ops!
   [repo ops]