Browse Source

enhance(rtc): support incrementally update tx-range from remote

When there is a significant tx gap between the local-graph and the
remote-graph, updating all tx-diffs at once causes excessive server
load.
We now use an incremental update strategy,
limiting the tx-range to a maximum of 2,000 per batch
rcmerci 1 week ago
parent
commit
f17d562ca2

+ 29 - 19
src/main/frontend/worker/rtc/client.cljs

@@ -34,7 +34,7 @@
             (throw (ex-info (:ex-message apply-ops-resp) (:ex-data apply-ops-resp)))
             ;;else
             (throw (ex-info "Unavailable3" {:remote-ex remote-ex}))))
-      (do (assert (pos? (:t apply-ops-resp)) apply-ops-resp)
+      (do (assert (and (pos? (:t apply-ops-resp)) (pos? (:t-query-end apply-ops-resp))) apply-ops-resp)
           (m/?
            (r.remote-update/task--apply-remote-update
             graph-uuid repo conn date-formatter {:type :remote-update :value apply-ops-resp} aes-key add-log-fn))))))
@@ -46,15 +46,19 @@
           get-graph-skeleton? (or (nil? @*last-calibrate-t)
                                   (< 500 (- t-before @*last-calibrate-t)))]
       (try
-        (let [{remote-t :t
+        (let [{_remote-t :t
+               remote-t-query-end :t-query-end
                server-schema-version :server-schema-version
                server-builtin-db-idents :server-builtin-db-idents
                :as resp}
-              (m/? (ws-util/send&recv get-ws-create-task {:action "init-request"
-                                                          :graph-uuid graph-uuid
-                                                          :schema-version (str major-schema-version)
-                                                          :t-before t-before
-                                                          :get-graph-skeleton get-graph-skeleton?}))]
+              (m/? (ws-util/send&recv get-ws-create-task
+                                      {:action "init-request"
+                                       :graph-uuid graph-uuid
+                                       :schema-version (str major-schema-version)
+                                       :api-version "20251124"
+                                       :t-before t-before
+                                       :get-graph-skeleton get-graph-skeleton?}
+                                      :timeout-ms 30000))]
           (if-let [remote-ex (:ex-data resp)]
             (do
               (add-log-fn :rtc.log/init-request remote-ex)
@@ -66,11 +70,11 @@
             (do
               (when server-schema-version
                 (reset! *server-schema-version server-schema-version)
-                (reset! *last-calibrate-t remote-t))
-              (when remote-t
-                (rtc-log-and-state/update-remote-t graph-uuid remote-t)
+                (reset! *last-calibrate-t remote-t-query-end))
+              (when remote-t-query-end
+                (rtc-log-and-state/update-remote-t graph-uuid remote-t-query-end)
                 (when (not t-before)
-                  (client-op/update-local-tx repo remote-t)))
+                  (client-op/update-local-tx repo remote-t-query-end)))
               (when (and server-schema-version server-builtin-db-idents)
                 (r.skeleton/calibrate-graph-skeleton server-schema-version server-builtin-db-idents @conn))
               resp)))
@@ -488,10 +492,13 @@
                                 ops-for-remote)
               r (try
                   (let [message (cond-> {:action "apply-ops"
-                                         :graph-uuid graph-uuid :schema-version (str major-schema-version)
-                                         :ops ops-for-remote* :t-before local-tx}
+                                         :graph-uuid graph-uuid
+                                         :schema-version (str major-schema-version)
+                                         :api-version "20251124"
+                                         :ops ops-for-remote*
+                                         :t-before local-tx}
                                   (true? @*remote-profile?) (assoc :profile true))
-                        r (m/? (ws-util/send&recv get-ws-create-task message))]
+                        r (m/? (ws-util/send&recv get-ws-create-task message :timeout-ms 30000))]
                     (r.throttle/add-rtc-api-call-record! message)
                     r)
                   (catch :default e
@@ -518,19 +525,22 @@
                   (do (rollback repo block-ops-map-coll update-kv-value-ops-map-coll rename-db-ident-ops-map-coll)
                       (throw (ex-info "Unavailable1" {:remote-ex remote-ex})))))
 
-            (do (assert (pos? (:t r)) r)
+            (do (assert (and (pos? (:t r)) (pos? (:t-query-end r))) r)
                 (m/?
                  (r.remote-update/task--apply-remote-update
                   graph-uuid repo conn date-formatter {:type :remote-update :value r} aes-key add-log-fn))
-                (add-log-fn :rtc.log/push-local-update {:remote-t (:t r)}))))))))
+                (add-log-fn :rtc.log/push-local-update {:remote-t (:t r) :remote-t-query-end (:t-query-end r)}))))))))
 
 (defn new-task--pull-remote-data
   [repo conn graph-uuid major-schema-version date-formatter get-ws-create-task aes-key add-log-fn]
   (m/sp
     (let [local-tx (client-op/get-local-tx repo)
           message {:action "apply-ops"
-                   :graph-uuid graph-uuid :schema-version (str major-schema-version)
-                   :ops [] :t-before (or local-tx 1)}
-          r (m/? (ws-util/send&recv get-ws-create-task message))]
+                   :graph-uuid graph-uuid
+                   :schema-version (str major-schema-version)
+                   :api-version "20251124"
+                   :ops []
+                   :t-before local-tx}
+          r (m/? (ws-util/send&recv get-ws-create-task message :timeout-ms 30000))]
       (r.throttle/add-rtc-api-call-record! message)
       (m/? (task--apply-remote-updates-from-apply-ops r graph-uuid repo conn date-formatter aes-key add-log-fn)))))

+ 12 - 6
src/main/frontend/worker/rtc/core.cljs

@@ -254,21 +254,27 @@
            (let [event (m/?> mixed-flow)]
              (case (:type event)
                (:remote-update :remote-asset-block-update)
-
                (try
                  (m/? (r.remote-update/task--apply-remote-update
                        graph-uuid repo conn date-formatter event @*aes-key add-log-fn))
                  (catch :default e
-                   (if (= ::r.remote-update/need-pull-remote-data (:type (ex-data e)))
+                   (if (= :rtc.exception/local-graph-too-old (:type (ex-data e)))
                      (m/? (r.client/new-task--pull-remote-data
                            repo conn graph-uuid major-schema-version date-formatter get-ws-create-task @*aes-key
                            add-log-fn))
-                     (throw (r.ex/e->ex-info e)))))
+                     (throw e))))
 
                :local-update-check
-               (m/? (r.client/new-task--push-local-ops
-                     repo conn graph-uuid major-schema-version date-formatter
-                     get-ws-create-task *remote-profile? @*aes-key add-log-fn))
+               (try
+                 (m/? (r.client/new-task--push-local-ops
+                       repo conn graph-uuid major-schema-version date-formatter
+                       get-ws-create-task *remote-profile? @*aes-key add-log-fn))
+                 (catch :default e
+                   (if (= :rtc.exception/local-graph-too-old (:type (ex-data e)))
+                     (m/? (r.client/new-task--pull-remote-data
+                           repo conn graph-uuid major-schema-version date-formatter get-ws-create-task @*aes-key
+                           add-log-fn))
+                     (throw e))))
 
                :online-users-updated
                (reset! *online-users (:online-users (:value event)))

+ 3 - 15
src/main/frontend/worker/rtc/exception.cljs

@@ -20,6 +20,9 @@ Trying to start rtc loop but there's already one running, need to cancel that on
 graph doesn't have :logseq.kv/remote-schema-version value"}
   :rtc.exception/major-schema-version-mismatched {:doc "Local exception.
 local-schema-version, remote-schema-version, app-schema-version are not equal, cannot start rtc"}
+  :rtc.exception/local-graph-too-old {:doc "Local exception.
+Local graph's tx is too old, need to pull earlier remote-data first"}
+
   :rtc.exception/get-s3-object-failed {:doc "Failed to fetch response from s3.
 When response from remote is too huge(> 32KB),
 the server will put it to s3 and return its presigned-url to clients."}
@@ -33,15 +36,6 @@ the server will put it to s3 and return its presigned-url to clients."}
   :rtc.exception/not-found-user-rsa-key-pair {:doc "user rsa-key-pair not found"}
   :rtc.exception/not-found-graph-aes-key {:doc "graph aes-key not found"})
 
-(def ex-ws-already-disconnected
-  (ex-info "websocket conn is already disconnected" {:type :rtc.exception/ws-already-disconnected}))
-
-(def ex-remote-graph-not-exist
-  (ex-info "remote graph not exist" {:type :rtc.exception/remote-graph-not-exist}))
-
-(def ex-remote-graph-not-ready
-  (ex-info "remote graph still creating" {:type :rtc.exception/remote-graph-not-ready}))
-
 (def ex-remote-graph-lock-missing
   (ex-info "remote graph lock missing(server internal error)"
            {:type :rtc.exception/remote-graph-lock-missing}))
@@ -49,12 +43,6 @@ the server will put it to s3 and return its presigned-url to clients."}
 (def ex-local-not-rtc-graph
   (ex-info "RTC is not supported for this local-graph" {:type :rtc.exception/not-rtc-graph}))
 
-(def ex-bad-request-body
-  (ex-info "bad request body" {:type :rtc.exception/bad-request-body}))
-
-(def ex-not-allowed
-  (ex-info "not allowed" {:type :rtc.exception/not-allowed}))
-
 (def ex-unknown-server-error
   (ex-info "Unknown server error" {:type :rtc.exception/unknown-server-error}))
 

+ 3 - 0
src/main/frontend/worker/rtc/malli_schema.cljs

@@ -132,6 +132,7 @@
   [:map
    [:t :int]
    [:t-before :int]
+   [:t-query-end :int]
    [:affected-blocks
     [:map-of :uuid
      [:multi {:dispatch :op :decode/string #(update % :op keyword)}
@@ -282,6 +283,7 @@
       ["init-request"
        [:map
         [:graph-uuid :uuid]
+        [:api-version :string]
         [:schema-version db-schema/major-schema-version-string-schema]
         [:t-before :int]
         [:get-graph-skeleton :boolean]]]
@@ -294,6 +296,7 @@
         [:map
          [:req-id :string]
          [:action :string]
+         [:api-version :string]
          [:profile {:optional true} :boolean]
          [:graph-uuid :uuid]
          [:schema-version db-schema/major-schema-version-string-schema]

+ 32 - 30
src/main/frontend/worker/rtc/remote_update.cljs

@@ -16,7 +16,6 @@
             [frontend.worker.state :as worker-state]
             [lambdaisland.glogi :as log]
             [logseq.clj-fractional-indexing :as index]
-            [logseq.common.defkeywords :refer [defkeywords]]
             [logseq.common.util :as common-util]
             [logseq.db :as ldb]
             [logseq.db.common.property-util :as db-property-util]
@@ -27,11 +26,6 @@
             [logseq.outliner.transaction :as outliner-tx]
             [missionary.core :as m]))
 
-(defkeywords
-  ::need-pull-remote-data {:doc "
-remote-update's :remote-t-before > :local-tx,
-so need to pull earlier remote-data from websocket."})
-
 (defmulti ^:private transact-db! (fn [action & _args] action))
 
 (defn- block-reuse-db-id
@@ -607,32 +601,33 @@ so need to pull earlier remote-data from websocket."})
   [aes-key encrypt-attr-set remote-update-data]
   (assert aes-key)
   (m/sp
-   (let [{affected-blocks-map :affected-blocks refed-blocks :refed-blocks} remote-update-data
-         affected-blocks-map'
-         (loop [[[block-uuid affected-block] & rest-affected-blocks] affected-blocks-map
-                affected-blocks-map-result {}]
-           (if-not block-uuid
-             affected-blocks-map-result
-             (let [affected-block' (c.m/<? (crypt/<decrypt-map aes-key encrypt-attr-set affected-block))]
-               (recur rest-affected-blocks (assoc affected-blocks-map-result block-uuid affected-block')))))
-         refed-blocks'
-         (loop [[refed-block & rest-refed-blocks] refed-blocks
-                refed-blocks-result []]
-           (if-not refed-block
-             refed-blocks-result
-             (let [refed-block' (c.m/<? (crypt/<decrypt-map aes-key encrypt-attr-set refed-block))]
-               (recur rest-refed-blocks (conj refed-blocks-result refed-block')))))]
-     (assoc remote-update-data
-            :affected-blocks affected-blocks-map'
-            :refed-blocks refed-blocks'))))
+    (let [{affected-blocks-map :affected-blocks refed-blocks :refed-blocks} remote-update-data
+          affected-blocks-map'
+          (loop [[[block-uuid affected-block] & rest-affected-blocks] affected-blocks-map
+                 affected-blocks-map-result {}]
+            (if-not block-uuid
+              affected-blocks-map-result
+              (let [affected-block' (c.m/<? (crypt/<decrypt-map aes-key encrypt-attr-set affected-block))]
+                (recur rest-affected-blocks (assoc affected-blocks-map-result block-uuid affected-block')))))
+          refed-blocks'
+          (loop [[refed-block & rest-refed-blocks] refed-blocks
+                 refed-blocks-result []]
+            (if-not refed-block
+              refed-blocks-result
+              (let [refed-block' (c.m/<? (crypt/<decrypt-map aes-key encrypt-attr-set refed-block))]
+                (recur rest-refed-blocks (conj refed-blocks-result refed-block')))))]
+      (assoc remote-update-data
+             :affected-blocks affected-blocks-map'
+             :refed-blocks refed-blocks'))))
 
 (defn apply-remote-update-check
   "If the check passes, return true"
   [repo remote-update-event add-log-fn]
   (let [remote-update-data (:value remote-update-event)]
     (assert (rtc-schema/data-from-ws-validator remote-update-data) remote-update-data)
-    (let [remote-t (:t remote-update-data)
-          remote-t-before (:t-before remote-update-data)
+    (let [{remote-latest-t :t
+           remote-t-before :t-before
+           remote-t :t-query-end} remote-update-data
           local-tx (client-op/get-local-tx repo)]
       (cond
         (not (and (pos? remote-t)
@@ -640,21 +635,28 @@ so need to pull earlier remote-data from websocket."})
         (throw (ex-info "invalid remote-data" {:data remote-update-data}))
 
         (<= remote-t local-tx)
-        (do (add-log-fn :rtc.log/apply-remote-update {:sub-type :skip :remote-t remote-t :local-t local-tx})
+        (do (add-log-fn :rtc.log/apply-remote-update
+                        {:sub-type :skip
+                         :remote-t remote-t
+                         :remote-latest-t remote-latest-t
+                         :local-t local-tx})
             false)
 
         (< local-tx remote-t-before)
         (do (add-log-fn :rtc.log/apply-remote-update {:sub-type :need-pull-remote-data
-                                                      :remote-t remote-t :local-t local-tx
+                                                      :remote-latest-t remote-latest-t
+                                                      :remote-t remote-t
+                                                      :local-t local-tx
                                                       :remote-t-before remote-t-before})
             (throw (ex-info "need pull earlier remote-data"
-                            {:type ::need-pull-remote-data
+                            {:type :rtc.exception/local-graph-too-old
                              :local-tx local-tx})))
 
         (<= remote-t-before local-tx remote-t) true
 
         :else (throw (ex-info "unreachable" {:remote-t remote-t
                                              :remote-t-before remote-t-before
+                                             :remote-latest-t remote-latest-t
                                              :local-t local-tx}))))))
 
 (defn task--apply-remote-update
@@ -668,7 +670,7 @@ so need to pull earlier remote-data from websocket."})
                                        aes-key rtc-const/encrypt-attr-set
                                        remote-update-data))
                                  remote-update-data)
-            remote-t (:t remote-update-data)
+            remote-t (:t-query-end remote-update-data)
             {affected-blocks-map :affected-blocks refed-blocks :refed-blocks} remote-update-data
             {:keys [remove-ops-map move-ops-map update-ops-map update-page-ops-map remove-page-ops-map]}
             (affected-blocks->diff-type-ops repo affected-blocks-map)

+ 21 - 16
src/main/frontend/worker/rtc/ws_util.cljs

@@ -3,7 +3,6 @@
   (:require [cljs-http-missionary.client :as http]
             [frontend.worker-common.util :as worker-util]
             [frontend.worker.rtc.db :as rtc-db]
-            [frontend.worker.rtc.exception :as r.ex]
             [frontend.worker.rtc.malli-schema :as rtc-schema]
             [frontend.worker.rtc.ws :as ws]
             [frontend.worker.state :as worker-state]
@@ -11,17 +10,26 @@
             [logseq.graph-parser.utf8 :as utf8]
             [missionary.core :as m]))
 
+(def ^:private remote-e-type->ex-info
+  {:ws-conn-already-disconnected
+   (ex-info "websocket conn is already disconnected" {:type :rtc.exception/ws-already-disconnected})
+   :graph-not-exist
+   (ex-info "remote graph not exist" {:type :rtc.exception/remote-graph-not-exist})
+   :graph-not-ready
+   (ex-info "remote graph still creating" {:type :rtc.exception/remote-graph-not-ready})
+   :bad-request-body
+   (ex-info "bad request body" {:type :rtc.exception/bad-request-body})
+   :not-allowed
+   (ex-info "not allowed" {:type :rtc.exception/not-allowed})
+   :client-graph-too-old
+   (ex-info "local graph too old" {:type :rtc.exception/local-graph-too-old})})
+
 (defn- handle-remote-ex
   [resp]
   (when (= :graph-not-exist (:type (:ex-data resp)))
     (rtc-db/remove-rtc-data-in-conn! (worker-state/get-current-repo))
     (worker-util/post-message :remote-graph-gone []))
-  (if-let [e ({:ws-conn-already-disconnected r.ex/ex-ws-already-disconnected
-               :graph-not-exist r.ex/ex-remote-graph-not-exist
-               :graph-not-ready r.ex/ex-remote-graph-not-ready
-               :bad-request-body r.ex/ex-bad-request-body
-               :not-allowed r.ex/ex-not-allowed}
-              (:type (:ex-data resp)))]
+  (if-let [e (get remote-e-type->ex-info (:type (:ex-data resp)))]
     (throw e)
     resp))
 
@@ -31,8 +39,9 @@
   {:pre [(= "apply-ops" (:action message))]}
   (m/sp
     (let [decoded-message (rtc-schema/data-to-ws-coercer (assoc message :req-id "temp-id"))
-          message-str (js/JSON.stringify (clj->js (select-keys (rtc-schema/data-to-ws-encoder decoded-message)
-                                                               ["graph-uuid" "ops" "t-before" "schema-version"])))
+          message-str (js/JSON.stringify
+                       (clj->js (select-keys (rtc-schema/data-to-ws-encoder decoded-message)
+                                             ["graph-uuid" "ops" "t-before" "schema-version" "api-version"])))
           len (.-length (utf8/encode message-str))]
       (when (< 100000 len)
         (let [{:keys [url key]} (m/? (ws/send&recv ws {:action "presign-put-temp-s3-obj"}))
@@ -46,16 +55,12 @@
   This function will attempt to reconnect and retry once after the ws closed(js/CloseEvent).
   For huge apply-ops request(>100KB),
   - upload its request message to s3 first,
-    then add `s3-key` key to request message map
-  For huge apply-ops request(> 400 ops)
-  - adjust its timeout to 20s"
-  [get-ws-create-task message]
+    then add `s3-key` key to request message map"
+  [get-ws-create-task message & {:keys [timeout-ms] :or {timeout-ms 10000}}]
   (let [task--helper
         (m/sp
           (let [ws (m/? get-ws-create-task)
-                opts (when (and (= "apply-ops" (:action message))
-                                (< 400 (count (:ops message))))
-                       {:timeout-ms 20000})
+                opts {:timeout-ms timeout-ms}
                 s3-key (when (= "apply-ops" (:action message))
                          (m/? (put-apply-ops-message-on-s3-if-too-huge ws message)))
                 message* (if s3-key