浏览代码

refactor: compute pipeline tx-data before d/transact!

Tienson Qin 5 天之前
父节点
当前提交
999618a406

+ 38 - 24
deps/db/src/logseq/db.cljs

@@ -35,6 +35,7 @@
 
 (defonce *transact-fn (atom nil))
 (defonce *transact-invalid-callback (atom nil))
+(defonce *transact-pipeline-fn (atom nil))
 
 (defn register-transact-fn!
   [f]
@@ -42,6 +43,9 @@
 (defn register-transact-invalid-callback-fn!
   [f]
   (when f (reset! *transact-invalid-callback f)))
+(defn register-transact-pipeline-fn!
+  [f]
+  (when f (reset! *transact-pipeline-fn f)))
 
 (defn- remove-temp-block-data
   [tx-data]
@@ -86,34 +90,44 @@
                   (:a d)))
      datoms)))
 
+(defn- throw-if-page-has-block-parent!
+  [db tx-data]
+  (when (some (fn [d] (and (:added d)
+                           (= :block/parent (:a d))
+                           (entity-util/page? (d/entity db (:e d)))
+                           (not (entity-util/page? (d/entity db (:v d)))))) tx-data)
+    (throw (ex-info "Page can't have block as parent"
+                    {:tx-data tx-data}))))
+
 (defn- transact-sync
   [repo-or-conn tx-data tx-meta]
   (try
     (let [conn repo-or-conn
           db @conn
-          db-based? (entity-plus/db-based-graph? db)
-          [validate-result tx-report] (if (and db-based?
-                                               (not (:pipeline-replace? tx-meta))
-                                               (not (:reset-conn! tx-meta))
-                                               (not (:skip-validate-db? tx-meta false))
-                                               (not (:logseq.graph-parser.exporter/new-graph? tx-meta)))
-                                        (let [tx-report (d/with db tx-data tx-meta)]
-                                          [(db-validate/validate-tx-report tx-report nil) tx-report])
-                                        [true nil])]
-      (if validate-result
-        (if (and tx-report (seq (:tx-data tx-report)))
-          ;; perf enhancement: avoid repeated call on `d/with`
-          (do
-            (reset! conn (:db-after tx-report))
-            (dc/store-after-transact! conn tx-report)
-            (dc/run-callbacks conn tx-report)
-            tx-report)
-          (d/transact! conn tx-data tx-meta))
-        (do
-          ;; notify ui
-          (when-let [f @*transact-invalid-callback]
-            (f tx-report))
-          (throw (ex-info "DB write with invalid data" {:tx-data tx-data})))))
+          db-based? (entity-plus/db-based-graph? db)]
+      (if (and db-based?
+               (not (:reset-conn! tx-meta))
+               (not (:initial-db? tx-meta))
+               (not (:skip-validate-db? tx-meta false))
+               (not (:logseq.graph-parser.exporter/new-graph? tx-meta)))
+        (let [tx-report* (d/with db tx-data tx-meta)
+              pipeline-f @*transact-pipeline-fn
+              tx-report (if-let [f pipeline-f] (f tx-report*) tx-report*)
+              _ (throw-if-page-has-block-parent! (:db-after tx-report) (:tx-data tx-report))
+              validate-result (db-validate/validate-tx-report tx-report nil)]
+          (if validate-result
+            (when (and tx-report (seq (:tx-data tx-report)))
+              ;; perf enhancement: avoid repeated call on `d/with`
+              (reset! conn (:db-after tx-report))
+              (dc/store-after-transact! conn tx-report)
+              (dc/run-callbacks conn tx-report))
+            (do
+              ;; notify ui
+              (when-let [f @*transact-invalid-callback]
+                (f tx-report))
+              (throw (ex-info "DB write with invalid data" {:tx-data tx-data}))))
+          tx-report)
+        (d/transact! conn tx-data tx-meta)))
     (catch :default e
       (js/console.trace)
       (prn :debug :transact-failed :tx-meta tx-meta :tx-data tx-data)
@@ -145,7 +159,7 @@
 
      ;; Ensure worker can handle the request sequentially (one by one)
      ;; Because UI assumes that the in-memory db has all the data except the last one transaction
-     (when (or (seq tx-data) (:db-persist? tx-meta))
+     (when (seq tx-data)
 
        ;; (prn :debug :transact :sync? (= d/transact! (or @*transact-fn d/transact!)) :tx-meta tx-meta)
        ;; (cljs.pprint/pprint tx-data)

+ 1 - 2
deps/graph-parser/script/db_import.cljs

@@ -95,8 +95,7 @@
       (println (some-> (get-in m [:ex-data :error]) .-stack)))
     (when debug
       (when-let [matching-tx (seq (filter #(and (get-in m [:ex-data :path])
-                                                (or (= (get-in % [:tx-meta ::gp-exporter/path]) (get-in m [:ex-data :path]))
-                                                    (= (get-in % [:tx-meta ::outliner-pipeline/original-tx-meta ::gp-exporter/path]) (get-in m [:ex-data :path]))))
+                                                (= (get-in % [:tx-meta ::gp-exporter/path]) (get-in m [:ex-data :path])))
                                           @tx-queue))]
         (println (str "\n" (count matching-tx)) "Tx Maps for failing path:")
         (pprint/pprint matching-tx))))

+ 32 - 31
deps/outliner/src/logseq/outliner/commands.cljs

@@ -149,15 +149,16 @@
         (tc/to-long next-time)))))
 
 (defn- compute-reschedule-property-tx
-  [conn db entity property-ident]
-  (let [frequency (or (db-property/property-value-content (:logseq.property.repeat/recur-frequency entity))
-                      (let [property (d/entity db :logseq.property.repeat/recur-frequency)
-                            default-value-block (db-property-build/build-property-value-block property property 1)
-                            default-value-tx-data [default-value-block
-                                                   {:db/id (:db/id property)
-                                                    :logseq.property/default-value [:block/uuid (:block/uuid default-value-block)]}]]
-                        (ldb/transact! conn default-value-tx-data)
-                        1))
+  [db entity property-ident]
+  (let [[frequency default-value-tx-data]
+        (or [(db-property/property-value-content (:logseq.property.repeat/recur-frequency entity))
+             nil]
+            (let [property (d/entity db :logseq.property.repeat/recur-frequency)
+                  default-value-block (db-property-build/build-property-value-block property property 1)
+                  default-value-tx-data [default-value-block
+                                         {:db/id (:db/id property)
+                                          :logseq.property/default-value [:block/uuid (:block/uuid default-value-block)]}]]
+              [1 default-value-tx-data]))
         unit (:logseq.property.repeat/recur-unit entity)
         property (d/entity db property-ident)
         date? (= :date (:logseq.property/type property))
@@ -175,11 +176,12 @@
                                               (outliner-page/create db title {})))
               value (if date? [:block/uuid page-uuid] next-time-long)]
           (concat
+           default-value-tx-data
            tx-data
            (when value
              [[:db/add (:db/id entity) property-ident value]])))))))
 
-(defmethod handle-command :reschedule [_ conn db entity _datoms]
+(defmethod handle-command :reschedule [_ db entity _datoms]
   (let [property-ident (or (:db/ident (:logseq.property.repeat/temporal-property entity))
                            :logseq.property/scheduled)
         other-property-idents (cond
@@ -193,14 +195,14 @@
 
                                 :else
                                 (filter (fn [p] (get entity p)) [:logseq.property/deadline :logseq.property/scheduled]))]
-    (mapcat #(compute-reschedule-property-tx conn db entity %) (distinct (cons property-ident other-property-idents)))))
+    (mapcat #(compute-reschedule-property-tx db entity %) (distinct (cons property-ident other-property-idents)))))
 
-(defmethod handle-command :set-property [_ _db _conn entity _datoms property value]
+(defmethod handle-command :set-property [_ _db entity _datoms property value]
   (let [property' (get-property entity property)
         value' (get-value entity property value)]
     [[:db/add (:db/id entity) property' value']]))
 
-(defmethod handle-command :record-property-history [_ _conn db entity datoms]
+(defmethod handle-command :record-property-history [_ db entity datoms]
   (let [changes (keep (fn [d]
                         (let [property (d/entity db (:a d))]
                           (when (and (true? (get property :logseq.property/enable-history?))
@@ -218,7 +220,7 @@
            :logseq.property.history/property (:db/id property)})))
      changes)))
 
-(defmethod handle-command :default [command _conn _db entity datoms]
+(defmethod handle-command :default [command _db entity datoms]
   (throw (ex-info "Unhandled command"
                   {:command command
                    :entity entity
@@ -226,23 +228,22 @@
 
 (defn execute-command
   "Build tx-data"
-  [conn db entity datoms [_command {:keys [actions]}]]
+  [db entity datoms [_command {:keys [actions]}]]
   (mapcat (fn [action]
-            (apply handle-command (first action) conn db entity datoms (rest action))) actions))
+            (apply handle-command (first action) db entity datoms (rest action))) actions))
 
 (defn run-commands
-  [conn {:keys [tx-data db-after]}]
-  (let [db db-after]
-    (mapcat (fn [[e datoms]]
-              (let [entity (d/entity db e)
-                    commands (filter (fn [[_command {:keys [entity-conditions tx-conditions]}]]
-                                       (and
-                                        (if (seq entity-conditions)
-                                          (every? #(satisfy-condition? db entity % nil) entity-conditions)
-                                          true)
-                                        (every? #(satisfy-condition? db entity % datoms) tx-conditions))) @*commands)]
-                (mapcat
-                 (fn [command]
-                   (execute-command conn db entity datoms command))
-                 commands)))
-            (group-by :e tx-data))))
+  [{:keys [tx-data db-after]}]
+  (mapcat (fn [[e datoms]]
+            (let [entity (d/entity db-after e)
+                  commands (filter (fn [[_command {:keys [entity-conditions tx-conditions]}]]
+                                     (and
+                                      (if (seq entity-conditions)
+                                        (every? #(satisfy-condition? db-after entity % nil) entity-conditions)
+                                        true)
+                                      (every? #(satisfy-condition? db-after entity % datoms) tx-conditions))) @*commands)]
+              (mapcat
+               (fn [command]
+                 (execute-command db-after entity datoms command))
+               commands)))
+          (group-by :e tx-data)))

+ 1 - 2
deps/outliner/src/logseq/outliner/core.cljs

@@ -1109,8 +1109,7 @@
     (let [result (apply f args)]
       (when result
         (let [tx-meta (assoc (:tx-meta result)
-                             :outliner-op outliner-op
-                             :skip-store? true)]
+                             :outliner-op outliner-op)]
           (ldb/transact! (second args) (:tx-data result) tx-meta)))
       result)
     (catch :default e

+ 1 - 2
deps/outliner/src/logseq/outliner/db_pipeline.cljs

@@ -12,8 +12,7 @@
   "Modified copy of frontend.worker.pipeline/invoke-hooks that handles new DB graphs but
    doesn't handle updating DB graphs well yet e.g. doesn't handle :block/tx-id"
   [conn tx-report]
-  (when (not (get-in tx-report [:tx-meta :pipeline-replace?]))
-    ;; TODO: Handle block edits with separate :block/refs rebuild as deleting property values is buggy
+  (when-not (:pipeline-replace? (:tx-meta tx-report))
     (outliner-pipeline/transact-new-db-graph-refs conn tx-report)))
 
 (defn ^:api add-listener

+ 2 - 2
deps/outliner/src/logseq/outliner/pipeline.cljs

@@ -146,6 +146,6 @@
   [conn tx-report]
   (let [{:keys [blocks]} (ds-report/get-blocks-and-pages tx-report)
         refs-tx-report (when-let [refs-tx (and (seq blocks) (rebuild-block-refs-tx tx-report blocks))]
-                         (ldb/transact! conn refs-tx {:pipeline-replace? true
-                                                      ::original-tx-meta (:tx-meta tx-report)}))]
+                         (ldb/transact! conn refs-tx (-> (:tx-meta tx-report)
+                                                         (assoc :pipeline-replace? true))))]
     refs-tx-report))

+ 1 - 1
src/main/frontend/handler/whiteboard.cljs

@@ -149,7 +149,7 @@
        :deleted-shapes deleted-shapes
        :new-shapes created-shapes
        :metadata {:whiteboard/transact? true
-                  :pipeline-replace? replace?}})))
+                  :whiteboard/replace? replace?}})))
 
 (defonce *last-shapes-nonce (atom {}))
 

+ 1 - 2
src/main/frontend/modules/outliner/pipeline.cljs

@@ -28,8 +28,7 @@
   [{:keys [repo tx-meta tx-data deleted-block-uuids deleted-assets affected-keys blocks]}]
   ;; (prn :debug
   ;;      :tx-meta tx-meta
-  ;;      ;; :tx-data tx-data
-  ;;      )
+  ;;      :tx-data tx-data)
   (let [{:keys [from-disk? new-graph? initial-pages? end?]} tx-meta
         tx-report {:tx-meta tx-meta
                    :tx-data tx-data}]

+ 1 - 2
src/main/frontend/undo_redo.cljs

@@ -269,8 +269,7 @@
           (when (seq tx-data)
             (let [reversed-tx-data (get-reversed-datoms conn undo? data tx-meta)
                   tx-meta' (-> tx-meta
-                               (dissoc :pipeline-replace?
-                                       :batch-tx/batch-tx-mode?)
+                               (dissoc :batch-tx/batch-tx-mode?)
                                (assoc
                                 :gen-undo-ops? false
                                 :undo? undo?

+ 28 - 32
src/main/frontend/worker/db_listener.cljs

@@ -35,7 +35,7 @@
 
         (when-not from-disk?
           (p/do!
-         ;; Sync SQLite search
+           ;; Sync SQLite search
            (let [{:keys [blocks-to-remove-set blocks-to-add]} (search/sync-search-indice repo tx-report')]
              (when (seq blocks-to-remove-set)
                ((@thread-api/*thread-apis :thread-api/search-delete-blocks) repo blocks-to-remove-set))
@@ -65,9 +65,7 @@
                                       (map (fn [id] [:db/add id :logseq.property.embedding/hnsw-label-updated-at 0])))
           tx-data (concat remove-old-hnsw-tx-data mark-embedding-tx-data)]
       (when (seq tx-data)
-        (ldb/transact! conn tx-data
-                       {:skip-refresh? true
-                        :pipeline-replace? true})))))
+        (ldb/transact! conn tx-data {})))))
 
 (defn listen-db-changes!
   [repo conn & {:keys [handler-keys]}]
@@ -90,39 +88,37 @@
                    (remove-old-embeddings-and-reset-new-updates! conn tx-data tx-meta)
 
                    (let [tx-meta (merge (batch-tx/get-batch-opts) tx-meta)
-                         pipeline-replace? (:pipeline-replace? tx-meta)
                          in-batch-tx-mode? (:batch-tx/batch-tx-mode? tx-meta)]
-                     (when-not pipeline-replace?
-                       (when in-batch-tx-mode?
-                         (batch-tx/set-batch-opts (dissoc tx-meta :pipeline-replace?)))
-                       (cond
-                         (and in-batch-tx-mode?
-                              (not (:batch-tx/exit? tx-meta)))
+                     (when in-batch-tx-mode?
+                       (batch-tx/set-batch-opts tx-meta))
+                     (cond
+                       (and in-batch-tx-mode?
+                            (not (:batch-tx/exit? tx-meta)))
                          ;; still in batch mode
-                         (vswap! *batch-all-txs into tx-data)
+                       (vswap! *batch-all-txs into tx-data)
 
-                         in-batch-tx-mode?
+                       in-batch-tx-mode?
                          ;; exit batch mode
-                         (when-let [tx-data (not-empty (get-batch-txs))]
-                           (vreset! *batch-all-txs [])
-                           (let [db-before (batch-tx/get-batch-db-before)
-                                 tx-meta (dissoc tx-meta :batch-tx/batch-tx-mode? :batch-tx/exit?)
-                                 tx-report (assoc tx-report
-                                                  :tx-data tx-data
-                                                  :db-before db-before
-                                                  :tx-meta tx-meta)
-                                 tx-report' (if sync-db-to-main-thread?
-                                              (sync-db-to-main-thread repo conn tx-report)
-                                              tx-report)
-                                 opt (assoc (additional-args (:tx-data tx-report')) :repo repo)]
-                             (doseq [[k handler-fn] handlers]
-                               (handler-fn k opt tx-report'))))
-
-                         (seq tx-data)
-                         ;; raw transact
-                         (let [tx-report' (if sync-db-to-main-thread?
+                       (when-let [tx-data (not-empty (get-batch-txs))]
+                         (vreset! *batch-all-txs [])
+                         (let [db-before (batch-tx/get-batch-db-before)
+                               tx-meta (dissoc tx-meta :batch-tx/batch-tx-mode? :batch-tx/exit?)
+                               tx-report (assoc tx-report
+                                                :tx-data tx-data
+                                                :db-before db-before
+                                                :tx-meta tx-meta)
+                               tx-report' (if sync-db-to-main-thread?
                                             (sync-db-to-main-thread repo conn tx-report)
                                             tx-report)
                                opt (assoc (additional-args (:tx-data tx-report')) :repo repo)]
                            (doseq [[k handler-fn] handlers]
-                             (handler-fn k opt tx-report')))))))))))
+                             (handler-fn k opt tx-report'))))
+
+                       (seq tx-data)
+                         ;; raw transact
+                       (let [tx-report' (if sync-db-to-main-thread?
+                                          (sync-db-to-main-thread repo conn tx-report)
+                                          tx-report)
+                             opt (assoc (additional-args (:tx-data tx-report')) :repo repo)]
+                         (doseq [[k handler-fn] handlers]
+                           (handler-fn k opt tx-report'))))))))))

+ 4 - 0
src/main/frontend/worker/db_worker.cljs

@@ -25,6 +25,7 @@
             [frontend.worker.file.reset :as file-reset]
             [frontend.worker.handler.page :as worker-page]
             [frontend.worker.handler.page.file-based.rename :as file-worker-page-rename]
+            [frontend.worker.pipeline :as worker-pipeline]
             [frontend.worker.rtc.asset-db-listener]
             [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.core :as rtc.core]
@@ -274,6 +275,9 @@
       (common-sqlite/create-kvs-table! db)
       (when-not @*publishing? (common-sqlite/create-kvs-table! client-ops-db))
       (search/create-tables-and-triggers! search-db)
+      (ldb/register-transact-pipeline-fn!
+       (fn [tx-report]
+         (worker-pipeline/transact-pipeline repo tx-report)))
       (let [schema (ldb/get-schema repo)
             conn (common-sqlite/get-storage-conn storage schema)
             _ (db-fix/check-and-fix-schema! repo conn)

+ 61 - 83
src/main/frontend/worker/pipeline.cljs

@@ -10,11 +10,11 @@
             [logseq.common.util.page-ref :as page-ref]
             [logseq.common.uuid :as common-uuid]
             [logseq.db :as ldb]
+            [logseq.db.common.entity-plus :as entity-plus]
             [logseq.db.common.order :as db-order]
             [logseq.db.common.sqlite :as common-sqlite]
             [logseq.db.frontend.class :as db-class]
             [logseq.db.sqlite.export :as sqlite-export]
-            [logseq.db.sqlite.util :as sqlite-util]
             [logseq.graph-parser.exporter :as gp-exporter]
             [logseq.outliner.commands :as commands]
             [logseq.outliner.core :as outliner-core]
@@ -169,9 +169,9 @@
        (apply concat)))))
 
 (defn- toggle-page-and-block
-  [conn {:keys [db-before db-after tx-data tx-meta]}]
+  [db {:keys [db-before db-after tx-data tx-meta]}]
   (when-not (rtc-tx-or-download-graph? tx-meta)
-    (let [page-tag (d/entity @conn :logseq.class/Page)
+    (let [page-tag (d/entity db :logseq.class/Page)
           library-page (ldb/get-library-page db-after)]
       (mapcat
        (fn [datom]
@@ -320,15 +320,16 @@
           (nil? created-by-ent) (cons created-by-block))))))
 
 (defn- compute-extra-tx-data
-  [repo conn tx-report]
+  [repo tx-report]
   (let [{:keys [db-before db-after tx-data tx-meta]} tx-report
+        db db-after
         fix-page-tags-tx-data (fix-page-tags tx-report)
         fix-inline-page-tx-data (fix-inline-built-in-page-classes tx-report)
         toggle-page-and-block-tx-data (when (empty? fix-inline-page-tx-data)
-                                        (toggle-page-and-block conn tx-report))
+                                        (toggle-page-and-block db tx-report))
         display-blocks-tx-data (add-missing-properties-to-typed-display-blocks db-after tx-data tx-meta)
         commands-tx (when-not (or (:undo? tx-meta) (:redo? tx-meta) (rtc-tx-or-download-graph? tx-meta))
-                      (commands/run-commands conn tx-report))
+                      (commands/run-commands tx-report))
         insert-templates-tx (when-not (rtc-tx-or-download-graph? tx-meta)
                               (insert-tag-templates repo tx-report))
         created-by-tx (add-created-by-ref-hook db-before db-after tx-data tx-meta)]
@@ -340,92 +341,71 @@
             fix-page-tags-tx-data
             fix-inline-page-tx-data)))
 
-(defn- reverse-tx!
-  [conn tx-data]
-  (let [reversed-tx-data (map (fn [[e a v _tx add?]]
-                                (let [op (if add? :db/retract :db/add)]
-                                  [op e a v])) tx-data)]
-    (d/transact! conn reversed-tx-data {:revert-tx-data? true
-                                        :gen-undo-ops? false})))
-
-(defn- undo-tx-data-if-disallowed!
-  [conn {:keys [tx-data tx-meta]}]
-  (when-not (:rtc-download-graph? tx-meta)
-    (let [db @conn
-          page-has-block-parent? (some (fn [d] (and (:added d)
-                                                    (= :block/parent (:a d))
-                                                    (ldb/page? (d/entity db (:e d)))
-                                                    (not (ldb/page? (d/entity db (:v d)))))) tx-data)]
-      ;; TODO: add other cases that need to be undo
-      (when page-has-block-parent?
-        (reverse-tx! conn tx-data)
-        (throw (ex-info "Page can't have block as parent"
-                        {:type :notification
-                         :payload {:message "Page can't have block as parent"
-                                   :type :warning}
-                         :tx-data tx-data}))))))
+(defn transact-pipeline
+  "Compute extra tx-data and block/refs, should ensure it's a pure function and
+  doesn't call `d/transact!` or `ldb/transact!`."
+  [repo {:keys [db-after tx-meta] :as tx-report}]
+  (let [db-based? (entity-plus/db-based-graph? db-after)
+        extra-tx-data (when db-based?
+                        (compute-extra-tx-data repo tx-report))
+        tx-report* (if (seq extra-tx-data)
+                     (let [result (d/with db-after extra-tx-data)]
+                       (assoc tx-report
+                              :tx-data (concat (:tx-data tx-report) (:tx-data result))
+                              :db-after (:db-after result)))
+                     tx-report)
+        {:keys [pages blocks]} (ds-report/get-blocks-and-pages tx-report*)
+        deleted-blocks (outliner-pipeline/filter-deleted-blocks (:tx-data tx-report*))
+        deleted-block-ids (set (map :db/id deleted-blocks))
+        blocks' (remove (fn [b] (deleted-block-ids (:db/id b))) blocks)
+        block-refs (when (seq blocks')
+                     (rebuild-block-refs repo tx-report* blocks'))
+        tx-id-data (let [db-after (:db-after tx-report*)
+                         updated-blocks (remove (fn [b] (contains? deleted-block-ids (:db/id b)))
+                                                (concat pages blocks))
+                         tx-id (get-in tx-report* [:tempids :db/current-tx])]
+                     (keep (fn [b]
+                             (when-let [db-id (:db/id b)]
+                               (when (:block/uuid (d/entity db-after db-id))
+                                 {:db/id db-id
+                                  :block/tx-id tx-id}))) updated-blocks))
+        block-refs-tx-id-data (concat block-refs tx-id-data)
+        replace-tx-report (when (seq block-refs-tx-id-data)
+                            (d/with (:db-after tx-report*) block-refs-tx-id-data))
+        tx-report' (or replace-tx-report tx-report*)
+        full-tx-data (concat (:tx-data tx-report*)
+                             (:tx-data replace-tx-report))]
+    (assoc tx-report'
+           :tx-data full-tx-data
+           :tx-meta tx-meta
+           :db-before (:db-before tx-report)
+           :db-after (or (:db-after tx-report')
+                         (:db-after tx-report)))))
 
 (defn- invoke-hooks-default
   [repo conn {:keys [tx-meta] :as tx-report} context]
-  ;; Notice: don't catch `undo-tx-data-if-disallowed!` since we want it failed immediately
-  (undo-tx-data-if-disallowed! conn tx-report)
   (try
-    (let [extra-tx-data (when (sqlite-util/db-based-graph? repo)
-                          (compute-extra-tx-data repo conn tx-report))
-          tx-report* (if (seq extra-tx-data)
-                       (let [result (ldb/transact! conn extra-tx-data {:pipeline-replace? true
-                                                                       :skip-store? true})]
-                         (assoc tx-report
-                                :tx-data (concat (:tx-data tx-report) (:tx-data result))
-                                :db-after (:db-after result)))
-                       tx-report)
-          {:keys [pages blocks]} (ds-report/get-blocks-and-pages tx-report*)
+    (let [{:keys [pages blocks]} (ds-report/get-blocks-and-pages tx-report)
+          deleted-blocks (outliner-pipeline/filter-deleted-blocks (:tx-data tx-report))
           _ (when (common-sqlite/local-file-based-graph? repo)
               (let [page-ids (distinct (map :db/id pages))]
                 (doseq [page-id page-ids]
                   (when (d/entity @conn page-id)
                     (file/sync-to-file repo page-id tx-meta)))))
-          deleted-blocks (outliner-pipeline/filter-deleted-blocks (:tx-data tx-report*))
-          deleted-block-ids (set (map :db/id deleted-blocks))
           deleted-block-uuids (set (map :block/uuid deleted-blocks))
+          deleted-block-ids (set (map :db/id deleted-blocks))
           _ (when (seq deleted-block-uuids)
               (swap! worker-state/*deleted-block-uuid->db-id merge
                      (zipmap (map :block/uuid deleted-blocks)
                              (map :db/id deleted-blocks))))
           deleted-assets (keep (fn [id]
-                                 (let [e (d/entity (:db-before tx-report*) id)]
+                                 (let [e (d/entity (:db-before tx-report) id)]
                                    (when (ldb/asset? e)
                                      {:block/uuid (:block/uuid e)
                                       :ext (:logseq.property.asset/type e)}))) deleted-block-ids)
-          blocks' (remove (fn [b] (deleted-block-ids (:db/id b))) blocks)
-          block-refs (when (seq blocks')
-                       (rebuild-block-refs repo tx-report* blocks'))
-          tx-id-data (let [db-after (:db-after tx-report*)
-                           updated-blocks (remove (fn [b] (contains? deleted-block-ids (:db/id b)))
-                                                  (concat pages blocks))
-                           tx-id (get-in tx-report* [:tempids :db/current-tx])]
-                       (keep (fn [b]
-                               (when-let [db-id (:db/id b)]
-                                 (when (:block/uuid (d/entity db-after db-id))
-                                   {:db/id db-id
-                                    :block/tx-id tx-id}))) updated-blocks))
-          block-refs-tx-id-data (concat block-refs tx-id-data)
-          replace-tx-report (when (seq block-refs-tx-id-data)
-                              (ldb/transact! conn block-refs-tx-id-data {:pipeline-replace? true
-                                                                         ;; Ensure db persisted
-                                                                         :db-persist? true}))
-          tx-report' (or replace-tx-report tx-report*)
-          full-tx-data (concat (:tx-data tx-report*)
-                               (:tx-data replace-tx-report))
-          final-tx-report (assoc tx-report'
-                                 :tx-data full-tx-data
-                                 :tx-meta tx-meta
-                                 :db-before (:db-before tx-report)
-                                 :db-after (or (:db-after tx-report')
-                                               (:db-after tx-report)))
           affected-query-keys (when-not (or (:importing? context) (:rtc-download-graph? tx-meta))
-                                (worker-react/get-affected-queries-keys final-tx-report))]
-      {:tx-report final-tx-report
+                                (worker-react/get-affected-queries-keys tx-report))]
+      {:tx-report tx-report
        :affected-keys affected-query-keys
        :deleted-block-uuids deleted-block-uuids
        :deleted-assets deleted-assets
@@ -437,15 +417,13 @@
 
 (defn invoke-hooks
   [repo conn {:keys [tx-meta] :as tx-report} context]
-  (when-not (or (:pipeline-replace? tx-meta)
-                (:revert-tx-data? tx-meta))
-    (let [{:keys [from-disk? new-graph?]} tx-meta]
-      (cond
-        (or from-disk? new-graph?)
-        {:tx-report tx-report}
+  (let [{:keys [from-disk? new-graph?]} tx-meta]
+    (cond
+      (or from-disk? new-graph?)
+      {:tx-report tx-report}
 
-        (or (::gp-exporter/new-graph? tx-meta) (::sqlite-export/imported-data? tx-meta))
-        (invoke-hooks-for-imported-graph conn tx-report)
+      (or (::gp-exporter/new-graph? tx-meta) (::sqlite-export/imported-data? tx-meta))
+      (invoke-hooks-for-imported-graph conn tx-report)
 
-        :else
-        (invoke-hooks-default repo conn tx-report context)))))
+      :else
+      (invoke-hooks-default repo conn tx-report context))))

+ 4 - 1
src/test/frontend/test/helper.cljs

@@ -13,6 +13,7 @@
             [frontend.state :as state]
             [frontend.worker.handler.page :as worker-page]
             [frontend.worker.pipeline :as worker-pipeline]
+            [logseq.db :as ldb]
             [logseq.db.common.order :as db-order]
             [logseq.db.sqlite.build :as sqlite-build]
             [logseq.db.sqlite.create-graph :as sqlite-create-graph]
@@ -33,9 +34,11 @@
         test-db' (if db-graph? test-db-name-db-version test-db-name)]
     (state/set-current-repo! test-db')
     (conn/start! test-db' opts)
+    (ldb/register-transact-pipeline-fn!
+     (fn [tx-report]
+       (worker-pipeline/transact-pipeline test-db' tx-report)))
     (let [conn (conn/get-db test-db' false)]
       (when db-graph?
-        (db-pipeline/add-listener conn)
         (d/transact! conn (sqlite-create-graph/build-db-initial-data "")))
       (d/listen! conn ::listen-db-changes!
                  (fn [tx-report]