Browse Source

Updated apply-remote-tx! to keep one-shot batch apply while

preserving correctness.

Changes in sync.cljs (src/main/frontend/worker/sync.cljs):

- Added batch flatten pipeline that:
- remaps per-batch tempids to unique ids,
- preserves tx-id position remapping,
- tracks newly created identities (:block/uuid / :db/ident),
- rewrites cross-batch lookup refs to tempids for :db/add so later adds in
the same merged transact can target newly-created entities.
- apply-remote-tx! batched branch now calls the new flattener once (no doseq
per batch).
- Kept rebase safety in local-changes path (conflict filtering + remote-
duplicate cleanup after sanitize).
- Fixed negative temp-id handling in canonical-entity-id to avoid d/entity
errors.
Tienson Qin 1 week ago
parent
commit
ba5e83c045
1 changed files with 151 additions and 7 deletions
  1. 151 7
      src/main/frontend/worker/sync.cljs

+ 151 - 7
src/main/frontend/worker/sync.cljs

@@ -500,6 +500,137 @@
              (= :block/uuid (first x)))
     (second x)))
 
+(defn- canonical-entity-id
+  [db e]
+  (cond
+    (vector? e) (or (get-lookup-id e) e)
+    (and (number? e) (not (neg? e))) (or (:block/uuid (d/entity db e)) e)
+    :else e))
+
+(defn- remote-updated-attr-keys
+  [db tx-data]
+  (->> tx-data
+       (keep (fn [item]
+               (when (and (vector? item)
+                          (>= (count item) 4)
+                          (contains? #{:db/add :db/retract} (first item)))
+                 [(canonical-entity-id db (second item))
+                  (nth item 2)])))
+       set))
+
+(defn- drop-remote-conflicted-local-tx
+  [db remote-updated-keys tx-data]
+  (if (seq remote-updated-keys)
+    (remove (fn [item]
+              (and (vector? item)
+                   (>= (count item) 4)
+                   (contains? #{:db/add :db/retract} (first item))
+                   (contains? remote-updated-keys
+                              [(canonical-entity-id db (second item))
+                               (nth item 2)])))
+            tx-data)
+    tx-data))
+
+(defn- remote-temp-id?
+  [x]
+  (or (and (integer? x) (neg? x))
+      (string? x)))
+
+(defn- remap-remote-batch-temp-ids
+  [batch-index tx-data]
+  (let [ops #{:db/add :db/retract :db/retractEntity}
+        entity-temp-ids (->> tx-data
+                             (keep (fn [item]
+                                     (when (and (vector? item)
+                                                (>= (count item) 2)
+                                                (contains? ops (first item))
+                                                (remote-temp-id? (second item)))
+                                       (second item))))
+                             distinct)
+        temp-id-map (when (seq entity-temp-ids)
+                      (zipmap entity-temp-ids
+                              (map-indexed (fn [idx _]
+                                             (str "remote-batch-" batch-index "-tempid-" idx))
+                                           entity-temp-ids)))]
+    (if (seq temp-id-map)
+      (mapv (fn [item]
+              (if (and (vector? item)
+                       (>= (count item) 2)
+                       (contains? ops (first item)))
+                (let [entity (second item)
+                      item' (if-let [entity' (get temp-id-map entity)]
+                              (assoc item 1 entity')
+                              item)]
+                  (cond-> item'
+                    (>= (count item') 4)
+                    (#(if-let [value' (get temp-id-map (nth % 3))]
+                        (assoc % 3 value')
+                        %))
+
+                    (>= (count item') 5)
+                    (#(if-let [tx' (get temp-id-map (nth % 4))]
+                        (assoc % 4 tx')
+                        %))))
+                item))
+            tx-data)
+      tx-data)))
+
+(defn- lookup-ref?
+  [x]
+  (and (vector? x)
+       (= 2 (count x))
+       (keyword? (first x))))
+
+(defn- created-lookup->temp-id
+  [tx-data]
+  (->> tx-data
+       (keep (fn [item]
+               (when (and (vector? item)
+                          (= :db/add (first item))
+                          (>= (count item) 4)
+                          (contains? #{:block/uuid :db/ident} (nth item 2))
+                          (remote-temp-id? (second item)))
+                 [[(nth item 2) (nth item 3)]
+                  (second item)])))
+       (into {})))
+
+(defn- resolve-lookup-refs
+  [lookup->temp-id tx-data]
+  (if (seq lookup->temp-id)
+    (mapv (fn [item]
+            (if (and (vector? item)
+                     (>= (count item) 2)
+                     (= :db/add (first item)))
+              (let [entity (second item)
+                    item' (if-let [entity' (and (lookup-ref? entity)
+                                                (get lookup->temp-id entity))]
+                            (assoc item 1 entity')
+                            item)]
+                (if (>= (count item') 4)
+                  (let [value (nth item' 3)]
+                    (if-let [value' (and (lookup-ref? value)
+                                         (get lookup->temp-id value))]
+                      (assoc item' 3 value')
+                      item'))
+                  item'))
+              item))
+          tx-data)
+    tx-data))
+
+(defn- flatten-batched-remote-tx-data
+  [tx-data*]
+  (loop [remaining (map-indexed vector tx-data*)
+         lookup->temp-id {}
+         acc []]
+    (if-let [[batch-index tx-data] (first remaining)]
+      (let [remapped-batch (remap-remote-batch-temp-ids batch-index tx-data)
+            lookup->temp-id (merge lookup->temp-id (created-lookup->temp-id remapped-batch))
+            resolved-batch (resolve-lookup-refs lookup->temp-id remapped-batch)]
+        (recur (rest remaining)
+               lookup->temp-id
+               (into acc resolved-batch)))
+      acc)))
+
 (defn- batched-remote-tx-data?
   [tx-data*]
   (and (seq tx-data*)
@@ -1035,11 +1166,25 @@
              ;; 3. rebase pending local txs
              rebase-tx-report (when (seq local-txs)
                                 (let [pending-tx-data (mapcat :tx local-txs)
-                                      rebased-tx-data (sanitize-tx-data
-                                                       (or (:db-after remote-tx-report)
-                                                           (:db-after reversed-tx-report))
-                                                       pending-tx-data
-                                                       (set (map :block/uuid local-deleted-blocks)))]
+                                      remote-db (or (:db-after remote-tx-report)
+                                                    (:db-after reversed-tx-report))
+                                      remote-updated-keys (remote-updated-attr-keys remote-db safe-remote-tx-data)
+                                      remote-tx-data-set (->> safe-remote-tx-data
+                                                              (map (fn [item]
+                                                                     (if (and (vector? item)
+                                                                              (= 5 (count item)))
+                                                                       (vec (butlast item))
+                                                                       item)))
+                                                              set)
+                                      pending-tx-data (drop-remote-conflicted-local-tx
+                                                       remote-db
+                                                       remote-updated-keys
+                                                       pending-tx-data)
+                                      rebased-tx-data (->> (sanitize-tx-data
+                                                            remote-db
+                                                            pending-tx-data
+                                                            (set (map :block/uuid local-deleted-blocks)))
+                                                           (remove remote-tx-data-set))]
                                   (when (seq rebased-tx-data)
                                     (ldb/transact! temp-conn rebased-tx-data (assoc tx-meta :op :rebase)))))
              ;; 4. fix tx data and delete nodes
@@ -1069,8 +1214,7 @@
 (defn- apply-remote-tx!
   [repo client tx-data*]
   (if (batched-remote-tx-data? tx-data*)
-    (doseq [tx-data tx-data*]
-      (apply-remote-tx! repo client tx-data))
+    (apply-remote-tx! repo client (flatten-batched-remote-tx-data tx-data*))
     (if-let [conn (worker-state/get-datascript-conn repo)]
       (let [tx-data (->> tx-data*
                          (db-normalize/remove-retract-entity-ref @conn)