Explorar el Código

fix: sequential writes

Tienson Qin hace 1 año
padre
commit
e79b928c6b

+ 14 - 9
deps/db/src/logseq/db.cljs

@@ -53,24 +53,34 @@
 
 (defonce *request-id (atom 0))
 (defonce requests (async/chan 1000))
-(defonce *request-id->response (atom {}))
+(defonce *unfinished-request-ids (atom #{}))
 
 (defn request-finished?
   "Whether any DB transaction request has been finished"
   []
-  (empty? @*request-id->response))
+  (empty? @*unfinished-request-ids))
 
 (async/go-loop []
   (when-let [{:keys [id request response]} (async/<! requests)]
     (let [result (async/<! (p->c (request)))]
       (p/resolve! response result)
-      (swap! *request-id->response dissoc id))
+      (swap! *unfinished-request-ids disj id))
     (recur)))
 
 (defn get-next-request-id
   []
   (swap! *request-id inc))
 
+(defn add-request!
+  [request-id request-f]
+  (let [resp (p/deferred)
+        new-request {:id request-id
+                     :request request-f
+                     :response resp}]
+    (swap! *unfinished-request-ids conj request-id)
+    (async/go (async/>! requests new-request))
+    resp))
+
 (defn transact!
   "`repo-or-conn`: repo for UI thread and conn for worker/node"
   ([repo-or-conn tx-data]
@@ -93,12 +103,7 @@
                         (assoc :request-id request-id))]
          (if sync?
            (f repo-or-conn tx-data tx-meta')
-           (let [resp (p/deferred)
-                 new-request {:id request-id
-                              :request #(f repo-or-conn tx-data tx-meta')
-                              :response resp}]
-             (async/go (async/>! requests new-request))
-             resp)))))))
+           (add-request! request-id #(f repo-or-conn tx-data tx-meta'))))))))
 
 (defn build-default-pages-tx
   []

+ 1 - 0
src/main/frontend/db.cljs

@@ -66,3 +66,4 @@
    (conn/start! repo option)))
 
 (def new-block-id ldb/new-block-id)
+(def request-finished? ldb/request-finished?)

+ 2 - 2
src/main/frontend/handler/events.cljs

@@ -196,13 +196,13 @@
   (let [^js sqlite @db-browser/*worker]
     (p/let [writes-finished? (when sqlite (.file-writes-finished? sqlite (state/get-current-repo)))
             request-finished? (ldb/request-finished?)]
-      (if (or (not request-finished?) (not writes-finished?)) ; TODO: test (:sync-graph/init? @state/state)
+      (if (not writes-finished?) ; TODO: test (:sync-graph/init? @state/state)
         (do
           (log/info :graph/switch (cond->
                                     {:request-finished? request-finished?
                                      :file-writes-finished? writes-finished?}
                                     (false? request-finished?)
-                                    (assoc :unfinished-requests? @ldb/*request-id->response)))
+                                    (assoc :unfinished-requests? @ldb/*unfinished-request-ids)))
           (notification/show!
             "Please wait seconds until all changes are saved for the current graph."
             :warning))

+ 2 - 1
src/main/frontend/handler/worker.cljs

@@ -45,7 +45,8 @@
   (set! (.-onmessage worker)
         (fn [event]
           (let [data (.-data event)]
-            (when-not (= (.-type data) "RAW")
+            (when-not (or (= (.-type data) "RAW")
+                          (= data "keepAlive"))
               ;; Log thrown exceptions from comlink
               ;; https://github.com/GoogleChromeLabs/comlink/blob/dffe9050f63b1b39f30213adeb1dd4b9ed7d2594/src/comlink.ts#L223-L236
               (if (and (= "HANDLER" (.-type data)) (= "throw" (.-name data)))

+ 24 - 20
src/main/frontend/modules/outliner/ui.cljc

@@ -7,23 +7,27 @@
 (defmacro transact!
   [opts & body]
   `(let [test?# frontend.util/node-test?]
-     (when (nil? @(:history/tx-before-editor-cursor @state/state))
-       (state/set-state! :history/tx-before-editor-cursor (state/get-current-edit-block-and-position)))
-     (let [ops# frontend.modules.outliner.op/*outliner-ops*]
-       (if ops#
-         (do ~@body)                    ; nested transact!
-         (binding [frontend.modules.outliner.op/*outliner-ops* (transient [])]
-           ~@body
-           (let [r# (persistent! frontend.modules.outliner.op/*outliner-ops*)
-                 worker# @state/*db-worker]
-             (if (and test?# (seq r#))
-               (logseq.outliner.op/apply-ops! (state/get-current-repo)
-                                              (db/get-db false)
-                                              r#
-                                              (state/get-date-formatter)
-                                              ~opts)
-               (when (and worker# (seq r#))
-                 (let [request-id# (state/get-worker-next-request-id)]
-                   (.apply-outliner-ops ^Object worker# (state/get-current-repo)
-                                        (pr-str r#)
-                                        (pr-str (assoc ~opts :request-id request-id#))))))))))))
+     (when (or test?# (db/request-finished?))
+       (when (nil? @(:history/tx-before-editor-cursor @state/state))
+         (state/set-state! :history/tx-before-editor-cursor (state/get-current-edit-block-and-position)))
+       (let [ops# frontend.modules.outliner.op/*outliner-ops*]
+         (if ops#
+           (do ~@body)                    ; nested transact!
+           (binding [frontend.modules.outliner.op/*outliner-ops* (transient [])]
+             ~@body
+             (let [r# (persistent! frontend.modules.outliner.op/*outliner-ops*)
+                   worker# @state/*db-worker]
+               (if (and test?# (seq r#))
+                 (logseq.outliner.op/apply-ops! (state/get-current-repo)
+                                                (db/get-db false)
+                                                r#
+                                                (state/get-date-formatter)
+                                                ~opts)
+                 (when (and worker# (seq r#))
+                   (let [request-id# (state/get-worker-next-request-id)
+                         request# #(.apply-outliner-ops ^Object worker# (state/get-current-repo)
+                                                        (pr-str r#)
+                                                        (pr-str (assoc ~opts :request-id request-id#)))
+                         response# (state/add-worker-request! request-id# request#)]
+
+                     response#))))))))))

+ 1 - 0
src/main/frontend/state.cljs

@@ -2376,3 +2376,4 @@ Similar to re-frame subscriptions"
   (update-state! :favorites/updated? inc))
 
 (def get-worker-next-request-id ldb/get-next-request-id)
+(def add-worker-request! ldb/add-request!)

+ 10 - 9
src/main/frontend/worker/rtc/db_listener.cljs

@@ -207,7 +207,7 @@
   [repo conn]
   (d/listen! conn :sync-db
              (fn [{:keys [tx-meta] :as tx-report}]
-               (let [{:keys [pipeline-replace?]} tx-meta
+               (let [{:keys [pipeline-replace? from-disk?]} tx-meta
                      result (worker-pipeline/invoke-hooks repo conn tx-report (worker-state/get-context))
                      tx-report' (or (:tx-report result) tx-report)]
                  (when-not pipeline-replace?
@@ -220,14 +220,15 @@
                                 (dissoc result :tx-report)))]
                      (worker-util/post-message :sync-db-changes data))
 
-                   (p/do!
-                    (let [{:keys [blocks-to-remove-set blocks-to-add]} (search/sync-search-indice repo tx-report')
-                          ^js wo (worker-state/get-worker-object)]
-                      (when wo
-                        (when (seq blocks-to-remove-set)
-                          (.search-delete-blocks wo repo (bean/->js blocks-to-remove-set)))
-                        (when (seq blocks-to-add)
-                          (.search-upsert-blocks wo repo (bean/->js blocks-to-add)))))))))))
+                   (when-not from-disk?
+                     (p/do!
+                      (let [{:keys [blocks-to-remove-set blocks-to-add]} (search/sync-search-indice repo tx-report')
+                            ^js wo (worker-state/get-worker-object)]
+                        (when wo
+                          (when (seq blocks-to-remove-set)
+                            (.search-delete-blocks wo repo (bean/->js blocks-to-remove-set)))
+                          (when (seq blocks-to-add)
+                            (.search-upsert-blocks wo repo (bean/->js blocks-to-add))))))))))))
 
 (defn listen-to-db-changes!
   [repo conn]