|
|
@@ -8,13 +8,9 @@
|
|
|
[logseq.common.path :as path]
|
|
|
[logseq.common.util :as common-util]
|
|
|
[logseq.db :as ldb]
|
|
|
- [logseq.db-sync.cycle :as db-sync-cycle]
|
|
|
[logseq.db-sync.malli-schema :as db-sync-schema]
|
|
|
- [logseq.db-sync.parent-missing :as db-sync-parent-missing]
|
|
|
[logseq.db.common.normalize :as db-normalize]
|
|
|
[logseq.db.sqlite.util :as sqlite-util]
|
|
|
- [logseq.outliner.core :as outliner-core]
|
|
|
- [logseq.outliner.transaction :as outliner-tx]
|
|
|
[promesa.core :as p]))
|
|
|
|
|
|
(defonce *repo->latest-remote-tx (atom {}))
|
|
|
@@ -232,123 +228,24 @@
|
|
|
(some-> (:block/uuid ent) str)))))
|
|
|
(distinct)))
|
|
|
|
|
|
-(defn- reparent-cycle-block!
|
|
|
- [conn block]
|
|
|
- (when-let [page (:block/page block)]
|
|
|
- (outliner-tx/transact!
|
|
|
- {:transact-opts {:conn conn}
|
|
|
- :persist-op? true
|
|
|
- :gen-undo-ops? false
|
|
|
- :outliner-op :fix-cycle-parent}
|
|
|
- (outliner-core/move-blocks! conn [block] page {:sibling? false}))))
|
|
|
-
|
|
|
-(defn- fix-cycle-after-remote-tx!
|
|
|
- [conn db tx-data]
|
|
|
- ;; FIXME: replace `entity` with `eid`
|
|
|
- (when-let [{:keys [attr entity]} (and (seq tx-data)
|
|
|
- (db-sync-cycle/detect-cycle db tx-data))]
|
|
|
- (let [eid entity]
|
|
|
- (log/info :db-sync/remote-cycle-detected
|
|
|
- {:attr attr
|
|
|
- :eid eid})
|
|
|
- (when (= attr :block/parent)
|
|
|
- (when-let [block (d/entity db eid)]
|
|
|
- (reparent-cycle-block! conn block))))))
|
|
|
-
|
|
|
-(defn- reconcile-cycle! [repo attr server_values]
|
|
|
- (if-let [conn (worker-state/get-datascript-conn repo)]
|
|
|
- (let [db @conn
|
|
|
- tx-data (reduce
|
|
|
- (fn [acc [eid value]]
|
|
|
- (let [entity (d/entity db eid)
|
|
|
- ;; FIXME: extends cardinality/many
|
|
|
- current (:db/id (get entity attr))]
|
|
|
- (cond
|
|
|
- (nil? entity) acc
|
|
|
- (nil? value)
|
|
|
- (if current
|
|
|
- (conj acc [:db/retract eid attr current])
|
|
|
- acc)
|
|
|
- :else
|
|
|
- (conj acc [:db/add eid attr value]))))
|
|
|
- []
|
|
|
- server_values)]
|
|
|
- (log/info :db-sync/reconcile-cycle
|
|
|
- {:repo repo
|
|
|
- :attr attr
|
|
|
- :server-values (count server_values)
|
|
|
- :tx-count (count tx-data)
|
|
|
- :entity-titles (->> (keys server_values)
|
|
|
- (keep (fn [ref]
|
|
|
- (when-let [ent (d/entity db ref)]
|
|
|
- {:uuid (some-> (:block/uuid ent) str)
|
|
|
- :title (or (:block/title ent)
|
|
|
- (:block/name ent))})))
|
|
|
- (take 10))})
|
|
|
- (when (seq tx-data)
|
|
|
- (ldb/transact! conn tx-data {:rtc-tx? true})))
|
|
|
- (fail-fast :db-sync/missing-db {:repo repo :op :reconcile-cycle})))
|
|
|
-
|
|
|
-(defn- normalize-entity-ref
|
|
|
- [db entity]
|
|
|
- (cond
|
|
|
- (vector? entity) entity
|
|
|
- (number? entity) (when-let [ent (d/entity db entity)]
|
|
|
- (cond
|
|
|
- (:block/uuid ent) [:block/uuid (:block/uuid ent)]
|
|
|
- (:db/ident ent) [:db/ident (:db/ident ent)]
|
|
|
- :else nil))
|
|
|
- (uuid? entity) [:block/uuid entity]
|
|
|
- (keyword? entity) [:db/ident entity]
|
|
|
- :else nil))
|
|
|
-
|
|
|
-(defn- strip-cycle-attrs
|
|
|
- [db tx-data {:keys [attr entity-refs]}]
|
|
|
- (let [entity-refs (set entity-refs)]
|
|
|
- (->> tx-data
|
|
|
- (mapcat
|
|
|
- (fn [tx]
|
|
|
- (cond
|
|
|
- (and (vector? tx) (= attr (nth tx 2 nil)))
|
|
|
- (let [entity (nth tx 1 nil)
|
|
|
- entity-ref (normalize-entity-ref db entity)]
|
|
|
- (if (and entity-ref (contains? entity-refs entity-ref))
|
|
|
- []
|
|
|
- [tx]))
|
|
|
-
|
|
|
- (and (map? tx) (contains? tx attr))
|
|
|
- (let [entity (or (:db/id tx) (:block/uuid tx) (:db/ident tx))
|
|
|
- entity-ref (normalize-entity-ref db entity)]
|
|
|
- (if (and entity-ref (contains? entity-refs entity-ref))
|
|
|
- (let [tx' (dissoc tx attr)
|
|
|
- meaningful (seq (dissoc tx' :db/id :block/uuid :db/ident))]
|
|
|
- (if meaningful [tx'] []))
|
|
|
- [tx]))
|
|
|
-
|
|
|
- :else
|
|
|
- [tx]))))))
|
|
|
-
|
|
|
(defn- client-ops-conn [repo]
|
|
|
(worker-state/get-client-ops-conn repo))
|
|
|
|
|
|
-(defn- persist-local-tx! [repo tx-str tx-meta]
|
|
|
+(defn- persist-local-tx! [repo tx-str _tx-meta]
|
|
|
(when-let [conn (client-ops-conn repo)]
|
|
|
(let [tx-id (random-uuid)
|
|
|
now (.now js/Date)]
|
|
|
(ldb/transact! conn [{:db-sync/tx-id tx-id
|
|
|
:db-sync/tx tx-str
|
|
|
- :db-sync/created-at now
|
|
|
- :db-sync/fix? (:rtc/fix? tx-meta)}])
|
|
|
+ :db-sync/created-at now}])
|
|
|
tx-id)))
|
|
|
|
|
|
(defn- pending-txs
|
|
|
[repo limit]
|
|
|
(when-let [conn (client-ops-conn repo)]
|
|
|
(let [db @conn
|
|
|
- datoms (take limit (d/datoms db :avet :db-sync/created-at))
|
|
|
- fixs (d/datoms db :avet :db-sync/fix? true)
|
|
|
- full-datoms (distinct (concat datoms fixs))]
|
|
|
- (->> full-datoms
|
|
|
+ datoms (take limit (d/datoms db :avet :db-sync/created-at))]
|
|
|
+ (->> datoms
|
|
|
(map (fn [datom]
|
|
|
(d/entity db (:e datom))))
|
|
|
(keep (fn [ent]
|
|
|
@@ -398,60 +295,6 @@
|
|
|
:t_before (or (client-op/get-local-tx repo) 0)
|
|
|
:txs (sqlite-util/write-transit-str tx-data)})))))))))))
|
|
|
|
|
|
-(defn- pending-txs-by-ids
|
|
|
- [repo tx-ids]
|
|
|
- (if-let [conn (client-ops-conn repo)]
|
|
|
- (let [db @conn]
|
|
|
- (keep (fn [tx-id]
|
|
|
- (when-let [ent (d/entity db [:db-sync/tx-id tx-id])]
|
|
|
- (when-let [tx (:db-sync/tx ent)]
|
|
|
- {:tx-id tx-id
|
|
|
- :tx tx})))
|
|
|
- tx-ids))
|
|
|
- (fail-fast :db-sync/missing-db {:repo repo :op :pending-txs-by-ids})))
|
|
|
-
|
|
|
-(defn- requeue-non-parent-txs!
|
|
|
- [repo attr server_values entries]
|
|
|
- (let [db (some-> (worker-state/get-datascript-conn repo) deref)
|
|
|
- entity-refs (when (seq server_values) (set (keys server_values)))
|
|
|
- scoped? (and db attr (seq entity-refs))
|
|
|
- requeued (volatile! 0)
|
|
|
- stripped (volatile! 0)]
|
|
|
- (if-not scoped?
|
|
|
- (fail-fast :db-sync/missing-field
|
|
|
- {:repo repo
|
|
|
- :has-db? (boolean db)
|
|
|
- :attr attr
|
|
|
- :server-values (count server_values)
|
|
|
- :entries (count entries)})
|
|
|
- (do
|
|
|
- (doseq [{:keys [tx]} entries]
|
|
|
- (when (string? tx)
|
|
|
- (vswap! stripped inc)
|
|
|
- (let [tx-data (parse-transit tx {:repo repo :op :requeue-non-parent-txs})
|
|
|
- filtered (strip-cycle-attrs db tx-data {:attr attr :entity-refs entity-refs})]
|
|
|
- (when (seq filtered)
|
|
|
- (vswap! requeued inc)
|
|
|
- (persist-local-tx! repo (sqlite-util/write-transit-str filtered) {:rtc/fix? true})))))
|
|
|
- (log/info :db-sync/requeue-non-parent-txs
|
|
|
- {:repo repo
|
|
|
- :entries (count entries)
|
|
|
- :stripped @stripped
|
|
|
- :requeued @requeued})))))
|
|
|
-
|
|
|
-(defn- cycle-entity-titles
|
|
|
- [repo server_values]
|
|
|
- (if-let [conn (worker-state/get-datascript-conn repo)]
|
|
|
- (let [db @conn]
|
|
|
- (->> (keys server_values)
|
|
|
- (keep (fn [ref]
|
|
|
- (when-let [ent (d/entity db ref)]
|
|
|
- {:uuid (some-> (:block/uuid ent) str)
|
|
|
- :title (or (:block/title ent)
|
|
|
- (:block/name ent))})))
|
|
|
- (take 10)))
|
|
|
- (fail-fast :db-sync/missing-db {:repo repo :op :cycle-entity-titles})))
|
|
|
-
|
|
|
(defn- ensure-client-state! [repo]
|
|
|
(let [client {:repo repo
|
|
|
:send-queue (atom (p/resolved nil))
|
|
|
@@ -679,60 +522,15 @@
|
|
|
(defn- apply-remote-tx! [repo client tx-data]
|
|
|
(if-let [conn (worker-state/get-datascript-conn repo)]
|
|
|
(try
|
|
|
- (let [report' (d/with @conn tx-data)
|
|
|
- fix-tx-data (db-sync-parent-missing/fix-parent-missing! conn report')
|
|
|
- tx-report (ldb/transact! conn fix-tx-data {:rtc-tx? true})
|
|
|
+ (let [tx-report (ldb/transact! conn tx-data {:rtc-tx? true})
|
|
|
db-after (:db-after tx-report)
|
|
|
asset-uuids (asset-uuids-from-tx db-after (:tx-data tx-report))]
|
|
|
- ;; FIXME: cycle should be fixed before `ldb/transact!`
|
|
|
- (fix-cycle-after-remote-tx! conn db-after tx-data)
|
|
|
(when (seq asset-uuids)
|
|
|
(enqueue-asset-downloads! repo client asset-uuids)))
|
|
|
(catch :default e
|
|
|
(log/error :db-sync/apply-remote-tx-failed {:error e})))
|
|
|
(fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))
|
|
|
|
|
|
-(defn- fix-cycle!
|
|
|
- [repo client message local-tx remote-tx]
|
|
|
- (when (nil? (:data message))
|
|
|
- (fail-fast :db-sync/missing-field
|
|
|
- {:repo repo :type "tx/reject" :field :data}))
|
|
|
- (let [{:keys [attr server_values]}
|
|
|
- (parse-transit (:data message) {:repo repo :type "tx/reject"})]
|
|
|
- (when (nil? attr)
|
|
|
- (fail-fast :db-sync/missing-field
|
|
|
- {:repo repo :type "tx/reject" :field :attr}))
|
|
|
- (when (nil? server_values)
|
|
|
- (fail-fast :db-sync/missing-field
|
|
|
- {:repo repo :type "tx/reject" :field :server_values}))
|
|
|
- ;; FIXME: fix cycle shouldn't re-trigger uploading
|
|
|
- (let [inflight-ids @(:inflight client)
|
|
|
- inflight-entries (pending-txs-by-ids repo inflight-ids)]
|
|
|
- (log/info :db-sync/tx-reject-cycle
|
|
|
- {:repo repo
|
|
|
- :attr attr
|
|
|
- :server-values (count server_values)
|
|
|
- :entity-titles (cycle-entity-titles repo server_values)
|
|
|
- :inflight-ids (count inflight-ids)
|
|
|
- :local-tx local-tx
|
|
|
- :remote-tx remote-tx})
|
|
|
- (reconcile-cycle! repo attr server_values)
|
|
|
- (remove-pending-txs! repo inflight-ids)
|
|
|
- (reset! (:inflight client) [])
|
|
|
- (requeue-non-parent-txs! repo attr server_values inflight-entries))
|
|
|
- (flush-pending! repo client)))
|
|
|
-
|
|
|
-(defn- fix-parent-missing!
|
|
|
- [repo client message]
|
|
|
- (if-let [conn (worker-state/get-datascript-conn repo)]
|
|
|
- (let [db @conn
|
|
|
- {:keys [eids]} (parse-transit (:data message) {:repo repo :type "tx/reject"})
|
|
|
- blocks (keep #(d/entity db %) eids)]
|
|
|
- (when (seq blocks)
|
|
|
- (db-sync-parent-missing/move-blocks-to-recycle! conn blocks))
|
|
|
- (flush-pending! repo client))
|
|
|
- (fail-fast :db-sync/missing-db {:repo repo :op :reconcile-cycle})))
|
|
|
-
|
|
|
(defn- handle-message! [repo client raw]
|
|
|
(let [message (-> raw parse-message coerce-ws-server-message)]
|
|
|
(when-not (map? message)
|
|
|
@@ -781,10 +579,6 @@
|
|
|
(case reason
|
|
|
"stale"
|
|
|
(send! (:ws client) {:type "pull" :since local-tx})
|
|
|
- "cycle"
|
|
|
- (fix-cycle! repo client message local-tx remote-tx)
|
|
|
- "parent-missing"
|
|
|
- (fix-parent-missing! repo client message)
|
|
|
|
|
|
(fail-fast :db-sync/invalid-field
|
|
|
{:repo repo :type "tx/reject" :reason reason})))
|
|
|
@@ -845,7 +639,7 @@
|
|
|
nil))))
|
|
|
|
|
|
(defn- connect! [repo client url]
|
|
|
- (when-let [current (:ws client)]
|
|
|
+ (when (:ws client)
|
|
|
(stop-client! client))
|
|
|
(let [ws (js/WebSocket. (append-token url (auth-token)))
|
|
|
updated (assoc client :ws ws)]
|