Browse Source

fix: remote tx data

Tienson Qin 1 month ago
parent
commit
ceec5d10c6

+ 0 - 1
deps/outliner/src/logseq/outliner/property.cljs

@@ -804,7 +804,6 @@
 
           :else
           (let [tx-data (build-closed-value-tx @conn property resolved-value opts)]
-            (prn :debug :tx-data tx-data)
             (ldb/transact! conn tx-data {:outliner-op :save-block})
             (when (seq description)
               (if-let [desc-ent (and id (:logseq.property/description (d/entity db [:block/uuid id])))]

+ 7 - 8
src/main/frontend/worker/db_sync.cljs

@@ -305,9 +305,9 @@
                                          (or (= :db/retractEntity (first item))
                                              (contains? local-deleted-ids (get-lookup-id (last item))))))
                                keep-last-update)]
-    (when (not= tx-data sanitized-tx-data)
-      (prn :debug :tx-data tx-data)
-      (prn :debug :sanitized-tx-data sanitized-tx-data))
+    ;; (when (not= tx-data sanitized-tx-data)
+    ;;   (prn :debug :tx-data tx-data)
+    ;;   (prn :debug :sanitized-tx-data sanitized-tx-data))
     sanitized-tx-data))
 
 (defn- flush-pending!
@@ -329,7 +329,7 @@
                                    keep-last-update
                                    distinct)]
                   ;; (prn :debug :before-keep-last-update txs)
-                  (prn :debug :upload :tx-data tx-data)
+                  ;; (prn :debug :upload :tx-data tx-data)
                   (when (seq txs)
                     (reset! (:inflight client) tx-ids)
                     (send! ws {:type "tx/batch"
@@ -592,11 +592,10 @@
 (defn- apply-remote-tx!
   [repo client tx-data* & {:keys [local-tx remote-tx]}]
   (if-let [conn (worker-state/get-datascript-conn repo)]
-    (let [tx-data (keep-last-update tx-data*)
+    (let [tx-data (->> tx-data*
+                       (db-normalize/remove-retract-entity-ref @conn)
+                       keep-last-update)
           local-txs (pending-txs repo)
-          _ (prn :debug :repo repo
-                 :pending-txs-count (count local-txs)
-                 :pending-txs (map :tx-id local-txs))
           reversed-tx-data (get-reverse-tx-data local-txs)
           has-local-changes? (seq reversed-tx-data)
           *remote-tx-report (atom nil)

+ 44 - 29
src/test/frontend/worker/db_sync_sim_test.cljs

@@ -1,5 +1,6 @@
 (ns frontend.worker.db-sync-sim-test
   (:require [cljs.test :refer [deftest is testing]]
+            [clojure.data :as data]
             [datascript.core :as d]
             [frontend.worker.db-sync :as db-sync]
             [frontend.worker.handler.page :as worker-page]
@@ -79,15 +80,15 @@
     (worker-page/create! conn base-page-title :uuid base-uuid)))
 
 (defn- create-page! [conn title uuid]
-  (prn :debug :create-page :title title :uuid uuid)
+  ;; (prn :debug :create-page :title title :uuid uuid)
   (worker-page/create! conn title :uuid uuid))
 
 (defn- delete-page! [conn uuid]
-  (prn :debug :delete-page :title (:block/title (d/entity @conn [:block/uuid uuid])) :uuid uuid)
+  ;; (prn :debug :delete-page :title (:block/title (d/entity @conn [:block/uuid uuid])) :uuid uuid)
   (worker-page/delete! conn uuid))
 
 (defn- create-block! [conn parent title uuid]
-  (prn :debug :create-block :parent (:db/id parent) (:block/uuid parent) :parent-title (:block/title parent) :title title :uuid uuid)
+  ;; (prn :debug :create-block :parent (:db/id parent) (:block/uuid parent) :parent-title (:block/title parent) :title title :uuid uuid)
   (outliner-core/insert-blocks! conn
                                 [{:block/title title
                                   :block/uuid uuid}]
@@ -96,20 +97,20 @@
                                  :keep-uuid? true}))
 
 (defn- update-title! [conn uuid new-title]
-  (prn :debug :update-title uuid :from (:block/title (d/entity @conn [:block/uuid uuid])) :to new-title)
-  (d/transact! conn [[:db/add [:block/uuid uuid] :block/title new-title]]))
+  ;; (prn :debug :update-title uuid :from (:block/title (d/entity @conn [:block/uuid uuid])) :to new-title)
+  (ldb/transact! conn [[:db/add [:block/uuid uuid] :block/title new-title]]))
 
 (defn- move-block! [conn block parent]
   (let [block (d/entity @conn [:block/uuid (:block/uuid block)])
         parent (d/entity @conn [:block/uuid (:block/uuid parent)])]
     (when (and block parent)
-      (prn :debug :move (:db/id block) (:block/uuid block) (:block/title block)
-           :to (:db/id parent) (:block/uuid parent) (:block/title parent))
+      ;; (prn :debug :move (:db/id block) (:block/uuid block) (:block/title block)
+      ;;      :to (:db/id parent) (:block/uuid parent) (:block/title parent))
       (outliner-core/move-blocks! conn [block] parent {:sibling? false}))))
 
 (defn- delete-block! [conn uuid]
   (when-let [block (d/entity @conn [:block/uuid uuid])]
-    (prn :debug :delete-block! (:db/id block) (:block/uuid block) (:block/title block))
+    ;; (prn :debug :delete-block! (:db/id block) (:block/uuid block) (:block/title block))
     (outliner-core/delete-blocks! conn [block] {})))
 
 (defn- existing-entities
@@ -125,19 +126,25 @@
        (remove page?)))
 
 (defn- make-server []
-  (atom {:t 0 :txs []}))
+  (atom {:t 0
+         :txs []
+         :conn (db-test/create-conn)}))
 
 (defn- server-pull [server since]
   (let [{:keys [txs]} @server]
-    (filter (fn [{:keys [t]}] (> t since)) txs)))
+    (->> (filter (fn [{:keys [t]}] (> t since)) txs)
+         (mapcat :tx))))
 
 (defn- server-upload! [server t-before tx-data]
   (swap! server
-         (fn [{:keys [t txs] :as state}]
+         (fn [{:keys [t txs conn] :as state}]
            (if (not= t t-before)
              state
-             (let [next-t (inc t)]
-               (assoc state :t next-t :txs (conj txs {:t next-t :tx tx-data})))))))
+             (let [{:keys [db-before db-after tx-data]} (ldb/transact! conn tx-data)
+                   normalized-data (->> tx-data
+                                        (db-normalize/normalize-tx-data db-after db-before))
+                   next-t (inc t)]
+               (assoc state :t next-t :txs (conj txs {:t next-t :tx normalized-data})))))))
 
 (defn- build-upload-tx [conn pending]
   (let [txs (mapcat :tx pending)]
@@ -152,12 +159,11 @@
     (let [progress? (atom false)
           local-tx (or (client-op/get-local-tx repo) 0)
           server-t (:t @server)]
-      (prn :debug :repo repo :local-tx local-tx :server-t server-t)
+      ;; (prn :debug :repo repo :local-tx local-tx :server-t server-t)
       (when (< local-tx server-t)
-        (let [tx (->> (server-pull server local-tx)
-                      (mapcat :tx))]
-          (prn :debug :apply-remote-tx :repo repo
-               :tx tx)
+        (let [tx (server-pull server local-tx)]
+          ;; (prn :debug :apply-remote-tx :repo repo
+          ;;      :tx tx)
           (#'db-sync/apply-remote-tx! repo client tx)
           (client-op/update-local-tx repo server-t)
           (reset! progress? true)))
@@ -167,7 +173,7 @@
         (when (and (seq pending) (= local-tx' server-t'))
           (let [tx-data (build-upload-tx conn pending)
                 tx-ids (mapv :tx-id pending)]
-            (prn :debug :upload :repo repo :tx-data tx-data)
+            ;; (prn :debug :upload :repo repo :tx-data tx-data)
             (when (seq tx-data)
               (server-upload! server local-tx' tx-data)
               (#'db-sync/remove-pending-txs! repo tx-ids)
@@ -357,7 +363,7 @@
 (defn- run-ops! [rng {:keys [conn base-uuid state client]} steps history & {:keys [pick-op-opts]}]
   (dotimes [_ steps]
     (let [{:keys [f name]} (pick-op rng pick-op-opts)
-          _ (prn :debug :client (:repo client) :name name)
+          ;; _ (prn :debug :client (:repo client) :name name)
           result (case name
                    :create-page (f rng conn state)
                    :delete-page (f rng conn base-uuid state)
@@ -371,6 +377,7 @@
 
 (deftest two-clients-online-offline-sim-test
   (testing "db-sync convergence with online/offline client and random ops"
+    (prn :debug "run two-clients-online-offline-sim-test")
     (let [seed (or (env-seed) default-seed)
           rng (make-rng seed)
           base-uuid (random-uuid)
@@ -415,7 +422,9 @@
               (is (seq attrs-a)
                   (str "db empty seed=" seed " history=" (count @history))))))))))
 
+(defonce op-runs 500)
 (deftest three-clients-single-repo-sim-test
+  (prn :debug "run three-clients-single-repo-sim-test")
   (testing "db-sync convergence with three clients sharing one repo"
     (let [seed (or (env-seed) default-seed)
           rng (make-rng seed)
@@ -451,13 +460,11 @@
           (let [clients [{:repo repo-a :conn conn-a :client client-a :online? true}
                          {:repo repo-b :conn conn-b :client client-b :online? true}
                          {:repo repo-c :conn conn-c :client client-c :online? true}]
-                ;; FIXME:
-                run-ops-opts {:pick-op-opts {:disable-ops #{:move-block}}}
-                ;; run-ops-opts {}
-                ]
+                ;; run-ops-opts {:pick-op-opts {:disable-ops #{:move-block}}}
+                run-ops-opts {}]
             (prn :debug :phase-a)
             ;; Phase A: all online
-            (dotimes [_ 60]
+            (dotimes [_ op-runs]
               (let [client (rand-nth! rng clients)
                     state (get repo->state (:repo client))]
                 (run-ops! rng (assoc client :base-uuid base-uuid :state state) 1 history run-ops-opts)
@@ -468,12 +475,12 @@
             (let [clients-phase-b [{:repo repo-a :conn conn-a :client client-a :online? true}
                                    {:repo repo-b :conn conn-b :client client-b :online? true}
                                    {:repo repo-c :conn conn-c :client client-c :online? false}]]
-              (dotimes [_ 100]
+              (dotimes [_ op-runs]
                 (let [client (rand-nth! rng (subvec (vec clients-phase-b) 0 2))
                       state (get repo->state (:repo client))]
                   (run-ops! rng (assoc client :base-uuid base-uuid :state state) 1 history run-ops-opts)
                   (sync-loop! server clients-phase-b)))
-              (dotimes [_ 100]
+              (dotimes [_ op-runs]
                 (run-ops! rng {:client client-c :conn conn-c :base-uuid base-uuid :state state-c} 1 history run-ops-opts)))
 
             ;; Phase C: reconnect C
@@ -485,12 +492,12 @@
             (let [clients-phase-d [{:repo repo-a :conn conn-a :client client-a :online? false}
                                    {:repo repo-b :conn conn-b :client client-b :online? true}
                                    {:repo repo-c :conn conn-c :client client-c :online? true}]]
-              (dotimes [_ 100]
+              (dotimes [_ op-runs]
                 (let [client (rand-nth! rng (subvec (vec clients-phase-d) 1 3))
                       state (get repo->state (:repo client))]
                   (run-ops! rng (assoc client :base-uuid base-uuid :state state) 1 history run-ops-opts)
                   (sync-loop! server clients-phase-d)))
-              (dotimes [_ 100]
+              (dotimes [_ op-runs]
                 (run-ops! rng {:conn conn-a :base-uuid base-uuid :state state-a} 1 history run-ops-opts)))
 
             ;; Final sync
@@ -507,6 +514,14 @@
             (let [attrs-a (block-attr-map @conn-a)
                   attrs-b (block-attr-map @conn-b)
                   attrs-c (block-attr-map @conn-c)]
+              (when-not (= attrs-a attrs-b)
+                (let [[a b] (take 2 (data/diff attrs-a attrs-b))]
+                  (prn :debug :diff :attrs-a a
+                       :attrs-b b)))
+              (when-not (= attrs-a attrs-c)
+                (let [[a c] (take 2 (data/diff attrs-a attrs-c))]
+                  (prn :debug :diff :attrs-a a
+                       :attrs-c c)))
               (is (= attrs-a attrs-b)
                   (str "db mismatch A/B seed=" seed
                        " a=" (count attrs-a)