|
|
@@ -195,7 +195,6 @@
|
|
|
(defn- reverse-tx-data
|
|
|
[tx-data]
|
|
|
(->> tx-data
|
|
|
- remove-ignored-attrs
|
|
|
(map (fn [[e a v _t added]]
|
|
|
[(if added :db/retract :db/add) e a v]))))
|
|
|
|
|
|
@@ -283,65 +282,31 @@
|
|
|
[:db.fn/retractEntity [:db-sync/tx-id tx-id]])
|
|
|
tx-ids)))))
|
|
|
|
|
|
-(defn- normalize-entity-ref
|
|
|
- [ref]
|
|
|
- (cond
|
|
|
- (uuid? ref) [:block/uuid ref]
|
|
|
- (keyword? ref) [:db/ident ref]
|
|
|
- :else ref))
|
|
|
-
|
|
|
-(defn- entity-ref->entid
|
|
|
- [db ref]
|
|
|
- (cond
|
|
|
- (nil? ref) nil
|
|
|
- (number? ref) (when (pos? ref) ref)
|
|
|
- (uuid? ref) (d/entid db [:block/uuid ref])
|
|
|
- (vector? ref) (d/entid db ref)
|
|
|
- (keyword? ref) (d/entid db [:db/ident ref])
|
|
|
- :else nil))
|
|
|
-
|
|
|
-(defn- entity-ref-matches?
|
|
|
- [db target ref]
|
|
|
- (let [target-entid (entity-ref->entid db target)
|
|
|
- ref-entid (entity-ref->entid db ref)]
|
|
|
- (cond
|
|
|
- (and target-entid ref-entid) (= target-entid ref-entid)
|
|
|
- :else (= (normalize-entity-ref target) (normalize-entity-ref ref)))))
|
|
|
-
|
|
|
-(defn- drop-invalid-refs
|
|
|
- [deleted-ids tx-data]
|
|
|
- (if (seq deleted-ids)
|
|
|
- (->> tx-data
|
|
|
- (remove (fn [tx]
|
|
|
- (when-let [id (when (and (vector? tx) (= 4 (count tx)))
|
|
|
- (let [lookup (second tx)]
|
|
|
- (when (and (vector? lookup)
|
|
|
- (= :block/uuid (first lookup)))
|
|
|
- (second lookup))))]
|
|
|
- (contains? deleted-ids id)))))
|
|
|
- tx-data))
|
|
|
+(defn get-lookup-id
|
|
|
+ [x]
|
|
|
+ (when (and (vector? x)
|
|
|
+ (= 2 (count x))
|
|
|
+ (= :block/uuid (first x)))
|
|
|
+ (second x)))
|
|
|
|
|
|
(defn- keep-last-update
|
|
|
- [db tx-data]
|
|
|
- (let [properties (->> (distinct (map :a tx-data))
|
|
|
- (map #(d/entity db %)))
|
|
|
- property->many? (zipmap (map :db/ident properties)
|
|
|
- (map (fn [property] (= :db.cardinality/many (:db/cardinality property))) properties))]
|
|
|
- (->> tx-data
|
|
|
- (common-util/distinct-by-last-wins
|
|
|
- (fn [item]
|
|
|
- (if (and (vector? item) (= 4 (count item))
|
|
|
- (not (property->many? (nth item 2))))
|
|
|
- (take 3 item)
|
|
|
- item))))))
|
|
|
+ [tx-data]
|
|
|
+ (->> tx-data
|
|
|
+ (common-util/distinct-by-last-wins
|
|
|
+ (fn [item]
|
|
|
+ (if (and (vector? item) (= 4 (count item))
|
|
|
+ (contains? #{:block/updated-at :block/title :block/name :block/order} (nth item 2)))
|
|
|
+ (take 3 item)
|
|
|
+ item)))))
|
|
|
|
|
|
(defn- sanitize-tx-data
|
|
|
- [db deleted-ids tx-data]
|
|
|
- (let [tx-data (vec tx-data)
|
|
|
- sanitized-tx-data (->> tx-data
|
|
|
- ;; db-normalize/replace-attr-retract-with-retract-entity-v2
|
|
|
- (keep-last-update db)
|
|
|
- (drop-invalid-refs deleted-ids))]
|
|
|
+ [tx-data local-deleted-ids]
|
|
|
+ (let [sanitized-tx-data (->> tx-data
|
|
|
+ db-normalize/replace-attr-retract-with-retract-entity-v2
|
|
|
+ (remove (fn [item]
|
|
|
+ (= :db.fn/retractEntity (first item))
|
|
|
+ (contains? local-deleted-ids (get-lookup-id (last item)))))
|
|
|
+ keep-last-update)]
|
|
|
(when (not= tx-data sanitized-tx-data)
|
|
|
(log/info :db-sync/tx-sanitized
|
|
|
{:diff (data/diff tx-data sanitized-tx-data)}))
|
|
|
@@ -362,8 +327,10 @@
|
|
|
(let [tx-ids (mapv :tx-id batch)
|
|
|
txs (mapcat :tx batch)
|
|
|
tx-data (->> txs
|
|
|
- (keep-last-update @conn)
|
|
|
+ keep-last-update
|
|
|
distinct)]
|
|
|
+ ;; (prn :debug :before-keep-last-update txs)
|
|
|
+ ;; (prn :debug :upload :tx-data tx-data)
|
|
|
(when (seq txs)
|
|
|
(reset! (:inflight client) tx-ids)
|
|
|
(send! ws {:type "tx/batch"
|
|
|
@@ -562,116 +529,142 @@
|
|
|
(p/resolved nil)))))
|
|
|
|
|
|
(defn- get-local-deleted-blocks
|
|
|
- [db reversed-tx-report reversed-tx-data remote-tx-report]
|
|
|
+ [reversed-tx-report reversed-tx-data]
|
|
|
(when (seq reversed-tx-data)
|
|
|
(->>
|
|
|
(:tx-data reversed-tx-report)
|
|
|
(keep
|
|
|
(fn [[e a _v _t added]]
|
|
|
(when (and (= :block/uuid a) added
|
|
|
- (nil? (d/entity db e))
|
|
|
- (some? (d/entity (:db-after remote-tx-report) e)))
|
|
|
+ (nil? (d/entity (:db-before reversed-tx-report) e)))
|
|
|
(d/entity (:db-after reversed-tx-report) e))))
|
|
|
distinct)))
|
|
|
|
|
|
(defn- delete-nodes!
|
|
|
- [temp-conn local-deleted-blocks]
|
|
|
+ [temp-conn local-deleted-blocks tx-meta]
|
|
|
(when (seq local-deleted-blocks)
|
|
|
(let [pages (filter ldb/page? local-deleted-blocks)
|
|
|
- blocks (remove ldb/page? local-deleted-blocks)]
|
|
|
- ;; deleting pages first
|
|
|
- (doseq [page pages]
|
|
|
- (worker-page/delete! temp-conn (:block/uuid page) {}))
|
|
|
- (when (seq blocks)
|
|
|
+ blocks (->> local-deleted-blocks
|
|
|
+ (keep (fn [block]
|
|
|
+ (d/entity @temp-conn [:block/uuid (:block/uuid block)])))
|
|
|
+ (remove ldb/page?))]
|
|
|
+ (when (or (seq blocks) (seq pages))
|
|
|
(outliner-tx/transact!
|
|
|
- {:gen-undo-ops? false
|
|
|
- :outliner-op :delete-blocks
|
|
|
- :transact-opts {:conn temp-conn}}
|
|
|
- (outliner-core/delete-blocks! temp-conn blocks {}))))))
|
|
|
+ (merge tx-meta
|
|
|
+ {:outliner-op :delete-blocks
|
|
|
+ :transact-opts {:conn temp-conn}})
|
|
|
+ (when (seq blocks)
|
|
|
+ (outliner-core/delete-blocks! temp-conn blocks {}))
|
|
|
+ (doseq [page pages]
|
|
|
+ (worker-page/delete! temp-conn (:block/uuid page) {})))))))
|
|
|
|
|
|
(defn- fix-tx!
|
|
|
- [temp-conn remote-tx-report rebase-tx-report]
|
|
|
- (let [cycle-tx-report (sync-cycle/fix-cycle! temp-conn remote-tx-report rebase-tx-report)]
|
|
|
+ [temp-conn remote-tx-report rebase-tx-report tx-meta]
|
|
|
+ (let [cycle-tx-report (sync-cycle/fix-cycle! temp-conn remote-tx-report rebase-tx-report
|
|
|
+ {:tx-meta tx-meta})]
|
|
|
(sync-order/fix-duplicate-orders! temp-conn
|
|
|
(mapcat :tx-data [remote-tx-report
|
|
|
rebase-tx-report
|
|
|
- cycle-tx-report]))))
|
|
|
+ cycle-tx-report])
|
|
|
+ tx-meta)))
|
|
|
|
|
|
(defn- apply-remote-tx!
|
|
|
[repo client tx-data* & {:keys [local-tx remote-tx]}]
|
|
|
(if-let [conn (worker-state/get-datascript-conn repo)]
|
|
|
- (let [tx-data (keep-last-update @conn tx-data*)
|
|
|
+ (let [tx-data (keep-last-update tx-data*)
|
|
|
local-txs (pending-txs repo)
|
|
|
reversed-tx-data (->> local-txs
|
|
|
(mapcat :reversed-tx)
|
|
|
reverse)
|
|
|
has-local-changes? (seq reversed-tx-data)
|
|
|
- *rebased-tx-data (atom [])
|
|
|
*remote-tx-report (atom nil)
|
|
|
+ *remote-deleted-ids (atom #{})
|
|
|
tx-report
|
|
|
(ldb/transact-with-temp-conn!
|
|
|
conn
|
|
|
{:rtc-tx? true}
|
|
|
(fn [temp-conn _*batch-tx-data]
|
|
|
- (let [tx-meta {:rtc-tx? true}
|
|
|
+ (let [tx-meta {:rtc-tx? true
|
|
|
+ :temp-conn? true
|
|
|
+ :gen-undo-ops? false
|
|
|
+ :persist-op? false}
|
|
|
db @temp-conn
|
|
|
- ;; 1. reverse local pending txs
|
|
|
+
|
|
|
reversed-tx-report (when has-local-changes?
|
|
|
(ldb/transact! temp-conn reversed-tx-data tx-meta))
|
|
|
;; 2. transact remote tx-data
|
|
|
- remote-tx-report (ldb/transact! temp-conn tx-data tx-meta)
|
|
|
+ remote-deleted-blocks (->> tx-data
|
|
|
+ (keep (fn [item]
|
|
|
+ (when (= :db.fn/retractEntity (first item))
|
|
|
+ (d/entity db (second item))))))
|
|
|
+ remote-deleted-block-ids (set (map :block/uuid remote-deleted-blocks))
|
|
|
+ remote-tx-report (if has-local-changes?
|
|
|
+ (let [tx-data (->> tx-data
|
|
|
+ (remove (fn [item]
|
|
|
+ (or (= :db.fn/retractEntity (first item))
|
|
|
+ (contains? remote-deleted-block-ids (get-lookup-id (last item))))))
|
|
|
+ seq)]
|
|
|
+ (ldb/transact! temp-conn tx-data tx-meta))
|
|
|
+
|
|
|
+ (ldb/transact! temp-conn tx-data tx-meta))
|
|
|
_ (reset! *remote-tx-report remote-tx-report)
|
|
|
remote-received-tx-data (sync-compare/filter-received-tx-data remote-tx-report tx-data)
|
|
|
remote-applied-tx-data (sync-compare/filter-applied-tx-data remote-tx-report)]
|
|
|
- (when (not= remote-received-tx-data remote-applied-tx-data)
|
|
|
- (prn :diff-tx-data-mismatch
|
|
|
- (data/diff remote-received-tx-data remote-applied-tx-data))
|
|
|
- (fail-fast :db-sync/compare-tx-data-mismatch
|
|
|
- {:repo repo
|
|
|
- :tx-data tx-data
|
|
|
- :remote-received-tx-data remote-received-tx-data
|
|
|
- :remote-applied-tx-data remote-applied-tx-data
|
|
|
- :local-tx local-tx
|
|
|
- :remote-tx remote-tx
|
|
|
- :tempids (:tempids remote-tx-report)}))
|
|
|
+ ;; (when (not= remote-received-tx-data remote-applied-tx-data)
|
|
|
+ ;; (prn :diff-tx-data-mismatch
|
|
|
+ ;; (data/diff remote-received-tx-data remote-applied-tx-data))
|
|
|
+ ;; (fail-fast :db-sync/compare-tx-data-mismatch
|
|
|
+ ;; {:repo repo
|
|
|
+ ;; :tx-data tx-data
|
|
|
+ ;; :remote-received-tx-data remote-received-tx-data
|
|
|
+ ;; :remote-applied-tx-data remote-applied-tx-data
|
|
|
+ ;; :local-tx local-tx
|
|
|
+ ;; :remote-tx remote-tx
|
|
|
+ ;; :tempids (:tempids remote-tx-report)}))
|
|
|
|
|
|
(when has-local-changes?
|
|
|
- ;; 3. Remove nodes which parents have been deleted locally
|
|
|
- ;; We may improve the ux by restoring parent path later
|
|
|
- (let [local-deleted-blocks (get-local-deleted-blocks db reversed-tx-report reversed-tx-data remote-tx-report)
|
|
|
- remote-deleted-blocks (->> (outliner-pipeline/filter-deleted-blocks (:tx-data remote-tx-report))
|
|
|
- (map #(d/entity db (:db/id %))))
|
|
|
+ (let [local-deleted-blocks (get-local-deleted-blocks reversed-tx-report reversed-tx-data)
|
|
|
+ _ (when (seq remote-deleted-blocks)
|
|
|
+ (reset! *remote-deleted-ids (set (map :block/uuid remote-deleted-blocks))))
|
|
|
deleted-nodes (concat local-deleted-blocks remote-deleted-blocks)
|
|
|
- deleted-ids (set (keep :block/uuid deleted-nodes))]
|
|
|
- (delete-nodes! temp-conn local-deleted-blocks)
|
|
|
-
|
|
|
- ;; 4. rebase pending local txs
|
|
|
- (let [rebase-tx-report (when (seq local-txs)
|
|
|
- (let [pending-tx-data (mapcat :tx local-txs)
|
|
|
- rebased-tx-data (sanitize-tx-data @temp-conn deleted-ids pending-tx-data)]
|
|
|
- (prn :debug :pending-tx-data pending-tx-data
|
|
|
- :rebased-tx-data rebased-tx-data)
|
|
|
- (when (seq rebased-tx-data)
|
|
|
- (ldb/transact! temp-conn rebased-tx-data {:gen-undo-ops? false}))))]
|
|
|
- ;; 5. fix tx data
|
|
|
- (fix-tx! temp-conn remote-tx-report rebase-tx-report))))))
|
|
|
- {:listen-db (fn [{:keys [tx-data tx-meta]}]
|
|
|
- (when (and has-local-changes? (not (:rtc-tx? tx-meta)))
|
|
|
- (swap! *rebased-tx-data into tx-data)))})
|
|
|
+ deleted-ids (set (keep :block/uuid deleted-nodes))
|
|
|
+ ;; 3. rebase pending local txs
|
|
|
+ rebase-tx-report (when (seq local-txs)
|
|
|
+ (let [pending-tx-data (mapcat :tx local-txs)
|
|
|
+ rebased-tx-data (sanitize-tx-data pending-tx-data
|
|
|
+ (set (map :block/uuid local-deleted-blocks)))]
|
|
|
+ (when (seq rebased-tx-data)
|
|
|
+ (ldb/transact! temp-conn rebased-tx-data tx-meta))))]
|
|
|
+
|
|
|
+ ;; 4. fix tx data
|
|
|
+ (let [db @temp-conn
|
|
|
+ deleted-nodes (keep (fn [id] (d/entity db [:block/uuid id])) deleted-ids)]
|
|
|
+ (delete-nodes! temp-conn deleted-nodes tx-meta))
|
|
|
+ (fix-tx! temp-conn remote-tx-report rebase-tx-report tx-meta))))))
|
|
|
remote-tx-report @*remote-tx-report]
|
|
|
|
|
|
;; persist rebase tx to client ops
|
|
|
- (when-let [tx-data (seq @*rebased-tx-data)]
|
|
|
- (reset! *rebased-tx-data nil)
|
|
|
- (let [normalized-tx-data (normalize-tx-data (:db-after tx-report) (:db-after remote-tx-report) tx-data)
|
|
|
- reversed-datoms (reverse-tx-data tx-data)]
|
|
|
- (persist-local-tx! repo normalized-tx-data reversed-datoms {:op :rtc-rebase})))
|
|
|
+ (when has-local-changes?
|
|
|
+ (when-let [tx-data (:tx-data tx-report)]
|
|
|
+ (let [remote-tx-data-set (set tx-data*)
|
|
|
+ normalized (normalize-tx-data (:db-after tx-report) (:db-before remote-tx-report) tx-data)
|
|
|
+ normalized-tx-data (remove remote-tx-data-set normalized)
|
|
|
+ reversed-datoms (reverse-tx-data tx-data)]
|
|
|
+ ;; (prn :debug :normalized-tx-data normalized-tx-data)
|
|
|
+ ;; (prn :debug :remote-tx-data remote-tx-data-set)
|
|
|
+ ;; (prn :debug :diff (data/diff remote-tx-data-set
|
|
|
+ ;; (set normalized)))
|
|
|
+ (remove-pending-txs! repo (map :tx-id local-txs))
|
|
|
+ (when (seq normalized-tx-data)
|
|
|
+ (persist-local-tx! repo normalized-tx-data reversed-datoms {:op :rtc-rebase})))))
|
|
|
|
|
|
(when tx-report
|
|
|
(let [asset-uuids (asset-uuids-from-tx (:db-after remote-tx-report) (:tx-data remote-tx-report))]
|
|
|
(when (seq asset-uuids)
|
|
|
(enqueue-asset-downloads! repo client asset-uuids))))
|
|
|
|
|
|
+ (reset! (:inflight client) [])
|
|
|
+
|
|
|
(reset! *remote-tx-report nil))
|
|
|
(fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))
|
|
|
|
|
|
@@ -835,7 +828,7 @@
|
|
|
tx-data' (remove (fn [d] (contains? #{:logseq.property.embedding/hnsw-label-updated-at :block/tx-id} (:a d))) tx-data)]
|
|
|
(when (and db (seq tx-data'))
|
|
|
(let [normalized (normalize-tx-data db-after db-before tx-data')
|
|
|
- reversed-datoms (reverse-tx-data tx-data')]
|
|
|
+ reversed-datoms (reverse-tx-data tx-data)]
|
|
|
(persist-local-tx! repo normalized reversed-datoms tx-meta)
|
|
|
(when-let [client @worker-state/*db-sync-client]
|
|
|
(when (= repo (:repo client))
|