Przeglądaj źródła

fix: ensure linear sequential transactions for the db worker

Tienson Qin 2 lat temu
rodzic
commit
d5e62ae21f

+ 8 - 1
deps/db/src/logseq/db.cljs

@@ -54,6 +54,11 @@
 (defonce *request-id (atom 0))
 (defonce *request-id->response (atom {}))
 
+(defn request-finished?
+  "Whether any DB transaction request has been finished"
+  []
+  (empty? @*request-id->response))
+
 (defn get-deferred-response
   [request-id]
   (get @*request-id->response request-id))
@@ -63,7 +68,9 @@
    (transact! conn tx-data nil))
   ([conn tx-data tx-meta]
    (let [tx-data (common-util/fast-remove-nils tx-data)]
-     (when (seq tx-data)
+     ;; Ensure worker can handle the request sequentially (one by one)
+     ;; Because UI assumes that the in-memory db has all the data except the last one transaction
+     (when (and (seq tx-data) (request-finished?))
 
        ;; (prn :debug :transact)
        ;; (cljs.pprint/pprint tx-data)

+ 5 - 0
src/main/frontend/db_worker.cljs

@@ -368,6 +368,11 @@
    [this]
    (empty? @file/*writes))
 
+  (page-file-saved
+   [this request-id page-id]
+   (file/dissoc-request! request-id)
+   nil)
+
   (sync-app-state
    [this new-state-str]
    (let [new-state (edn/read-string new-state-str)]

+ 20 - 14
src/main/frontend/handler/events.cljs

@@ -77,7 +77,8 @@
             [frontend.db.rtc.debug-ui :as rtc-debug-ui]
             [frontend.modules.outliner.pipeline :as pipeline]
             [electron.ipc :as ipc]
-            [frontend.date :as date]))
+            [frontend.date :as date]
+            [logseq.db :as ldb]))
 
 ;; TODO: should we move all events here?
 
@@ -187,8 +188,7 @@
 
 (defmethod handle :graph/switch [[_ graph opts]]
   (let [^js sqlite @db-browser/*worker]
-    (p/let [writes-finished? (when sqlite (.file-writes-finished? sqlite))
-            writes-finished? (if (some? writes-finished?) writes-finished? true)]
+    (p/let [writes-finished? (when sqlite (.file-writes-finished? sqlite))]
       (if (or writes-finished? (:sync-graph/init? @state/state))
         (graph-switch-on-persisted graph opts)
         (notification/show!
@@ -363,10 +363,15 @@
 
 (defmethod handle :file/not-matched-from-disk [[_ path disk-content db-content]]
   (when-let [repo (state/get-current-repo)]
-    (when (and disk-content db-content
-               (not= (util/trim-safe disk-content) (util/trim-safe db-content)))
-      (state/set-modal! #(diff/local-file repo path disk-content db-content)
-                        {:label "diff__cp"}))))
+    (let [^js sqlite @db-browser/*worker]
+      (p/let [writes-finished? (when sqlite (.file-writes-finished? sqlite))
+              request-finished? (ldb/request-finished?)]
+        (prn :debug :writes-finished? writes-finished?
+             :request-finished? request-finished?)
+        (when (and request-finished? writes-finished? disk-content db-content
+                   (not= (util/trim-safe disk-content) (util/trim-safe db-content)))
+          (state/set-modal! #(diff/local-file repo path disk-content db-content)
+                            {:label "diff__cp"}))))))
 
 (defmethod handle :modal/display-file-version [[_ path content hash]]
   (state/set-modal! #(git-component/file-specific-version path hash content)))
@@ -945,13 +950,14 @@
   (swap! rtc-debug-ui/debug-state (fn [old] (merge old state))))
 
 ;; db-worker -> UI
-(defmethod handle :db/sync-changes [[_ data]]
-  (let [repo (state/get-current-repo)]
-    (pipeline/invoke-hooks data)
-
-    (ipc/ipc :db-transact repo (pr-str (:tx-data data)) (pr-str (:tx-meta data)))
-    (state/pub-event! [:search/transact-data repo (:search-indice data)])
-    nil))
+(defmethod handle :db/sync-changes [[_ {:keys [request-id] :as data}]]
+  (when request-id                      ; request-id could be nil sometimes
+    (let [repo (state/get-current-repo)]
+      (pipeline/invoke-hooks data)
+
+      (ipc/ipc :db-transact repo (pr-str (:tx-data data)) (pr-str (:tx-meta data)))
+      (state/pub-event! [:search/transact-data repo (:search-indice data)])
+      nil)))
 
 (defn run!
   []

+ 13 - 11
src/main/frontend/handler/worker.cljs

@@ -4,37 +4,39 @@
             [frontend.handler.file :as file-handler]
             [frontend.handler.notification :as notification]
             [clojure.edn :as edn]
-            [frontend.state :as state]))
+            [frontend.state :as state]
+            [promesa.core :as p]))
 
 (defmulti handle identity)
 
-(defmethod handle :write-files [_ data]
-  (let [{:keys [repo files]} (edn/read-string data)]
-    (file-handler/alter-files repo files {})))
+(defmethod handle :write-files [_ ^js worker data]
+  (let [{:keys [request-id page-id repo files]} (edn/read-string data)]
+    (p/let [_ (file-handler/alter-files repo files {})]
+      (.page-file-saved worker request-id page-id))))
 
-(defmethod handle :notification [_ data]
+(defmethod handle :notification [_ _worker data]
   (apply notification/show! (edn/read-string data)))
 
-(defmethod handle :add-repo [_ data]
+(defmethod handle :add-repo [_ _worker data]
   (state/add-repo! {:url (:repo (edn/read-string data))}))
 
-(defmethod handle :rtc-sync-state [_ data]
+(defmethod handle :rtc-sync-state [_ _worker data]
   (let [state (edn/read-string data)]
     (state/pub-event! [:rtc/sync-state state])))
 
-(defmethod handle :sync-db-changes [_ data]
+(defmethod handle :sync-db-changes [_ _worker data]
   (let [data (edn/read-string data)]
     (state/pub-event! [:db/sync-changes data])))
 
-(defmethod handle :default [_ data]
+(defmethod handle :default [_ _worker data]
   (prn :debug "Worker data not handled: " data))
 
 (defn handle-message!
-  [^js worker]
+  [^js worker wrapped-worker]
   (assert worker "worker doesn't exists")
   (set! (.-onmessage worker)
         (fn [event]
           (let [data (.-data event)]
             (when-not (= (.-type data) "RAW")
               (let [[e payload] (bean/->clj data)]
-                (handle (keyword e) payload)))))))
+                (handle (keyword e) wrapped-worker payload)))))))

+ 58 - 59
src/main/frontend/modules/outliner/pipeline.cljs

@@ -40,62 +40,61 @@
 
 (defn invoke-hooks
   [{:keys [request-id tx-meta tx-data deleted-block-uuids affected-keys blocks] :as opts}]
-  (let [{:keys [from-disk? new-graph? local-tx? undo? redo?]} tx-meta
-        repo (state/get-current-repo)
-        tx-report {:tx-meta tx-meta
-                   :tx-data tx-data}]
-
-    (prn :debug :worker-response :request-id request-id)
-
-    (let [conn (db/get-db repo false)
-          tx-report (d/transact! conn tx-data tx-meta)]
-      (when local-tx?
-        (let [tx-id (get-tx-id tx-report)]
-          (store-undo-data! (assoc opts :tx-id tx-id))))
-      (when-not (or undo? redo?)
-        (update-current-tx-editor-cursor! tx-report)))
-
-    (let [pages (set (keep #(when (= :block/name (:a %)) (:v %)) tx-data))]
-      (when (seq pages)
-        (mark-pages-as-loaded! repo pages)))
-
-    (when (= (:outliner-op tx-meta) :delete-page)
-      (state/pub-event! [:page/deleted repo (:deleted-page tx-meta) (:file-path tx-meta)]))
-
-    (when (= (:outliner-op tx-meta) :rename-page)
-      (state/pub-event! [:page/renamed repo (:data tx-meta)]))
-
-    (if (or from-disk? new-graph?)
-      (do
-        (react/clear-query-state!)
-        (ui-handler/re-render-root!))
-      (when-not (:graph/importing @state/state)
-        (react/refresh! repo tx-report affected-keys)
-
-        (when-let [state (:ui/restore-cursor-state @state/state)]
-          (when (or undo? redo?)
-            (restore-cursor-and-app-state! state undo?)
-            (state/set-state! :ui/restore-cursor-state nil)))
-
-        (state/set-state! :editor/start-pos nil)
-
-        (when (and state/lsp-enabled?
-                   (seq blocks)
-                   (<= (count blocks) 1000))
-          (state/pub-event! [:plugin/hook-db-tx
-                             {:blocks  blocks
-                              :deleted-block-uuids deleted-block-uuids
-                              :tx-data (:tx-data tx-report)
-                              :tx-meta (:tx-meta tx-report)}]))))
-
-    (when-let [deleting-block-id (:ui/deleting-block @state/state)]
-      (when (some (fn [datom] (and
-                               (= :block/uuid (:a datom))
-                               (= (:v datom) deleting-block-id)
-                               (true? (:added datom)))) tx-data) ; editing-block was added back (could be undo or from remote sync)
-        (state/set-state! :ui/deleting-block nil)))
-
-    (when-let [deferred (ldb/get-deferred-response request-id)]
-      (p/resolve! deferred {:tx-meta tx-meta
-                            :tx-data tx-data})
-      (swap! ldb/*request-id->response dissoc request-id))))
+  (when request-id
+    (let [{:keys [from-disk? new-graph? local-tx? undo? redo?]} tx-meta
+          repo (state/get-current-repo)
+          tx-report {:tx-meta tx-meta
+                     :tx-data tx-data}]
+
+      (let [conn (db/get-db repo false)
+            tx-report (d/transact! conn tx-data tx-meta)]
+        (when local-tx?
+          (let [tx-id (get-tx-id tx-report)]
+            (store-undo-data! (assoc opts :tx-id tx-id))))
+        (when-not (or undo? redo?)
+          (update-current-tx-editor-cursor! tx-report)))
+
+      (let [pages (set (keep #(when (= :block/name (:a %)) (:v %)) tx-data))]
+        (when (seq pages)
+          (mark-pages-as-loaded! repo pages)))
+
+      (when (= (:outliner-op tx-meta) :delete-page)
+        (state/pub-event! [:page/deleted repo (:deleted-page tx-meta) (:file-path tx-meta)]))
+
+      (when (= (:outliner-op tx-meta) :rename-page)
+        (state/pub-event! [:page/renamed repo (:data tx-meta)]))
+
+      (if (or from-disk? new-graph?)
+        (do
+          (react/clear-query-state!)
+          (ui-handler/re-render-root!))
+        (when-not (:graph/importing @state/state)
+          (react/refresh! repo tx-report affected-keys)
+
+          (when-let [state (:ui/restore-cursor-state @state/state)]
+            (when (or undo? redo?)
+              (restore-cursor-and-app-state! state undo?)
+              (state/set-state! :ui/restore-cursor-state nil)))
+
+          (state/set-state! :editor/start-pos nil)
+
+          (when (and state/lsp-enabled?
+                     (seq blocks)
+                     (<= (count blocks) 1000))
+            (state/pub-event! [:plugin/hook-db-tx
+                               {:blocks  blocks
+                                :deleted-block-uuids deleted-block-uuids
+                                :tx-data (:tx-data tx-report)
+                                :tx-meta (:tx-meta tx-report)}]))))
+
+      (when-let [deleting-block-id (:ui/deleting-block @state/state)]
+        (when (some (fn [datom] (and
+                                 (= :block/uuid (:a datom))
+                                 (= (:v datom) deleting-block-id)
+                                 (true? (:added datom)))) tx-data) ; editing-block was added back (could be undo or from remote sync)
+          (state/set-state! :ui/deleting-block nil)))
+
+      (when-let [deferred (ldb/get-deferred-response request-id)]
+        (p/resolve! deferred {:tx-meta tx-meta
+                              :tx-data tx-data})
+        (swap! ldb/*request-id->response dissoc request-id)))))

+ 1 - 1
src/main/frontend/persist_db/browser.cljs

@@ -68,7 +68,7 @@
                        "/static/js/db-worker.js")
           worker (js/Worker. (str worker-url "?electron=" (util/electron?)))
           wrapped-worker (Comlink/wrap worker)]
-      (worker-handler/handle-message! worker)
+      (worker-handler/handle-message! worker wrapped-worker)
       (reset! *worker wrapped-worker)
       (-> (p/let [_ (.init wrapped-worker config/RTC-WS-URL)
                   _ (.sync-app-state wrapped-worker

+ 1 - 1
src/main/frontend/worker/file.cljs

@@ -16,6 +16,7 @@
             [logseq.db.sqlite.util :as sqlite-util]))
 
 (def *writes file/*writes)
+(def dissoc-request! file/dissoc-request!)
 
 (defonce file-writes-chan
   (let [coercer (m/coercer [:catn
@@ -98,7 +99,6 @@
              page-id
              (not (:created-from-journal-template? tx-meta))
              (not (:delete-files? tx-meta)))
-    (swap! *writes conj page-id)
     (async/put! file-writes-chan [repo page-id (:outliner-op tx-meta) (tc/to-long (t/now))])))
 
 (defn <ratelimit-file-writes!

+ 18 - 4
src/main/frontend/worker/file/core.cljs

@@ -2,7 +2,6 @@
   "Save file to disk"
   (:require [clojure.string :as string]
             [frontend.worker.file.util :as wfu]
-            [frontend.worker.state :as worker-state]
             [logseq.graph-parser.property :as gp-property]
             [logseq.common.path :as path]
             [datascript.core :as d]
@@ -10,7 +9,18 @@
             [frontend.worker.date :as worker-date]
             [frontend.worker.util :as util]))
 
-(def *writes (atom #{}))
+(defonce *writes (atom {}))
+(defonce *request-id (atom 0))
+
+(defn conj-page-write!
+  [page-id]
+  (let [request-id (swap! *request-id inc)]
+    (swap! *writes assoc request-id page-id)
+    request-id))
+
+(defn dissoc-request!
+  [request-id]
+  (swap! *writes dissoc request-id))
 
 (defn- indented-block-content
   [content spaces-tabs]
@@ -155,8 +165,12 @@
         (when-not (and (string/blank? new-content) (not blocks-just-deleted?))
           (let [files [[file-path new-content]]]
             (when (seq files)
-              (util/post-message :write-files (pr-str {:repo repo :files files}))
-              (swap! *writes disj (:db/id page-block))))))
+              (let [page-id (:db/id page-block)
+                    request-id (conj-page-write! page-id)]
+                (util/post-message :write-files (pr-str {:request-id request-id
+                                                         :page-id page-id
+                                                         :repo repo
+                                                         :files files})))))))
       ;; In e2e tests, "card" page in db has no :file/path
       (js/console.error "File path from page-block is not valid" page-block tree))))
 

+ 2 - 2
src/main/frontend/worker/pipeline.cljs

@@ -75,6 +75,8 @@
       (if (or from-disk? new-graph?)
         {:tx-report tx-report}
         (let [{:keys [pages blocks]} (ds-report/get-blocks-and-pages tx-report)
+              _ (doseq [page pages]
+                  (file/sync-to-file repo (:db/id page) tx-meta))
               deleted-block-uuids (set (outliner-pipeline/filter-deleted-blocks (:tx-data tx-report)))
               replace-tx (concat
                           ;; block path refs
@@ -104,8 +106,6 @@
               final-tx-report (assoc tx-report' :tx-data full-tx-data)
               affected-query-keys (when-not (:importing? context)
                                     (worker-react/get-affected-queries-keys final-tx-report))]
-          (doseq [page pages]
-            (file/sync-to-file repo (:db/id page) tx-meta))
           {:tx-report final-tx-report
            :affected-keys affected-query-keys
            :deleted-block-uuids deleted-block-uuids