|
|
@@ -120,6 +120,11 @@
|
|
|
(when-not (number? value)
|
|
|
(fail-fast :db-sync/invalid-field (assoc context :value value))))
|
|
|
|
|
|
+(defn- require-non-negative [value context]
|
|
|
+ (require-number value context)
|
|
|
+ (when (neg? value)
|
|
|
+ (fail-fast :db-sync/invalid-field (assoc context :value value))))
|
|
|
+
|
|
|
(defn- require-seq [value context]
|
|
|
(when-not (sequential? value)
|
|
|
(fail-fast :db-sync/invalid-field (assoc context :value value))))
|
|
|
@@ -209,6 +214,12 @@
|
|
|
:logseq.property.asset/type
|
|
|
:logseq.property.asset/checksum})
|
|
|
|
|
|
+(defn- require-asset-field
|
|
|
+ [repo field value context]
|
|
|
+ (when (or (nil? value) (and (string? value) (string/blank? value)))
|
|
|
+ (fail-fast :db-sync/missing-field
|
|
|
+ (merge {:repo repo :field field :value value} context))))
|
|
|
+
|
|
|
(defn- asset-uuids-from-tx [db tx-data]
|
|
|
(->> tx-data
|
|
|
(keep (fn [datom]
|
|
|
@@ -404,7 +415,7 @@
|
|
|
remote-tx (:t message)]
|
|
|
(case (:type message)
|
|
|
"hello" (do
|
|
|
- (require-number remote-tx {:repo repo :type "hello"})
|
|
|
+ (require-non-negative remote-tx {:repo repo :type "hello"})
|
|
|
(when (> remote-tx local-tx)
|
|
|
(send! (:ws client) {:type "pull" :since local-tx}))
|
|
|
(enqueue-asset-sync! repo client)
|
|
|
@@ -412,7 +423,7 @@
|
|
|
(flush-pending! repo client))
|
|
|
;; Upload response
|
|
|
"tx/batch/ok" (do
|
|
|
- (require-number remote-tx {:repo repo :type "tx/batch/ok"})
|
|
|
+ (require-non-negative remote-tx {:repo repo :type "tx/batch/ok"})
|
|
|
(client-op/update-local-tx repo remote-tx)
|
|
|
(remove-pending-txs! repo @(:inflight client))
|
|
|
(reset! (:inflight client) [])
|
|
|
@@ -420,7 +431,7 @@
|
|
|
;; Download response
|
|
|
;; Merge batch txs to one tx, does it really work? We'll see
|
|
|
"pull/ok" (let [txs (:txs message)
|
|
|
- _ (require-number remote-tx {:repo repo :type "pull/ok"})
|
|
|
+ _ (require-non-negative remote-tx {:repo repo :type "pull/ok"})
|
|
|
_ (require-seq txs {:repo repo :type "pull/ok" :field :txs})
|
|
|
tx (mapcat (fn [data]
|
|
|
(parse-transit (:tx data) {:repo repo :type "pull/ok"}))
|
|
|
@@ -430,13 +441,15 @@
|
|
|
(client-op/update-local-tx repo remote-tx)
|
|
|
(flush-pending! repo client)))
|
|
|
"changed" (do
|
|
|
- (require-number remote-tx {:repo repo :type "changed"})
|
|
|
+ (require-non-negative remote-tx {:repo repo :type "changed"})
|
|
|
(when (< local-tx remote-tx)
|
|
|
(send! (:ws client) {:type "pull" :since local-tx})))
|
|
|
"tx/reject" (let [reason (:reason message)]
|
|
|
(when (nil? reason)
|
|
|
(fail-fast :db-sync/missing-field
|
|
|
{:repo repo :type "tx/reject" :field :reason}))
|
|
|
+ (when (contains? message :t)
|
|
|
+ (require-non-negative remote-tx {:repo repo :type "tx/reject"}))
|
|
|
(case reason
|
|
|
"stale"
|
|
|
(send! (:ws client) {:type "pull" :since local-tx})
|
|
|
@@ -551,7 +564,12 @@
|
|
|
|
|
|
(defn- process-asset-op!
|
|
|
[repo graph-id asset-op]
|
|
|
- (let [asset-uuid (:block/uuid asset-op)]
|
|
|
+ (let [asset-uuid (:block/uuid asset-op)
|
|
|
+ op-type (cond
|
|
|
+ (contains? asset-op :update-asset) :update-asset
|
|
|
+ (contains? asset-op :remove-asset) :remove-asset
|
|
|
+ :else :unknown)]
|
|
|
+ (require-asset-field repo :asset-uuid asset-uuid {:op op-type})
|
|
|
(cond
|
|
|
(contains? asset-op :update-asset)
|
|
|
(if-let [conn (worker-state/get-datascript-conn repo)]
|
|
|
@@ -559,12 +577,11 @@
|
|
|
asset-type (:logseq.property.asset/type ent)
|
|
|
checksum (:logseq.property.asset/checksum ent)
|
|
|
size (:logseq.property.asset/size ent 0)]
|
|
|
+ (require-asset-field repo :asset-type asset-type {:op :update-asset :asset-uuid asset-uuid})
|
|
|
+ (require-asset-field repo :checksum checksum {:op :update-asset
|
|
|
+ :asset-uuid asset-uuid
|
|
|
+ :asset-type asset-type})
|
|
|
(cond
|
|
|
- (or (nil? ent) (nil? asset-type) (nil? checksum))
|
|
|
- (do
|
|
|
- (client-op/remove-asset-op repo asset-uuid)
|
|
|
- (p/resolved nil))
|
|
|
-
|
|
|
(> size max-asset-size)
|
|
|
(do
|
|
|
(log/info :db-sync/asset-too-large {:repo repo
|
|
|
@@ -591,11 +608,11 @@
|
|
|
:rtc.exception/upload-asset-failed
|
|
|
nil
|
|
|
|
|
|
- (log/error :db-sync/asset-upload-failed
|
|
|
- {:repo repo
|
|
|
- :asset-uuid asset-uuid
|
|
|
- :error e})))))))
|
|
|
- (p/resolved nil))
|
|
|
+ (log/error :db-sync/asset-upload-failed
|
|
|
+ {:repo repo
|
|
|
+ :asset-uuid asset-uuid
|
|
|
+ :error e})))))))
|
|
|
+ (fail-fast :db-sync/missing-db {:repo repo :op :process-asset-op}))
|
|
|
|
|
|
(contains? asset-op :remove-asset)
|
|
|
(-> (p/let [conn (worker-state/get-datascript-conn repo)
|
|
|
@@ -603,9 +620,9 @@
|
|
|
asset-type (if (seq (:logseq.property.asset/type ent))
|
|
|
(:logseq.property.asset/type ent)
|
|
|
(asset-type-from-files repo asset-uuid))]
|
|
|
+ (require-asset-field repo :asset-type asset-type {:op :remove-asset :asset-uuid asset-uuid})
|
|
|
(p/do!
|
|
|
- (when (seq asset-type)
|
|
|
- (delete-remote-asset! repo graph-id asset-uuid asset-type))
|
|
|
+ (delete-remote-asset! repo graph-id asset-uuid asset-type)
|
|
|
(client-op/remove-asset-op repo asset-uuid)))
|
|
|
(p/catch (fn [e]
|
|
|
(log/error :db-sync/asset-delete-failed
|