Browse Source

Merge branch 'feat/db' into refactor/narrow-gap-between-page-and-block

Tienson Qin 1 year ago
parent
commit
09cba5fa28

+ 2 - 0
.carve/ignore

@@ -88,3 +88,5 @@ frontend.worker.rtc.asset-sync/<loop-for-assets-sync
 frontend.worker.rtc.hash/hash-blocks
 ;; Repl fn
 frontend.rum/use-atom-in
+;; missionary utils
+frontend.common.missionary-util/<!

+ 4 - 0
resources/css/shui.css

@@ -281,6 +281,10 @@ div[data-radix-popper-content-wrapper] {
   &[data-side=bottom] {
     max-height: calc(var(--radix-dropdown-menu-content-available-height) - 20px);
   }
+
+  &.text-popover-foreground {
+    color: inherit;
+  }
 }
 
 .ui__popover-content {

+ 2 - 2
src/main/frontend/components/block.cljs

@@ -720,6 +720,7 @@
         *timer1 (rum/use-ref nil)                           ;; hide
         *el-popup (rum/use-ref nil)
         [visible? set-visible!] (rum/use-state nil)
+        ;; set-visible! (fn debug-visible [v] (js/console.warn "debug: visible" v) (set-visible! v))
         _  #_:clj-kondo/ignore (rum/defc preview-render []
                                  (rum/use-effect!
                                    (fn []
@@ -744,8 +745,7 @@
                                                            (js/clearTimeout timer1)))
                                        :on-mouse-leave (fn []
                                                          ;; check the top popup whether is the preview popup
-                                                         (when (= "ls-preview-popup"
-                                                                 (some-> (shui-popups/get-last-popup) :content-props :class))
+                                                         (when (ui/last-shui-preview-popup?)
                                                            (rum/set-ref! *timer1
                                                              (js/setTimeout #(set-visible! false) 500))))}
                                       (let [page-cp (state/get-page-blocks-cp)]

+ 5 - 5
src/main/frontend/components/property.cljs

@@ -204,7 +204,7 @@
                    (shui/dialog-close!)
                    (and block (= type :checkbox))
                    (p/do!
-                    (shui/popup-hide-all!)
+                    (ui/hide-popups-until-preview-popup!)
                     (pv/<add-property! block (:db/ident property) false {:exit-edit? true}))
                    (and block (= type :default)
                         (not (seq (:property/closed-values property))))
@@ -469,7 +469,7 @@
 
             (= :checkbox type)
             (p/do!
-             (shui/popup-hide-all!)
+             (ui/hide-popups-until-preview-popup!)
              (shui/dialog-close!)
              (pv/<add-property! block (:db/ident property) false {:exit-edit? true}))
 
@@ -495,7 +495,7 @@
       state
       :on-hide (fn [_state _e type]
                  (when (= type :esc)
-                   (shui/popup-hide-all!)
+                   (shui/popup-hide!)
                    (shui/dialog-close!)
                    (when-let [^js input (state/get-input)]
                      (.focus input)))))))
@@ -627,8 +627,8 @@
            (-> (when-not config/publishing?
                  {:on-click #(shui/popup-show! (.-target %) content-fn {:as-dropdown? true :auto-focus? true})})
              (assoc :class "flex items-center"))
-           (if-let [color (some-> icon :color)]
-             [:span.flex.items-center {:style {:color (or color "inherit")}}
+           (if icon
+             [:span.flex.items-center {:style {:color (or (some-> icon :color) "inherit")}}
               (icon-component/icon icon {:size 15})]
              (property-icon property nil)))))
 

+ 2 - 3
src/main/frontend/components/property/value.cljs

@@ -110,7 +110,7 @@
           (<create-new-block! block (db/entity property-id) property-value' {:edit-block? false})
           (property-handler/set-block-property! repo (:block/uuid block) property-id property-value')))
       (when exit-edit?
-        (shui/popup-hide-all!)
+        (ui/hide-popups-until-preview-popup!)
         (shui/dialog-close!))
       (when-not (or many? checkbox?)
         (when-let [input (state/get-input)]
@@ -146,7 +146,6 @@
                            (.focus)) 0)
                  state)
    :will-unmount (fn [state]
-                   (shui/popup-hide!)
                    (shui/dialog-close!)
                    (state/set-editor-action! nil)
                    state)}
@@ -168,7 +167,7 @@
                  (when (fn? on-change)
                    (on-change (db/get-case-page journal)))
                  (shui/popup-hide! id)
-                 (shui/popup-hide!)
+                 (ui/hide-popups-until-preview-popup!)
                  (shui/dialog-close!))))))]
     (shui/calendar
      (cond->

+ 42 - 20
src/main/frontend/db_worker.cljs

@@ -17,9 +17,9 @@
             [frontend.worker.handler.page.db-based.rename :as db-worker-page-rename]
             [frontend.worker.handler.page.file-based.rename :as file-worker-page-rename]
             [frontend.worker.rtc.asset-db-listener]
+            [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.core :as rtc-core]
             [frontend.worker.rtc.db-listener]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.search :as search]
             [frontend.worker.state :as worker-state] ;; [frontend.worker.undo-redo :as undo-redo]
             [frontend.worker.undo-redo2 :as undo-redo]
@@ -38,6 +38,7 @@
 (defonce *sqlite worker-state/*sqlite)
 (defonce *sqlite-conns worker-state/*sqlite-conns)
 (defonce *datascript-conns worker-state/*datascript-conns)
+(defonce *client-ops-conns worker-state/*client-ops-conns)
 (defonce *opfs-pools worker-state/*opfs-pools)
 (defonce *publishing? (atom false))
 
@@ -82,8 +83,8 @@
 
 (defn upsert-addr-content!
   "Upsert addr+data-seq"
-  [repo data delete-addrs]
-  (let [^Object db (worker-state/get-sqlite-conn repo)]
+  [repo data delete-addrs & {:keys [client-ops-db?] :or {client-ops-db? false}}]
+  (let [^Object db (worker-state/get-sqlite-conn repo (if client-ops-db? :client-ops :db))]
     (assert (some? db) "sqlite db not exists")
     (.transaction db (fn [tx]
                        (doseq [item data]
@@ -95,8 +96,8 @@
                                         :bind #js [addr]}))))))
 
 (defn restore-data-from-addr
-  [repo addr]
-  (let [^Object db (worker-state/get-sqlite-conn repo)]
+  [repo addr & {:keys [client-ops-db?] :or {client-ops-db? false}}]
+  (let [^Object db (worker-state/get-sqlite-conn repo (if client-ops-db? :client-ops :db))]
     (assert (some? db) "sqlite db not exists")
     (when-let [content (-> (.exec db #js {:sql "select content from kvs where addr = ?"
                                           :bind #js [addr]
@@ -126,56 +127,78 @@
     (-restore [_ addr]
       (restore-data-from-addr repo addr))))
 
+(defn new-sqlite-client-ops-storage
+  [repo]
+  (reify IStorage
+    (-store [_ addr+data-seq delete-addrs]
+      (let [data (map
+                  (fn [[addr data]]
+                    #js {:$addr addr
+                         :$content (sqlite-util/transit-write data)})
+                  addr+data-seq)]
+        (upsert-addr-content! repo data delete-addrs :client-ops-db? true)))
+
+    (-restore [_ addr]
+      (restore-data-from-addr repo addr :client-ops-db? true))))
+
 (defn- close-db-aux!
-  [repo ^Object db ^Object search]
+  [repo ^Object db ^Object search ^Object client-ops]
   (swap! *sqlite-conns dissoc repo)
   (swap! *datascript-conns dissoc repo)
+  (swap! *client-ops-conns dissoc repo)
   (when db (.close db))
   (when search (.close search))
+  (when client-ops (.close client-ops))
   (when-let [^js pool (worker-state/get-opfs-pool repo)]
     (.releaseAccessHandles pool))
   (swap! *opfs-pools dissoc repo))
 
 (defn- close-other-dbs!
   [repo]
-  (doseq [[r {:keys [db search]}] @*sqlite-conns]
+  (doseq [[r {:keys [db search client-ops]}] @*sqlite-conns]
     (when-not (= repo r)
-      (close-db-aux! r db search))))
+      (close-db-aux! r db search client-ops))))
 
 (defn close-db!
   [repo]
-  (let [{:keys [db search]} (@*sqlite-conns repo)]
-    (close-db-aux! repo db search)))
+  (let [{:keys [db search client-ops]} (@*sqlite-conns repo)]
+    (close-db-aux! repo db search client-ops)))
 
-(defn- get-db-and-search-db
+(defn- get-dbs
   [repo]
   (if @*publishing?
     (p/let [^object DB (.-DB ^object (.-oo1 ^object @*sqlite))
             db (new DB "/db.sqlite" "c")
             search-db (new DB "/search-db.sqlite" "c")]
-      [db search-db])
+      [db search-db nil])
     (p/let [^js pool (<get-opfs-pool repo)
             capacity (.getCapacity pool)
             _ (when (zero? capacity)   ; file handle already releases since pool will be initialized only once
                 (.acquireAccessHandles pool))
             db (new (.-OpfsSAHPoolDb pool) repo-path)
-            search-db (new (.-OpfsSAHPoolDb pool) (str "search" repo-path))]
-      [db search-db])))
+            search-db (new (.-OpfsSAHPoolDb pool) (str "search" repo-path))
+            client-ops-db (new (.-OpfsSAHPoolDb pool) (str "client-ops-" repo-path))]
+      [db search-db client-ops-db])))
 
 (defn- create-or-open-db!
   [repo {:keys [config]}]
   (when-not (worker-state/get-sqlite-conn repo)
-    (p/let [[db search-db] (get-db-and-search-db repo)
-            storage (new-sqlite-storage repo {})]
+    (p/let [[db search-db client-ops-db] (get-dbs repo)
+            storage (new-sqlite-storage repo {})
+            client-ops-storage (new-sqlite-client-ops-storage repo)]
       (swap! *sqlite-conns assoc repo {:db db
-                                       :search search-db})
+                                       :search search-db
+                                       :client-ops client-ops-db})
       (.exec db "PRAGMA locking_mode=exclusive")
       (sqlite-common-db/create-kvs-table! db)
+      (sqlite-common-db/create-kvs-table! client-ops-db)
       (search/create-tables-and-triggers! search-db)
       (let [schema (sqlite-util/get-schema repo)
             conn (sqlite-common-db/get-storage-conn storage schema)
+            client-ops-conn (sqlite-common-db/get-storage-conn client-ops-storage client-op/schema-in-db)
             initial-data-exists? (d/entity @conn :logseq.class/Root)]
         (swap! *datascript-conns assoc repo conn)
+        (swap! *client-ops-conns assoc repo client-ops-conn)
         (when (and config (not initial-data-exists?))
           (let [initial-data (sqlite-create-graph/build-db-initial-data config)]
             (d/transact! conn initial-data {:initial-db? true})))
@@ -185,8 +208,7 @@
 
         (db-migrate/migrate conn)
 
-        (p/let [_ (op-mem-layer/<init-load-from-indexeddb2! repo)]
-          (db-listener/listen-db-changes! repo conn))))))
+        (db-listener/listen-db-changes! repo conn)))))
 
 (defn- iter->vec [iter]
   (when iter
@@ -255,7 +277,7 @@
 
 (defn- get-search-db
   [repo]
-  (worker-state/get-sqlite-conn repo {:search? true}))
+  (worker-state/get-sqlite-conn repo :search))
 
 (defn- with-write-transit-str
   [p]

+ 12 - 0
src/main/frontend/ui.cljs

@@ -32,6 +32,7 @@
             [goog.object :as gobj]
             [lambdaisland.glogi :as log]
             [logseq.shui.icon.v2 :as shui.icon.v2]
+            [logseq.shui.popup.core :as shui-popups]
             [medley.core :as medley]
             [promesa.core :as p]
             [rum.core :as rum]
@@ -54,6 +55,17 @@
 
 (defonce icon-size (if (mobile-util/native-platform?) 26 20))
 
+(defn shui-popups? [] (some-> (shui-popups/get-popups) (count) (> 0)))
+(defn last-shui-preview-popup?
+  []
+  (= "ls-preview-popup"
+    (some-> (shui-popups/get-last-popup) :content-props :class)))
+(defn hide-popups-until-preview-popup!
+  []
+  (while (and (shui-popups?)
+           (not (last-shui-preview-popup?)))
+    (shui/popup-hide!)))
+
 (def built-in-colors
   ["yellow"
    "red"

+ 2 - 2
src/main/frontend/worker/rtc/asset_db_listener.cljs

@@ -4,7 +4,7 @@
             [frontend.schema-register :as sr]
             [frontend.worker.db-listener :as db-listener]
             [frontend.worker.rtc.asset :as r.asset]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]))
+            [frontend.worker.rtc.client-op :as client-op]))
 
 (defn entity-datoms=>action+asset-uuid
   [db-after entity-datoms]
@@ -33,6 +33,6 @@
 (defmethod db-listener/listen-db-changes :gen-asset-change-events
   [_ {:keys [_tx-data tx-meta _db-before db-after
              repo _id->attr->datom _e->a->add?->v->t same-entity-datoms-coll]}]
-  (when (and (op-mem-layer/rtc-db-graph? repo)
+  (when (and (client-op/rtc-db-graph? repo)
              (:generate-asset-change-events? tx-meta true))
     (generate-asset-change-events db-after same-entity-datoms-coll)))

+ 56 - 77
src/main/frontend/worker/rtc/client.cljs

@@ -1,13 +1,12 @@
 (ns frontend.worker.rtc.client
   "Fns about push local updates"
-  (:require [clojure.set :as set]
-            [clojure.string :as string]
+  (:require [clojure.string :as string]
             [datascript.core :as d]
             [frontend.common.missionary-util :as c.m]
+            [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.const :as rtc-const]
             [frontend.worker.rtc.exception :as r.ex]
             [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.rtc.remote-update :as r.remote-update]
             [frontend.worker.rtc.skeleton :as r.skeleton]
             [frontend.worker.rtc.ws :as ws]
@@ -22,8 +21,8 @@
             (m/? (ws-util/send&recv get-ws-create-task {:action "register-graph-updates"
                                                         :graph-uuid graph-uuid}))]
         (rtc-log-and-state/update-remote-t graph-uuid remote-t)
-        (when-not (op-mem-layer/get-local-tx repo)
-          (op-mem-layer/update-local-tx! repo remote-t)))
+        (when-not (client-op/get-local-tx repo)
+          (client-op/update-local-tx repo remote-t)))
       (catch :default e
         (if (= :rtc.exception/remote-graph-not-ready (:type (ex-data e)))
           (throw (ex-info "remote graph is still creating" {:missionary/retry true} e))
@@ -60,8 +59,8 @@
           (m/? (c.m/backoff
                 (take 5 (drop 2 c.m/delays)) ;retry 5 times if remote-graph is creating (4000 8000 16000 32000 64000)
                 (register-graph-updates get-ws-create-task graph-uuid repo)))
-          (let [t (op-mem-layer/get-local-tx repo)]
-            (when (and (zero? (op-mem-layer/get-unpushed-block-update-count repo))
+          (let [t (client-op/get-local-tx repo)]
+            (when (and (zero? (client-op/get-unpushed-ops-count repo))
                        (or (nil? @*last-calibrate-t)
                            (< 500 (- t @*last-calibrate-t))))
               (m/? (r.skeleton/new-task--calibrate-graph-skeleton get-ws-create-task graph-uuid conn t))
@@ -241,40 +240,13 @@
      :depend-on-block-uuids @*depend-on-block-uuid-set}))
 
 (defn- gen-block-uuid->remote-ops
-  [repo conn & {:keys [n] :or {n 50}}]
-  (loop [current-handling-block-ops nil
-         current-handling-block-uuid nil
-         depend-on-block-uuid-coll nil
-         r {}]
-    (cond
-      (and (empty? current-handling-block-ops)
-           (empty? depend-on-block-uuid-coll)
-           (>= (count r) n))
-      r
-
-      (and (empty? current-handling-block-ops)
-           (empty? depend-on-block-uuid-coll))
-      (if-let [{min-t-block-ops :ops block-uuid :block-uuid} (op-mem-layer/get-min-t-block-ops repo)]
-        (do (assert (not (contains? r block-uuid)) {:r r :block-uuid block-uuid})
-            (op-mem-layer/remove-block-ops! repo block-uuid)
-            (recur min-t-block-ops block-uuid depend-on-block-uuid-coll r))
-        ;; finish
-        r)
-
-      (and (empty? current-handling-block-ops)
-           (seq depend-on-block-uuid-coll))
-      (let [[block-uuid & other-block-uuids] depend-on-block-uuid-coll
-            block-ops (op-mem-layer/get-block-ops repo block-uuid)]
-        (op-mem-layer/remove-block-ops! repo block-uuid)
-        (recur block-ops block-uuid other-block-uuids r))
-
-      (seq current-handling-block-ops)
-      (let [{:keys [remote-ops depend-on-block-uuids]}
-            (local-block-ops->remote-ops @conn current-handling-block-ops)]
-        (recur nil nil
-               (set/union (set depend-on-block-uuid-coll)
-                          (op-mem-layer/intersection-block-uuids repo depend-on-block-uuids))
-               (assoc r current-handling-block-uuid (into {} remote-ops)))))))
+  [db block-ops-map-coll]
+  (into {}
+        (map
+         (fn [block-ops-map]
+           [(:block/uuid block-ops-map)
+            (:remote-ops (local-block-ops->remote-ops db block-ops-map))]))
+        block-ops-map-coll))
 
 (defn- merge-remove-remove-ops
   [remote-remove-ops]
@@ -333,52 +305,59 @@
                          block-uuid->remote-ops)]
     (concat update-schema-ops update-page-ops remove-ops sorted-move-ops update-ops remove-page-ops)))
 
+(defn- rollback
+  [repo block-ops-map-coll]
+  (let [ops (map (fn [m]
+                   (keep (fn [[k op]]
+                           (when (not= :block/uuid k)
+                             op))
+                         m))
+                 block-ops-map-coll)]
+    (client-op/add-ops repo ops)
+    nil))
+
 (defn new-task--push-local-ops
   "Return a task: push local updates"
   [repo conn graph-uuid date-formatter get-ws-create-task add-log-fn]
   (m/sp
-    (op-mem-layer/new-branch! repo)
-    (if-let [remote-ops (not-empty (gen-block-uuid->remote-ops repo conn))]
-      (when-let [ops-for-remote (rtc-const/to-ws-ops-decoder
-                                 (sort-remote-ops
-                                  remote-ops))]
-        (let [local-tx (op-mem-layer/get-local-tx repo)
-              r (m/? (ws-util/send&recv get-ws-create-task {:action "apply-ops" :graph-uuid graph-uuid
-                                                            :ops ops-for-remote :t-before (or local-tx 1)}))]
-          (if-let [remote-ex (:ex-data r)]
-            (do (add-log-fn :rtc.log/push-local-update remote-ex)
-                (case (:type remote-ex)
-                  ;; - :graph-lock-failed
-                  ;;   conflict-update remote-graph, keep these local-pending-ops
-                  ;;   and try to send ops later
-                  :graph-lock-failed
-                  (do (op-mem-layer/rollback! repo)
-                      nil)
-                  ;; - :graph-lock-missing
-                  ;;   this case means something wrong in remote-graph data,
-                  ;;   nothing to do at client-side
-                  :graph-lock-missing
-                  (do (op-mem-layer/rollback! repo)
-                      (throw r.ex/ex-remote-graph-lock-missing))
+    (let [block-ops-map-coll (client-op/get&remove-all-ops repo)]
+      (when-let [block-uuid->remote-ops (not-empty (gen-block-uuid->remote-ops @conn block-ops-map-coll))]
+        (when-let [ops-for-remote (rtc-const/to-ws-ops-decoder
+                                   (sort-remote-ops
+                                    block-uuid->remote-ops))]
+          (let [local-tx (client-op/get-local-tx repo)
+                r (m/? (ws-util/send&recv get-ws-create-task {:action "apply-ops" :graph-uuid graph-uuid
+                                                              :ops ops-for-remote :t-before (or local-tx 1)}))]
+            (if-let [remote-ex (:ex-data r)]
+              (do (add-log-fn :rtc.log/push-local-update remote-ex)
+                  (case (:type remote-ex)
+                    ;; - :graph-lock-failed
+                    ;;   conflict-update remote-graph, keep these local-pending-ops
+                    ;;   and try to send ops later
+                    :graph-lock-failed
+                    (rollback repo block-ops-map-coll)
+                    ;; - :graph-lock-missing
+                    ;;   this case means something wrong in remote-graph data,
+                    ;;   nothing to do at client-side
+                    :graph-lock-missing
+                    (do (rollback repo block-ops-map-coll)
+                        (throw r.ex/ex-remote-graph-lock-missing))
 
-                  :rtc.exception/get-s3-object-failed
-                  (do (op-mem-layer/rollback! repo)
-                      nil)
-                  ;; else
-                  (do (op-mem-layer/rollback! repo)
-                      (throw (ex-info "Unavailable" {:remote-ex remote-ex})))))
+                    :rtc.exception/get-s3-object-failed
+                    (rollback repo block-ops-map-coll)
+                    ;; else
+                    (do (rollback repo block-ops-map-coll)
+                        (throw (ex-info "Unavailable" {:remote-ex remote-ex})))))
 
-            (do (assert (pos? (:t r)) r)
-                (op-mem-layer/commit! repo)
-                (r.remote-update/apply-remote-update
-                 graph-uuid repo conn date-formatter {:type :remote-update :value r} add-log-fn)
-                (add-log-fn :rtc.log/push-local-update {:remote-t (:t r)})))))
-      (op-mem-layer/rollback! repo))))
+              (do (assert (pos? (:t r)) r)
+                  (r.remote-update/apply-remote-update
+                   graph-uuid repo conn date-formatter {:type :remote-update :value r} add-log-fn)
+                  (add-log-fn :rtc.log/push-local-update {:remote-t (:t r)})))))))))
 
 (defn new-task--pull-remote-data
   [repo conn graph-uuid date-formatter get-ws-create-task add-log-fn]
   (m/sp
-    (let [local-tx (op-mem-layer/get-local-tx repo)
+    (let [local-tx (client-op/get-local-tx repo)
           r (m/? (ws-util/send&recv get-ws-create-task {:action "apply-ops" :graph-uuid graph-uuid
                                                         :ops [] :t-before (or local-tx 1)}))]
       (if-let [remote-ex (:ex-data r)]

+ 220 - 0
src/main/frontend/worker/rtc/client_op.cljs

@@ -0,0 +1,220 @@
+(ns frontend.worker.rtc.client-op
+  "Store client-ops in a persisted datascript"
+  (:require [datascript.core :as d]
+            [frontend.worker.rtc.const :as rtc-const]
+            [frontend.worker.state :as worker-state]
+            [logseq.db.sqlite.util :as sqlite-util]
+            [malli.core :as ma]
+            [malli.transform :as mt]
+            [missionary.core :as m]))
+
+(def op-schema
+  [:multi {:dispatch first}
+   [:move
+    [:catn
+     [:op :keyword]
+     [:t :int]
+     [:value [:map
+              [:block-uuid :uuid]]]]]
+   [:remove
+    [:catn
+     [:op :keyword]
+     [:t :int]
+     [:value [:map
+              [:block-uuid :uuid]]]]]
+   [:update-page
+    [:catn
+     [:op :keyword]
+     [:t :int]
+     [:value [:map
+              [:block-uuid :uuid]]]]]
+   [:remove-page
+    [:catn
+     [:op :keyword]
+     [:t :int]
+     [:value [:map
+              [:block-uuid :uuid]]]]]
+   [:update
+    [:catn
+     [:op :keyword]
+     [:t :int]
+     [:value [:map
+              [:block-uuid :uuid]
+              [:av-coll [:sequential rtc-const/av-schema]]]]]]])
+
+(def ops-schema [:sequential op-schema])
+(def ops-coercer (ma/coercer ops-schema mt/json-transformer nil
+                             #(do (prn ::bad-ops (:value %))
+                                  (ma/-fail! ::ops-schema %))))
+
+(def schema-in-db
+  {:block/uuid {:db/unique :db.unique/identity}
+   :local-tx {:db/index true}
+   :graph-uuid {:db/index true}})
+
+(defn update-graph-uuid
+  [repo graph-uuid]
+  {:pre [(some? graph-uuid)]}
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (assert (nil? (first (d/datoms @conn :avet :graph-uuid))))
+    (d/transact! conn [[:db/add "e" :graph-uuid graph-uuid]])))
+
+(defn update-local-tx
+  [repo t]
+  {:pre [(some? t)]}
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (let [tx-data
+          (if-let [datom (first (d/datoms @conn :avet :local-tx))]
+            [:db/add (:e datom) :local-tx t]
+            [:db/add "e" :local-tx t])]
+      (d/transact! conn [tx-data]))))
+
+(defn get-local-tx
+  [repo]
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (:v (first (d/datoms @conn :avet :local-tx)))))
+
+(defn- merge-update-ops
+  [update-op1 update-op2]
+  {:pre [(= :update (first update-op1))
+         (= :update (first update-op2))
+         (= (:block-uuid (last update-op1))
+            (:block-uuid (last update-op2)))]}
+  (let [t1 (second update-op1)
+        t2 (second update-op2)]
+    (if (> t1 t2)
+      (merge-update-ops update-op2 update-op1)
+      (let [{av-coll1 :av-coll block-uuid :block-uuid} (last update-op1)
+            {av-coll2 :av-coll} (last update-op1)]
+        [:update t2
+         {:block-uuid block-uuid
+          :av-coll (concat av-coll1 av-coll2)}]))))
+
+(defn add-ops*
+  [conn ops]
+  (let [ops (ops-coercer ops)]
+    (letfn [(already-removed? [remove-op t]
+              (some-> remove-op second (> t)))
+            (add-after-remove? [move-op t]
+              (some-> move-op second (> t)))]
+      (doseq [op ops]
+        (let [[op-type t value] op
+              {:keys [block-uuid]} value
+              exist-block-ops-entity (d/entity @conn [:block/uuid block-uuid])
+              e (:db/id exist-block-ops-entity)
+              tx-data
+              (case op-type
+                :move
+                (let [remove-op (get exist-block-ops-entity :remove)]
+                  (when-not (already-removed? remove-op t)
+                    (cond-> [{:block/uuid block-uuid
+                              :move op}]
+                      remove-op (conj [:db.fn/retractAttribute e :remove]))))
+                :update
+                (let [remove-op (get exist-block-ops-entity :remove)]
+                  (when-not (already-removed? remove-op t)
+                    (let [origin-update-op (get exist-block-ops-entity :update)
+                          op* (if origin-update-op (merge-update-ops origin-update-op op) op)]
+                      (cond-> [{:block/uuid block-uuid
+                                :update op*}]
+                        remove-op (conj [:db.fn/retractAttribute e :remove])))))
+                :remove
+                (let [move-op (get exist-block-ops-entity :move)]
+                  (when-not (add-after-remove? move-op t)
+                    (cond-> [{:block/uuid block-uuid
+                              :remove op}]
+                      move-op (conj [:db.fn/retractAttribute e :move]))))
+                :update-page
+                (let [remove-page-op (get exist-block-ops-entity :remove-page)]
+                  (when-not (already-removed? remove-page-op t)
+                    (cond-> [{:block/uuid block-uuid
+                              :update-page op}]
+                      remove-page-op (conj [:db.fn/retractAttribute e :remove-page]))))
+                :remove-page
+                (let [update-page-op (get exist-block-ops-entity :update-page)]
+                  (when-not (add-after-remove? update-page-op t)
+                    (cond-> [{:block/uuid block-uuid
+                              :remove-page op}]
+                      update-page-op (conj [:db.fn/retractAttribute e :update-page])))))]
+          (when (seq tx-data)
+            (d/transact! conn tx-data)))))))
+
+(defn add-ops
+  [repo ops]
+  (let [conn (worker-state/get-client-ops-conn repo)]
+    (assert (some? conn) repo)
+    (add-ops* conn ops)))
+
+(defn- get-all-op-datoms
+  [conn]
+  (->> (d/datoms @conn :eavt)
+       (remove (fn [datom] (contains? #{:graph-uuid :local-tx} (:a datom))))
+       (group-by :e)))
+
+(defn get-all-ops*
+  [conn]
+  (let [e->datoms (get-all-op-datoms conn)]
+    (map (fn [same-ent-datoms]
+           (into {} (map (juxt :a :v)) same-ent-datoms))
+         (vals e->datoms))))
+
+(defn get&remove-all-ops*
+  [conn]
+  (let [e->datoms (get-all-op-datoms conn)
+        retract-all-tx-data (map (fn [e] [:db.fn/retractEntity e]) (keys e->datoms))]
+    (d/transact! conn retract-all-tx-data)
+    (map (fn [same-ent-datoms]
+           (into {} (map (juxt :a :v)) same-ent-datoms))
+         (vals e->datoms))))
+
+(defn get-all-ops
+  "Return coll of
+  {:block/uuid ...
+   :update ...
+   :move ...
+   ...}"
+  [repo]
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (mapcat
+     (fn [m]
+       (keep (fn [[k v]]
+               (when (not= :block/uuid k) v))
+             m))
+     (get-all-ops* conn))))
+
+(defn get&remove-all-ops
+  "Return coll of
+  {:block/uuid ...
+   :update ...
+   :move ...
+   ...}"
+  [repo]
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (get&remove-all-ops* conn)))
+
+(defn get-unpushed-ops-count
+  [repo]
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (count (get-all-op-datoms conn))))
+
+(defn rtc-db-graph?
+  "Is db-graph & RTC enabled"
+  [repo]
+  (and (sqlite-util/db-based-graph? repo)
+       (or (exists? js/process)
+           (some? (get-local-tx repo)))))
+
+(defn create-pending-ops-count-flow
+  [repo]
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (letfn [(datom-count [db]
+              (count (d/datoms db :avet :block/uuid)))]
+      (m/relieve
+       (m/observe
+        (fn ctor [emit!]
+          (d/listen! conn :create-pending-ops-count-flow
+                     (fn [{:keys [db-after]}]
+                       (emit! (datom-count db-after))))
+          (emit! (datom-count @conn))
+          (fn dtor []
+            (d/unlisten! conn :create-pending-ops-count-flow))))))))

+ 3 - 1
src/main/frontend/worker/rtc/const.cljs

@@ -59,7 +59,9 @@
       [:db/cardinality {:optional true} :keyword]
       [:db/index {:optional true} :boolean]]]]])
 
-(def to-ws-ops-validator (m/validator [:sequential to-ws-op-schema]))
+(comment
+  (def to-ws-ops-validator (m/validator [:sequential to-ws-op-schema])))
+
 (def to-ws-ops-decoder (m/decoder [:sequential to-ws-op-schema] mt/string-transformer))
 
 (def ^:private extra-attr-map-schema

+ 3 - 3
src/main/frontend/worker/rtc/core.cljs

@@ -3,10 +3,10 @@
   (:require [frontend.common.missionary-util :as c.m]
             [frontend.worker.rtc.asset :as r.asset]
             [frontend.worker.rtc.client :as r.client]
+            [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.exception :as r.ex]
             [frontend.worker.rtc.full-upload-download-graph :as r.upload-download]
             [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.rtc.remote-update :as r.remote-update]
             [frontend.worker.rtc.skeleton]
             [frontend.worker.rtc.ws :as ws]
@@ -49,7 +49,7 @@
         merge-flow (m/latest vector auto-push-flow clock-flow)]
     (m/eduction (filter first)
                 (map second)
-                (filter (fn [v] (when (pos? (op-mem-layer/get-unpushed-block-update-count repo)) v)))
+                (filter (fn [v] (when (pos? (client-op/get-unpushed-ops-count repo)) v)))
                 merge-flow)))
 
 (defn- create-mixed-flow
@@ -305,7 +305,7 @@
                  :auto-push? rtc-auto-push?
                  :online-users online-users})
               rtc-state-flow (m/watch *rtc-auto-push?) (m/watch *rtc-lock) (m/watch *online-users)
-              (op-mem-layer/create-pending-ops-count-flow repo)
+              (client-op/create-pending-ops-count-flow repo)
               (rtc-log-and-state/create-local-t-flow graph-uuid)
               (rtc-log-and-state/create-remote-t-flow graph-uuid))))
           (catch Cancelled _))))))

+ 3 - 3
src/main/frontend/worker/rtc/db_listener.cljs

@@ -4,7 +4,7 @@
             [datascript.core :as d]
             [frontend.schema-register :include-macros true :as sr]
             [frontend.worker.db-listener :as db-listener]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
+            [frontend.worker.rtc.client-op :as client-op]
             [logseq.db :as ldb]))
 
 (defn- latest-add?->v->t
@@ -117,7 +117,7 @@
   (let [ops (mapcat (partial entity-datoms=>ops db-before db-after e->a->v->add?->t)
                     same-entity-datoms-coll)]
     (when (seq ops)
-      (op-mem-layer/add-ops! repo ops))))
+      (client-op/add-ops repo ops))))
 
 (sr/defkeyword :persist-op?
   "tx-meta option, generate rtc ops when not nil (default true)")
@@ -125,6 +125,6 @@
 (defmethod db-listener/listen-db-changes :gen-rtc-ops
   [_ {:keys [_tx-data tx-meta db-before db-after
              repo _id->attr->datom e->a->add?->v->t same-entity-datoms-coll]}]
-  (when (and (op-mem-layer/rtc-db-graph? repo)
+  (when (and (client-op/rtc-db-graph? repo)
              (:persist-op? tx-meta true))
     (generate-rtc-ops repo db-before db-after same-entity-datoms-coll e->a->add?->v->t)))

+ 4 - 8
src/main/frontend/worker/rtc/full_upload_download_graph.cljs

@@ -5,8 +5,8 @@
             [clojure.set :as set]
             [datascript.core :as d]
             [frontend.common.missionary-util :as c.m]
+            [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.rtc.ws-util :as ws-util]
             [frontend.worker.state :as worker-state]
             [frontend.worker.util :as worker-util]
@@ -134,9 +134,7 @@
                            [{:db/ident :logseq.kv/graph-uuid :kv/value graph-uuid}
                             {:db/ident :logseq.kv/graph-local-tx :kv/value "0"}])
             (m/? (c.m/await-promise (.storeMetadata worker-obj repo (pr-str {:kv/value graph-uuid}))))
-            (op-mem-layer/init-empty-ops-store! repo)
-            (op-mem-layer/update-graph-uuid! repo graph-uuid)
-            (m/? (op-mem-layer/new-task--sync-to-idb repo))
+            (client-op/update-graph-uuid repo graph-uuid)
             (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :upload-completed
                                                         :message "upload-graph completed"})
             nil)
@@ -246,7 +244,7 @@
                              schema-blocks)
         ^js worker-obj (:worker/object @worker-state/*state)]
     (m/sp
-      (op-mem-layer/update-local-tx! repo t)
+      (client-op/update-local-tx repo t)
       (rtc-log-and-state/update-local-t graph-uuid t)
       (rtc-log-and-state/update-remote-t graph-uuid t)
       (m/?
@@ -324,10 +322,8 @@
                                                         :graph-uuid graph-uuid})
           (let [all-blocks (ldb/read-transit-str body)]
             (worker-state/set-rtc-downloading-graph! true)
-            (op-mem-layer/init-empty-ops-store! repo)
             (m/? (new-task--transact-remote-all-blocks all-blocks repo graph-uuid))
-            (op-mem-layer/update-graph-uuid! repo graph-uuid)
-            (m/? (op-mem-layer/new-task--sync-to-idb repo))
+            (client-op/update-graph-uuid repo graph-uuid)
             (m/? (c.m/await-promise (.storeMetadata worker-obj repo (pr-str {:kv/value graph-uuid}))))
             (worker-state/set-rtc-downloading-graph! false)
             (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :download-completed

+ 0 - 33
src/main/frontend/worker/rtc/op_idb_layer.cljs

@@ -1,33 +0,0 @@
-(ns frontend.worker.rtc.op-idb-layer
-  "Fns to read/write client-ops from/into indexeddb."
-  (:require ["/frontend/idbkv" :as idb-keyval]
-            [promesa.core :as p]
-            [logseq.db.sqlite.util :as sqlite-util]
-            [logseq.db :as ldb]))
-
-(def stores (atom {}))
-
-(defn- ensure-store
-  "Return nil when 'repo' is not a db-graph"
-  [repo]
-  {:pre [(some? repo)]}
-  (when (sqlite-util/db-based-graph? repo)
-    (if-let [s (@stores repo)]
-      s
-      (do (swap! stores assoc repo (idb-keyval/newStore (str "rtc-ops-" repo) "ops"))
-          (@stores repo)))))
-
-(defn <reset2!
-  [repo v]
-  (p/do!
-   (when-let [store (ensure-store repo)]
-     (let [v (ldb/write-transit-str v)]
-       (idb-keyval/set "v" v store)))))
-
-(defn <read2
-  [repo]
-  (p/do!
-   (when-let [store (ensure-store repo)]
-     (p/let [v (idb-keyval/get "v" store)]
-       (when v
-         (ldb/read-transit-str v))))))

+ 0 - 402
src/main/frontend/worker/rtc/op_mem_layer.cljs

@@ -1,402 +0,0 @@
-(ns frontend.worker.rtc.op-mem-layer
-  "Store client-ops in memory.
-  And sync these ops to indexedDb automatically."
-  (:require [clojure.set :as set]
-            [frontend.common.missionary-util :as c.m]
-            [frontend.worker.rtc.const :as rtc-const]
-            [frontend.worker.rtc.op-idb-layer :as op-idb-layer]
-            [frontend.worker.state :as worker-state]
-            [logseq.db :as ldb]
-            [logseq.db.sqlite.util :as sqlite-util]
-            [malli.core :as ma]
-            [malli.transform :as mt]
-            [missionary.core :as m]
-            [promesa.core :as p])
-  (:import [missionary Cancelled]))
-
-(def op-schema
-  [:multi {:dispatch first}
-   [:move
-    [:catn
-     [:op :keyword]
-     [:t :int]
-     [:value [:map
-              [:block-uuid :uuid]]]]]
-   [:remove
-    [:catn
-     [:op :keyword]
-     [:t :int]
-     [:value [:map
-              [:block-uuid :uuid]]]]]
-   [:update-page
-    [:catn
-     [:op :keyword]
-     [:t :int]
-     [:value [:map
-              [:block-uuid :uuid]]]]]
-   [:remove-page
-    [:catn
-     [:op :keyword]
-     [:t :int]
-     [:value [:map
-              [:block-uuid :uuid]]]]]
-   [:update
-    [:catn
-     [:op :keyword]
-     [:t :int]
-     [:value [:map
-              [:block-uuid :uuid]
-              [:av-coll [:sequential rtc-const/av-schema]]]]]]
-
-   [:update-asset
-    [:catn
-     [:op :keyword]
-     [:t :int]
-     [:value [:map
-              [:asset-uuid :uuid]]]]]
-   [:remove-asset
-    [:catn
-     [:op :keyword]
-     [:t :int]
-     [:value [:map
-              [:asset-uuid :uuid]]]]]])
-
-(def ops-schema [:sequential op-schema])
-
-(def ops-coercer (ma/coercer ops-schema mt/json-transformer nil
-                             #(do (prn ::bad-ops (:value %))
-                                  (ma/-fail! ::ops-schema %))))
-
-(def ops-store-value-schema
-  [:map
-   [:graph-uuid {:optional true} :string]
-   [:local-tx {:optional true} :int]
-   [:block-uuid->ops [:map-of :uuid
-                      [:map-of [:enum :move :remove :update :update-page :remove-page] :any]]]
-   ;; TODO: remove :asset-uuid->ops
-   [:asset-uuid->ops [:map-of :uuid
-                      [:map-of [:enum :update-asset :remove-asset] :any]]]
-   [:t+block-uuid-sorted-set [:set [:cat :int :uuid]]]])
-
-(def ops-store-schema
-  [:map-of :string                      ; repo-name
-   [:map
-    [:current-branch ops-store-value-schema]
-    [:old-branch {:optional true} [:maybe ops-store-value-schema]]]])
-
-(def ops-store-schema-coercer (ma/coercer ops-store-schema nil nil #(ma/-fail! ::ops-store-schema %)))
-
-(defonce *ops-store (atom {} :validator ops-store-schema-coercer))
-
-(defn- merge-update-ops
-  [update-op1 update-op2]
-  {:pre [(= :update (first update-op1))
-         (= :update (first update-op2))
-         (= (:block-uuid (last update-op1))
-            (:block-uuid (last update-op2)))]}
-  (let [t1 (second update-op1)
-        t2 (second update-op2)]
-    (if (> t1 t2)
-      (merge-update-ops update-op2 update-op1)
-      (let [{av-coll1 :av-coll block-uuid :block-uuid} (last update-op1)
-            av-coll2 (:av-coll (last update-op2))]
-        [:update t2
-         {:block-uuid block-uuid
-          :av-coll (concat av-coll1 av-coll2)}]))))
-
-(defn- block-uuid->min-t
-  [block-uuid->ops block-uuid]
-  (some->> (block-uuid->ops block-uuid)
-           vals
-           (map second)
-           seq
-           (apply min)))
-
-(defn- update-t+block-uuid-sorted-set
-  [t+block-uuid-sorted-set old-block-uuid->ops block-uuid->ops block-uuid]
-  (let [origin-min-t (block-uuid->min-t old-block-uuid->ops block-uuid)
-        min-t (block-uuid->min-t block-uuid->ops block-uuid)]
-    (cond-> t+block-uuid-sorted-set
-      origin-min-t (disj [origin-min-t block-uuid])
-      true (conj [min-t block-uuid]))))
-
-(defn ^:large-vars/cleanup-todo add-ops-aux
-  [ops block-uuid->ops t+block-uuid-sorted-set]
-  (loop [block-uuid->ops block-uuid->ops
-         t+block-uuid-sorted-set t+block-uuid-sorted-set
-         [op & others] ops]
-    (if-not op
-      {:block-uuid->ops block-uuid->ops
-       :t+block-uuid-sorted-set t+block-uuid-sorted-set}
-      (let [[op-type t value] op
-            {:keys [block-uuid]} value
-            exist-ops (some-> block-uuid block-uuid->ops)]
-        (case op-type
-          :move
-          (let [already-removed? (some-> (get exist-ops :remove) second (> t))]
-            (if already-removed?
-              (recur block-uuid->ops t+block-uuid-sorted-set others)
-              (let [block-uuid->ops* (-> block-uuid->ops
-                                         (assoc-in [block-uuid :move] op)
-                                         (update block-uuid dissoc :remove))
-                    t+block-uuid-sorted-set*
-                    (update-t+block-uuid-sorted-set t+block-uuid-sorted-set
-                                                    block-uuid->ops
-                                                    block-uuid->ops*
-                                                    block-uuid)]
-                (recur block-uuid->ops* t+block-uuid-sorted-set* others))))
-          :update
-          (let [already-removed? (some-> (get exist-ops :remove) second (> t))]
-            (if already-removed?
-              (recur block-uuid->ops t+block-uuid-sorted-set others)
-              (let [origin-update-op (get-in block-uuid->ops [block-uuid :update])
-                    op* (if origin-update-op (merge-update-ops origin-update-op op) op)
-                    block-uuid->ops* (-> block-uuid->ops
-                                         (assoc-in [block-uuid :update] op*)
-                                         (update block-uuid dissoc :remove))
-                    t+block-uuid-sorted-set*
-                    (update-t+block-uuid-sorted-set t+block-uuid-sorted-set
-                                                    block-uuid->ops
-                                                    block-uuid->ops*
-                                                    block-uuid)]
-                (recur block-uuid->ops* t+block-uuid-sorted-set* others))))
-          :remove
-          (let [add-after-remove? (some-> (get exist-ops :move) second (> t))]
-            (if add-after-remove?
-              (recur block-uuid->ops t+block-uuid-sorted-set others)
-              (let [block-uuid->ops* (assoc block-uuid->ops block-uuid {:remove op})
-                    t+block-uuid-sorted-set*
-                    (update-t+block-uuid-sorted-set t+block-uuid-sorted-set
-                                                    block-uuid->ops
-                                                    block-uuid->ops*
-                                                    block-uuid)]
-                (recur block-uuid->ops* t+block-uuid-sorted-set* others))))
-          :update-page
-          (let [already-removed? (some-> (get exist-ops :remove-page) second (> t))]
-            (if already-removed?
-              (recur block-uuid->ops t+block-uuid-sorted-set others)
-              (let [block-uuid->ops* (-> block-uuid->ops
-                                         (assoc-in [block-uuid :update-page] op)
-                                         (update block-uuid dissoc :remove-page))
-                    t+block-uuid-sorted-set*
-                    (update-t+block-uuid-sorted-set t+block-uuid-sorted-set
-                                                    block-uuid->ops
-                                                    block-uuid->ops*
-                                                    block-uuid)]
-                (recur block-uuid->ops* t+block-uuid-sorted-set* others))))
-          :remove-page
-          (let [add-after-remove? (some-> (get exist-ops :update-page) second (> t))]
-            (if add-after-remove?
-              (recur block-uuid->ops t+block-uuid-sorted-set others)
-              (let [block-uuid->ops* (assoc block-uuid->ops block-uuid {:remove-page op})
-                    t+block-uuid-sorted-set*
-                    (update-t+block-uuid-sorted-set t+block-uuid-sorted-set
-                                                    block-uuid->ops
-                                                    block-uuid->ops*
-                                                    block-uuid)]
-                (recur block-uuid->ops* t+block-uuid-sorted-set* others)))))))))
-
-(def ^:private sorted-set-by-t (sorted-set-by (fn [[t1 x] [t2 y]]
-                                                (let [r (compare t1 t2)]
-                                                  (if (not= r 0)
-                                                    r
-                                                    (compare x y))))))
-
-(def ^:private empty-ops-store-value {:current-branch {:block-uuid->ops {}
-                                                       :asset-uuid->ops {}
-                                                       :t+block-uuid-sorted-set sorted-set-by-t}})
-
-(defn init-empty-ops-store!
-  [repo]
-  (swap! *ops-store assoc repo empty-ops-store-value))
-
-(defn remove-ops-store!
-  [repo]
-  (swap! *ops-store dissoc repo))
-
-(defn add-ops!
-  [repo ops]
-  (assert (contains? (@*ops-store repo) :current-branch) (@*ops-store repo))
-  (let [ops (ops-coercer ops)
-        {{old-branch-block-uuid->ops :block-uuid->ops
-          old-t+block-uuid-sorted-set :t+block-uuid-sorted-set
-          :as old-branch} :old-branch
-         {:keys [block-uuid->ops t+block-uuid-sorted-set]} :current-branch}
-        (get @*ops-store repo)
-        {:keys [block-uuid->ops t+block-uuid-sorted-set]}
-        (add-ops-aux ops block-uuid->ops t+block-uuid-sorted-set)
-        {old-branch-block-uuid->ops :block-uuid->ops old-t+block-uuid-sorted-set :t+block-uuid-sorted-set}
-        (when old-branch
-          (add-ops-aux ops old-branch-block-uuid->ops old-t+block-uuid-sorted-set))]
-    (swap! *ops-store update repo
-           (fn [{:keys [current-branch old-branch]}]
-             (cond-> {:current-branch
-                      (assoc current-branch
-                             :block-uuid->ops block-uuid->ops
-                             :t+block-uuid-sorted-set t+block-uuid-sorted-set)}
-               old-branch
-               (assoc :old-branch
-                      (assoc old-branch
-                             :block-uuid->ops old-branch-block-uuid->ops
-                             :t+block-uuid-sorted-set old-t+block-uuid-sorted-set)))))))
-
-(defn update-local-tx!
-  [repo t]
-  (assert (contains? (@*ops-store repo) :current-branch))
-  (swap! *ops-store update-in [repo :current-branch] assoc :local-tx t))
-
-(defn update-graph-uuid!
-  [repo graph-uuid]
-  (assert (contains? (@*ops-store repo) :current-branch))
-  (swap! *ops-store update repo
-         (fn [{:keys [current-branch old-branch]}]
-           (cond-> {:current-branch (assoc current-branch :graph-uuid graph-uuid)}
-             old-branch (assoc :old-branch (assoc old-branch :graph-uuid graph-uuid))))))
-
-(defn new-branch!
-  "Make a copy of current repo-ops-store, and also store in `*ops-store`.
-  The following `add-ops` apply on both old-branch and new-branch(current).
-  use `rollback` to replace current-branch with old-branch.
-  use `commit` to remove old-branch."
-  [repo]
-  (let [{:keys [current-branch]} (get @*ops-store repo)]
-    (assert (some? current-branch) repo)
-    (swap! *ops-store assoc-in [repo :old-branch] current-branch)))
-
-(defn rollback!
-  [repo]
-  (when-let [old-branch (get-in @*ops-store [repo :old-branch])]
-    (assert (some? old-branch))
-    (swap! *ops-store assoc repo {:current-branch old-branch})))
-
-(defn commit!
-  [repo]
-  (swap! *ops-store update repo dissoc :old-branch))
-
-(defn get-min-t-block-ops
-  [repo]
-  (let [repo-ops-store (get @*ops-store repo)
-        {:keys [t+block-uuid-sorted-set block-uuid->ops]} (:current-branch repo-ops-store)]
-    (assert (contains? repo-ops-store :current-branch) repo)
-    (when-let [[t block-uuid] (first t+block-uuid-sorted-set)]
-      (if (contains? block-uuid->ops block-uuid)
-        {:block-uuid block-uuid
-         :ops (block-uuid->ops block-uuid)}
-
-        (throw (ex-info "unavailable" {:t t :block-uuid block-uuid :block-uuid->ops block-uuid->ops}))
-        ;; if not found, remove item in :t+block-uuid-sorted-set and retry
-        ;; (do (swap! *ops-store update-in [repo :current-branch] assoc
-        ;;            :t+block-uuid-sorted-set (disj t+block-uuid-sorted-set [t block-uuid]))
-        ;;     (get-min-t-block-ops repo))
-        ))))
-
-(defn get-block-ops
-  [repo block-uuid]
-  (let [repo-ops-store (get @*ops-store repo)
-        {:keys [block-uuid->ops]} (:current-branch repo-ops-store)]
-    (assert (contains? repo-ops-store :current-branch) repo)
-    (block-uuid->ops block-uuid)))
-
-(defn get-all-ops
-  [repo]
-  (some->> (get @*ops-store repo)
-           :current-branch
-           :block-uuid->ops
-           vals
-           (mapcat vals)))
-
-(defn get-local-tx
-  [repo]
-  (some-> (get @*ops-store repo)
-          :current-branch
-          :local-tx))
-
-(defn get-unpushed-block-update-count
-  [repo]
-  (or
-   (some-> (get @*ops-store repo)
-           :current-branch
-           :block-uuid->ops
-           keys
-           count)
-   0))
-
-(comment
-  (defn get-unpushed-asset-update-count
-    [repo]
-    (some-> (get @*ops-store repo)
-            :current-branch
-            :asset-uuid->ops
-            keys
-            count)))
-
-(defn intersection-block-uuids
-  [repo block-uuid-coll]
-  (some->> (get @*ops-store repo)
-           :current-branch
-           :block-uuid->ops
-           keys
-           set
-           (set/intersection (set block-uuid-coll))))
-
-(defn remove-block-ops!
-  [repo block-uuid]
-  {:pre [(uuid? block-uuid)]}
-  (let [repo-ops-store (get @*ops-store repo)
-        {:keys [t+block-uuid-sorted-set block-uuid->ops]} (:current-branch repo-ops-store)]
-    (assert (contains? repo-ops-store :current-branch) repo)
-    (let [min-t (block-uuid->min-t block-uuid->ops block-uuid)]
-      (swap! *ops-store update-in [repo :current-branch] assoc
-             :block-uuid->ops (dissoc block-uuid->ops block-uuid)
-             :t+block-uuid-sorted-set (disj t+block-uuid-sorted-set [min-t block-uuid])))))
-
-(defn <init-load-from-indexeddb2!
-  [repo]
-  (p/let [v (op-idb-layer/<read2 repo)]
-    (when v
-      (let [v (assoc v
-                     :t+block-uuid-sorted-set
-                     (apply conj sorted-set-by-t (:t+block-uuid-sorted-set v)))]
-        (swap! *ops-store assoc repo {:current-branch v})
-        (prn ::<init-load-from-indexeddb! repo)))))
-
-(defn new-task--sync-to-idb
-  [repo]
-  (m/sp
-    (when-let [v (:current-branch (@*ops-store repo))]
-      (m/? (c.m/await-promise (op-idb-layer/<reset2! repo v))))))
-
-(defn- new-task--sync-to-idb-loop
-  []
-  (m/sp
-    (let [*v-hash (atom nil)]
-      (loop []
-        (m/? (m/sleep 3000))
-        (let [repo (worker-state/get-current-repo)
-              conn (worker-state/get-datascript-conn repo)]
-          (when (and repo conn
-                     (ldb/db-based-graph? @conn))
-            (when-let [v (:current-branch (@*ops-store repo))]
-              (let [v-hash (hash v)]
-                (when (not= v-hash @*v-hash)
-                  (m/? (c.m/await-promise (op-idb-layer/<reset2! repo v)))
-                  (reset! *v-hash v-hash))))))
-        (recur)))))
-
-#_:clj-kondo/ignore
-(defonce _sync-loop-canceler (c.m/run-task (new-task--sync-to-idb-loop) ::sync-to-idb-loop))
-
-(defn rtc-db-graph?
-  "Is db-graph & RTC enabled"
-  [repo]
-  (and (sqlite-util/db-based-graph? repo)
-       (or (exists? js/process)
-           (some? (get-local-tx repo)))))
-
-(defn create-pending-ops-count-flow
-  [repo]
-  (m/ap
-    (m/?< (m/watch *ops-store))
-    (try
-      (get-unpushed-block-update-count repo)
-      (catch Cancelled _))))

+ 5 - 5
src/main/frontend/worker/rtc/remote_update.cljs

@@ -6,9 +6,9 @@
             [datascript.core :as d]
             [frontend.schema-register :as sr]
             [frontend.worker.handler.page :as worker-page]
+            [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.const :as rtc-const]
             [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.state :as worker-state]
             [frontend.worker.util :as worker-util]
             [logseq.clj-fractional-indexing :as index]
@@ -252,7 +252,7 @@
   these updates maybe not needed or need to update, because this client just updated some of these blocks,
   so we need to update these remote-data by local-ops"
   [affected-blocks-map local-unpushed-ops]
-  (assert (op-mem-layer/ops-coercer local-unpushed-ops) local-unpushed-ops)
+  (assert (client-op/ops-coercer local-unpushed-ops) local-unpushed-ops)
   (reduce
    (fn [affected-blocks-map local-op]
      (let [local-op-value (last local-op)]
@@ -283,7 +283,7 @@
 
 (defn- affected-blocks->diff-type-ops
   [repo affected-blocks]
-  (let [unpushed-ops (op-mem-layer/get-all-ops repo)
+  (let [unpushed-ops (client-op/get-all-ops repo)
         affected-blocks-map* (if unpushed-ops
                                (update-remote-data-by-local-unpushed-ops
                                 affected-blocks unpushed-ops)
@@ -530,7 +530,7 @@
     (assert (rtc-const/data-from-ws-validator remote-update-data) remote-update-data)
     (let [remote-t (:t remote-update-data)
           remote-t-before (:t-before remote-update-data)
-          local-tx (op-mem-layer/get-local-tx repo)]
+          local-tx (client-op/get-local-tx repo)]
       (rtc-log-and-state/update-remote-t graph-uuid remote-t)
       (cond
         (not (and (pos? remote-t)
@@ -569,7 +569,7 @@
           (worker-util/profile :apply-remote-remove-ops (apply-remote-remove-ops repo conn date-formatter remove-ops))
           (js/console.groupEnd)
 
-          (op-mem-layer/update-local-tx! repo remote-t)
+          (client-op/update-local-tx repo remote-t)
           (rtc-log-and-state/update-local-t graph-uuid remote-t))
         :else (throw (ex-info "unreachable" {:remote-t remote-t
                                              :remote-t-before remote-t-before

+ 13 - 6
src/main/frontend/worker/state.cljs

@@ -32,24 +32,31 @@
 (defonce *rtc-ws-url (atom nil))
 
 (defonce *sqlite (atom nil))
-;; repo -> {:db conn :search conn}
+;; repo -> {:db conn :search conn :client-ops conn}
 (defonce *sqlite-conns (atom nil))
 ;; repo -> conn
 (defonce *datascript-conns (atom nil))
+
+;; repo -> conn
+(defonce *client-ops-conns (atom nil))
+
 ;; repo -> pool
 (defonce *opfs-pools (atom nil))
 
 (defn get-sqlite-conn
-  [repo & {:keys [search?]
-           :or {search? false}
-           :as _opts}]
-  (let [k (if search? :search :db)]
-    (get-in @*sqlite-conns [repo k])))
+  ([repo] (get-sqlite-conn repo :db))
+  ([repo which-db]
+   (assert (contains? #{:db :search :client-ops} which-db) which-db)
+   (get-in @*sqlite-conns [repo which-db])))
 
 (defn get-datascript-conn
   [repo]
   (get @*datascript-conns repo))
 
+(defn get-client-ops-conn
+  [repo]
+  (get @*client-ops-conns repo))
+
 (defn get-opfs-pool
   [repo]
   (get @*opfs-pools repo))

+ 57 - 1
src/test/frontend/worker/rtc/db_listener_test.cljs

@@ -1,9 +1,22 @@
 (ns frontend.worker.rtc.db-listener-test
   (:require [cljs.test :as t :refer [deftest is testing]]
             [datascript.core :as d]
+            [frontend.db.conn :as conn]
+            [frontend.state :as state]
+            [frontend.test.helper :as test-helper]
             [frontend.worker.db-listener :as worker-db-listener]
+            [frontend.worker.handler.page :as worker-page]
+            [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.db-listener :as subject]
-            [logseq.db.frontend.schema :as db-schema]))
+            [frontend.worker.rtc.fixture :as r.fixture]
+            [frontend.worker.state :as worker-state]
+            [logseq.db.frontend.schema :as db-schema]
+            [logseq.outliner.batch-tx :as batch-tx]
+            [logseq.outliner.core :as outliner-core]))
+
+(t/use-fixtures :each
+  test-helper/db-based-start-and-destroy-db-map-fixture
+  r.fixture/listen-test-db-to-gen-rtc-ops-fixture)
 
 (def empty-db (d/empty-db db-schema/schema-for-db-based-graph))
 
@@ -99,3 +112,46 @@
                              (:av-coll op-value)
                              (assoc :av-coll (map #(take 2 %) (:av-coll op-value))))])
                 ops))))))
+
+(deftest listen-db-changes-and-validate-generated-rtc-ops
+  (letfn [(ops-coll=>block-uuid->op-types [ops-coll]
+            (into {}
+                  (map (fn [m]
+                         [(:block/uuid m) (set (keys (dissoc m :block/uuid)))]))
+                  ops-coll))]
+    (let [repo (state/get-current-repo)
+          conn (conn/get-db repo false)
+          [page-uuid block-uuid1 block-uuid2] (repeatedly random-uuid)]
+      (testing "add page"
+        (worker-page/create! repo conn (worker-state/get-config repo)
+                             "TEST-PAGE"
+                             {:uuid page-uuid
+                              :create-first-block? false})
+        (is (some? (d/pull @conn '[*] [:block/uuid page-uuid])))
+        (is (= {page-uuid #{:update-page :update}}
+               (ops-coll=>block-uuid->op-types (client-op/get&remove-all-ops repo)))))
+      (testing "add blocks to this page"
+        (let [target-entity (d/entity @conn [:block/uuid page-uuid])]
+          (batch-tx/with-batch-tx-mode conn
+            {:persist-op? true}
+            (outliner-core/insert-blocks! repo conn [{:block/uuid block-uuid1
+                                                      :block/title "block1"
+                                                      :block/format :markdown}
+                                                     {:block/uuid block-uuid2
+                                                      :block/title "block2"
+                                                      :block/format :markdown}]
+                                          target-entity
+                                          {:sibling? false :keep-uuid? true}))
+          (is (=
+               {block-uuid1 #{:move :update}
+                block-uuid2 #{:move :update}}
+               (ops-coll=>block-uuid->op-types (client-op/get&remove-all-ops repo))))))
+
+      (testing "delete a block"
+        (batch-tx/with-batch-tx-mode conn
+          {:persist-op? true}
+          (outliner-core/delete-blocks! repo conn nil [(d/entity @conn [:block/uuid block-uuid1])] {}))
+
+        (is (=
+             {block-uuid1 #{:remove}}
+             (ops-coll=>block-uuid->op-types (client-op/get&remove-all-ops repo))))))))

+ 7 - 8
src/test/frontend/worker/rtc/fixture.cljs

@@ -3,19 +3,18 @@
             [frontend.db.conn :as conn]
             [frontend.test.helper :as test-helper]
             [frontend.worker.db-listener :as worker-db-listener]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]))
+            [frontend.worker.rtc.client-op :as client-op]
+            [frontend.worker.state :as worker-state]))
 
 (def listen-test-db-to-gen-rtc-ops-fixture
   {:before
    #(let [test-db-conn (conn/get-db test-helper/test-db-name-db-version false)]
       (assert (some? test-db-conn))
       (worker-db-listener/listen-db-changes! test-helper/test-db-name-db-version test-db-conn
-                                             {:handler-keys [:gen-rtc-ops]}))
+                                             {:handler-keys [:gen-rtc-ops]})
+      (swap! worker-state/*client-ops-conns
+             assoc test-helper/test-db-name-db-version (d/create-conn client-op/schema-in-db)))
    :after
    #(when-let [test-db-conn (conn/get-db test-helper/test-db-name-db-version false)]
-      (d/unlisten! test-db-conn :frontend.worker.db-listener/listen-db-changes!))})
-
-(def clear-op-mem-stores-fixture
-  {:before #(do (op-mem-layer/remove-ops-store! test-helper/test-db-name-db-version)
-                (op-mem-layer/init-empty-ops-store! test-helper/test-db-name-db-version))
-   :after #(op-mem-layer/remove-ops-store! test-helper/test-db-name-db-version)})
+      (d/unlisten! test-db-conn :frontend.worker.db-listener/listen-db-changes!)
+      (swap! worker-state/*client-ops-conns dissoc test-helper/test-db-name-db-version))})

+ 0 - 149
src/test/frontend/worker/rtc/rtc_fns_test.cljs

@@ -6,8 +6,6 @@
             [frontend.test.helper :as test-helper]
             [frontend.worker.rtc.const :as rtc-const]
             [frontend.worker.rtc.remote-update :as r.remote]
-            [frontend.worker.rtc.client :as r.client]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.state :as worker-state]
             [logseq.common.config :as common-config]
             [logseq.db :as ldb]
@@ -82,45 +80,6 @@
                :block/title "update content"}}
              r)))))
 
-(deftest gen-remote-ops-test
-  (let [repo (state/get-current-repo)
-        conn (conn/get-db repo false)
-        [uuid1 uuid2 uuid3 uuid4] (repeatedly random-uuid)
-        opts {:persist-op? false
-              :transact-opts {:repo repo
-                              :conn conn}}]
-    (test-helper/create-page! "gen-remote-ops-test" {:redirect? false :create-first-block? false :uuid uuid1})
-    (outliner-tx/transact!
-     opts
-     (outliner-core/insert-blocks!
-      repo
-      conn
-      [{:block/uuid uuid2 :block/title "uuid2-block"}
-       {:block/uuid uuid3 :block/title "uuid3-block"
-        :block/parent [:block/uuid uuid1]}
-       {:block/uuid uuid4 :block/title "uuid4-block"
-        :block/parent [:block/uuid uuid1]}]
-      (ldb/get-page @conn  "gen-remote-ops-test")
-      {:sibling? true :keep-uuid? true}))
-
-    (op-mem-layer/init-empty-ops-store! repo)
-    (op-mem-layer/add-ops! repo [[:move 1 {:block-uuid uuid2}]
-                                 [:move 2 {:block-uuid uuid4}]
-                                 [:move 3 {:block-uuid uuid3}]
-                                 [:update 4 {:block-uuid uuid4
-                                             :av-coll [[:block/title (ldb/write-transit-str "uuid4-block") 4 true]]}]])
-    (let [_ (op-mem-layer/new-branch! repo)
-          r1 (#'r.client/gen-block-uuid->remote-ops repo conn :n 1)
-          _ (op-mem-layer/rollback! repo)
-          r2 (#'r.client/gen-block-uuid->remote-ops repo conn :n 3)]
-      (is (= {uuid2 [:move]}
-             (update-vals r1 keys)))
-      (is (= {uuid2 [:move]
-              uuid3 [:move]
-              uuid4 [:move :update]}
-             (update-vals r2 keys))))
-    (op-mem-layer/remove-ops-store! repo)))
-
 (deftest ^:fix-me apply-remote-move-ops-test
   (let [repo (state/get-current-repo)
         conn (conn/get-db repo false)
@@ -191,114 +150,6 @@
           (is (= uuid1-client (:block/uuid (:block/left (d/entity @conn [:block/uuid uuid2-remote])))))
           (is (= uuid2-remote (:block/uuid (:block/left (d/entity @conn [:block/uuid uuid1-remote]))))))))))
 
-;; (deftest ^:large-vars/cleanup-todo apply-remote-update-ops-test
-;;   (let [repo (state/get-current-repo)
-;;         conn (conn/get-db repo false)
-;;         opts {:persist-op? false
-;;               :transact-opts {:repo repo
-;;                               :conn conn}}
-;;         date-formatter (common-config/get-date-formatter (worker-state/get-config repo))
-;;         page-name "apply-remote-update-ops-test"
-;;         [page-uuid
-;;          uuid1-client uuid2-client
-;;          uuid1-remote
-;;          uuid1-not-exist
-;;          tag1-uuid] (repeatedly random-uuid)]
-;;     (test-helper/create-page! page-name {:redirect? false :create-first-block? false :uuid page-uuid})
-;;     (outliner-tx/transact!
-;;      opts
-;;      (outliner-core/insert-blocks!
-;;       repo
-;;       conn
-;;       [{:block/uuid uuid1-client :block/title "uuid1-client"
-;;         :block/left [:block/uuid page-uuid]
-;;         :block/parent [:block/uuid page-uuid]}
-;;        {:block/uuid uuid2-client :block/title "uuid2-client"
-;;         :block/left [:block/uuid uuid1-client]
-;;         :block/parent [:block/uuid page-uuid]}]
-;;       (ldb/get-page @conn page-name)
-;;       {:sibling? true :keep-uuid? true}))
-;;     (testing "apply-remote-update-ops-test1"
-;;       (let [data-from-ws {:req-id "req-id"
-;;                           :t 1 ;; not used
-;;                           :t-before 0
-;;                           :affected-blocks
-;;                           {uuid1-remote {:op :update-attrs
-;;                                          :self uuid1-remote
-;;                                          :parents [uuid1-client]
-;;                                          :left uuid1-client
-;;                                          :content "uuid2-remote"
-;;                                          :created-at 1
-;;                                          :link uuid1-client
-;;                                          :type ["property"]}}}
-;;             update-ops (vals
-;;                         (:update-ops-map
-;;                          (#'r.remote/affected-blocks->diff-type-ops repo (:affected-blocks data-from-ws))))]
-;;         (is (rtc-const/data-from-ws-validator data-from-ws))
-;;         (#'r.remote/apply-remote-update-ops repo conn date-formatter update-ops)
-;;         (let [page-blocks (ldb/get-page-blocks @conn (:db/id (ldb/get-page @conn page-name)) {})]
-;;           (is (= #{uuid1-client uuid2-client uuid1-remote} (set (map :block/uuid page-blocks))))
-;;           (is (= [uuid1-client #{"property"}]
-;;                  ((juxt (comp :block/uuid :block/link) :block/type) (d/entity @conn [:block/uuid uuid1-remote])))))))
-
-;;     (testing "apply-remote-update-ops-test2"
-;;       (let [data-from-ws {:req-id "req-id"
-;;                           :t 1
-;;                           :t-before 0
-;;                           :affected-blocks
-;;                           {uuid1-remote {:op :update-attrs
-;;                                          :self uuid1-remote
-;;                                          :parents [uuid1-client]
-;;                                          :left uuid1-client
-;;                                          :content "uuid2-remote"
-;;                                          :created-at 1
-;;                                          :link nil
-;;                                          :type nil}}}
-;;             update-ops (vals
-;;                         (:update-ops-map
-;;                          (#'r.remote/affected-blocks->diff-type-ops repo (:affected-blocks data-from-ws))))]
-;;         (is (rtc-const/data-from-ws-validator data-from-ws))
-;;         (#'r.remote/apply-remote-update-ops repo conn date-formatter update-ops)
-;;         (let [page-blocks (ldb/get-page-blocks @conn (:db/id (ldb/get-page @conn page-name)) {})]
-;;           (is (= #{uuid1-client uuid2-client uuid1-remote} (set (map :block/uuid page-blocks))))
-;;           (is (= [nil nil] ((juxt :block/link :block/type) (d/entity @conn [:block/uuid uuid1-remote])))))))
-;;     (testing "apply-remote-update-ops-test3"
-;;       (let [data-from-ws {:req-id "req-id"
-;;                           :t 1 :t-before 0
-;;                           :affected-blocks
-;;                           {uuid1-remote {:op :update-attrs
-;;                                          :self uuid1-remote
-;;                                          :parents [uuid2-client]
-;;                                          :left uuid2-client
-;;                                          :link uuid1-not-exist}}}
-;;             update-ops (vals
-;;                         (:update-ops-map
-;;                          (#'r.remote/affected-blocks->diff-type-ops repo (:affected-blocks data-from-ws))))]
-;;         (is (rtc-const/data-from-ws-validator data-from-ws))
-;;         (#'r.remote/apply-remote-update-ops repo conn date-formatter update-ops)
-;;         (let [page-blocks (ldb/get-page-blocks @conn (:db/id (ldb/get-page @conn page-name)) {})]
-;;           (is (= #{uuid1-client uuid2-client uuid1-remote} (set (map :block/uuid page-blocks))))
-;;           (is (= [nil nil] ((juxt :block/link :block/type) (d/entity @conn [:block/uuid uuid1-remote])))))))
-;;     (testing "update-attr :block/tags"
-;;       (let [data-from-ws {:req-id "req-id"
-;;                           :t 1 :t-before 0
-;;                           :affected-blocks
-;;                           {uuid1-remote {:op :update-attrs
-;;                                          :self uuid1-remote
-;;                                          :parents [uuid2-client]
-;;                                          :left uuid2-client
-;;                                          :tags [tag1-uuid]}}}
-;;             update-ops (vals
-;;                         (:update-ops-map
-;;                          (#'r.remote/affected-blocks->diff-type-ops repo (:affected-blocks data-from-ws))))]
-;;         (d/transact! conn [{:block/uuid tag1-uuid
-;;                             :block/type #{"class"},
-;;                             :block/name "task",
-;;                             :block/title "Task"}])
-;;         (is (rtc-const/data-from-ws-validator data-from-ws))
-;;         (#'r.remote/apply-remote-update-ops repo conn date-formatter update-ops)
-;;         (is (= #{tag1-uuid} (set (map :block/uuid (:block/tags (d/entity @conn [:block/uuid uuid1-remote]))))))))))
-
 (deftest ^:fix-me apply-remote-remove-ops-test
   (let [repo (state/get-current-repo)
         conn (conn/get-db repo false)