|
@@ -641,57 +641,77 @@
|
|
|
|
|
|
|
|
(defn- rebase-apply-remote-tx! [repo client tx-data]
|
|
(defn- rebase-apply-remote-tx! [repo client tx-data]
|
|
|
(if-let [conn (worker-state/get-datascript-conn repo)]
|
|
(if-let [conn (worker-state/get-datascript-conn repo)]
|
|
|
- (try
|
|
|
|
|
- (let [local-txs (pending-txs repo)
|
|
|
|
|
- reversed-tx-data (->> local-txs
|
|
|
|
|
- (mapcat :reversed-tx)
|
|
|
|
|
- reverse
|
|
|
|
|
- db-normalize/replace-attr-retract-with-retract-entity-v2)
|
|
|
|
|
- tx-report (ldb/transact-with-temp-conn!
|
|
|
|
|
- conn
|
|
|
|
|
- {:rtc-tx? true}
|
|
|
|
|
- (fn [temp-conn _*batch-tx-data]
|
|
|
|
|
- (let [db @temp-conn
|
|
|
|
|
- ;; 1. rebase
|
|
|
|
|
- rebase (:db-after (d/with db reversed-tx-data))
|
|
|
|
|
- tx-report (d/with rebase tx-data)]
|
|
|
|
|
- ;; TODO: 2. ensure checksum matches between client & server
|
|
|
|
|
-
|
|
|
|
|
- ;; 3. fix data
|
|
|
|
|
- (let [deleted-blocks (outliner-pipeline/filter-deleted-blocks (:tx-data tx-report))
|
|
|
|
|
- tx-meta {:persist-op? false
|
|
|
|
|
- :gen-undo-ops? false}]
|
|
|
|
|
- (when (seq deleted-blocks)
|
|
|
|
|
- (let [nodes (map #(d/entity @temp-conn (:db/id %)) deleted-blocks)
|
|
|
|
|
- pages (filter ldb/page? nodes)
|
|
|
|
|
- blocks (remove ldb/page? nodes)]
|
|
|
|
|
- ;; deleting pages first
|
|
|
|
|
- (doseq [page pages]
|
|
|
|
|
- (worker-page/delete! temp-conn (:block/uuid page) tx-meta))
|
|
|
|
|
- (when (seq blocks)
|
|
|
|
|
- (outliner-tx/transact!
|
|
|
|
|
- (assoc tx-meta
|
|
|
|
|
- :outliner-op :delete-blocks
|
|
|
|
|
- :transact-opts {:conn temp-conn})
|
|
|
|
|
- (outliner-core/delete-blocks! temp-conn blocks {})))))
|
|
|
|
|
-
|
|
|
|
|
- ;; 4. apply remote tx-data
|
|
|
|
|
- (when (seq tx-data)
|
|
|
|
|
- (let [rtc-tx-data (sanitize-remote-tx-data @temp-conn tx-data)
|
|
|
|
|
- tx-report (ldb/transact! temp-conn rtc-tx-data)]
|
|
|
|
|
- (sync-order/fix-duplicate-orders! temp-conn (:tx-data tx-report))))))))]
|
|
|
|
|
-
|
|
|
|
|
- (when tx-report
|
|
|
|
|
- (let [db-after (:db-after tx-report)
|
|
|
|
|
- asset-uuids (asset-uuids-from-tx db-after (:tx-data tx-report))]
|
|
|
|
|
- (when (seq asset-uuids)
|
|
|
|
|
- (enqueue-asset-downloads! repo client asset-uuids))))
|
|
|
|
|
-
|
|
|
|
|
- ;; TODO: Remove all pending txs, insert the above one
|
|
|
|
|
- )
|
|
|
|
|
- (catch :default e
|
|
|
|
|
- (log/error :db-sync/apply-remote-tx-failed {:error e})
|
|
|
|
|
- (throw e)))
|
|
|
|
|
|
|
+ (let [local-txs (pending-txs repo)
|
|
|
|
|
+ pending-tx-ids (mapv :tx-id local-txs)
|
|
|
|
|
+ rebased-pending (atom nil)
|
|
|
|
|
+ reversed-tx-data (->> local-txs
|
|
|
|
|
+ (mapcat :reversed-tx)
|
|
|
|
|
+ reverse
|
|
|
|
|
+ db-normalize/replace-attr-retract-with-retract-entity-v2)
|
|
|
|
|
+ tx-report (ldb/transact-with-temp-conn!
|
|
|
|
|
+ conn
|
|
|
|
|
+ {:rtc-tx? true}
|
|
|
|
|
+ (fn [temp-conn _*batch-tx-data]
|
|
|
|
|
+ (let [db @temp-conn
|
|
|
|
|
+ ;; 1. reverse local pending txs
|
|
|
|
|
+ reversed-db (:db-after (d/with db reversed-tx-data))
|
|
|
|
|
+ tx-report (d/with reversed-db tx-data)]
|
|
|
|
|
+ ;; TODO: 2. ensure checksum matches between client & server
|
|
|
|
|
+
|
|
|
|
|
+ ;; 3. fix data
|
|
|
|
|
+ (let [deleted-blocks (outliner-pipeline/filter-deleted-blocks (:tx-data tx-report))
|
|
|
|
|
+ tx-meta {:persist-op? false
|
|
|
|
|
+ :gen-undo-ops? false}]
|
|
|
|
|
+ (when (seq deleted-blocks)
|
|
|
|
|
+ (let [nodes (map #(d/entity @temp-conn (:db/id %)) deleted-blocks)
|
|
|
|
|
+ pages (filter ldb/page? nodes)
|
|
|
|
|
+ blocks (remove ldb/page? nodes)]
|
|
|
|
|
+ ;; deleting pages first
|
|
|
|
|
+ (doseq [page pages]
|
|
|
|
|
+ (worker-page/delete! temp-conn (:block/uuid page) tx-meta))
|
|
|
|
|
+ (when (seq blocks)
|
|
|
|
|
+ (outliner-tx/transact!
|
|
|
|
|
+ (assoc tx-meta
|
|
|
|
|
+ :outliner-op :delete-blocks
|
|
|
|
|
+ :transact-opts {:conn temp-conn})
|
|
|
|
|
+ (outliner-core/delete-blocks! temp-conn blocks {})))))
|
|
|
|
|
+
|
|
|
|
|
+ ;; 4. apply remote tx-data
|
|
|
|
|
+ (let [rtc-tx-data (sanitize-remote-tx-data @temp-conn tx-data)
|
|
|
|
|
+ tx-report (ldb/transact! temp-conn rtc-tx-data)]
|
|
|
|
|
+ (sync-order/fix-duplicate-orders! temp-conn (:tx-data tx-report)))
|
|
|
|
|
+
|
|
|
|
|
+ ;; 5. rebase pending local txs for the new remote base
|
|
|
|
|
+ (when (seq local-txs)
|
|
|
|
|
+ (let [pending-tx-data (->> local-txs
|
|
|
|
|
+ (mapcat :tx)
|
|
|
|
|
+ keep-last-parent-update)
|
|
|
|
|
+ db (:db-after @temp-conn)
|
|
|
|
|
+ pending-tx-data (sanitize-remote-tx-data db pending-tx-data)]
|
|
|
|
|
+ (when (seq pending-tx-data)
|
|
|
|
|
+ (let [rebased-report (d/with (:db-after tx-report) pending-tx-data)
|
|
|
|
|
+ normalized (normalize-tx-data
|
|
|
|
|
+ (:db-after rebased-report)
|
|
|
|
|
+ (:db-after tx-report)
|
|
|
|
|
+ (:tx-data rebased-report))
|
|
|
|
|
+ reversed-datoms (map (fn [[e a v _t added]]
|
|
|
|
|
+ [(if added :db/retract :db/add) e a v])
|
|
|
|
|
+ (:tx-data rebased-report))]
|
|
|
|
|
+ (reset! rebased-pending {:normalized normalized
|
|
|
|
|
+ :reversed reversed-datoms})))))))))]
|
|
|
|
|
+
|
|
|
|
|
+ (when (seq local-txs)
|
|
|
|
|
+ (if-let [{:keys [normalized reversed]} @rebased-pending]
|
|
|
|
|
+ (when (seq normalized)
|
|
|
|
|
+ (when (persist-local-tx! repo normalized reversed {:persist-op? false})
|
|
|
|
|
+ (remove-pending-txs! repo pending-tx-ids)))
|
|
|
|
|
+ (log/info :db-sync/pending-txs-retained {:repo repo :reason :rebased-empty})))
|
|
|
|
|
+
|
|
|
|
|
+ (when tx-report
|
|
|
|
|
+ (let [db-after (:db-after tx-report)
|
|
|
|
|
+ asset-uuids (asset-uuids-from-tx db-after (:tx-data tx-report))]
|
|
|
|
|
+ (when (seq asset-uuids)
|
|
|
|
|
+ (enqueue-asset-downloads! repo client asset-uuids)))))
|
|
|
(fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))
|
|
(fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))
|
|
|
|
|
|
|
|
(defn- handle-message! [repo client raw]
|
|
(defn- handle-message! [repo client raw]
|
|
@@ -725,7 +745,7 @@
|
|
|
tx (mapcat (fn [data]
|
|
tx (mapcat (fn [data]
|
|
|
(parse-transit (:tx data) {:repo repo :type "pull/ok"}))
|
|
(parse-transit (:tx data) {:repo repo :type "pull/ok"}))
|
|
|
txs)]
|
|
txs)]
|
|
|
- (when tx
|
|
|
|
|
|
|
+ (when (seq tx)
|
|
|
(rebase-apply-remote-tx! repo client tx)
|
|
(rebase-apply-remote-tx! repo client tx)
|
|
|
(client-op/update-local-tx repo remote-tx)
|
|
(client-op/update-local-tx repo remote-tx)
|
|
|
(flush-pending! repo client)))
|
|
(flush-pending! repo client)))
|