Răsfoiți Sursa

refactor: collect all worker db-listeners by 'defmethod listen-db-changes'

rcmerci 1 an în urmă
părinte
comite
d3a679485d

+ 2 - 5
src/main/frontend/db_worker.cljs

@@ -17,7 +17,7 @@
             [frontend.worker.handler.page :as worker-page]
             [frontend.worker.handler.page.rename :as worker-page-rename]
             [frontend.worker.rtc.core :as rtc-core]
-            [frontend.worker.rtc.db-listener :as rtc-db-listener]
+            [frontend.worker.rtc.db-listener]
             [frontend.worker.rtc.full-upload-download-graph :as rtc-updown]
             [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.rtc.snapshot :as rtc-snapshot]
@@ -176,9 +176,7 @@
             conn (sqlite-common-db/get-storage-conn storage schema)]
         (swap! *datascript-conns assoc repo conn)
         (p/let [_ (op-mem-layer/<init-load-from-indexeddb! repo)]
-          (rtc-db-listener/listen-to-db-changes! repo conn)
-          (db-listener/listen-db-changes! repo conn))
-        ))))
+          (db-listener/listen-db-changes! repo conn))))))
 
 (defn- iter->vec [iter]
   (when iter
@@ -588,7 +586,6 @@
          (try
            (let [state (<? (rtc-core/<init-state token false))
                  r (<? (rtc-updown/<async-upload-graph state repo conn remote-graph-name))]
-             (rtc-db-listener/listen-db-to-generate-ops repo conn)
              (p/resolve! d r))
            (catch :default e
              (worker-util/post-message :notification

+ 39 - 6
src/main/frontend/worker/db_listener.cljs

@@ -1,6 +1,12 @@
 (ns frontend.worker.db-listener
   "Db listeners for worker-db."
-  (:require [datascript.core :as d]))
+  (:require [cljs-bean.core :as bean]
+            [datascript.core :as d]
+            [frontend.worker.pipeline :as worker-pipeline]
+            [frontend.worker.search :as search]
+            [frontend.worker.state :as worker-state]
+            [frontend.worker.util :as worker-util]
+            [promesa.core :as p]))
 
 
 (defn- entity-datoms=>attr->datom
@@ -27,6 +33,31 @@
 (defmulti listen-db-changes
   (fn [listen-key & _] listen-key))
 
+(defmethod listen-db-changes :sync-db-to-main-thread
+  [_ {:keys [tx-meta repo conn] :as tx-report}]
+  (let [{:keys [pipeline-replace? from-disk?]} tx-meta
+                     result (worker-pipeline/invoke-hooks repo conn tx-report (worker-state/get-context))
+                     tx-report' (or (:tx-report result) tx-report)]
+                 (when-not pipeline-replace?
+                   (let [data (merge
+                               {:request-id (:request-id tx-meta)
+                                :repo repo
+                                :tx-data (:tx-data tx-report')
+                                :tx-meta tx-meta}
+                               (dissoc result :tx-report))]
+                     (worker-util/post-message :sync-db-changes data))
+
+                   (when-not from-disk?
+                     (p/do!
+                      (let [{:keys [blocks-to-remove-set blocks-to-add]} (search/sync-search-indice repo tx-report')
+                            ^js wo (worker-state/get-worker-object)]
+                        (when wo
+                          (when (seq blocks-to-remove-set)
+                            (.search-delete-blocks wo repo (bean/->js blocks-to-remove-set)))
+                          (when (seq blocks-to-add)
+                            (.search-upsert-blocks wo repo (bean/->js blocks-to-add))))))))))
+
+
 (defn listen-db-changes!
   [repo conn]
   (let [handlers (methods listen-db-changes)]
@@ -38,9 +69,11 @@
                        id->same-entity-datoms (group-by first datom-vec-coll)
                        id-order (distinct (map first datom-vec-coll))
                        same-entity-datoms-coll (map id->same-entity-datoms id-order)
-                       id->attr->datom (update-vals id->same-entity-datoms entity-datoms=>attr->datom)]
+                       id->attr->datom (update-vals id->same-entity-datoms entity-datoms=>attr->datom)
+                       args* (assoc args
+                                    :repo repo
+                                    :conn conn
+                                    :id->attr->datom id->attr->datom
+                                    :same-entity-datoms-coll same-entity-datoms-coll)]
                    (doseq [[k handler-fn] handlers]
-                     (handler-fn k (assoc args
-                                          :repo repo
-                                          :id->attr->datom id->attr->datom
-                                          :same-entity-datoms-coll same-entity-datoms-coll))))))))
+                     (handler-fn k args*)))))))

+ 33 - 109
src/main/frontend/worker/rtc/db_listener.cljs

@@ -5,34 +5,8 @@
             [clojure.data :as data]
             [clojure.set :as set]
             [datascript.core :as d]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
-            [frontend.worker.state :as worker-state]
-            [frontend.worker.pipeline :as worker-pipeline]
-            [frontend.worker.search :as search]
-            [frontend.worker.util :as worker-util]
-            [promesa.core :as p]
-            [cljs-bean.core :as bean]))
-
-
-(defn- entity-datoms=>attr->datom
-  [entity-datoms]
-  (reduce
-   (fn [m datom]
-     (let [[_e a _v t add?] datom]
-       (if-let [[_e _a _v old-t old-add?] (get m a)]
-         (cond
-           (and (= old-t t)
-                (true? add?)
-                (false? old-add?))
-           (assoc m a datom)
-
-           (< old-t t)
-           (assoc m a datom)
-
-           :else
-           m)
-         (assoc m a datom))))
-   {} entity-datoms))
+            [frontend.worker.db-listener :as db-listener]
+            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]))
 
 
 (defn- diff-value-of-set-type-attr
@@ -60,11 +34,11 @@
       (seq retract-uuids) (conj [:retract retract-uuids]))))
 
 (defn- entity-datoms=>ops
-  [db-before db-after entity-datoms]
-  (let [attr->datom (entity-datoms=>attr->datom entity-datoms)]
+  [db-before db-after id->attr->datom entity-datoms]
+  (let [e (ffirst entity-datoms)
+        attr->datom (id->attr->datom e)]
     (when (seq attr->datom)
       (let [updated-key-set (set (keys attr->datom))
-            e (some-> attr->datom first second first)
             {[_e _a block-uuid _t add1?] :block/uuid
              [_e _a _v _t add2?]         :block/name
              [_e _a _v _t add3?]         :block/parent
@@ -153,36 +127,32 @@
         ops*))))
 
 (defn- entity-datoms=>asset-op
-  [db-after entity-datoms]
-  (let [attr->datom (entity-datoms=>attr->datom entity-datoms)]
-    (when (seq attr->datom)
-      (let [e (some-> attr->datom first second first)
-            {[_e _a asset-uuid _t add1?] :asset/uuid
-             [_e _a asset-meta _t add2?] :asset/meta}
-            attr->datom
-            op (cond
-                 (or (and add1? asset-uuid)
-                     (and add2? asset-meta))
-                 [:update-asset]
-
-                 (and (not add1?) asset-uuid)
-                 [:remove-asset asset-uuid])]
-        (when op
-          (let [asset-uuid (some-> (d/entity db-after e) :asset/uuid str)]
-            (case (first op)
-              :update-asset (when asset-uuid ["update-asset" {:asset-uuid asset-uuid}])
-              :remove-asset ["remove-asset" {:asset-uuid (str (second op))}])))))))
+  [db-after id->attr->datom entity-datoms]
+  (when-let [e (ffirst entity-datoms)]
+    (let [attr->datom (id->attr->datom e)]
+      (when (seq attr->datom)
+        (let [{[_e _a asset-uuid _t add1?] :asset/uuid
+               [_e _a asset-meta _t add2?] :asset/meta}
+              attr->datom
+              op (cond
+                   (or (and add1? asset-uuid)
+                       (and add2? asset-meta))
+                   [:update-asset]
+
+                   (and (not add1?) asset-uuid)
+                   [:remove-asset asset-uuid])]
+          (when op
+            (let [asset-uuid (some-> (d/entity db-after e) :asset/uuid str)]
+              (case (first op)
+                :update-asset (when asset-uuid ["update-asset" {:asset-uuid asset-uuid}])
+                :remove-asset ["remove-asset" {:asset-uuid (str (second op))}]))))))))
 
 
 (defn generate-rtc-ops
-  [repo db-before db-after datoms]
-  (let [datom-vec-coll (map vec datoms)
-        id->same-entity-datoms (group-by first datom-vec-coll)
-        id-order (distinct (map first datom-vec-coll))
-        same-entity-datoms-coll (map id->same-entity-datoms id-order)
-        asset-ops (keep (partial entity-datoms=>asset-op db-after) same-entity-datoms-coll)
+  [repo db-before db-after same-entity-datoms-coll id->attr->datom]
+  (let [asset-ops (keep (partial entity-datoms=>asset-op db-after id->attr->datom) same-entity-datoms-coll)
         ops (when (empty asset-ops)
-              (mapcat (partial entity-datoms=>ops db-before db-after) same-entity-datoms-coll))
+              (mapcat (partial entity-datoms=>ops db-before db-after id->attr->datom) same-entity-datoms-coll))
         now-epoch*1000 (* 1000 (tc/to-long (t/now)))
         ops* (map-indexed (fn [idx op]
                             [(first op) (assoc (second op) :epoch (+ idx now-epoch*1000))]) ops)
@@ -195,55 +165,9 @@
       (op-mem-layer/add-asset-ops! repo asset-ops*))))
 
 
-
-(defn listen-db-to-generate-ops
-  [repo conn]
-  (d/listen! conn :gen-ops
-             (fn [{:keys [tx-data tx-meta db-before db-after]}]
-               (when (:persist-op? tx-meta true)
-                 (generate-rtc-ops repo db-before db-after tx-data)))))
-
-(comment
-  (defn listen-db-to-batch-txs
-   [conn]
-   (d/listen! conn :batch-txs
-              (fn [{:keys [tx-data]}]
-                (when (worker-state/batch-tx-mode?)
-                  (worker-state/conj-batch-txs! tx-data))))))
-
-(defn sync-db-to-main-thread
-  [repo conn]
-  (d/listen! conn :sync-db
-             (fn [{:keys [tx-meta] :as tx-report}]
-               (let [{:keys [pipeline-replace? from-disk?]} tx-meta
-                     result (worker-pipeline/invoke-hooks repo conn tx-report (worker-state/get-context))
-                     tx-report' (or (:tx-report result) tx-report)]
-                 (when-not pipeline-replace?
-                   (let [data (merge
-                               {:request-id (:request-id tx-meta)
-                                :repo repo
-                                :tx-data (:tx-data tx-report')
-                                :tx-meta tx-meta}
-                               (dissoc result :tx-report))]
-                     (worker-util/post-message :sync-db-changes data))
-
-                   (when-not from-disk?
-                     (p/do!
-                      (let [{:keys [blocks-to-remove-set blocks-to-add]} (search/sync-search-indice repo tx-report')
-                            ^js wo (worker-state/get-worker-object)]
-                        (when wo
-                          (when (seq blocks-to-remove-set)
-                            (.search-delete-blocks wo repo (bean/->js blocks-to-remove-set)))
-                          (when (seq blocks-to-add)
-                            (.search-upsert-blocks wo repo (bean/->js blocks-to-add))))))))))))
-
-(defn listen-to-db-changes!
-  [repo conn]
-  (d/unlisten! conn :gen-ops)
-  (d/unlisten! conn :sync-db)
-  (when (op-mem-layer/rtc-db-graph? repo)
-    (listen-db-to-generate-ops repo conn)
-    ;; (rtc-db-listener/listen-db-to-batch-txs conn)
-    )
-
-  (sync-db-to-main-thread repo conn))
+(defmethod db-listener/listen-db-changes :gen-rtc-ops
+  [_ {:keys [_tx-data tx-meta db-before db-after
+             repo id->attr->datom same-entity-datoms-coll]}]
+  (when (and (op-mem-layer/rtc-db-graph? repo)
+             (:persist-op? tx-meta true))
+    (generate-rtc-ops repo db-before db-after same-entity-datoms-coll id->attr->datom)))

+ 8 - 0
src/test/frontend/worker/rtc/db_listener_test.cljs

@@ -1,6 +1,7 @@
 (ns frontend.worker.rtc.db-listener-test
   (:require [cljs.test :as t :refer [deftest is testing]]
             [datascript.core :as d]
+            [frontend.worker.db-listener :as worker-db-listener]
             [frontend.worker.rtc.db-listener :as subject]
             [logseq.db.frontend.schema :as db-schema]))
 
@@ -8,6 +9,12 @@
 (def empty-db (d/empty-db db-schema/schema-for-db-based-graph))
 
 
+(defn- tx-data=>id->attr->datom
+  [tx-data]
+  (let [datom-vec-coll (map vec tx-data)
+        id->same-entity-datoms (group-by first datom-vec-coll)]
+    (update-vals id->same-entity-datoms #'worker-db-listener/entity-datoms=>attr->datom)))
+
 (deftest entity-datoms=>ops-test
   (testing "remove whiteboard page-block"
     (let [conn (d/conn-from-db empty-db)
@@ -22,4 +29,5 @@
       (is (= [["remove-page" {:block-uuid (str block-uuid)}]]
              (#'subject/entity-datoms=>ops (:db-before remove-whiteboard-page-block)
                                            (:db-after remove-whiteboard-page-block)
+                                           (tx-data=>id->attr->datom (:tx-data remove-whiteboard-page-block))
                                            (map vec (:tx-data remove-whiteboard-page-block))))))))

+ 20 - 11
src/test/frontend/worker/rtc/fixture.cljs

@@ -1,16 +1,17 @@
 (ns frontend.worker.rtc.fixture
-  (:require [cljs.test :as t]
-            [cljs.core.async :as async :refer [<! >! chan go]]
-            [frontend.worker.rtc.mock :as rtc-mock]
-            [frontend.worker.rtc.core :as rtc-core]
-            [frontend.worker.rtc.asset-sync :as asset-sync]
-            [frontend.test.helper :as test-helper]
+  (:require [cljs.core.async :as async :refer [<! >! chan go]]
+            [cljs.test :as t]
             [datascript.core :as d]
+            [frontend.db :as db]
             [frontend.db.conn :as conn]
+            [frontend.state :as state]
+            [frontend.test.helper :as test-helper]
+            [frontend.worker.db-listener :as worker-db-listener]
+            [frontend.worker.rtc.asset-sync :as asset-sync]
+            [frontend.worker.rtc.core :as rtc-core]
             [frontend.worker.rtc.db-listener :as db-listener]
-            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
-            [frontend.db :as db]
-            [frontend.state :as state]))
+            [frontend.worker.rtc.mock :as rtc-mock]
+            [frontend.worker.rtc.op-mem-layer :as op-mem-layer]))
 
 (def *test-rtc-state (atom nil))
 (def *test-asset-sync-state (atom nil))
@@ -94,8 +95,16 @@
       (d/listen! test-db-conn
                  ::gen-ops
                  (fn [{:keys [tx-data tx-meta db-before db-after]}]
-                   (when (:persist-op? tx-meta true)
-                     (db-listener/generate-rtc-ops test-helper/test-db-name-db-version db-before db-after tx-data)))))
+                   (let [datom-vec-coll (map vec tx-data)
+                         id->same-entity-datoms (group-by first datom-vec-coll)
+                         id-order (distinct (map first datom-vec-coll))
+                         same-entity-datoms-coll (map id->same-entity-datoms id-order)
+                         id->attr->datom (update-vals
+                                          id->same-entity-datoms
+                                          #'worker-db-listener/entity-datoms=>attr->datom)]
+                     (when (:persist-op? tx-meta true)
+                       (db-listener/generate-rtc-ops test-helper/test-db-name-db-version db-before db-after
+                                                     same-entity-datoms-coll id->attr->datom))))))
    :after
    #(when-let [test-db-conn (conn/get-db test-helper/test-db-name-db-version false)]
       (d/unlisten! test-db-conn ::gen-ops))})