소스 검색

fix: parent missing

Tienson Qin 1 주 전
부모
커밋
63357c43ba

+ 2 - 1
deps/db-sync/deps.edn

@@ -11,7 +11,8 @@
   com.lambdaisland/glogi                {:git/url "https://github.com/lambdaisland/glogi"
                                          :git/sha "30328a045141717aadbbb693465aed55f0904976"}
   logseq/common                         {:local/root "../common"}
-  logseq/db                             {:local/root "../db"}}
+  logseq/db                             {:local/root "../db"}
+  logseq/outliner                       {:local/root "../outliner"}}
  :aliases
  {:cljs {:extra-deps {org.clojure/tools.namespace      {:mvn/version "0.2.11"}
                       cider/cider-nrepl                {:mvn/version "0.55.1"}

+ 56 - 0
deps/db-sync/src/logseq/db_sync/parent_missing.cljs

@@ -0,0 +1,56 @@
+(ns logseq.db-sync.parent-missing
+  (:require [datascript.core :as d]
+            [logseq.common.config :as common-config]
+            [logseq.db :as ldb]
+            [logseq.db.sqlite.create-graph :as sqlite-create-graph]
+            [logseq.db.sqlite.util :as sqlite-util]
+            [logseq.outliner.core :as outliner-core]
+            [logseq.outliner.transaction :as outliner-tx]))
+
+(defn- ensure-recycle-page!
+  [conn]
+  (let [db @conn]
+    (or (ldb/get-built-in-page db common-config/recycle-page-name)
+        (let [page (-> (sqlite-util/build-new-page common-config/recycle-page-name)
+                       sqlite-create-graph/mark-block-as-built-in)
+              {:keys [db-after]} (ldb/transact! conn [page] {:db-sync/recycle-page? true
+                                                             :outliner-op :create-page
+                                                             :rtc/fix? true})]
+          (d/entity db-after [:block/uuid (:block/uuid page)])))))
+
+(defn get-missing-parent-datoms
+  [{:keys [db-after tx-data]}]
+  (->> tx-data
+       ;; block still exists while its parent has been gone
+       (filter (fn [d]
+                 (and (= :block/parent (:a d))
+                      (nil? (d/entity db-after (:v d)))
+                      (let [block (d/entity db-after (:e d))]
+                        (and (some? block)
+                             (not (ldb/page? block)))))))))
+
+(defn move-blocks-to-recycle!
+  [conn blocks]
+  (let [recycle-page (ensure-recycle-page! conn)]
+    (outliner-tx/transact!
+     {:persist-op? true
+      :gen-undo-ops? false
+      :outliner-op :fix-missing-parent
+      :rtc/fix? true
+      :transact-opts {:conn conn}}
+     (outliner-core/move-blocks! conn blocks recycle-page {:sibling? false}))))
+
+(defn fix-parent-missing!
+  [conn tx-report]
+  (let [tx-data (:tx-data tx-report)]
+    (if-let [missing-parent-datoms (seq (get-missing-parent-datoms tx-report))]
+      (let [blocks (map (fn [d]
+                          (d/entity (:db-after tx-report) (:e d)))
+                        missing-parent-datoms)
+            block-ids (set (map :db/id blocks))]
+        (move-blocks-to-recycle! conn blocks)
+        (remove
+         (fn [d]
+           (and (contains? block-ids (:e d)) (= (:a d) :block/parent) (false? (:added d))))
+         tx-data))
+      tx-data)))

+ 22 - 4
deps/db-sync/src/logseq/db_sync/storage.cljs

@@ -1,10 +1,12 @@
 (ns logseq.db-sync.storage
   (:require [cljs-bean.core :as bean]
             [clojure.string :as string]
+            [datascrip.core :as d]
             [datascript.storage :refer [IStorage]]
+            [logseq.db-sync.common :as common]
+            [logseq.db.common.normalize :as db-normalize]
             [logseq.db.common.sqlite :as common-sqlite]
-            [logseq.db.frontend.schema :as db-schema]
-            [logseq.db-sync.common :as common]))
+            [logseq.db.frontend.schema :as db-schema]))
 
 (defn init-schema! [sql]
   (common/sql-exec sql "create table if not exists kvs (addr INTEGER primary key, content TEXT, addresses JSON)")
@@ -109,8 +111,24 @@
     (-restore [_ addr]
       (restore-data-from-addr sql addr))))
 
+(defn- append-tx-for-tx-report
+  [sql {:keys [db-after db-before tx-data]}]
+  (let [new-t (next-t! sql)
+        created-at (common/now-ms)
+        normalized-data (db-normalize/normalize-tx-data db-after db-before tx-data)
+        tx-str (common/write-transit normalized-data)]
+    (append-tx! sql new-t tx-str created-at)))
+
+(defn- listen-db-updates!
+  [sql conn]
+  (d/listen conn ::listen-db-updates
+            (fn [tx-report]
+              (append-tx-for-tx-report sql tx-report))))
+
 (defn open-conn [sql]
   (init-schema! sql)
   (let [storage (new-sqlite-storage sql)
-        schema db-schema/schema]
-    (common-sqlite/get-storage-conn storage schema)))
+        schema db-schema/schema
+        conn (common-sqlite/get-storage-conn storage schema)]
+    (listen-db-updates! sql conn)
+    conn))

+ 23 - 22
deps/db-sync/src/logseq/db_sync/worker.cljs

@@ -9,6 +9,7 @@
             [logseq.db-sync.common :as common :refer [cors-headers]]
             [logseq.db-sync.cycle :as cycle]
             [logseq.db-sync.malli-schema :as db-sync-schema]
+            [logseq.db-sync.parent-missing :as db-sync-parent-missing]
             [logseq.db-sync.protocol :as protocol]
             [logseq.db-sync.storage :as storage]
             [logseq.db-sync.worker-core :as worker-core]
@@ -341,10 +342,14 @@
             {:attr attr
              :server_values server-values})}))
 
-(defn- fix-tx-data
-  [db tx-data]
-  (->> tx-data
-       (worker-core/fix-duplicate-orders db)))
+(defn- missing-parent-reject-response [db datoms]
+  (log/info :db-sync/missing-parent-reject
+            {:datoms datoms})
+  {:type "tx/reject"
+   :reason "missing-parent"
+   :data (common/write-transit
+          {:eids (map (fn [d] (let [block (d/entity db (:e d))]
+                                [:block/uuid (:block/uuid block)])) datoms)})})
 
 (defn- apply-tx! [^js self sender txs]
   (let [sql (.-sql self)
@@ -355,24 +360,20 @@
           db @conn
           tx-report (d/with db tx-data)
           db' (:db-after tx-report)
-          order-fixed (fix-tx-data db' tx-data)
-          cycle-info (cycle/detect-cycle db' order-fixed)]
-      (if cycle-info
-        (do
-          (log/info :db-sync/cycle-detected
-                    {:attr (:attr cycle-info)
-                     :entity (:entity cycle-info)
-                     :entity-title (entity-title db (:entity cycle-info))
-                     :tx-count (count order-fixed)})
-          (cycle-reject-response db order-fixed cycle-info))
-        (let [{:keys [tx-data db-before db-after]} (ldb/transact! conn order-fixed)
-              normalized-data (db-normalize/normalize-tx-data db-after db-before tx-data)
-              new-t (storage/next-t! sql)
-              created-at (common/now-ms)
-              tx-str (common/write-transit normalized-data)]
-          (storage/append-tx! sql new-t tx-str created-at)
-          (broadcast! self sender {:type "changed" :t new-t})
-          new-t)))))
+          missing-parent-datoms (db-sync-parent-missing/get-missing-parent-datoms tx-report)]
+      (if (seq missing-parent-datoms)
+        (missing-parent-reject-response db' missing-parent-datoms)
+        ;; TODO: move fix order to client to keep worker thin
+        (let [order-fixed (worker-core/fix-duplicate-orders db' tx-data)
+              cycle-info (cycle/detect-cycle db' order-fixed)]
+          (if cycle-info
+            (cycle-reject-response db order-fixed cycle-info)
+            (do
+              (ldb/transact! conn order-fixed)
+              (let [new-t (storage/get-t sql)]
+              ;; FIXME: no need to broadcast if client tx is less than remote tx
+                (broadcast! self sender {:type "changed" :t new-t})
+                new-t))))))))
 
 (defn- handle-tx-batch! [^js self sender txs t-before]
   (let [current-t (t-now self)]

+ 85 - 97
src/main/frontend/worker/db_sync.cljs

@@ -5,19 +5,20 @@
             [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.state :as worker-state]
             [lambdaisland.glogi :as log]
-            [logseq.common.config :as common-config]
             [logseq.common.path :as path]
             [logseq.common.util :as common-util]
             [logseq.db :as ldb]
             [logseq.db-sync.cycle :as db-sync-cycle]
             [logseq.db-sync.malli-schema :as db-sync-schema]
+            [logseq.db-sync.parent-missing :as db-sync-parent-missing]
             [logseq.db.common.normalize :as db-normalize]
-            [logseq.db.sqlite.create-graph :as sqlite-create-graph]
             [logseq.db.sqlite.util :as sqlite-util]
             [logseq.outliner.core :as outliner-core]
             [logseq.outliner.transaction :as outliner-tx]
             [promesa.core :as p]))
 
+(defonce *repo->latest-remote-tx (atom {}))
+
 (defn- enabled?
   []
   (true? (:enabled? @worker-state/*db-sync-config)))
@@ -231,46 +232,6 @@
                    (some-> (:block/uuid ent) str)))))
        (distinct)))
 
-(defn- ensure-recycle-page!
-  [conn]
-  (let [db @conn]
-    (or (ldb/get-built-in-page db common-config/recycle-page-name)
-        (let [page (-> (sqlite-util/build-new-page common-config/recycle-page-name)
-                       sqlite-create-graph/mark-block-as-built-in)
-              {:keys [db-after]} (ldb/transact! conn [page] {:db-sync/recycle-page? true
-                                                             :outliner-op :create-page})]
-          (d/entity db-after [:block/uuid (:block/uuid page)])))))
-
-(defn- get-missing-parent-datoms
-  [{:keys [db-after tx-data]}]
-  (->> tx-data
-       ;; block still exists while its parent has been gone
-       (filter (fn [d]
-                 (and (= :block/parent (:a d))
-                      (nil? (d/entity db-after (:v d)))
-                      (some? (d/entity db-after (:e d))))))))
-
-(defn- move-missing-parents
-  [conn tx-report]
-  (let [tx-data (:tx-data tx-report)]
-    (if-let [missing-parent-datoms (seq (get-missing-parent-datoms tx-report))]
-      (let [blocks (map (fn [d]
-                          (d/entity (:db-after tx-report) (:e d)))
-                        missing-parent-datoms)
-            block-ids (set (map :db/id blocks))
-            recycle-page (ensure-recycle-page! conn)
-            _ (outliner-tx/transact!
-               {:persist-op? true
-                :gen-undo-ops? false
-                :outliner-op :fix-missing-parent
-                :transact-opts {:conn conn}}
-               (outliner-core/move-blocks! conn blocks recycle-page {:sibling? false}))]
-        (remove
-         (fn [d]
-           (and (contains? block-ids (:e d)) (= (:a d) :block/parent) (false? (:added d))))
-         tx-data))
-      tx-data)))
-
 (defn- reparent-cycle-block!
   [conn block]
   (when-let [page (:block/page block)]
@@ -370,28 +331,30 @@
 (defn- client-ops-conn [repo]
   (worker-state/get-client-ops-conn repo))
 
-(defn- persist-local-tx! [repo tx-str]
+(defn- persist-local-tx! [repo tx-str tx-meta]
   (when-let [conn (client-ops-conn repo)]
     (let [tx-id (random-uuid)
           now (.now js/Date)]
       (ldb/transact! conn [{:db-sync/tx-id tx-id
                             :db-sync/tx tx-str
-                            :db-sync/created-at now}])
+                            :db-sync/created-at now
+                            :db-sync/fix? (:rtc/fix? tx-meta)}])
       tx-id)))
 
 (defn- pending-txs
   [repo limit]
   (when-let [conn (client-ops-conn repo)]
     (let [db @conn
-          datoms (d/datoms db :avet :db-sync/created-at)]
-      (->> datoms
+          datoms (take limit (d/datoms db :avet :db-sync/created-at))
+          fixs (d/datoms db :avet :db-sync/fix? true)
+          full-datoms (distinct (concat datoms fixs))]
+      (->> full-datoms
            (map (fn [datom]
                   (d/entity db (:e datom))))
            (keep (fn [ent]
-                   (when-let [tx-id (:db-sync/tx-id ent)]
+                   (let [tx-id (:db-sync/tx-id ent)]
                      {:tx-id tx-id
                       :tx (:db-sync/tx ent)})))
-           (take limit)
            (vec)))))
 
 (defn- remove-pending-txs!
@@ -415,22 +378,25 @@
 
 (defn- flush-pending!
   [repo client]
-  (let [inflight @(:inflight client)]
-    (when (empty? inflight)
-      (when-let [ws (:ws client)]
-        (when (ws-open? ws)
-          (let [batch (pending-txs repo 50)]
-            (when (seq batch)
-              (let [tx-ids (mapv :tx-id batch)
-                    txs (mapv :tx batch)
-                    tx-data (->> txs
-                                 (mapcat sqlite-util/read-transit-str)
-                                 keep-last-parent-update)]
-                (when (seq txs)
-                  (reset! (:inflight client) tx-ids)
-                  (send! ws {:type "tx/batch"
-                             :t_before (or (client-op/get-local-tx repo) 0)
-                             :txs (sqlite-util/write-transit-str tx-data)}))))))))))
+  (let [inflight @(:inflight client)
+        local-tx (or (client-op/get-local-tx repo) 0)
+        remote-tx 0]
+    (when (= local-tx remote-tx)        ; rebase
+      (when (empty? inflight)
+        (when-let [ws (:ws client)]
+          (when (ws-open? ws)
+            (let [batch (pending-txs repo 50)]
+              (when (seq batch)
+                (let [tx-ids (mapv :tx-id batch)
+                      txs (mapv :tx batch)
+                      tx-data (->> txs
+                                   (mapcat sqlite-util/read-transit-str)
+                                   keep-last-parent-update)]
+                  (when (seq txs)
+                    (reset! (:inflight client) tx-ids)
+                    (send! ws {:type "tx/batch"
+                               :t_before (or (client-op/get-local-tx repo) 0)
+                               :txs (sqlite-util/write-transit-str tx-data)})))))))))))
 
 (defn- pending-txs-by-ids
   [repo tx-ids]
@@ -466,7 +432,7 @@
                   filtered (strip-cycle-attrs db tx-data {:attr attr :entity-refs entity-refs})]
               (when (seq filtered)
                 (vswap! requeued inc)
-                (persist-local-tx! repo (sqlite-util/write-transit-str filtered))))))
+                (persist-local-tx! repo (sqlite-util/write-transit-str filtered) {:rtc/fix? true})))))
         (log/info :db-sync/requeue-non-parent-txs
                   {:repo repo
                    :entries (count entries)
@@ -714,10 +680,11 @@
   (if-let [conn (worker-state/get-datascript-conn repo)]
     (try
       (let [report' (d/with @conn tx-data)
-            fix-tx-data (move-missing-parents conn report')
+            fix-tx-data (db-sync-parent-missing/fix-parent-missing! conn report')
             tx-report (ldb/transact! conn fix-tx-data {:rtc-tx? true})
             db-after (:db-after tx-report)
             asset-uuids (asset-uuids-from-tx db-after (:tx-data tx-report))]
+        ;; FIXME: cycle should be fixed before `ldb/transact!`
         (fix-cycle-after-remote-tx! conn db-after tx-data)
         (when (seq asset-uuids)
           (enqueue-asset-downloads! repo client asset-uuids)))
@@ -725,12 +692,55 @@
         (log/error :db-sync/apply-remote-tx-failed {:error e})))
     (fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))
 
+(defn- fix-cycle!
+  [repo client message local-tx remote-tx]
+  (when (nil? (:data message))
+    (fail-fast :db-sync/missing-field
+               {:repo repo :type "tx/reject" :field :data}))
+  (let [{:keys [attr server_values]}
+        (parse-transit (:data message) {:repo repo :type "tx/reject"})]
+    (when (nil? attr)
+      (fail-fast :db-sync/missing-field
+                 {:repo repo :type "tx/reject" :field :attr}))
+    (when (nil? server_values)
+      (fail-fast :db-sync/missing-field
+                 {:repo repo :type "tx/reject" :field :server_values}))
+    ;; FIXME: fix cycle shouldn't re-trigger uploading
+    (let [inflight-ids @(:inflight client)
+          inflight-entries (pending-txs-by-ids repo inflight-ids)]
+      (log/info :db-sync/tx-reject-cycle
+                {:repo repo
+                 :attr attr
+                 :server-values (count server_values)
+                 :entity-titles (cycle-entity-titles repo server_values)
+                 :inflight-ids (count inflight-ids)
+                 :local-tx local-tx
+                 :remote-tx remote-tx})
+      (reconcile-cycle! repo attr server_values)
+      (remove-pending-txs! repo inflight-ids)
+      (reset! (:inflight client) [])
+      (requeue-non-parent-txs! repo attr server_values inflight-entries))
+    (flush-pending! repo client)))
+
+(defn- fix-parent-missing!
+  [repo client message]
+  (if-let [conn (worker-state/get-datascript-conn repo)]
+    (let [db @conn
+          {:keys [eids]} (parse-transit (:data message) {:repo repo :type "tx/reject"})
+          blocks (keep #(d/entity db %) eids)]
+      (when (seq blocks)
+        (db-sync-parent-missing/move-blocks-to-recycle! conn blocks))
+      (flush-pending! repo client))
+    (fail-fast :db-sync/missing-db {:repo repo :op :reconcile-cycle})))
+
 (defn- handle-message! [repo client raw]
   (let [message (-> raw parse-message coerce-ws-server-message)]
     (when-not (map? message)
       (fail-fast :db-sync/response-parse-failed {:repo repo :raw raw}))
     (let [local-tx (or (client-op/get-local-tx repo) 0)
           remote-tx (:t message)]
+      (when remote-tx (swap! *repo->latest-remote-tx assoc repo remote-tx))
+
       (case (:type message)
         "hello" (do
                   (require-non-negative remote-tx {:repo repo :type "hello"})
@@ -772,34 +782,10 @@
                         "stale"
                         (send! (:ws client) {:type "pull" :since local-tx})
                         "cycle"
-                        (do
-                          (when (nil? (:data message))
-                            (fail-fast :db-sync/missing-field
-                                       {:repo repo :type "tx/reject" :field :data}))
-                          (let [{:keys [attr server_values]}
-                                (parse-transit (:data message) {:repo repo :type "tx/reject"})]
-                            (when (nil? attr)
-                              (fail-fast :db-sync/missing-field
-                                         {:repo repo :type "tx/reject" :field :attr}))
-                            (when (nil? server_values)
-                              (fail-fast :db-sync/missing-field
-                                         {:repo repo :type "tx/reject" :field :server_values}))
-                            ;; FIXME: fix cycle shouldn't re-trigger uploading
-                            (let [inflight-ids @(:inflight client)
-                                  inflight-entries (pending-txs-by-ids repo inflight-ids)]
-                              (log/info :db-sync/tx-reject-cycle
-                                        {:repo repo
-                                         :attr attr
-                                         :server-values (count server_values)
-                                         :entity-titles (cycle-entity-titles repo server_values)
-                                         :inflight-ids (count inflight-ids)
-                                         :local-tx local-tx
-                                         :remote-tx remote-tx})
-                              (reconcile-cycle! repo attr server_values)
-                              (remove-pending-txs! repo inflight-ids)
-                              (reset! (:inflight client) [])
-                              (requeue-non-parent-txs! repo attr server_values inflight-entries))
-                            (flush-pending! repo client)))
+                        (fix-cycle! repo client message local-tx remote-tx)
+                        "parent-missing"
+                        (fix-parent-missing! repo client message)
+
                         (fail-fast :db-sync/invalid-field
                                    {:repo repo :type "tx/reject" :reason reason})))
         (fail-fast :db-sync/invalid-field
@@ -900,7 +886,7 @@
            (p/resolved nil)))))))
 
 (defn enqueue-local-tx!
-  [repo {:keys [tx-data db-after db-before]}]
+  [repo {:keys [tx-meta tx-data db-after db-before]}]
   (let [conn (worker-state/get-datascript-conn repo)
         db (some-> conn deref)
         ;; FIXME: all ignored properties
@@ -908,7 +894,7 @@
     (when (and db (seq tx-data'))
       (let [normalized (normalize-tx-data db-after db-before tx-data')
             tx-str (sqlite-util/write-transit-str normalized)]
-        (persist-local-tx! repo tx-str)
+        (persist-local-tx! repo tx-str tx-meta)
         (when-let [client @worker-state/*db-sync-client]
           (when (= repo (:repo client))
             (let [send-queue (:send-queue client)]
@@ -924,7 +910,9 @@
 
 (defn handle-local-tx!
   [repo {:keys [tx-data tx-meta] :as tx-report}]
-  (when (and (enabled?) (seq tx-data) (not (:rtc-tx? tx-meta)))
+  (when (and (enabled?) (seq tx-data)
+             (not (:rtc-tx? tx-meta))
+             (:persist-op? tx-meta true))
     (enqueue-local-tx! repo tx-report)
     (when-let [client @worker-state/*db-sync-client]
       (when (= repo (:repo client))

+ 2 - 1
src/main/frontend/worker/rtc/client_op.cljs

@@ -99,7 +99,8 @@
    :local-tx {:db/index true}
    :graph-uuid {:db/index true}
    :db-sync/tx-id {:db/unique :db.unique/identity}
-   :db-sync/created-at {:db/index true}})
+   :db-sync/created-at {:db/index true}
+   :db-sync/fix? {:db/index true}})
 
 (defn update-graph-uuid
   [repo graph-uuid]