Jelajahi Sumber

enhance(rtc): validate graph skeleton data when start-rtc

rcmerci 1 tahun lalu
induk
melakukan
9ab79bfe38

+ 15 - 24
src/main/frontend/worker/rtc/client.cljs

@@ -8,31 +8,17 @@
             [frontend.worker.rtc.exception :as r.ex]
             [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.rtc.remote-update :as r.remote-update]
-            [frontend.worker.rtc.ws :as ws]
+            [frontend.worker.rtc.skeleton :as r.skeleton]
+            [frontend.worker.rtc.ws-util :as ws-util]
             [missionary.core :as m]))
 
-(defn- handle-remote-ex
-  [resp]
-  (if-let [e ({:graph-not-exist r.ex/ex-remote-graph-not-exist
-               :graph-not-ready r.ex/ex-remote-graph-not-ready}
-              (:type (:ex-data resp)))]
-    (throw e)
-    resp))
-
-(defn send&recv
-  "Return a task: throw exception if recv ex-data response"
-  [get-ws-create-task message]
-  (m/sp
-    (let [ws (m/? get-ws-create-task)]
-      (handle-remote-ex (m/? (ws/send&recv ws message))))))
-
 (defn- register-graph-updates
   [get-ws-create-task graph-uuid repo]
   (m/sp
     (try
       (let [{:keys [t]}
-            (m/? (send&recv get-ws-create-task {:action "register-graph-updates"
-                                                :graph-uuid graph-uuid}))]
+            (m/? (ws-util/send&recv get-ws-create-task {:action "register-graph-updates"
+                                                        :graph-uuid graph-uuid}))]
         (when-not (op-mem-layer/get-local-tx repo)
           (op-mem-layer/update-local-tx! repo t)))
       (catch :default e
@@ -43,8 +29,8 @@
 (defn- ensure-register-graph-updates*
   "Return a task: get or create a mws(missionary wrapped websocket).
   see also `ws/get-mws-create`.
-  But ensure `register-graph-updates` has been sent"
-  [get-ws-create-task graph-uuid repo]
+  But ensure `register-graph-updates` and `calibrate-graph-skeleton` has been sent"
+  [get-ws-create-task graph-uuid repo conn *last-calibrate-t]
   (assert (some? graph-uuid))
   (let [*sent (atom {}) ;; ws->bool
         ]
@@ -56,6 +42,11 @@
           (m/? (c.m/backoff
                 (take 5 (drop 2 c.m/delays))     ;retry 5 times if remote-graph is creating (4000 8000 16000 32000 64000)
                 (register-graph-updates get-ws-create-task graph-uuid repo)))
+          (let [t (op-mem-layer/get-local-tx repo)]
+            (when (or (nil? @*last-calibrate-t)
+                      (< 500 (- t @*last-calibrate-t)))
+              (m/? (r.skeleton/new-task--calibrate-graph-skeleton get-ws-create-task graph-uuid conn t))
+              (reset! *last-calibrate-t t)))
           (swap! *sent assoc ws true))
         ws))))
 
@@ -332,8 +323,8 @@
                                  (sort-remote-ops
                                   remote-ops))]
         (let [local-tx (op-mem-layer/get-local-tx repo)
-              r (m/? (send&recv get-ws-create-task {:action "apply-ops" :graph-uuid graph-uuid
-                                                    :ops ops-for-remote :t-before (or local-tx 1)}))]
+              r (m/? (ws-util/send&recv get-ws-create-task {:action "apply-ops" :graph-uuid graph-uuid
+                                                            :ops ops-for-remote :t-before (or local-tx 1)}))]
           (if-let [remote-ex (:ex-data r)]
             (do (add-log-fn remote-ex)
                 (case (:type remote-ex)
@@ -368,8 +359,8 @@
   [repo conn graph-uuid date-formatter get-ws-create-task add-log-fn]
   (m/sp
     (let [local-tx (op-mem-layer/get-local-tx repo)
-          r (m/? (send&recv get-ws-create-task {:action "apply-ops" :graph-uuid graph-uuid
-                                                :ops [] :t-before (or local-tx 1)}))]
+          r (m/? (ws-util/send&recv get-ws-create-task {:action "apply-ops" :graph-uuid graph-uuid
+                                                        :ops [] :t-before (or local-tx 1)}))]
       (if-let [remote-ex (:ex-data r)]
         (do (add-log-fn remote-ex)
             (case (:type remote-ex)

+ 16 - 1
src/main/frontend/worker/rtc/const.cljs

@@ -75,6 +75,7 @@
    [:t-before {:optional true} :int]
    [:failed-ops {:optional true} [:sequential to-ws-op-schema]]
    [:s3-presign-url {:optional true} :string]
+   [:diff-data {:optional true} [:map-of :keyword :any]]
    [:refed-blocks {:optional true}
     [:maybe
      [:sequential
@@ -208,7 +209,21 @@
                                  [:map
                                   [:asset-uuid :uuid]
                                   [:asset-name :string]]]]
-      [:delete {:optional true} [:sequential :uuid]]]]]))
+      [:delete {:optional true} [:sequential :uuid]]]]
+    ["calibrate-graph-skeleton"
+     [:map
+      [:req-id :string]
+      [:action :string]
+      [:graph-uuid :string]
+      [:t :int]
+      [:db-ident-blocks [:sequential
+                         [:map
+                          [:block/uuid :uuid]
+                          [:db/ident :keyword]
+                          [:block/parent {:optional true} :uuid]
+                          [:block/type {:optional true} [:set :string]]
+                          [:block/order {:optional true} :string]
+                          [:block/content {:optional true} :string]]]]]]]))
 (def data-to-ws-encoder (m/encoder data-to-ws-schema (mt/transformer
                                                       mt/string-transformer
                                                       (mt/key-transformer {:encode m/-keyword->string}))))

+ 35 - 31
src/main/frontend/worker/rtc/core.cljs

@@ -6,7 +6,9 @@
             [frontend.worker.rtc.full-upload-download-graph :as r.upload-download]
             [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.rtc.remote-update :as r.remote-update]
+            [frontend.worker.rtc.skeleton]
             [frontend.worker.rtc.ws :as ws]
+            [frontend.worker.rtc.ws-util :as ws-util]
             [frontend.worker.state :as worker-state]
             [frontend.worker.util :as worker-util]
             [goog.string :as gstring]
@@ -30,18 +32,18 @@
   "Return a flow: receive messages from ws, and filter messages with :req-id=`push-updates`."
   [get-ws-create-task]
   (m/ap
-   (loop []
-     (let [ws (m/? get-ws-create-task)
-           x (try
-               (m/?> (m/eduction
-                      (filter (fn [data] (= "push-updates" (:req-id data))))
-                      (map (fn [data] (prn :get-remote-updates (:t data)) data))
-                      (ws/recv-flow ws)))
-               (catch js/CloseEvent _
-                 sentinel))]
-       (if (identical? x sentinel)
-         (recur)
-         x)))))
+    (loop []
+      (let [ws (m/? get-ws-create-task)
+            x (try
+                (m/?> (m/eduction
+                       (filter (fn [data] (= "push-updates" (:req-id data))))
+                       (map (fn [data] (prn :get-remote-updates (:t data)) data))
+                       (ws/recv-flow ws)))
+                (catch js/CloseEvent _
+                  sentinel))]
+        (if (identical? x sentinel)
+          (recur)
+          x)))))
 
 (defn- create-local-updates-check-flow
   "Return a flow: emit if need to push local-updates"
@@ -136,11 +138,13 @@
   (let [ws-url              (or debug-ws-url (get-ws-url token))
         *auto-push?         (atom auto-push?)
         *log                (atom nil)
+        *last-calibrate-t   (atom nil)
         started-dfv         (m/dfv)
         add-log-fn          #(reset! *log [(js/Date.) %])
         {:keys [*current-ws get-ws-create-task]}
         (new-task--get-ws-create--memoized ws-url)
-        get-ws-create-task  (r.client/ensure-register-graph-updates get-ws-create-task graph-uuid repo)
+        get-ws-create-task  (r.client/ensure-register-graph-updates
+                             get-ws-create-task graph-uuid repo conn *last-calibrate-t)
         mixed-flow          (create-mixed-flow repo get-ws-create-task *auto-push?)]
     (assert (some? *current-ws))
     {:rtc-log-flow    (m/buffer 100 (m/watch *log))
@@ -230,7 +234,7 @@
   [token]
   (let [{:keys [get-ws-create-task]} (new-task--get-ws-create--memoized (get-ws-url token))]
     (m/join :graphs
-            (r.client/send&recv get-ws-create-task {:action "list-graphs"}))))
+            (ws-util/send&recv get-ws-create-task {:action "list-graphs"}))))
 
 (defn new-task--delete-graph
   "Return a task that return true if succeed"
@@ -238,8 +242,8 @@
   (let [{:keys [get-ws-create-task]} (new-task--get-ws-create--memoized (get-ws-url token))]
     (m/sp
       (let [{:keys [ex-data]}
-            (m/? (r.client/send&recv get-ws-create-task
-                                     {:action "delete-graph" :graph-uuid graph-uuid}))]
+            (m/? (ws-util/send&recv get-ws-create-task
+                                    {:action "delete-graph" :graph-uuid graph-uuid}))]
         (when ex-data (prn ::delete-graph-failed graph-uuid ex-data))
         (boolean (nil? ex-data))))))
 
@@ -248,26 +252,26 @@
   [token graph-uuid]
   (let [{:keys [get-ws-create-task]} (new-task--get-ws-create--memoized (get-ws-url token))]
     (m/join :users
-            (r.client/send&recv get-ws-create-task
-                                {:action "get-users-info" :graph-uuid graph-uuid}))))
+            (ws-util/send&recv get-ws-create-task
+                               {:action "get-users-info" :graph-uuid graph-uuid}))))
 
 (defn new-task--grant-access-to-others
   [token graph-uuid & {:keys [target-user-uuids target-user-emails]}]
   (let [{:keys [get-ws-create-task]} (new-task--get-ws-create--memoized (get-ws-url token))]
-    (r.client/send&recv get-ws-create-task
-                        (cond-> {:action "grant-access"
-                                 :graph-uuid graph-uuid}
-                          target-user-uuids (assoc :target-user-uuids target-user-uuids)
-                          target-user-emails (assoc :target-user-emails target-user-emails)))))
+    (ws-util/send&recv get-ws-create-task
+                       (cond-> {:action "grant-access"
+                                :graph-uuid graph-uuid}
+                         target-user-uuids (assoc :target-user-uuids target-user-uuids)
+                         target-user-emails (assoc :target-user-emails target-user-emails)))))
 
 (defn new-task--get-block-content-versions
   "Return a task that return map [:ex-data :ex-message :versions]"
   [token graph-uuid block-uuid]
   (let [{:keys [get-ws-create-task]} (new-task--get-ws-create--memoized (get-ws-url token))]
-    (m/join :versions (r.client/send&recv get-ws-create-task
-                                          {:action "query-block-content-versions"
-                                           :block-uuids [block-uuid]
-                                           :graph-uuid graph-uuid}))))
+    (m/join :versions (ws-util/send&recv get-ws-create-task
+                                         {:action "query-block-content-versions"
+                                          :block-uuids [block-uuid]
+                                          :graph-uuid graph-uuid}))))
 
 (defn- create-get-debug-state-flow
   []
@@ -300,14 +304,14 @@
   [token graph-uuid]
   (let [{:keys [get-ws-create-task]} (new-task--get-ws-create--memoized (get-ws-url token))]
     (m/join #(select-keys % [:snapshot-uuid :graph-uuid])
-            (r.client/send&recv get-ws-create-task {:action "snapshot-graph"
-                                                    :graph-uuid graph-uuid}))))
+            (ws-util/send&recv get-ws-create-task {:action "snapshot-graph"
+                                                   :graph-uuid graph-uuid}))))
 (defn new-task--snapshot-list
   [token graph-uuid]
   (let [{:keys [get-ws-create-task]} (new-task--get-ws-create--memoized (get-ws-url token))]
     (m/join :snapshot-list
-            (r.client/send&recv get-ws-create-task {:action "snapshot-list"
-                                                    :graph-uuid graph-uuid}))))
+            (ws-util/send&recv get-ws-create-task {:action "snapshot-list"
+                                                   :graph-uuid graph-uuid}))))
 
 (defn new-task--upload-graph
   [token repo remote-graph-name]

+ 2 - 0
src/main/frontend/worker/rtc/exception.cljs

@@ -27,6 +27,8 @@ Trying to start rtc loop but there's already one running, need to cancel that on
 When response from remote is too huge(> 32KB),
 the server will put it to s3 and return its presigned-url to clients.")
 
+(sr/defkeyword :rtc.exception/different-graph-skeleton
+  "remote graph skeleton data is different from local's.")
 
 (def ex-remote-graph-not-exist
   (ex-info "remote graph not exist" {:type :rtc.exception/remote-graph-not-exist}))

+ 11 - 11
src/main/frontend/worker/rtc/full_upload_download_graph.cljs

@@ -5,8 +5,8 @@
             [clojure.set :as set]
             [datascript.core :as d]
             [frontend.common.missionary-util :as c.m]
-            [frontend.worker.rtc.client :as r.client]
             [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
+            [frontend.worker.rtc.ws-util :as ws-util]
             [frontend.worker.state :as worker-state]
             [frontend.worker.util :as worker-util]
             [logseq.db :as ldb]
@@ -113,15 +113,15 @@
           (m/?
            (m/join
             vector
-            (r.client/send&recv get-ws-create-task {:action "presign-put-temp-s3-obj"})
+            (ws-util/send&recv get-ws-create-task {:action "presign-put-temp-s3-obj"})
             (m/sp
               (let [all-blocks (export-as-blocks @conn)]
                 (ldb/write-transit-str all-blocks)))))]
       (m/? (c.m/<! (http/put url {:body all-blocks-str :with-credentials? false})))
       (let [upload-resp
-            (m/? (r.client/send&recv get-ws-create-task {:action "upload-graph"
-                                                         :s3-key key
-                                                         :graph-name remote-graph-name}))]
+            (m/? (ws-util/send&recv get-ws-create-task {:action "upload-graph"
+                                                        :s3-key key
+                                                        :graph-name remote-graph-name}))]
         (if-let [graph-uuid (:graph-uuid upload-resp)]
           (let [^js worker-obj (:worker/object @worker-state/*state)]
             (ldb/transact! conn
@@ -257,14 +257,14 @@
 (defn new-task--request-download-graph
   [get-ws-create-task graph-uuid]
   (m/join :download-info-uuid
-          (r.client/send&recv get-ws-create-task {:action "download-graph"
-                                                  :graph-uuid graph-uuid})))
+          (ws-util/send&recv get-ws-create-task {:action "download-graph"
+                                                 :graph-uuid graph-uuid})))
 
 (defn new-task--download-info-list
   [get-ws-create-task graph-uuid]
   (m/join :download-info-list
-          (r.client/send&recv get-ws-create-task {:action "download-info-list"
-                                                  :graph-uuid graph-uuid})))
+          (ws-util/send&recv get-ws-create-task {:action "download-info-list"
+                                                 :graph-uuid graph-uuid})))
 
 (defn new-task--wait-download-info-ready
   [get-ws-create-task download-info-uuid graph-uuid timeout-ms]
@@ -273,8 +273,8 @@
      (loop []
        (m/? (m/sleep 3000))
        (let [{:keys [download-info-list]}
-             (m/? (r.client/send&recv get-ws-create-task {:action "download-info-list"
-                                                          :graph-uuid graph-uuid}))]
+             (m/? (ws-util/send&recv get-ws-create-task {:action "download-info-list"
+                                                         :graph-uuid graph-uuid}))]
          (if-let [found-download-info
                   (some
                    (fn [download-info]

+ 43 - 0
src/main/frontend/worker/rtc/skeleton.cljs

@@ -0,0 +1,43 @@
+(ns frontend.worker.rtc.skeleton
+  "Validate skeleton data between server and client"
+  (:require [datascript.core :as d]
+            [frontend.worker.rtc.ws-util :as ws-util]
+            [missionary.core :as m]))
+
+(defn- get-all-db-ident-blocks
+  [db]
+  (let [db-ident-coll (map :v (d/datoms db :avet :db/ident))
+        db-ident-blocks (->> db-ident-coll
+                             (d/pull-many db [:db/ident
+                                              :block/uuid
+                                              {:block/parent [:block/uuid]}
+                                              :block/order
+                                              :block/type
+                                              :block/content])
+                             (filter :block/uuid))]
+    (map
+     (fn [block]
+       (cond-> block
+         (:block/parent block) (update :block/parent :block/uuid)))
+     db-ident-blocks)))
+
+(defn new-task--calibrate-graph-skeleton
+  [get-ws-create-task graph-uuid conn t]
+  (m/sp
+    (let [db @conn
+          db-ident-blocks (get-all-db-ident-blocks db)
+          r (m/? (ws-util/send&recv get-ws-create-task
+                                    {:action "calibrate-graph-skeleton"
+                                     :graph-uuid graph-uuid
+                                     :t t
+                                     :db-ident-blocks db-ident-blocks}))]
+      (if-let [remote-ex (:ex-data r)]
+        (case (:type remote-ex)
+          :t-not-matched nil
+        ;;else
+          (throw (ex-info "Unavailable" {:remote-ex remote-ex})))
+        (let [diff-data (:diff-data r)]
+          (when (seq diff-data)
+            (throw (ex-info "different graph skeleton between server and client"
+                            {:type :rtc.exception/different-graph-skeleton
+                             :diff-data diff-data}))))))))

+ 20 - 0
src/main/frontend/worker/rtc/ws_util.cljs

@@ -0,0 +1,20 @@
+(ns frontend.worker.rtc.ws-util
+  "Add RTC related logic to the function based on ws."
+  (:require [missionary.core :as m]
+            [frontend.worker.rtc.exception :as r.ex]
+            [frontend.worker.rtc.ws :as ws]))
+
+(defn- handle-remote-ex
+  [resp]
+  (if-let [e ({:graph-not-exist r.ex/ex-remote-graph-not-exist
+               :graph-not-ready r.ex/ex-remote-graph-not-ready}
+              (:type (:ex-data resp)))]
+    (throw e)
+    resp))
+
+(defn send&recv
+  "Return a task: throw exception if recv ex-data response"
+  [get-ws-create-task message]
+  (m/sp
+    (let [ws (m/? get-ws-create-task)]
+      (handle-remote-ex (m/? (ws/send&recv ws message))))))