Kaynağa Gözat

enhance(rtc): several fixes&enhancements for rtc:

- incorrect use of m/?> (with recur) in get-remote-updates
- sort graph-list in debug-ui
- coerce data when get data from s3
rcmerci 1 yıl önce
ebeveyn
işleme
b6b4979fb4

+ 1 - 1
src/main/frontend/db/rtc/debug_ui.cljs

@@ -175,7 +175,7 @@
           {:placeholder "Select a graph-uuid"}))
         (shui/select-content
          (shui/select-group
-          (for [{:keys [graph-uuid graph-status]} (:remote-graphs state)]
+          (for [{:keys [graph-uuid graph-status]} (sort-by :graph-uuid (:remote-graphs state))]
             (shui/select-item {:value graph-uuid :disabled (some? graph-status)} graph-uuid)))))
 
        [:b "+"]

+ 13 - 13
src/main/frontend/worker/rtc/core.cljs

@@ -29,19 +29,19 @@
 (defn- get-remote-updates
   "Return a flow: receive messages from ws, and filter messages with :req-id=`push-updates`."
   [get-ws-create-task]
-  (m/stream
-   (m/ap
-     (loop []
-       (let [ws (m/? get-ws-create-task)
-             x (try
-                 (m/?> (m/eduction
-                        (filter (fn [data] (= "push-updates" (:req-id data))))
-                        (ws/recv-flow ws)))
-                 (catch js/CloseEvent _
-                   sentinel))]
-         (if (identical? x sentinel)
-           (recur)
-           (m/amb x (recur))))))))
+  (m/ap
+   (loop []
+     (let [ws (m/? get-ws-create-task)
+           x (try
+               (m/?> (m/eduction
+                      (filter (fn [data] (= "push-updates" (:req-id data))))
+                      (map (fn [data] (prn :get-remote-updates (:t data)) data))
+                      (ws/recv-flow ws)))
+               (catch js/CloseEvent _
+                 sentinel))]
+       (if (identical? x sentinel)
+         (recur)
+         x)))))
 
 (defn- create-local-updates-check-flow
   "Return a flow: emit if need to push local-updates"

+ 14 - 25
src/main/frontend/worker/rtc/remote_update.cljs

@@ -6,7 +6,6 @@
             [frontend.schema-register :as sr]
             [frontend.worker.batch-tx :as batch-tx]
             [frontend.worker.handler.page :as worker-page]
-            [frontend.worker.handler.page.db-based.rename :as worker-page-rename]
             [frontend.worker.rtc.const :as rtc-const]
             [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.state :as worker-state]
@@ -345,7 +344,7 @@
           [[:db/add e k remote-v*]]))
 
       [false true]
-      (let [_ (assert (coll? remote-v) {:remote-v remote-v :a k :e e})
+      (let [_ (assert (or (nil? remote-v) (coll? remote-v)) {:remote-v remote-v :a k :e e})
             remote-v* (set (map ldb/read-transit-str remote-v))
             [local-only remote-only] (data/diff (set local-v) remote-v*)]
         (cond-> []
@@ -389,18 +388,19 @@
                                          ;; else
                                          v)])))
                              (into {}))]
-    (diff-block-map->tx-data db (:db/id ent) local-block-map op-value)))
+    (diff-block-map->tx-data db (:db/id ent) local-block-map (select-keys op-value watched-attrs))))
 
 (defn- update-block-attrs
-  [repo conn block-uuid {:keys [parents _content] :as op-value}]
-  (let [first-remote-parent (first parents)
-        local-parent (d/entity @conn [:block/uuid first-remote-parent])
-        whiteboard-page-block? (whiteboard-page-block? local-parent)]
-    (if whiteboard-page-block?
-      (upsert-whiteboard-block repo conn op-value)
-      (when-let [tx-data (seq (remote-op-value->tx-data @conn block-uuid op-value))]
-        (ldb/transact! conn tx-data {:persist-op? false
-                                     :gen-undo-ops? false})))))
+  [repo conn block-uuid {:keys [parents] :as op-value}]
+  (when (some (fn [k] (= "block" (namespace k))) (keys op-value)) ; there exists some :block/xxx attrs
+    (let [first-remote-parent (first parents)
+          local-parent (d/entity @conn [:block/uuid first-remote-parent])
+          whiteboard-page-block? (whiteboard-page-block? local-parent)]
+      (if whiteboard-page-block?
+        (upsert-whiteboard-block repo conn op-value)
+        (when-let [tx-data (seq (remote-op-value->tx-data @conn block-uuid op-value))]
+          (ldb/transact! conn tx-data {:persist-op? false
+                                       :gen-undo-ops? false}))))))
 
 (defn- apply-remote-update-ops
   [repo conn update-ops]
@@ -434,20 +434,9 @@
     (doseq [{:keys [self _page-name]
              original-name :block/original-name
              :as op-value} update-page-ops]
-      (let [old-page-original-name (:block/original-name (d/entity @conn [:block/uuid self]))
-            create-opts {:create-first-block? false
+      (let [create-opts {:create-first-block? false
                          :uuid self :persist-op? false}]
-        (cond
-          ;; a client-page has same uuid as remote but different page-names,
-          ;; then we need to rename the client-page to remote-page-name
-          (and old-page-original-name (not= old-page-original-name original-name))
-          (worker-page-rename/rename! repo conn config old-page-original-name original-name {:persist-op? false})
-
-          ;; no such page, name=remote-page-name, OR, uuid=remote-block-uuid
-          ;; just create-page
-          :else
-          (worker-page/create! repo conn config original-name create-opts))
-
+        (worker-page/create! repo conn config (ldb/read-transit-str original-name) create-opts)
         (update-block-attrs repo conn self op-value)))))
 
 (defn apply-remote-update

+ 8 - 7
src/main/frontend/worker/rtc/ws.cljs

@@ -127,18 +127,18 @@
     (.close (:raw-ws m-ws))))
 
 (defn send
-  "Returns a task: send message and return mws"
+  "Returns a task: send message"
   [mws message]
   (m/sp
     (let [decoded-message (rtc-const/data-to-ws-coercer message)
           message-str (js/JSON.stringify (clj->js (rtc-const/data-to-ws-encoder decoded-message)))]
-      (m/? ((:send mws) message-str))
-      mws)))
+      (m/? ((:send mws) message-str)))))
 
 (defn recv-flow
   [m-ws]
   (m/eduction
    (map #(js->clj (js/JSON.parse %) :keywordize-keys true))
+   ;; (map (fn [x] (prn :recv (:req-id x) (tc/to-string (t/now))) x))
    (map rtc-const/data-from-ws-coercer)
    (:recv-flow m-ws)))
 
@@ -149,8 +149,8 @@
   {:pre [(pos-int? timeout-ms)
          (some? (:req-id message))]}
   (m/sp
-    (let [mws (m/? (send mws message))
-          req-id (:req-id message)
+    (m/? (send mws message))
+    (let [req-id (:req-id message)
           result (m/?
                   (m/timeout
                    (m/reduce
@@ -160,7 +160,8 @@
                     (recv-flow mws))
                    timeout-ms))]
       (when-not result
-        (throw (ex-info (str "recv timeout (" timeout-ms "ms)") {:missionary/retry true})))
+        (throw (ex-info (str "recv timeout (" timeout-ms "ms)") {:missionary/retry true
+                                                                 :message message})))
       result)))
 
 (defn send&recv
@@ -176,7 +177,7 @@
       (if-let [s3-presign-url (:s3-presign-url resp)]
         (let [{:keys [status body]} (m/? (c.m/<! (http/get s3-presign-url {:with-credentials? false})))]
           (if (http/unexceptional-status? status)
-            (js->clj (js/JSON.parse body) :keywordize-keys true)
+            (rtc-const/data-from-ws-coercer (js->clj (js/JSON.parse body) :keywordize-keys true))
             {:req-id req-id
              :ex-message "get s3 object failed"
              :ex-data {:type :rtc.exception/get-s3-object-failed :status status :body body}}))