Ver Fonte

refactor(rtc): add frontend.worker.rtc.client-op to replace op-mem-layer

rcmerci há 1 ano atrás
pai
commit
b93d067aa5

+ 41 - 20
src/main/frontend/db_worker.cljs

@@ -17,9 +17,9 @@
             [frontend.worker.handler.page.db-based.rename :as db-worker-page-rename]
             [frontend.worker.handler.page.file-based.rename :as file-worker-page-rename]
             [frontend.worker.rtc.asset-db-listener]
+            [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.core :as rtc-core]
             [frontend.worker.rtc.db-listener]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.search :as search]
             [frontend.worker.state :as worker-state] ;; [frontend.worker.undo-redo :as undo-redo]
             [frontend.worker.undo-redo2 :as undo-redo]
@@ -38,6 +38,7 @@
 (defonce *sqlite worker-state/*sqlite)
 (defonce *sqlite-conns worker-state/*sqlite-conns)
 (defonce *datascript-conns worker-state/*datascript-conns)
+(defonce *client-ops-conns worker-state/*client-ops-conns)
 (defonce *opfs-pools worker-state/*opfs-pools)
 (defonce *publishing? (atom false))
 
@@ -82,8 +83,8 @@
 
 (defn upsert-addr-content!
   "Upsert addr+data-seq"
-  [repo data delete-addrs]
-  (let [^Object db (worker-state/get-sqlite-conn repo)]
+  [repo data delete-addrs & {:keys [client-ops-db?] :or {client-ops-db? false}}]
+  (let [^Object db (worker-state/get-sqlite-conn repo (if client-ops-db? :client-ops :db))]
     (assert (some? db) "sqlite db not exists")
     (.transaction db (fn [tx]
                        (doseq [item data]
@@ -95,8 +96,8 @@
                                         :bind #js [addr]}))))))
 
 (defn restore-data-from-addr
-  [repo addr]
-  (let [^Object db (worker-state/get-sqlite-conn repo)]
+  [repo addr & {:keys [client-ops-db?] :or {client-ops-db? false}}]
+  (let [^Object db (worker-state/get-sqlite-conn repo (if client-ops-db? :client-ops :db))]
     (assert (some? db) "sqlite db not exists")
     (when-let [content (-> (.exec db #js {:sql "select content from kvs where addr = ?"
                                           :bind #js [addr]
@@ -126,56 +127,77 @@
     (-restore [_ addr]
       (restore-data-from-addr repo addr))))
 
+(defn new-sqlite-client-ops-storage
+  [repo]
+  (reify IStorage
+    (-store [_ addr+data-seq delete-addrs]
+      (let [data (map
+                  (fn [[addr data]]
+                    #js {:$addr addr
+                         :$content (sqlite-util/transit-write data)})
+                  addr+data-seq)]
+        (upsert-addr-content! repo data delete-addrs :client-ops-db? true)))
+
+    (-restore [_ addr]
+      (restore-data-from-addr repo addr :client-ops-db? true))))
+
 (defn- close-db-aux!
-  [repo ^Object db ^Object search]
+  [repo ^Object db ^Object search ^Object client-ops]
   (swap! *sqlite-conns dissoc repo)
   (swap! *datascript-conns dissoc repo)
+  (swap! *client-ops-conns dissoc repo)
   (when db (.close db))
   (when search (.close search))
+  (when client-ops (.close client-ops))
   (when-let [^js pool (worker-state/get-opfs-pool repo)]
     (.releaseAccessHandles pool))
   (swap! *opfs-pools dissoc repo))
 
 (defn- close-other-dbs!
   [repo]
-  (doseq [[r {:keys [db search]}] @*sqlite-conns]
+  (doseq [[r {:keys [db search client-ops]}] @*sqlite-conns]
     (when-not (= repo r)
-      (close-db-aux! r db search))))
+      (close-db-aux! r db search client-ops))))
 
 (defn close-db!
   [repo]
-  (let [{:keys [db search]} (@*sqlite-conns repo)]
-    (close-db-aux! repo db search)))
+  (let [{:keys [db search client-ops]} (@*sqlite-conns repo)]
+    (close-db-aux! repo db search client-ops)))
 
-(defn- get-db-and-search-db
+(defn- get-dbs
   [repo]
   (if @*publishing?
     (p/let [^object DB (.-DB ^object (.-oo1 ^object @*sqlite))
             db (new DB "/db.sqlite" "c")
             search-db (new DB "/search-db.sqlite" "c")]
-      [db search-db])
+      [db search-db nil])
     (p/let [^js pool (<get-opfs-pool repo)
             capacity (.getCapacity pool)
             _ (when (zero? capacity)   ; file handle already releases since pool will be initialized only once
                 (.acquireAccessHandles pool))
             db (new (.-OpfsSAHPoolDb pool) repo-path)
-            search-db (new (.-OpfsSAHPoolDb pool) (str "search" repo-path))]
-      [db search-db])))
+            search-db (new (.-OpfsSAHPoolDb pool) (str "search" repo-path))
+            client-ops-db (new (.-OpfsSAHPoolDb pool) (str "client-ops-" repo-path))]
+      [db search-db client-ops-db])))
 
 (defn- create-or-open-db!
   [repo {:keys [config]}]
   (when-not (worker-state/get-sqlite-conn repo)
-    (p/let [[db search-db] (get-db-and-search-db repo)
-            storage (new-sqlite-storage repo {})]
+    (p/let [[db search-db client-ops-db] (get-dbs repo)
+            storage (new-sqlite-storage repo {})
+            client-ops-storage (new-sqlite-client-ops-storage repo)]
       (swap! *sqlite-conns assoc repo {:db db
-                                       :search search-db})
+                                       :search search-db
+                                       :client-ops client-ops-db})
       (.exec db "PRAGMA locking_mode=exclusive")
       (sqlite-common-db/create-kvs-table! db)
       (search/create-tables-and-triggers! search-db)
       (let [schema (sqlite-util/get-schema repo)
             conn (sqlite-common-db/get-storage-conn storage schema)
+            client-ops-conn (sqlite-common-db/get-storage-conn storage client-op/schema-in-db)
             initial-data-exists? (d/entity @conn :logseq.class/Root)]
         (swap! *datascript-conns assoc repo conn)
+        (swap! *client-ops-conns assoc repo client-ops-conn)
         (when (and config (not initial-data-exists?))
           (let [initial-data (sqlite-create-graph/build-db-initial-data config)]
             (d/transact! conn initial-data {:initial-db? true})))
@@ -185,8 +207,7 @@
 
         (db-migrate/migrate conn)
 
-        (p/let [_ (op-mem-layer/<init-load-from-indexeddb2! repo)]
-          (db-listener/listen-db-changes! repo conn))))))
+        (db-listener/listen-db-changes! repo conn)))))
 
 (defn- iter->vec [iter]
   (when iter
@@ -255,7 +276,7 @@
 
 (defn- get-search-db
   [repo]
-  (worker-state/get-sqlite-conn repo {:search? true}))
+  (worker-state/get-sqlite-conn repo :search))
 
 (defn- with-write-transit-str
   [p]

+ 2 - 2
src/main/frontend/worker/rtc/asset_db_listener.cljs

@@ -4,7 +4,7 @@
             [frontend.schema-register :as sr]
             [frontend.worker.db-listener :as db-listener]
             [frontend.worker.rtc.asset :as r.asset]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]))
+            [frontend.worker.rtc.client-op :as client-op]))
 
 (defn entity-datoms=>action+asset-uuid
   [db-after entity-datoms]
@@ -33,6 +33,6 @@
 (defmethod db-listener/listen-db-changes :gen-asset-change-events
   [_ {:keys [_tx-data tx-meta _db-before db-after
              repo _id->attr->datom _e->a->add?->v->t same-entity-datoms-coll]}]
-  (when (and (op-mem-layer/rtc-db-graph? repo)
+  (when (and (client-op/rtc-db-graph? repo)
              (:generate-asset-change-events? tx-meta true))
     (generate-asset-change-events db-after same-entity-datoms-coll)))

+ 56 - 77
src/main/frontend/worker/rtc/client.cljs

@@ -1,13 +1,12 @@
 (ns frontend.worker.rtc.client
   "Fns about push local updates"
-  (:require [clojure.set :as set]
-            [clojure.string :as string]
+  (:require [clojure.string :as string]
             [datascript.core :as d]
             [frontend.common.missionary-util :as c.m]
+            [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.const :as rtc-const]
             [frontend.worker.rtc.exception :as r.ex]
             [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.rtc.remote-update :as r.remote-update]
             [frontend.worker.rtc.skeleton :as r.skeleton]
             [frontend.worker.rtc.ws :as ws]
@@ -22,8 +21,8 @@
             (m/? (ws-util/send&recv get-ws-create-task {:action "register-graph-updates"
                                                         :graph-uuid graph-uuid}))]
         (rtc-log-and-state/update-remote-t graph-uuid remote-t)
-        (when-not (op-mem-layer/get-local-tx repo)
-          (op-mem-layer/update-local-tx! repo remote-t)))
+        (when-not (client-op/get-local-tx repo)
+          (client-op/update-local-tx repo remote-t)))
       (catch :default e
         (if (= :rtc.exception/remote-graph-not-ready (:type (ex-data e)))
           (throw (ex-info "remote graph is still creating" {:missionary/retry true} e))
@@ -60,8 +59,8 @@
           (m/? (c.m/backoff
                 (take 5 (drop 2 c.m/delays)) ;retry 5 times if remote-graph is creating (4000 8000 16000 32000 64000)
                 (register-graph-updates get-ws-create-task graph-uuid repo)))
-          (let [t (op-mem-layer/get-local-tx repo)]
-            (when (and (zero? (op-mem-layer/get-unpushed-block-update-count repo))
+          (let [t (client-op/get-local-tx repo)]
+            (when (and (zero? (client-op/get-unpushed-ops-count repo))
                        (or (nil? @*last-calibrate-t)
                            (< 500 (- t @*last-calibrate-t))))
               (m/? (r.skeleton/new-task--calibrate-graph-skeleton get-ws-create-task graph-uuid conn t))
@@ -241,40 +240,13 @@
      :depend-on-block-uuids @*depend-on-block-uuid-set}))
 
 (defn- gen-block-uuid->remote-ops
-  [repo conn & {:keys [n] :or {n 50}}]
-  (loop [current-handling-block-ops nil
-         current-handling-block-uuid nil
-         depend-on-block-uuid-coll nil
-         r {}]
-    (cond
-      (and (empty? current-handling-block-ops)
-           (empty? depend-on-block-uuid-coll)
-           (>= (count r) n))
-      r
-
-      (and (empty? current-handling-block-ops)
-           (empty? depend-on-block-uuid-coll))
-      (if-let [{min-t-block-ops :ops block-uuid :block-uuid} (op-mem-layer/get-min-t-block-ops repo)]
-        (do (assert (not (contains? r block-uuid)) {:r r :block-uuid block-uuid})
-            (op-mem-layer/remove-block-ops! repo block-uuid)
-            (recur min-t-block-ops block-uuid depend-on-block-uuid-coll r))
-        ;; finish
-        r)
-
-      (and (empty? current-handling-block-ops)
-           (seq depend-on-block-uuid-coll))
-      (let [[block-uuid & other-block-uuids] depend-on-block-uuid-coll
-            block-ops (op-mem-layer/get-block-ops repo block-uuid)]
-        (op-mem-layer/remove-block-ops! repo block-uuid)
-        (recur block-ops block-uuid other-block-uuids r))
-
-      (seq current-handling-block-ops)
-      (let [{:keys [remote-ops depend-on-block-uuids]}
-            (local-block-ops->remote-ops @conn current-handling-block-ops)]
-        (recur nil nil
-               (set/union (set depend-on-block-uuid-coll)
-                          (op-mem-layer/intersection-block-uuids repo depend-on-block-uuids))
-               (assoc r current-handling-block-uuid (into {} remote-ops)))))))
+  [db block-ops-map-coll]
+  (into {}
+        (map
+         (fn [block-ops-map]
+           [(:block/uuid block-ops-map)
+            (:remote-ops (local-block-ops->remote-ops db block-ops-map))]))
+        block-ops-map-coll))
 
 (defn- merge-remove-remove-ops
   [remote-remove-ops]
@@ -333,52 +305,59 @@
                          block-uuid->remote-ops)]
     (concat update-schema-ops update-page-ops remove-ops sorted-move-ops update-ops remove-page-ops)))
 
+(defn- rollback
+  [repo block-ops-map-coll]
+  (let [ops (map (fn [m]
+                   (keep (fn [[k op]]
+                           (when (not= :block/uuid k)
+                             op))
+                         m))
+                 block-ops-map-coll)]
+    (client-op/add-ops repo ops)
+    nil))
+
 (defn new-task--push-local-ops
   "Return a task: push local updates"
   [repo conn graph-uuid date-formatter get-ws-create-task add-log-fn]
   (m/sp
-    (op-mem-layer/new-branch! repo)
-    (if-let [remote-ops (not-empty (gen-block-uuid->remote-ops repo conn))]
-      (when-let [ops-for-remote (rtc-const/to-ws-ops-decoder
-                                 (sort-remote-ops
-                                  remote-ops))]
-        (let [local-tx (op-mem-layer/get-local-tx repo)
-              r (m/? (ws-util/send&recv get-ws-create-task {:action "apply-ops" :graph-uuid graph-uuid
-                                                            :ops ops-for-remote :t-before (or local-tx 1)}))]
-          (if-let [remote-ex (:ex-data r)]
-            (do (add-log-fn :rtc.log/push-local-update remote-ex)
-                (case (:type remote-ex)
-                  ;; - :graph-lock-failed
-                  ;;   conflict-update remote-graph, keep these local-pending-ops
-                  ;;   and try to send ops later
-                  :graph-lock-failed
-                  (do (op-mem-layer/rollback! repo)
-                      nil)
-                  ;; - :graph-lock-missing
-                  ;;   this case means something wrong in remote-graph data,
-                  ;;   nothing to do at client-side
-                  :graph-lock-missing
-                  (do (op-mem-layer/rollback! repo)
-                      (throw r.ex/ex-remote-graph-lock-missing))
+    (let [block-ops-map-coll (client-op/get&remove-all-ops repo)]
+      (when-let [block-uuid->remote-ops (not-empty (gen-block-uuid->remote-ops @conn block-ops-map-coll))]
+        (when-let [ops-for-remote (rtc-const/to-ws-ops-decoder
+                                   (sort-remote-ops
+                                    block-uuid->remote-ops))]
+          (let [local-tx (client-op/get-local-tx repo)
+                r (m/? (ws-util/send&recv get-ws-create-task {:action "apply-ops" :graph-uuid graph-uuid
+                                                              :ops ops-for-remote :t-before (or local-tx 1)}))]
+            (if-let [remote-ex (:ex-data r)]
+              (do (add-log-fn :rtc.log/push-local-update remote-ex)
+                  (case (:type remote-ex)
+                    ;; - :graph-lock-failed
+                    ;;   conflict-update remote-graph, keep these local-pending-ops
+                    ;;   and try to send ops later
+                    :graph-lock-failed
+                    (rollback repo block-ops-map-coll)
+                    ;; - :graph-lock-missing
+                    ;;   this case means something wrong in remote-graph data,
+                    ;;   nothing to do at client-side
+                    :graph-lock-missing
+                    (do (rollback repo block-ops-map-coll)
+                        (throw r.ex/ex-remote-graph-lock-missing))
 
-                  :rtc.exception/get-s3-object-failed
-                  (do (op-mem-layer/rollback! repo)
-                      nil)
-                  ;; else
-                  (do (op-mem-layer/rollback! repo)
-                      (throw (ex-info "Unavailable" {:remote-ex remote-ex})))))
+                    :rtc.exception/get-s3-object-failed
+                    (rollback repo block-ops-map-coll)
+                    ;; else
+                    (do (rollback repo block-ops-map-coll)
+                        (throw (ex-info "Unavailable" {:remote-ex remote-ex})))))
 
-            (do (assert (pos? (:t r)) r)
-                (op-mem-layer/commit! repo)
-                (r.remote-update/apply-remote-update
-                 graph-uuid repo conn date-formatter {:type :remote-update :value r} add-log-fn)
-                (add-log-fn :rtc.log/push-local-update {:remote-t (:t r)})))))
-      (op-mem-layer/rollback! repo))))
+              (do (assert (pos? (:t r)) r)
+                  (r.remote-update/apply-remote-update
+                   graph-uuid repo conn date-formatter {:type :remote-update :value r} add-log-fn)
+                  (add-log-fn :rtc.log/push-local-update {:remote-t (:t r)})))))))))
 
 (defn new-task--pull-remote-data
   [repo conn graph-uuid date-formatter get-ws-create-task add-log-fn]
   (m/sp
-    (let [local-tx (op-mem-layer/get-local-tx repo)
+    (let [local-tx (client-op/get-local-tx repo)
           r (m/? (ws-util/send&recv get-ws-create-task {:action "apply-ops" :graph-uuid graph-uuid
                                                         :ops [] :t-before (or local-tx 1)}))]
       (if-let [remote-ex (:ex-data r)]

+ 220 - 0
src/main/frontend/worker/rtc/client_op.cljs

@@ -0,0 +1,220 @@
+(ns frontend.worker.rtc.client-op
+  "Store client-ops in a persisted datascript"
+  (:require [datascript.core :as d]
+            [frontend.worker.rtc.const :as rtc-const]
+            [frontend.worker.state :as worker-state]
+            [logseq.db.sqlite.util :as sqlite-util]
+            [malli.core :as ma]
+            [malli.transform :as mt]
+            [missionary.core :as m]))
+
+(def op-schema
+  [:multi {:dispatch first}
+   [:move
+    [:catn
+     [:op :keyword]
+     [:t :int]
+     [:value [:map
+              [:block-uuid :uuid]]]]]
+   [:remove
+    [:catn
+     [:op :keyword]
+     [:t :int]
+     [:value [:map
+              [:block-uuid :uuid]]]]]
+   [:update-page
+    [:catn
+     [:op :keyword]
+     [:t :int]
+     [:value [:map
+              [:block-uuid :uuid]]]]]
+   [:remove-page
+    [:catn
+     [:op :keyword]
+     [:t :int]
+     [:value [:map
+              [:block-uuid :uuid]]]]]
+   [:update
+    [:catn
+     [:op :keyword]
+     [:t :int]
+     [:value [:map
+              [:block-uuid :uuid]
+              [:av-coll [:sequential rtc-const/av-schema]]]]]]])
+
+(def ops-schema [:sequential op-schema])
+(def ops-coercer (ma/coercer ops-schema mt/json-transformer nil
+                             #(do (prn ::bad-ops (:value %))
+                                  (ma/-fail! ::ops-schema %))))
+
+(def schema-in-db
+  {:block/uuid {:db/unique :db.unique/identity}
+   :local-tx {:db/index true}
+   :graph-uuid {:db/index true}})
+
+(defn update-graph-uuid
+  [repo graph-uuid]
+  {:pre [(some? graph-uuid)]}
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (assert (nil? (first (d/datoms @conn :avet :graph-uuid))))
+    (d/transact! conn [[:db/add "e" :graph-uuid graph-uuid]])))
+
+(defn update-local-tx
+  [repo t]
+  {:pre [(some? t)]}
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (let [tx-data
+          (if-let [datom (first (d/datoms @conn :avet :local-tx))]
+            [:db/add (:e datom) :local-tx t]
+            [:db/add "e" :local-tx t])]
+      (d/transact! conn [tx-data]))))
+
+(defn get-local-tx
+  [repo]
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (:v (first (d/datoms @conn :avet :local-tx)))))
+
+(defn- merge-update-ops
+  [update-op1 update-op2]
+  {:pre [(= :update (first update-op1))
+         (= :update (first update-op2))
+         (= (:block-uuid (last update-op1))
+            (:block-uuid (last update-op2)))]}
+  (let [t1 (second update-op1)
+        t2 (second update-op2)]
+    (if (> t1 t2)
+      (merge-update-ops update-op2 update-op1)
+      (let [{av-coll1 :av-coll block-uuid :block-uuid} (last update-op1)
+            {av-coll2 :av-coll} (last update-op1)]
+        [:update t2
+         {:block-uuid block-uuid
+          :av-coll (concat av-coll1 av-coll2)}]))))
+
+(defn add-ops*
+  [conn ops]
+  (let [ops (ops-coercer ops)]
+    (letfn [(already-removed? [remove-op t]
+              (some-> remove-op second (> t)))
+            (add-after-remove? [move-op t]
+              (some-> move-op second (> t)))]
+      (doseq [op ops]
+        (let [[op-type t value] op
+              {:keys [block-uuid]} value
+              exist-block-ops-entity (d/entity @conn [:block/uuid block-uuid])
+              e (:db/id exist-block-ops-entity)
+              tx-data
+              (case op-type
+                :move
+                (let [remove-op (get exist-block-ops-entity :remove)]
+                  (when-not (already-removed? remove-op t)
+                    (cond-> [{:block/uuid block-uuid
+                              :move op}]
+                      remove-op (conj [:db.fn/retractAttribute e :remove]))))
+                :update
+                (let [remove-op (get exist-block-ops-entity :remove)]
+                  (when-not (already-removed? remove-op t)
+                    (let [origin-update-op (get exist-block-ops-entity :update)
+                          op* (if origin-update-op (merge-update-ops origin-update-op op) op)]
+                      (cond-> [{:block/uuid block-uuid
+                                :update op*}]
+                        remove-op (conj [:db.fn/retractAttribute e :remove])))))
+                :remove
+                (let [move-op (get exist-block-ops-entity :move)]
+                  (when-not (add-after-remove? move-op t)
+                    (cond-> [{:block/uuid block-uuid
+                              :remove op}]
+                      move-op (conj [:db.fn/retractAttribute e :move]))))
+                :update-page
+                (let [remove-page-op (get exist-block-ops-entity :remove-page)]
+                  (when-not (already-removed? remove-page-op t)
+                    (cond-> [{:block/uuid block-uuid
+                              :update-page op}]
+                      remove-page-op (conj [:db.fn/retractAttribute e :remove-page]))))
+                :remove-page
+                (let [update-page-op (get exist-block-ops-entity :update-page)]
+                  (when-not (add-after-remove? update-page-op t)
+                    (cond-> [{:block/uuid block-uuid
+                              :remove-page op}]
+                      update-page-op (conj [:db.fn/retractAttribute e :update-page])))))]
+          (when (seq tx-data)
+            (d/transact! conn tx-data)))))))
+
+(defn add-ops
+  [repo ops]
+  (let [conn (worker-state/get-client-ops-conn repo)]
+    (assert (some? conn) repo)
+    (add-ops* conn ops)))
+
+(defn- get-all-op-datoms
+  [conn]
+  (->> (d/datoms @conn :eavt)
+       (remove (fn [datom] (contains? #{:graph-uuid :local-tx} (:a datom))))
+       (group-by :e)))
+
+(defn get-all-ops*
+  [conn]
+  (let [e->datoms (get-all-op-datoms conn)]
+    (map (fn [same-ent-datoms]
+           (into {} (map (juxt :a :v)) same-ent-datoms))
+         (vals e->datoms))))
+
+(defn get&remove-all-ops*
+  [conn]
+  (let [e->datoms (get-all-op-datoms conn)
+        retract-all-tx-data (map (fn [e] [:db.fn/retractEntity e]) (keys e->datoms))]
+    (d/transact! conn retract-all-tx-data)
+    (map (fn [same-ent-datoms]
+           (into {} (map (juxt :a :v)) same-ent-datoms))
+         (vals e->datoms))))
+
+(defn get-all-ops
+  "Return coll of
+  {:block/uuid ...
+   :update ...
+   :move ...
+   ...}"
+  [repo]
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (mapcat
+     (fn [m]
+       (keep (fn [[k v]]
+               (when (not= :block/uuid k) v))
+             m))
+     (get-all-ops* conn))))
+
+(defn get&remove-all-ops
+  "Return coll of
+  {:block/uuid ...
+   :update ...
+   :move ...
+   ...}"
+  [repo]
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (get&remove-all-ops* conn)))
+
+(defn get-unpushed-ops-count
+  [repo]
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (count (get-all-op-datoms conn))))
+
+(defn rtc-db-graph?
+  "Is db-graph & RTC enabled"
+  [repo]
+  (and (sqlite-util/db-based-graph? repo)
+       (or (exists? js/process)
+           (some? (get-local-tx repo)))))
+
+(defn create-pending-ops-count-flow
+  [repo]
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (letfn [(datom-count [db]
+              (count (d/datoms db :avet :block/uuid)))]
+      (m/relieve
+       (m/observe
+        (fn ctor [emit!]
+          (d/listen! conn :create-pending-ops-count-flow
+                     (fn [{:keys [db-after]}]
+                       (emit! (datom-count db-after))))
+          (emit! (datom-count @conn))
+          (fn dtor []
+            (d/unlisten! conn :create-pending-ops-count-flow))))))))

+ 3 - 3
src/main/frontend/worker/rtc/core.cljs

@@ -3,10 +3,10 @@
   (:require [frontend.common.missionary-util :as c.m]
             [frontend.worker.rtc.asset :as r.asset]
             [frontend.worker.rtc.client :as r.client]
+            [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.exception :as r.ex]
             [frontend.worker.rtc.full-upload-download-graph :as r.upload-download]
             [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.rtc.remote-update :as r.remote-update]
             [frontend.worker.rtc.skeleton]
             [frontend.worker.rtc.ws :as ws]
@@ -49,7 +49,7 @@
         merge-flow (m/latest vector auto-push-flow clock-flow)]
     (m/eduction (filter first)
                 (map second)
-                (filter (fn [v] (when (pos? (op-mem-layer/get-unpushed-block-update-count repo)) v)))
+                (filter (fn [v] (when (pos? (client-op/get-unpushed-ops-count repo)) v)))
                 merge-flow)))
 
 (defn- create-mixed-flow
@@ -305,7 +305,7 @@
                  :auto-push? rtc-auto-push?
                  :online-users online-users})
               rtc-state-flow (m/watch *rtc-auto-push?) (m/watch *rtc-lock) (m/watch *online-users)
-              (op-mem-layer/create-pending-ops-count-flow repo)
+              (client-op/create-pending-ops-count-flow repo)
               (rtc-log-and-state/create-local-t-flow graph-uuid)
               (rtc-log-and-state/create-remote-t-flow graph-uuid))))
           (catch Cancelled _))))))

+ 3 - 3
src/main/frontend/worker/rtc/db_listener.cljs

@@ -4,7 +4,7 @@
             [datascript.core :as d]
             [frontend.schema-register :include-macros true :as sr]
             [frontend.worker.db-listener :as db-listener]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
+            [frontend.worker.rtc.client-op :as client-op]
             [logseq.db :as ldb]))
 
 (defn- latest-add?->v->t
@@ -117,7 +117,7 @@
   (let [ops (mapcat (partial entity-datoms=>ops db-before db-after e->a->v->add?->t)
                     same-entity-datoms-coll)]
     (when (seq ops)
-      (op-mem-layer/add-ops! repo ops))))
+      (client-op/add-ops repo ops))))
 
 (sr/defkeyword :persist-op?
   "tx-meta option, generate rtc ops when not nil (default true)")
@@ -125,6 +125,6 @@
 (defmethod db-listener/listen-db-changes :gen-rtc-ops
   [_ {:keys [_tx-data tx-meta db-before db-after
              repo _id->attr->datom e->a->add?->v->t same-entity-datoms-coll]}]
-  (when (and (op-mem-layer/rtc-db-graph? repo)
+  (when (and (client-op/rtc-db-graph? repo)
              (:persist-op? tx-meta true))
     (generate-rtc-ops repo db-before db-after same-entity-datoms-coll e->a->add?->v->t)))

+ 4 - 8
src/main/frontend/worker/rtc/full_upload_download_graph.cljs

@@ -5,8 +5,8 @@
             [clojure.set :as set]
             [datascript.core :as d]
             [frontend.common.missionary-util :as c.m]
+            [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.rtc.ws-util :as ws-util]
             [frontend.worker.state :as worker-state]
             [frontend.worker.util :as worker-util]
@@ -134,9 +134,7 @@
                            [{:db/ident :logseq.kv/graph-uuid :kv/value graph-uuid}
                             {:db/ident :logseq.kv/graph-local-tx :kv/value "0"}])
             (m/? (c.m/await-promise (.storeMetadata worker-obj repo (pr-str {:kv/value graph-uuid}))))
-            (op-mem-layer/init-empty-ops-store! repo)
-            (op-mem-layer/update-graph-uuid! repo graph-uuid)
-            (m/? (op-mem-layer/new-task--sync-to-idb repo))
+            (client-op/update-graph-uuid repo graph-uuid)
             (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :upload-completed
                                                         :message "upload-graph completed"})
             nil)
@@ -246,7 +244,7 @@
                              schema-blocks)
         ^js worker-obj (:worker/object @worker-state/*state)]
     (m/sp
-      (op-mem-layer/update-local-tx! repo t)
+      (client-op/update-local-tx repo t)
       (rtc-log-and-state/update-local-t graph-uuid t)
       (rtc-log-and-state/update-remote-t graph-uuid t)
       (m/?
@@ -324,10 +322,8 @@
                                                         :graph-uuid graph-uuid})
           (let [all-blocks (ldb/read-transit-str body)]
             (worker-state/set-rtc-downloading-graph! true)
-            (op-mem-layer/init-empty-ops-store! repo)
             (m/? (new-task--transact-remote-all-blocks all-blocks repo graph-uuid))
-            (op-mem-layer/update-graph-uuid! repo graph-uuid)
-            (m/? (op-mem-layer/new-task--sync-to-idb repo))
+            (client-op/update-graph-uuid repo graph-uuid)
             (m/? (c.m/await-promise (.storeMetadata worker-obj repo (pr-str {:kv/value graph-uuid}))))
             (worker-state/set-rtc-downloading-graph! false)
             (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :download-completed

+ 5 - 5
src/main/frontend/worker/rtc/remote_update.cljs

@@ -6,9 +6,9 @@
             [datascript.core :as d]
             [frontend.schema-register :as sr]
             [frontend.worker.handler.page :as worker-page]
+            [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.const :as rtc-const]
             [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.state :as worker-state]
             [frontend.worker.util :as worker-util]
             [logseq.clj-fractional-indexing :as index]
@@ -252,7 +252,7 @@
   these updates maybe not needed or need to update, because this client just updated some of these blocks,
   so we need to update these remote-data by local-ops"
   [affected-blocks-map local-unpushed-ops]
-  (assert (op-mem-layer/ops-coercer local-unpushed-ops) local-unpushed-ops)
+  (assert (client-op/ops-coercer local-unpushed-ops) local-unpushed-ops)
   (reduce
    (fn [affected-blocks-map local-op]
      (let [local-op-value (last local-op)]
@@ -283,7 +283,7 @@
 
 (defn- affected-blocks->diff-type-ops
   [repo affected-blocks]
-  (let [unpushed-ops (op-mem-layer/get-all-ops repo)
+  (let [unpushed-ops (client-op/get-all-ops repo)
         affected-blocks-map* (if unpushed-ops
                                (update-remote-data-by-local-unpushed-ops
                                 affected-blocks unpushed-ops)
@@ -530,7 +530,7 @@
     (assert (rtc-const/data-from-ws-validator remote-update-data) remote-update-data)
     (let [remote-t (:t remote-update-data)
           remote-t-before (:t-before remote-update-data)
-          local-tx (op-mem-layer/get-local-tx repo)]
+          local-tx (client-op/get-local-tx repo)]
       (rtc-log-and-state/update-remote-t graph-uuid remote-t)
       (cond
         (not (and (pos? remote-t)
@@ -569,7 +569,7 @@
           (worker-util/profile :apply-remote-remove-ops (apply-remote-remove-ops repo conn date-formatter remove-ops))
           (js/console.groupEnd)
 
-          (op-mem-layer/update-local-tx! repo remote-t)
+          (client-op/update-local-tx repo remote-t)
           (rtc-log-and-state/update-local-t graph-uuid remote-t))
         :else (throw (ex-info "unreachable" {:remote-t remote-t
                                              :remote-t-before remote-t-before

+ 13 - 6
src/main/frontend/worker/state.cljs

@@ -32,24 +32,31 @@
 (defonce *rtc-ws-url (atom nil))
 
 (defonce *sqlite (atom nil))
-;; repo -> {:db conn :search conn}
+;; repo -> {:db conn :search conn :client-ops conn}
 (defonce *sqlite-conns (atom nil))
 ;; repo -> conn
 (defonce *datascript-conns (atom nil))
+
+;; repo -> conn
+(defonce *client-ops-conns (atom nil))
+
 ;; repo -> pool
 (defonce *opfs-pools (atom nil))
 
 (defn get-sqlite-conn
-  [repo & {:keys [search?]
-           :or {search? false}
-           :as _opts}]
-  (let [k (if search? :search :db)]
-    (get-in @*sqlite-conns [repo k])))
+  ([repo] (get-sqlite-conn repo :db))
+  ([repo which-db]
+   (assert (contains? #{:db :search :client-ops} which-db) which-db)
+   (get-in @*sqlite-conns [repo which-db])))
 
 (defn get-datascript-conn
   [repo]
   (get @*datascript-conns repo))
 
+(defn get-client-ops-conn
+  [repo]
+  (get @*client-ops-conns repo))
+
 (defn get-opfs-pool
   [repo]
   (get @*opfs-pools repo))