浏览代码

refactor(rtc): split core by adding ns remote, client, exception

and re-impl using missionary
rcmerci 1 年之前
父节点
当前提交
75f69ffb48

+ 42 - 17
deps/common/src/logseq/common/missionary_util.cljs

@@ -1,27 +1,52 @@
 (ns logseq.common.missionary-util
   "Utils based on missionary."
+  (:import [missionary Cancelled])
   (:require [missionary.core :as m]))
 
-
-
-(def ^:private retry-sentinel (js-obj))
-
 (def delays (reductions * 1000 (repeat 2)))
 
+(def ^:private retry-sentinel (js-obj))
 (defn backoff
   "Retry task when it throw exception `(get ex-data :missionary/retry)`"
   [delays task]
   (m/sp
-   (loop [[delay & delays] (seq delays)]
-     (let [r (try
-               (m/? task)
-               (catch :default e
-                 (if (and (some-> e ex-data :missionary/retry)
-                          (pos-int? delay))
-                   (do (m/? (m/sleep delay))
-                       (println :missionary/retry "after" delay "ms (" (ex-message e) ")")
-                       retry-sentinel)
-                   (throw e))))]
-       (if (identical? r retry-sentinel)
-         (recur delays)
-         r)))))
+    (loop [[delay & delays] (seq delays)]
+      (let [r (try
+                (m/? task)
+                (catch :default e
+                  (if (and (some-> e ex-data :missionary/retry)
+                           (pos-int? delay))
+                    (do (m/? (m/sleep delay))
+                        (println :missionary/retry "after" delay "ms (" (ex-message e) ")")
+                        retry-sentinel)
+                    (throw e))))]
+        (if (identical? r retry-sentinel)
+          (recur delays)
+          r)))))
+
+(defn mix
+  "Return a flow which is mixed by `flows`"
+  [& flows]
+  (m/ap (m/?> (m/?> (count flows) (m/seed flows)))))
+
+(defn clock
+  "Return a flow that emits `value` every `interval-ms`."
+  ([interval-ms]
+   (clock interval-ms nil))
+  ([interval-ms value]
+   (->>
+    (m/ap
+      (loop []
+        (m/amb
+         (m/? (m/sleep interval-ms value))
+         (recur))))
+    (m/reductions {} value)
+    (m/latest identity))))
+
+(defn debounce
+  [duration-ms flow]
+  (m/ap
+    (let [x (m/?< flow)]
+      (try (m/? (m/sleep duration-ms x))
+           (catch Cancelled _
+             (m/amb))))))

+ 336 - 0
src/main/frontend/worker/rtc/client.cljs

@@ -0,0 +1,336 @@
+(ns frontend.worker.rtc.client
+  "Fns about push local updates"
+  (:require [missionary.core :as m]
+            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
+            [frontend.worker.rtc.const :as rtc-const]
+            [datascript.core :as d]
+            [cognitect.transit :as transit]
+            [frontend.worker.rtc.ws2 :as ws]
+            [clojure.set :as set]
+            [frontend.worker.rtc.exception :as r.ex]
+            [frontend.worker.rtc.remote-update :as r.remote-update]))
+
+
+(def ^:private transit-w (transit/writer :json))
+
+
+(defn- handle-remote-ex
+  [resp]
+  (if-let [e ({:graph-not-exist r.ex/ex-remote-graph-not-exist
+               :graph-not-ready r.ex/ex-remote-graph-not-ready}
+              (:type (:ex-data resp)))]
+    (throw e)
+    resp))
+
+(defn send&recv
+  "Return a task: throw exception if recv ex-data response"
+  [get-mws-task message]
+  (m/sp
+   (handle-remote-ex
+    (m/? (ws/send&recv get-mws-task message)))))
+
+(defn- register-graph-updates
+  [get-mws-task graph-uuid]
+  (send&recv get-mws-task {:action "register-graph-updates"
+                           :graph-uuid graph-uuid}))
+
+(defn ensure-register-graph-updates
+  "Return a task: get or create a mws(missionary wrapped websocket).
+  see also `ws/get-mws-create`.
+  But ensure `register-graph-updates` has been sent"
+  [get-mws-task graph-uuid]
+  (assert (some? graph-uuid))
+  (let [*sent (atom {})]
+    (m/sp
+      (let [mws (m/? get-mws-task)]
+        (when (contains? @*sent mws)
+          (swap! *sent mws false))
+        (when (not (@*sent mws))
+          (m/? (register-graph-updates (m/sp mws) graph-uuid))
+          (swap! *sent mws true))
+        mws))))
+
+
+(defn- remove-non-exist-block-uuids-in-add-retract-map
+  [conn add-retract-map]
+  (let [{:keys [add retract]} add-retract-map
+        add* (->> add
+                  (map (fn [x] [:block/uuid x]))
+                  (d/pull-many @conn [:block/uuid])
+                  (keep :block/uuid))]
+    (cond-> {}
+      (seq add*) (assoc :add add*)
+      (seq retract) (assoc :retract retract))))
+
+(defn- ->pos
+  [left-uuid parent-uuid]
+  (cond
+    (or (nil? left-uuid) (nil? parent-uuid)) :no-order
+    (not= left-uuid parent-uuid)             :sibling
+    :else                                    :child))
+
+(defmulti ^:private local-block-ops->remote-ops-aux (fn [tp & _] tp))
+
+(defmethod local-block-ops->remote-ops-aux :move-op
+  [_ & {:keys [parent-uuid left-uuid block-uuid *remote-ops *depend-on-block-uuid-set]}]
+  (when parent-uuid
+    (let [target-uuid (or left-uuid parent-uuid)
+          pos         (->pos left-uuid parent-uuid)]
+      (swap! *remote-ops conj [:move {:block-uuid block-uuid :target-uuid target-uuid :pos pos}])
+      (swap! *depend-on-block-uuid-set conj target-uuid))))
+
+
+(defmethod local-block-ops->remote-ops-aux :update-op
+  [_ & {:keys [conn user-uuid block update-op left-uuid parent-uuid *remote-ops]}]
+  (let [block-uuid          (:block/uuid block)
+        attr-map            (:updated-attrs (second update-op))
+        attr-alias-map      (when (contains? attr-map :alias)
+                              (remove-non-exist-block-uuids-in-add-retract-map conn (:alias attr-map)))
+        attr-tags-map       (when (contains? attr-map :tags)
+                              (remove-non-exist-block-uuids-in-add-retract-map conn (:tags attr-map)))
+        attr-type-map       (when (contains? attr-map :type)
+                              (let [{:keys [add retract]} (:type attr-map)
+                                    current-type-value    (set (:block/type block))
+                                    add                   (set/intersection add current-type-value)
+                                    retract               (set/difference retract current-type-value)]
+                                (cond-> {}
+                                  (seq add)     (assoc :add add)
+                                  (seq retract) (assoc :retract retract))))
+        attr-properties-map (when (contains? attr-map :properties)
+                              (let [{:keys [add retract]} (:properties attr-map)
+                                    properties            (:block/properties block)
+                                    add*                  (into []
+                                                                (update-vals (select-keys properties add)
+                                                                             (partial transit/write transit-w)))]
+                                (cond-> {}
+                                  (seq add*)    (assoc :add add*)
+                                  (seq retract) (assoc :retract retract))))
+        target-uuid         (or left-uuid parent-uuid)
+        pos                 (->pos left-uuid parent-uuid)]
+    (swap! *remote-ops conj
+           [:update
+            (cond-> {:block-uuid block-uuid}
+              (:block/journal-day block)    (assoc :journal-day (:block/journal-day block))
+              (:block/updated-at block)     (assoc :updated-at (:block/updated-at block))
+              (:block/created-at block)     (assoc :created-at (:block/created-at block))
+              (= (:block/updated-at block)
+                 (:block/created-at block)) (assoc :created-by user-uuid)
+              (contains? attr-map :schema)  (assoc :schema
+                                                   (transit/write transit-w (:block/schema block)))
+              attr-alias-map                (assoc :alias attr-alias-map)
+              attr-type-map                 (assoc :type attr-type-map)
+              attr-tags-map                 (assoc :tags attr-tags-map)
+              attr-properties-map           (assoc :properties attr-properties-map)
+              (and (contains? attr-map :content)
+                   (:block/raw-content block))
+              (assoc :content (:block/raw-content block))
+              (and (contains? attr-map :link)
+                   (:block/uuid (:block/link block)))
+              (assoc :link (:block/uuid (:block/link block)))
+              target-uuid                   (assoc :target-uuid target-uuid :pos pos))])))
+
+(defmethod local-block-ops->remote-ops-aux :update-page-op
+  [_ & {:keys [conn block-uuid *remote-ops]}]
+  (when-let [{page-name :block/name original-name :block/original-name}
+             (d/entity @conn [:block/uuid block-uuid])]
+    (swap! *remote-ops conj
+           [:update-page {:block-uuid block-uuid
+                          :page-name page-name
+                          :original-name (or original-name page-name)}])))
+
+(defmethod local-block-ops->remote-ops-aux :remove-op
+  [_ & {:keys [conn remove-op *remote-ops]}]
+  (when-let [block-uuid (:block-uuid (second remove-op))]
+    (when (nil? (d/entity @conn [:block/uuid block-uuid]))
+      (swap! *remote-ops conj [:remove {:block-uuids [block-uuid]}]))))
+
+(defmethod local-block-ops->remote-ops-aux :remove-page-op
+  [_ & {:keys [conn remove-page-op *remote-ops]}]
+  (when-let [block-uuid (:block-uuid (second remove-page-op))]
+    (when (nil? (d/entity @conn [:block/uuid block-uuid]))
+      (swap! *remote-ops conj [:remove-page {:block-uuid block-uuid}]))))
+
+(defn- local-block-ops->remote-ops
+  [repo conn user-uuid block-ops]
+  (let [*depend-on-block-uuid-set (atom #{})
+        *remote-ops (atom [])
+        {move-op :move remove-op :remove update-op :update update-page-op :update-page remove-page-op :remove-page}
+        block-ops]
+    (when-let [block-uuid
+               (some (comp :block-uuid second) [move-op update-op update-page-op])]
+      (when-let [block (d/entity @conn [:block/uuid block-uuid])]
+        (let [left-uuid (some-> block :block/left :block/uuid)
+              parent-uuid (some-> block :block/parent :block/uuid)]
+          (when parent-uuid ; whiteboard blocks don't have :block/left
+            ;; remote-move-op
+            (when move-op
+              (local-block-ops->remote-ops-aux :move-op
+                                               :parent-uuid parent-uuid
+                                               :left-uuid left-uuid
+                                               :block-uuid block-uuid
+                                               :*remote-ops *remote-ops
+                                               :*depend-on-block-uuid-set *depend-on-block-uuid-set)))
+          ;; remote-update-op
+          (when update-op
+            (local-block-ops->remote-ops-aux :update-op
+                                             :repo repo
+                                             :user-uuid user-uuid
+                                             :conn conn
+                                             :block block
+                                             :update-op update-op
+                                             :parent-uuid parent-uuid
+                                             :left-uuid left-uuid
+                                             :*remote-ops *remote-ops
+                                             :created-by user-uuid)))
+        ;; remote-update-page-op
+        (when update-page-op
+          (local-block-ops->remote-ops-aux :update-page-op
+                                           :repo repo
+                                           :conn conn
+                                           :block-uuid block-uuid
+                                           :*remote-ops *remote-ops))))
+    ;; remote-remove-op
+    (when remove-op
+      (local-block-ops->remote-ops-aux :remove-op
+                                       :repo repo
+                                       :conn conn
+                                       :remove-op remove-op
+                                       :*remote-ops *remote-ops))
+
+    ;; remote-remove-page-op
+    (when remove-page-op
+      (local-block-ops->remote-ops-aux :remove-page-op
+                                       :repo repo
+                                       :conn conn
+                                       :remove-page-op remove-page-op
+                                       :*remote-ops *remote-ops))
+
+    {:remote-ops @*remote-ops
+     :depend-on-block-uuids @*depend-on-block-uuid-set}))
+
+(defn- gen-block-uuid->remote-ops
+  [repo conn user-uuid & {:keys [n] :or {n 50}}]
+  (loop [current-handling-block-ops nil
+         current-handling-block-uuid nil
+         depend-on-block-uuid-coll nil
+         r {}]
+    (cond
+      (and (empty? current-handling-block-ops)
+           (empty? depend-on-block-uuid-coll)
+           (>= (count r) n))
+      r
+
+      (and (empty? current-handling-block-ops)
+           (empty? depend-on-block-uuid-coll))
+      (if-let [{min-epoch-block-ops :ops block-uuid :block-uuid} (op-mem-layer/get-min-epoch-block-ops repo)]
+        (do (assert (not (contains? r block-uuid)) {:r r :block-uuid block-uuid})
+            (op-mem-layer/remove-block-ops! repo block-uuid)
+            (recur min-epoch-block-ops block-uuid depend-on-block-uuid-coll r))
+        ;; finish
+        r)
+
+      (and (empty? current-handling-block-ops)
+           (seq depend-on-block-uuid-coll))
+      (let [[block-uuid & other-block-uuids] depend-on-block-uuid-coll
+            block-ops (op-mem-layer/get-block-ops repo block-uuid)]
+        (op-mem-layer/remove-block-ops! repo block-uuid)
+        (recur block-ops block-uuid other-block-uuids r))
+
+      (seq current-handling-block-ops)
+      (let [{:keys [remote-ops depend-on-block-uuids]}
+            (local-block-ops->remote-ops repo conn user-uuid current-handling-block-ops)]
+        (recur nil nil
+               (set/union (set depend-on-block-uuid-coll)
+                          (op-mem-layer/intersection-block-uuids repo depend-on-block-uuids))
+               (assoc r current-handling-block-uuid (into {} remote-ops)))))))
+
+(defn- merge-remove-remove-ops
+  [remote-remove-ops]
+  (when-let [block-uuids (->> remote-remove-ops
+                              (mapcat (fn [[_ {:keys [block-uuids]}]] block-uuids))
+                              distinct
+                              seq)]
+    [[:remove {:block-uuids block-uuids}]]))
+
+(defn- sort-remote-ops
+  [block-uuid->remote-ops]
+  (let [block-uuid->dep-uuid
+        (into {}
+              (keep (fn [[block-uuid remote-ops]]
+                      (when-let [move-op (get remote-ops :move)]
+                        [block-uuid (:target-uuid move-op)])))
+              block-uuid->remote-ops)
+        all-move-uuids (set (keys block-uuid->dep-uuid))
+        sorted-uuids
+        (loop [r []
+               rest-uuids all-move-uuids
+               uuid (first rest-uuids)]
+          (if-not uuid
+            r
+            (let [dep-uuid (block-uuid->dep-uuid uuid)]
+              (if-let [next-uuid (get rest-uuids dep-uuid)]
+                (recur r rest-uuids next-uuid)
+                (let [rest-uuids* (disj rest-uuids uuid)]
+                  (recur (conj r uuid) rest-uuids* (first rest-uuids*)))))))
+        sorted-move-ops (keep
+                         (fn [block-uuid]
+                           (some->> (get-in block-uuid->remote-ops [block-uuid :move])
+                                    (vector :move)))
+                         sorted-uuids)
+        remove-ops (merge-remove-remove-ops
+                    (keep
+                     (fn [[_ remote-ops]]
+                       (some->> (:remove remote-ops) (vector :remove)))
+                     block-uuid->remote-ops))
+        update-ops (keep
+                    (fn [[_ remote-ops]]
+                      (some->> (:update remote-ops) (vector :update)))
+                    block-uuid->remote-ops)
+        update-page-ops (keep
+                         (fn [[_ remote-ops]]
+                           (some->> (:update-page remote-ops) (vector :update-page)))
+                         block-uuid->remote-ops)
+        remove-page-ops (keep
+                         (fn [[_ remote-ops]]
+                           (some->> (:remove-page remote-ops) (vector :remove-page)))
+                         block-uuid->remote-ops)]
+    (concat update-page-ops remove-ops sorted-move-ops update-ops remove-page-ops)))
+
+(defn create-push-local-ops-task
+  "Return a task: push local updates"
+  [repo conn user-uuid graph-uuid date-formatter get-mws-task add-log-fn]
+  (m/sp
+    (when-let [ops-for-remote (rtc-const/to-ws-ops-decoder
+                               (sort-remote-ops
+                                (gen-block-uuid->remote-ops repo conn user-uuid)))]
+      (op-mem-layer/new-branch! repo)
+      (let [local-tx (op-mem-layer/get-local-tx repo)
+            r (m/? (send&recv get-mws-task {:action "apply-ops" :graph-uuid graph-uuid
+                                            :ops ops-for-remote :t-before (or local-tx 1)}))]
+        (if-let [remote-ex (:ex-data r)]
+          (do (add-log-fn remote-ex)
+              (case (:type remote-ex)
+              ;; - :graph-lock-failed
+              ;;   conflict-update remote-graph, keep these local-pending-ops
+              ;;   and try to send ops later
+                :graph-lock-failed
+                (do (op-mem-layer/rollback! repo)
+                    nil)
+              ;; - :graph-lock-missing
+              ;;   this case means something wrong in remote-graph data,
+              ;;   nothing to do at client-side
+                :graph-lock-missing
+                (do (op-mem-layer/rollback! repo)
+                    (throw r.ex/ex-remote-graph-lock-missing))
+              ;; TODO: support read s3-obj when websocket return specific data
+                :get-s3-object-failed
+                (do (op-mem-layer/rollback! repo)
+                    nil)
+              ;; else
+                (do (op-mem-layer/rollback! repo)
+                    (throw (ex-info "Unavailable" {:remote-ex remote-ex})))))
+
+          (do (assert (pos? (:t r)) r)
+              (op-mem-layer/commit! repo)
+              (r.remote-update/apply-remote-update repo conn date-formatter r add-log-fn)
+              (add-log-fn {:type ::push-client-updates :remote-t (:t r)})))))))

+ 2 - 1
src/main/frontend/worker/rtc/core.cljs

@@ -30,7 +30,8 @@
             [logseq.outliner.transaction :as outliner-tx]
             [malli.core :as m]
             [malli.util :as mu]
-            [frontend.worker.rtc.ws2]))
+            [frontend.worker.rtc.ws2]
+            [frontend.worker.rtc.core2]))
 
 ;;                     +-------------+
 ;;                     |             |

+ 119 - 0
src/main/frontend/worker/rtc/core2.cljs

@@ -0,0 +1,119 @@
+(ns frontend.worker.rtc.core2
+  "Main(use missionary) ns for rtc related fns"
+  (:require [frontend.worker.rtc.client :as r.client]
+            [frontend.worker.rtc.remote-update :as r.remote-update]
+            [frontend.worker.rtc.ws2 :as ws]
+            [frontend.worker.state :as worker-state]
+            [goog.string :as gstring]
+            [logseq.common.missionary-util :as c.m]
+            [malli.core :as ma]
+            [missionary.core :as m]))
+
+(def ^:private rtc-state-schema
+  [:map
+   [:ws-state [:enum :open :connecting :cancelled]]])
+(def ^:private rtc-state-validator (ma/validator rtc-state-schema))
+
+(defn- get-ws-url
+  [token]
+  (gstring/format @worker-state/*rtc-ws-url token))
+
+(def ^:private sentinel (js-obj))
+(defn get-remote-updates
+  "Return a flow: receive messages from mws, and filter messages with :req-id=`push-updates`."
+  [get-mws-task]
+  (m/stream
+   (m/ap
+     (loop []
+       (let [mws (m/? get-mws-task)
+             x (try
+                 (m/?> (m/eduction
+                        (filter (fn [data] (= "push-updates" (:req-id data))))
+                        (ws/recv-flow mws)))
+                 (catch js/CloseEvent _
+                   sentinel))]
+         (if (identical? x sentinel)
+           (recur)
+           (m/amb x (recur))))))))
+
+(defn- create-local-updates-check-flow
+  "Return a flow"
+  [*auto-push? interval-ms]
+  (let [auto-push-flow (m/watch *auto-push?)
+        clock-flow (c.m/clock interval-ms :clock)
+        merge-flow (m/latest vector auto-push-flow clock-flow)]
+    (m/eduction (filter first)
+                (map second)
+                merge-flow)))
+
+(comment
+  (def *push (atom true))
+  (def f (create-local-updates-check-flow *push 2000))
+  (def cancel ((m/reduce (fn [_ v] (prn :v v) v) f) #(js/console.log :s %) #(js/console.log :f %)))
+  (reset! *push not)
+  (cancel))
+
+(defn- create-mixed-flow
+  "Return a flow that emits all kinds of events:
+  `:remote-update`: remote-updates data from server
+  `:local-update-check`: event to notify to check if there're some new local-updates, then push to remote."
+  [get-mws-task *auto-push?]
+  (let [remote-updates-flow (m/eduction
+                             (map (fn [data] {:type :remote-update :value data}))
+                             (get-remote-updates get-mws-task))
+        local-updates-check-flow (m/eduction
+                                  (map (fn [data] {:type :local-update-check :value data}))
+                                  (create-local-updates-check-flow *auto-push? 2000))]
+    (c.m/mix remote-updates-flow local-updates-check-flow)))
+
+(defn- wrap-set-rtc-ws-state
+  "Return a task"
+  [get-mws-task set-state-fn]
+  (m/sp
+    (let [mws (m/? (m/race
+                    (m/sp (m/? (m/sleep 100))
+                          (set-state-fn :ws-state :connecting)
+                          (m/? m/never))
+                    get-mws-task))]
+      (set-state-fn :ws-state :open)
+      mws)))
+
+(def send&recv r.client/send&recv)
+
+(defn create-rtc-loop
+  "Return a map with [:rtc-log-flow :*rtc-state :rtc-loop-task :*rtc-auto-push?]
+  TODO: auto refresh token if needed"
+  [user-uuid graph-uuid repo conn date-formatter token & {:keys [auto-push?] :or {auto-push? true}}]
+  (let [ws-url       (get-ws-url token)
+        *auto-push?  (atom auto-push?)
+        *log         (atom nil)
+        add-log-fn   #(reset! *log [(js/Date.) %])
+        rtc-log-flow (m/buffer 100 (m/watch *log))
+        *rtc-state   (atom {} :validator rtc-state-validator)
+        set-state-fn (fn [k v] (swap! *rtc-state assoc k v))
+        get-mws-task (wrap-set-rtc-ws-state
+                      (r.client/ensure-register-graph-updates
+                       (ws/get-mws-create ws-url)
+                       graph-uuid)
+                      set-state-fn)
+        mixed-flow   (create-mixed-flow get-mws-task *auto-push?)]
+    {:rtc-log-flow    rtc-log-flow
+     :*rtc-state      *rtc-state
+     :*rtc-auto-push? *auto-push?
+     :rtc-loop-task
+     (m/sp
+       ;; init run to open a ws
+       (m/? get-mws-task)
+       (->>
+        (let [event (m/?> mixed-flow)]
+          (case (:type event)
+            :remote-update
+            (r.remote-update/apply-remote-update repo conn date-formatter event add-log-fn)
+
+            :local-update-check
+            (m/? (r.client/create-push-local-ops-task
+                  repo conn user-uuid graph-uuid date-formatter
+                  get-mws-task add-log-fn))))
+        (m/ap)
+        (m/reduce {})
+        (m/?)))}))

+ 11 - 0
src/main/frontend/worker/rtc/exception.cljs

@@ -0,0 +1,11 @@
+(ns frontend.worker.rtc.exception
+  "Exception list")
+
+(def ex-remote-graph-not-exist
+  (ex-info "remote graph not exist" {:type ::remote-graph-not-exist}))
+
+(def ex-remote-graph-not-ready
+  (ex-info "remote graph still creating" {:type ::remote-graph-not-ready}))
+
+(def ex-remote-graph-lock-missing
+  (ex-info "remote graph lock missing(server error)" {:type ::remote-graph-lock-missing}))

+ 561 - 0
src/main/frontend/worker/rtc/remote_update.cljs

@@ -0,0 +1,561 @@
+(ns frontend.worker.rtc.remote-update
+  "Fns about applying remote updates"
+  (:require [cljs-time.coerce :as tc]
+            [cljs-time.core :as t]
+            [clojure.set :as set]
+            [clojure.string :as string]
+            [cognitect.transit :as transit]
+            [datascript.core :as d]
+            [frontend.schema-register :as sr]
+            [frontend.worker.batch-tx :as batch-tx]
+            [frontend.worker.handler.page :as worker-page]
+            [frontend.worker.handler.page.rename :as worker-page-rename]
+            [frontend.worker.rtc.const :as rtc-const]
+            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
+            [frontend.worker.state :as worker-state]
+            [frontend.worker.util :as worker-util]
+            [logseq.common.util :as common-util]
+            [logseq.db :as ldb]
+            [logseq.db.frontend.content :as db-content]
+            [logseq.db.frontend.property :as db-property]
+            [logseq.graph-parser.whiteboard :as gp-whiteboard]
+            [logseq.outliner.core :as outliner-core]
+            [logseq.outliner.transaction :as outliner-tx]))
+
+(sr/defkeyword ::need-pull-remote-data
+  "remote-update's :remote-t-before > :local-tx,
+   so need to pull earlier remote-data from websocket.")
+
+(def ^:private transit-r (transit/reader :json))
+
+(defmulti ^:private transact-db! (fn [action & _args] action))
+
+(defmethod transact-db! :delete-blocks [_ & args]
+  (outliner-tx/transact!
+   {:persist-op? false
+    :gen-undo-ops? false
+    :outliner-op :delete-blocks
+    :transact-opts {:repo (first args)
+                    :conn (second args)}}
+   (apply outliner-core/delete-blocks! args)))
+
+(defmethod transact-db! :move-blocks [_ & args]
+  (outliner-tx/transact!
+   {:persist-op? false
+    :gen-undo-ops? false
+    :outliner-op :move-blocks
+    :transact-opts {:repo (first args)
+                    :conn (second args)}}
+   (apply outliner-core/move-blocks! args)))
+
+(defmethod transact-db! :move-blocks&persist-op [_ & args]
+  (outliner-tx/transact!
+   {:persist-op? true
+    :gen-undo-ops? false
+    :outliner-op :move-blocks
+    :transact-opts {:repo (first args)
+                    :conn (second args)}}
+   (apply outliner-core/move-blocks! args)))
+
+(defmethod transact-db! :insert-blocks [_ & args]
+  (outliner-tx/transact!
+   {:persist-op? false
+    :gen-undo-ops? false
+    :outliner-op :insert-blocks
+    :transact-opts {:repo (first args)
+                    :conn (second args)}}
+   (apply outliner-core/insert-blocks! args)))
+
+(defmethod transact-db! :insert-no-order-blocks [_ conn block-uuids]
+  (ldb/transact! conn
+                 (mapv (fn [block-uuid]
+                         ;; add block/content block/format to satisfy the normal-block schema
+                         {:block/uuid block-uuid
+                          ;; NOTE: block without :block/left
+                          ;; must be `logseq.db.frontend.malli-schema.closed-value-block`
+                          :block/type #{"closed value"}})
+                       block-uuids)
+                 {:persist-op? false
+                  :gen-undo-ops? false}))
+
+(defmethod transact-db! :save-block [_ & args]
+  (outliner-tx/transact!
+   {:persist-op? false
+    :gen-undo-ops? false
+    :outliner-op :save-block
+    :transact-opts {:repo (first args)
+                    :conn (second args)}}
+   (apply outliner-core/save-block! args)))
+
+(defmethod transact-db! :delete-whiteboard-blocks [_ conn block-uuids]
+  (ldb/transact! conn
+                 (mapv (fn [block-uuid] [:db/retractEntity [:block/uuid block-uuid]]) block-uuids)
+                 {:persist-op? false
+                  :gen-undo-ops? false}))
+
+(defmethod transact-db! :upsert-whiteboard-block [_ conn blocks]
+  (ldb/transact! conn blocks {:persist-op? false
+                              :gen-undo-ops? false}))
+
+(defn- whiteboard-page-block?
+  [block]
+  (contains? (set (:block/type block)) "whiteboard"))
+
+(defn- group-remote-remove-ops-by-whiteboard-block
+  "return {true [<whiteboard-block-ops>], false [<other-ops>]}"
+  [db remote-remove-ops]
+  (group-by (fn [{:keys [block-uuid]}]
+              (boolean
+               (when-let [block (d/entity db [:block/uuid block-uuid])]
+                 (whiteboard-page-block? (:block/parent block)))))
+            remote-remove-ops))
+
+(defn- apply-remote-remove-ops-helper
+  [conn remove-ops]
+  (let [block-uuid->entity (into {}
+                                 (keep
+                                  (fn [op]
+                                    (when-let [block-uuid (:block-uuid op)]
+                                      (when-let [ent (d/entity @conn [:block/uuid block-uuid])]
+                                        [block-uuid ent])))
+                                  remove-ops))
+        block-uuid-set (set (keys block-uuid->entity))
+        block-uuids-need-move
+        (set
+         (mapcat
+          (fn [[_block-uuid ent]]
+            (set/difference (set (map :block/uuid (:block/_parent ent))) block-uuid-set))
+          block-uuid->entity))]
+    {:block-uuids-need-move block-uuids-need-move
+     :block-uuids-to-remove block-uuid-set}))
+
+(defn- apply-remote-remove-ops
+  [repo conn date-formatter remove-ops]
+  (let [{whiteboard-block-ops true other-ops false} (group-remote-remove-ops-by-whiteboard-block @conn remove-ops)]
+    (transact-db! :delete-whiteboard-blocks conn (map :block-uuid whiteboard-block-ops))
+
+    (let [{:keys [block-uuids-need-move block-uuids-to-remove]}
+          (apply-remote-remove-ops-helper conn other-ops)]
+      ;; move to page-block's first child
+      (doseq [block-uuid block-uuids-need-move]
+        (transact-db! :move-blocks&persist-op
+                      repo conn
+                      [(d/entity @conn [:block/uuid block-uuid])]
+                      (d/entity @conn (:db/id (:block/page (d/entity @conn [:block/uuid block-uuid]))))
+                      false))
+      (doseq [block-uuid block-uuids-to-remove]
+        (transact-db! :delete-blocks
+                      repo conn date-formatter
+                      [(d/entity @conn [:block/uuid block-uuid])]
+                      {})))))
+
+(defn- insert-or-move-block
+  [repo conn block-uuid remote-parents remote-left-uuid move? op-value]
+  (when (seq remote-parents)
+    (let [first-remote-parent (first remote-parents)
+          local-parent (d/entity @conn [:block/uuid first-remote-parent])
+          whiteboard-page-block? (whiteboard-page-block? local-parent)
+          ;; when insert blocks in whiteboard, local-left is ignored
+          ;; remote-left-uuid is nil when it's :no-order block
+          local-left (when-not whiteboard-page-block?
+                       (when remote-left-uuid
+                         (d/entity @conn [:block/uuid remote-left-uuid])))
+          b (d/entity @conn [:block/uuid block-uuid])]
+      (case [whiteboard-page-block? (some? local-parent) (some? local-left) (some? remote-left-uuid)]
+        [false false true true]
+        (if move?
+          (transact-db! :move-blocks repo conn [b] local-left true)
+          (transact-db! :insert-blocks repo conn
+                        [{:block/uuid block-uuid
+                          :block/content ""
+                          :block/format :markdown}]
+                        local-left {:sibling? true :keep-uuid? true}))
+
+        [false true true true]
+        (let [sibling? (not= (:block/uuid local-parent) (:block/uuid local-left))]
+          (if move?
+            (transact-db! :move-blocks repo conn [b] local-left sibling?)
+            (transact-db! :insert-blocks repo conn
+                          [{:block/uuid block-uuid :block/content ""
+                            :block/format :markdown}]
+                          local-left {:sibling? sibling? :keep-uuid? true})))
+
+        [false true false true]
+        (if move?
+          (transact-db! :move-blocks repo conn [b] local-parent false)
+          (transact-db! :insert-blocks repo conn
+                        [{:block/uuid block-uuid :block/content ""
+                          :block/format :markdown}]
+                        local-parent {:sibling? false :keep-uuid? true}))
+
+        [false true false false]
+        (if move?
+          (transact-db! :move-blocks repo conn [b] local-parent false)
+          (transact-db! :insert-no-order-blocks conn [block-uuid]))
+
+        ;; Don't need to insert-whiteboard-block here,
+        ;; will do :upsert-whiteboard-block in `update-block-attrs`
+        ([true true false true] [true true false false])
+        (when (nil? (:properties op-value))
+          ;; when :properties is nil, this block should be treat as normal block
+          (if move?
+            (transact-db! :move-blocks repo conn [b] local-parent false)
+            (transact-db! :insert-blocks repo conn [{:block/uuid block-uuid :block/content "" :block/format :markdown}]
+                          local-parent {:sibling? false :keep-uuid? true})))
+        ([true true true true] [true true true false])
+        (when (nil? (:properties op-value))
+          (let [sibling? (not= (:block/uuid local-parent) (:block/uuid local-left))]
+            (if move?
+              (transact-db! :move-blocks repo conn [b] local-left sibling?)
+              (transact-db! :insert-blocks repo conn [{:block/uuid block-uuid :block/content "" :block/format :markdown}]
+                            local-left {:sibling? sibling? :keep-uuid? true}))))
+
+        (throw (ex-info "Don't know where to insert" {:block-uuid block-uuid :remote-parents remote-parents
+                                                      :remote-left remote-left-uuid}))))))
+
+(defn- move-ops-map->sorted-move-ops
+  [move-ops-map]
+  (let [uuid->dep-uuids (into {} (map (fn [[uuid env]] [uuid (set (conj (:parents env) (:left env)))]) move-ops-map))
+        all-uuids (set (keys move-ops-map))
+        sorted-uuids
+        (loop [r []
+               rest-uuids all-uuids
+               uuid (first rest-uuids)]
+          (if-not uuid
+            r
+            (let [dep-uuids (uuid->dep-uuids uuid)]
+              (if-let [next-uuid (first (set/intersection dep-uuids rest-uuids))]
+                (recur r rest-uuids next-uuid)
+                (let [rest-uuids* (disj rest-uuids uuid)]
+                  (recur (conj r uuid) rest-uuids* (first rest-uuids*)))))))]
+    (mapv move-ops-map sorted-uuids)))
+
+(defn- apply-remote-remove-page-ops
+  [repo conn remove-page-ops]
+  (doseq [op remove-page-ops]
+    (when-let [page-name (:block/name (d/entity @conn [:block/uuid (:block-uuid op)]))]
+      (worker-page/delete! repo conn page-name {:persist-op? false}))))
+
+(defn- filter-remote-data-by-local-unpushed-ops
+  "when remote-data request client to move/update/remove/... blocks,
+  these updates maybe not needed, because this client just updated some of these blocks,
+  so we need to filter these just-updated blocks out, according to the unpushed-local-ops"
+  [affected-blocks-map local-unpushed-ops]
+  ;; (assert (op-mem-layer/ops-coercer local-unpushed-ops) local-unpushed-ops)
+  (reduce
+   (fn [affected-blocks-map local-op]
+     (case (first local-op)
+       "move"
+       (let [block-uuid (:block-uuid (second local-op))
+             remote-op (get affected-blocks-map block-uuid)]
+         (case (:op remote-op)
+           :remove (dissoc affected-blocks-map (:block-uuid remote-op))
+           :move (dissoc affected-blocks-map (:self remote-op))
+           ;; default
+           affected-blocks-map))
+
+       "update"
+       (let [block-uuid (:block-uuid (second local-op))
+             local-updated-attr-set (set (keys (:updated-attrs (second local-op))))]
+         (if-let [remote-op (get affected-blocks-map block-uuid)]
+           (assoc affected-blocks-map block-uuid
+                  (if (#{:update-attrs :move} (:op remote-op))
+                    (apply dissoc remote-op local-updated-attr-set)
+                    remote-op))
+           affected-blocks-map))
+       ;;else
+       affected-blocks-map))
+   affected-blocks-map local-unpushed-ops))
+
+(defn- affected-blocks->diff-type-ops
+  [repo affected-blocks]
+  (let [unpushed-ops (op-mem-layer/get-all-ops repo)
+        affected-blocks-map* (if unpushed-ops
+                               (filter-remote-data-by-local-unpushed-ops
+                                affected-blocks unpushed-ops)
+                               affected-blocks)
+        {remove-ops-map :remove move-ops-map :move update-ops-map :update-attrs
+         update-page-ops-map :update-page remove-page-ops-map :remove-page}
+        (update-vals
+         (group-by (fn [[_ env]] (get env :op)) affected-blocks-map*)
+         (partial into {}))]
+    {:remove-ops-map remove-ops-map
+     :move-ops-map move-ops-map
+     :update-ops-map update-ops-map
+     :update-page-ops-map update-page-ops-map
+     :remove-page-ops-map remove-page-ops-map}))
+
+(defn- empty-page?
+  "1. page has no child-block
+  2. page has child-blocks and all these blocks only have empty :block/content"
+  [page-entity]
+  (not
+   (when-let [children-blocks (and page-entity
+                                   (seq (map #(into {} %) (:block/_parent page-entity))))]
+     (some
+      (fn [block]
+        (not= {:block/content ""}
+              (-> (apply dissoc block [:block/tx-id
+                                       :block/uuid
+                                       :block/updated-at
+                                       :block/left
+                                       :block/created-at
+                                       :block/format
+                                       :db/id
+                                       :block/parent
+                                       :block/page
+                                       :block/path-refs])
+                  (update :block/content string/trim))))
+      children-blocks))))
+
+(defn- check-block-pos
+  "NOTE: some blocks don't have :block/left (e.g. whiteboard blocks)"
+  [db block-uuid remote-parents remote-left-uuid]
+  (let [local-b (d/entity db [:block/uuid block-uuid])
+        remote-parent-uuid (first remote-parents)]
+    (cond
+      (nil? local-b)
+      :not-exist
+
+      (not= [remote-left-uuid remote-parent-uuid]
+            [(:block/uuid (:block/left local-b)) (:block/uuid (:block/parent local-b))])
+      :wrong-pos
+
+      :else nil)))
+
+(defn- upsert-whiteboard-block
+  [repo conn {:keys [parents properties] :as _op-value}]
+  (let [db @conn
+        first-remote-parent (first parents)]
+    (when-let [local-parent (d/entity db [:block/uuid first-remote-parent])]
+      (let [page-name (:block/name local-parent)
+            properties* (transit/read transit-r properties)
+            shape-property-id (db-property/get-pid repo db :logseq.property.tldraw/shape)
+            shape (and (map? properties*)
+                       (get properties* shape-property-id))]
+        (assert (some? page-name) local-parent)
+        (assert (some? shape) properties*)
+        (transact-db! :upsert-whiteboard-block conn [(gp-whiteboard/shape->block repo db shape page-name)])))))
+
+(defn- need-update-block?
+  [conn block-uuid op-value]
+  (let [ent (d/entity @conn [:block/uuid block-uuid])]
+    (worker-util/profile
+     :need-update-block?
+     (let [r (some (fn [[k v]]
+                     (case k
+                       :content     (not= v (:block/raw-content ent))
+                       :updated-at  (not= v (:block/updated-at ent))
+                       :created-at  (not= v (:block/created-at ent))
+                       :alias       (not= (set v) (set (map :block/uuid (:block/alias ent))))
+                       :type        (not= (set v) (set (:block/type ent)))
+                       :schema      (not= (transit/read transit-r v) (:block/schema ent))
+                       :tags        (not= (set v) (set (map :block/uuid (:block/tags ent))))
+                       :properties  (not= (transit/read transit-r v) (:block/properties ent))
+                       :link        (not= v (:block/uuid (:block/link ent)))
+                       :journal-day (not= v (:block/journal-day ent))
+                       false))
+                   op-value)]
+       (prn :need-update-block? r)
+       r))))
+
+(defn- update-block-attrs
+  [repo conn date-formatter block-uuid {:keys [parents properties _content] :as op-value}]
+  (let [key-set (set/intersection
+                 (conj rtc-const/general-attr-set :content)
+                 (set (keys op-value)))]
+    (when (seq key-set)
+      (let [first-remote-parent (first parents)
+            local-parent (d/entity @conn [:block/uuid first-remote-parent])
+            whiteboard-page-block? (whiteboard-page-block? local-parent)]
+        (cond
+          (and whiteboard-page-block? properties)
+          (upsert-whiteboard-block repo conn op-value)
+
+          (need-update-block? conn block-uuid op-value)
+          (let [b-ent (d/entity @conn [:block/uuid block-uuid])
+                db-id (:db/id b-ent)
+                new-block
+                (cond-> b-ent
+                  (and (contains? key-set :content)
+                       (not= (:content op-value)
+                             (:block/raw-content b-ent)))
+                  (assoc :block/content
+                         (db-content/db-special-id-ref->page @conn (:content op-value)))
+
+                  (contains? key-set :updated-at)     (assoc :block/updated-at (:updated-at op-value))
+                  (contains? key-set :created-at)     (assoc :block/created-at (:created-at op-value))
+                  (contains? key-set :alias)          (assoc :block/alias (some->> (seq (:alias op-value))
+                                                                                   (map (partial vector :block/uuid))
+                                                                                   (d/pull-many @conn [:db/id])
+                                                                                   (keep :db/id)))
+                  (contains? key-set :type)           (assoc :block/type (:type op-value))
+                  (and (contains? key-set :schema)
+                       (some? (:schema op-value)))
+                  (assoc :block/schema (transit/read transit-r (:schema op-value)))
+
+                  (contains? key-set :tags)           (assoc :block/tags (some->> (seq (:tags op-value))
+                                                                                  (map (partial vector :block/uuid))
+                                                                                  (d/pull-many @conn [:db/id])
+                                                                                  (keep :db/id)))
+                  (contains? key-set :properties)     (assoc :block/properties
+                                                             (transit/read transit-r (:properties op-value)))
+                  (and (contains? key-set :link)
+                       (some? (:link op-value)))
+                  (assoc :block/link (some->> (:link op-value)
+                                              (vector :block/uuid)
+                                              (d/pull @conn [:db/id])
+                                              :db/id))
+
+                  (and (contains? key-set :journal-day)
+                       (some? (:journal-day op-value)))
+                  (assoc :block/journal-day (:journal-day op-value)
+                         :block/journal? true))
+                *other-tx-data (atom [])]
+            ;; 'save-block' dont handle card-many attrs well?
+            (when (contains? key-set :alias)
+              (swap! *other-tx-data conj [:db/retract db-id :block/alias]))
+            (when (contains? key-set :tags)
+              (swap! *other-tx-data conj [:db/retract db-id :block/tags]))
+            (when (contains? key-set :type)
+              (swap! *other-tx-data conj [:db/retract db-id :block/type]))
+            (when (and (contains? key-set :link) (nil? (:link op-value)))
+              (swap! *other-tx-data conj [:db/retract db-id :block/link]))
+            (when (and (contains? key-set :schema) (nil? (:schema op-value)))
+              (swap! *other-tx-data conj [:db/retract db-id :block/schema]))
+            (when (and (contains? key-set :properties) (nil? (:properties op-value)))
+              (swap! *other-tx-data conj [:db/retract db-id :block/properties]))
+            (when (and (contains? key-set :journal-day) (nil? (:journal-day op-value)))
+              (swap! *other-tx-data conj
+                     [:db/retract db-id :block/journal-day]
+                     [:db/retract db-id :block/journal?]))
+            (when (seq @*other-tx-data)
+              (ldb/transact! conn @*other-tx-data {:persist-op? false
+                                                   :gen-undo-ops? false}))
+            (transact-db! :save-block repo conn date-formatter new-block)))))))
+
+(defn- apply-remote-update-ops
+  [repo conn date-formatter update-ops]
+  (doseq [{:keys [parents left self] :as op-value} update-ops]
+    (when (and parents left)
+      (let [r (check-block-pos @conn self parents left)]
+        (case r
+          :not-exist
+          (insert-or-move-block repo conn self parents left false op-value)
+          :wrong-pos
+          (insert-or-move-block repo conn self parents left true op-value)
+          nil)))
+    (update-block-attrs repo conn date-formatter self op-value)))
+
+(defn- move-all-blocks-to-another-page
+  [repo conn from-page-name to-page-name]
+  (let [blocks (ldb/get-page-blocks @conn from-page-name {})
+        from-page-block (some-> (first blocks) :block/page)
+        target-page-block (d/entity @conn [:block/name to-page-name])]
+    (when (and (seq blocks) target-page-block)
+      (let [blocks* (ldb/sort-by-left blocks from-page-block)]
+        (outliner-tx/transact!
+         {:persist-op? true
+          :gen-undo-ops? false
+          :transact-opts {:repo repo
+                          :conn conn}}
+         (outliner-core/move-blocks! repo conn blocks* target-page-block false))))))
+
+(defn- apply-remote-move-ops
+  [repo conn date-formatter sorted-move-ops]
+  (doseq [{:keys [parents left self] :as op-value} sorted-move-ops]
+    (let [r (check-block-pos @conn self parents left)]
+      (case r
+        :not-exist
+        (insert-or-move-block repo conn self parents left false op-value)
+        :wrong-pos
+        (insert-or-move-block repo conn self parents left true op-value)
+        nil                             ; do nothing
+        nil)
+      (update-block-attrs repo conn date-formatter self op-value))))
+
+(defn- apply-remote-update-page-ops
+  [repo conn date-formatter update-page-ops]
+  (let [config (worker-state/get-config repo)]
+    (doseq [{:keys [self page-name original-name] :as op-value} update-page-ops]
+      (let [old-page-original-name (:block/original-name (d/entity @conn [:block/uuid self]))
+            exist-page (d/entity @conn [:block/name page-name])
+            create-opts {:create-first-block? false
+                         :uuid self :persist-op? false}]
+        (cond
+          ;; same name but different uuid, and local-existed-page is empty(`empty-page?`)
+          ;; just remove local-existed-page
+          (and exist-page
+               (not= (:block/uuid exist-page) self)
+               (empty-page? exist-page))
+          (do (worker-page/delete! repo conn page-name {:persist-op? false})
+              (worker-page/create! repo conn config original-name create-opts))
+
+          ;; same name but different uuid
+          ;; remote page has same block/name as local's, but they don't have same block/uuid.
+          ;; 1. rename local page's name to '<origin-name>-<ms-epoch>-Conflict'
+          ;; 2. create page, name=<origin-name>, uuid=remote-uuid
+          (and exist-page
+               (not= (:block/uuid exist-page) self))
+          (let [conflict-page-name (common-util/format "%s-%s-CONFLICT" original-name (tc/to-long (t/now)))]
+            (worker-page-rename/rename! repo conn config original-name conflict-page-name {:persist-op? false})
+            (worker-page/create! repo conn config original-name create-opts)
+            (move-all-blocks-to-another-page repo conn conflict-page-name original-name))
+
+          ;; a client-page has same uuid as remote but different page-names,
+          ;; then we need to rename the client-page to remote-page-name
+          (and old-page-original-name (not= old-page-original-name original-name))
+          (worker-page-rename/rename! repo conn config old-page-original-name original-name {:persist-op? false})
+
+          ;; no such page, name=remote-page-name, OR, uuid=remote-block-uuid
+          ;; just create-page
+          :else
+          (worker-page/create! repo conn config original-name create-opts))
+
+        (update-block-attrs repo conn date-formatter self op-value)))))
+
+(defn apply-remote-update
+  "Apply remote-update(`remote-update-event`)"
+  [repo conn date-formatter remote-update-event add-log-fn]
+  (let [remote-update-data (:value remote-update-event)]
+    (assert (rtc-const/data-from-ws-validator remote-update-data) remote-update-data)
+    (let [remote-t (:t remote-update-data)
+          remote-t-before (:t-before remote-update-data)
+          local-tx (op-mem-layer/get-local-tx repo)]
+      (cond
+        (not (and (pos? remote-t)
+                  (pos? remote-t-before)))
+        (throw (ex-info "invalid remote-data" {:data remote-update-data}))
+
+        (<= remote-t local-tx)
+        (add-log-fn {:type ::skip :remote-t remote-t :local-t local-tx})
+
+        (< local-tx remote-t-before)
+        (do (add-log-fn {:type ::need-pull-remote-data :remote-t remote-t :local-t local-tx})
+            (throw (ex-info "need pull earlier remote-data"
+                            {:type ::need-pull-remote-data
+                             :local-tx local-tx})))
+
+        (<= remote-t-before local-tx remote-t)
+        (let [affected-blocks-map (:affected-blocks remote-update-data)
+              {:keys [remove-ops-map move-ops-map update-ops-map update-page-ops-map remove-page-ops-map]}
+              (affected-blocks->diff-type-ops repo affected-blocks-map)
+              remove-ops (vals remove-ops-map)
+              sorted-move-ops (move-ops-map->sorted-move-ops move-ops-map)
+              update-ops (vals update-ops-map)
+              update-page-ops (vals update-page-ops-map)
+              remove-page-ops (vals remove-page-ops-map)]
+
+          (batch-tx/with-batch-tx-mode conn {:rtc-tx? true}
+            (js/console.groupCollapsed "rtc/apply-remote-ops-log")
+            (worker-util/profile :apply-remote-update-page-ops (apply-remote-update-page-ops repo conn date-formatter update-page-ops))
+            (worker-util/profile :apply-remote-remove-ops (apply-remote-remove-ops repo conn date-formatter remove-ops))
+            (worker-util/profile :apply-remote-move-ops (apply-remote-move-ops repo conn date-formatter sorted-move-ops))
+            (worker-util/profile :apply-remote-update-ops (apply-remote-update-ops repo conn date-formatter update-ops))
+            (worker-util/profile :apply-remote-remove-page-ops (apply-remote-remove-page-ops repo conn remove-page-ops))
+            (js/console.groupEnd))
+
+          (op-mem-layer/update-local-tx! repo remote-t))
+        :else (throw (ex-info "unreachable" {:remote-t remote-t
+                                             :remote-t-before remote-t-before
+                                             :local-t local-tx}))))))

+ 25 - 32
src/main/frontend/worker/rtc/ws2.cljs

@@ -2,7 +2,6 @@
   "Websocket wrapped by missionary.
   based on
   https://github.com/ReilySiegel/missionary-websocket/blob/master/src/com/reilysiegel/missionary/websocket.cljs"
-  {:clj-kondo/ignore true}
   (:require [frontend.worker.rtc.const :as rtc-const]
             [logseq.common.missionary-util :as c.m]
             [missionary.core :as m]))
@@ -54,35 +53,32 @@
 (defn- create-mws*
   [url]
   (m/sp
-   (if-let [[mbx ws close-dfv] (m/? (open-ws-task url))]
-     {:raw-ws ws
-      :send (fn [data]
-              (m/sp
-               (handle-close
-                (m/?
-                 (m/race close-dfv
-                         (m/sp (while (< 4096 (.-bufferedAmount ws))
-                                 (m/? (m/sleep 50)))
-                               (.send ws data)))))))
-      :recv-flow
-      (m/stream
-       (m/ap
-        (loop []
-          (m/amb
-           (handle-close
-            (m/? (m/race close-dfv mbx)))
-           (recur)))))}
-     (throw (ex-info "open ws timeout(10s)" {:missionary/retry true})))))
-
-
+    (if-let [[mbx ws close-dfv] (m/? (open-ws-task url))]
+      {:raw-ws ws
+       :send (fn [data]
+               (m/sp
+                 (handle-close
+                  (m/?
+                   (m/race close-dfv
+                           (m/sp (while (< 4096 (.-bufferedAmount ws))
+                                   (m/? (m/sleep 50)))
+                                 (.send ws data)))))))
+       :recv-flow
+       (m/stream
+        (m/ap
+          (loop []
+            (m/amb
+             (handle-close
+              (m/? (m/race close-dfv mbx)))
+             (recur)))))}
+      (throw (ex-info "open ws timeout(10s)" {:missionary/retry true})))))
 
 (defn- closed?
   [m-ws]
   (contains? #{:closing :closed} (get-state (:raw-ws m-ws))))
 
-
 (defn get-mws-create
-  "Returns a task :get a mws(missionary-websocket), creating one if needed.
+  "Returns a task to get a mws(missionary-websocket), creating one if needed.
   Always try to produce NOT-closed websocket.
   When failed to open websocket, retry with backoff.
   TODO: retry ASAP once network condition changed"
@@ -109,10 +105,10 @@
         (if (and m-ws (not (closed? m-ws)))
           m-ws
           (m/? backoff-create-ws-task))))))
-
-(defn close
-  [m-ws]
-  (.close (:raw-ws m-ws)))
+(comment
+  (defn close
+    [m-ws]
+    (.close (:raw-ws m-ws))))
 
 (defn send
   "Returns a task: send message and return mws"
@@ -159,7 +155,4 @@
                    (m/? (send&recv get-mws-task {:action "list-graphs"} :timeout-ms 1000))
                    (m/? (send&recv get-mws-task {:action "list-graphs"})))
                  #(prn :s %) #(js/console.log :f %)))
-    (cancel)
-    )
-
-)
+    (cancel)))