|
|
@@ -7,6 +7,7 @@
|
|
|
indicates need to upload the asset to server"
|
|
|
(:require [clojure.set :as set]
|
|
|
[datascript.core :as d]
|
|
|
+ [frontend.common.crypt :as crypt]
|
|
|
[frontend.common.missionary :as c.m]
|
|
|
[frontend.worker.rtc.client-op :as client-op]
|
|
|
[frontend.worker.rtc.exception :as r.ex]
|
|
|
@@ -119,44 +120,52 @@
|
|
|
|
|
|
(defn- new-task--concurrent-download-assets
|
|
|
"Concurrently download assets with limited max concurrent count"
|
|
|
- [repo asset-uuid->url asset-uuid->asset-type]
|
|
|
- (->> (fn [[asset-uuid url]]
|
|
|
- (m/sp
|
|
|
- (let [r (c.m/<?
|
|
|
- (worker-state/<invoke-main-thread :thread-api/rtc-download-asset
|
|
|
- repo (str asset-uuid)
|
|
|
- (get asset-uuid->asset-type asset-uuid) url))]
|
|
|
- (when-let [edata (:ex-data r)]
|
|
|
- ;; if download-url return 404, ignore this asset
|
|
|
- (when (not= 404 (:status (:data edata)))
|
|
|
- (throw (ex-info "download asset failed" r)))))))
|
|
|
- (c.m/concurrent-exec-flow 5 (m/seed asset-uuid->url))
|
|
|
- (m/reduce (constantly nil))))
|
|
|
+ [repo aes-key asset-uuid->url asset-uuid->asset-type]
|
|
|
+ (m/sp
|
|
|
+ (let [exported-aes-key (when aes-key (c.m/<? (crypt/<export-aes-key aes-key)))]
|
|
|
+ (m/?
|
|
|
+ (->> (fn [[asset-uuid url]]
|
|
|
+ (m/sp
|
|
|
+ (let [r (c.m/<?
|
|
|
+ (worker-state/<invoke-main-thread :thread-api/rtc-download-asset
|
|
|
+ repo exported-aes-key (str asset-uuid)
|
|
|
+ (get asset-uuid->asset-type asset-uuid) url))]
|
|
|
+ (when-let [edata (:ex-data r)]
|
|
|
+ ;; if download-url return 404, ignore this asset
|
|
|
+ (when (not= 404 (:status (:data edata)))
|
|
|
+ (throw (ex-info "download asset failed" r)))))))
|
|
|
+ (c.m/concurrent-exec-flow 5 (m/seed asset-uuid->url))
|
|
|
+ (m/reduce (constantly nil)))))))
|
|
|
|
|
|
(defn- new-task--concurrent-upload-assets
|
|
|
"Concurrently upload assets with limited max concurrent count"
|
|
|
- [repo conn asset-uuid->url asset-uuid->asset-metadata]
|
|
|
- (->> (fn [[asset-uuid url]]
|
|
|
- (m/sp
|
|
|
- (let [[asset-type checksum] (get asset-uuid->asset-metadata asset-uuid)
|
|
|
- r (c.m/<?
|
|
|
- (worker-state/<invoke-main-thread :thread-api/rtc-upload-asset
|
|
|
- repo (str asset-uuid) asset-type checksum url))]
|
|
|
- (when (:ex-data r)
|
|
|
- (throw (ex-info "upload asset failed" r)))
|
|
|
- ;; asset might be deleted by the user before uploaded successfully
|
|
|
- (when (d/entity @conn [:block/uuid asset-uuid])
|
|
|
- (ldb/transact! conn
|
|
|
- [{:block/uuid asset-uuid
|
|
|
- :logseq.property.asset/remote-metadata {:checksum checksum :type asset-type}}]
|
|
|
- ;; Don't generate rtc ops again, (block-ops & asset-ops)
|
|
|
- {:persist-op? false}))
|
|
|
- (client-op/remove-asset-op repo asset-uuid))))
|
|
|
- (c.m/concurrent-exec-flow 3 (m/seed asset-uuid->url))
|
|
|
- (m/reduce (constantly nil))))
|
|
|
+ [repo conn aes-key asset-uuid->url asset-uuid->asset-metadata]
|
|
|
+ (m/sp
|
|
|
+ (let [exported-aes-key (when aes-key (c.m/<? (crypt/<export-aes-key aes-key)))]
|
|
|
+ (m/?
|
|
|
+ (->> (fn [[asset-uuid url]]
|
|
|
+ (m/sp
|
|
|
+ (let [[asset-type checksum] (get asset-uuid->asset-metadata asset-uuid)
|
|
|
+ _ (prn :xxx exported-aes-key)
|
|
|
+ r (c.m/<?
|
|
|
+ (worker-state/<invoke-main-thread :thread-api/rtc-upload-asset
|
|
|
+ repo exported-aes-key (str asset-uuid)
|
|
|
+ asset-type checksum url))]
|
|
|
+ (when (:ex-data r)
|
|
|
+ (throw (ex-info "upload asset failed" r)))
|
|
|
+ ;; asset might be deleted by the user before uploaded successfully
|
|
|
+ (when (d/entity @conn [:block/uuid asset-uuid])
|
|
|
+ (ldb/transact! conn
|
|
|
+ [{:block/uuid asset-uuid
|
|
|
+ :logseq.property.asset/remote-metadata {:checksum checksum :type asset-type}}]
|
|
|
+ ;; Don't generate rtc ops again, (block-ops & asset-ops)
|
|
|
+ {:persist-op? false}))
|
|
|
+ (client-op/remove-asset-op repo asset-uuid))))
|
|
|
+ (c.m/concurrent-exec-flow 3 (m/seed asset-uuid->url))
|
|
|
+ (m/reduce (constantly nil)))))))
|
|
|
|
|
|
(defn- new-task--push-local-asset-updates
|
|
|
- [repo get-ws-create-task conn graph-uuid major-schema-version add-log-fn]
|
|
|
+ [repo get-ws-create-task conn graph-uuid major-schema-version aes-key add-log-fn]
|
|
|
(m/sp
|
|
|
(when-let [asset-ops (not-empty (client-op/get-all-asset-ops repo))]
|
|
|
(let [upload-asset-uuids (keep
|
|
|
@@ -198,7 +207,7 @@
|
|
|
:asset-uuid->url))]
|
|
|
(when (seq asset-uuid->url)
|
|
|
(add-log-fn :rtc.asset.log/upload-assets {:asset-uuids (keys asset-uuid->url)}))
|
|
|
- (m/? (new-task--concurrent-upload-assets repo conn asset-uuid->url asset-uuid->asset-metadata))
|
|
|
+ (m/? (new-task--concurrent-upload-assets repo conn aes-key asset-uuid->url asset-uuid->asset-metadata))
|
|
|
(when (seq remove-asset-uuids)
|
|
|
(add-log-fn :rtc.asset.log/remove-assets {:asset-uuids remove-asset-uuids})
|
|
|
(m/? (ws-util/send&recv get-ws-create-task
|
|
|
@@ -213,7 +222,7 @@
|
|
|
(concat (keys asset-uuid->url) remove-asset-uuids))))))
|
|
|
|
|
|
(defn- new-task--pull-remote-asset-updates
|
|
|
- [repo get-ws-create-task conn graph-uuid add-log-fn asset-update-ops]
|
|
|
+ [repo get-ws-create-task conn graph-uuid aes-key add-log-fn asset-update-ops]
|
|
|
(m/sp
|
|
|
(when (seq asset-update-ops)
|
|
|
(let [update-asset-uuids (keep (fn [op]
|
|
|
@@ -252,7 +261,7 @@
|
|
|
repo (str asset-uuid) asset-type)))
|
|
|
(when (seq asset-uuid->url)
|
|
|
(add-log-fn :rtc.asset.log/download-assets {:asset-uuids (keys asset-uuid->url)}))
|
|
|
- (m/? (new-task--concurrent-download-assets repo asset-uuid->url asset-uuid->asset-type))))))
|
|
|
+ (m/? (new-task--concurrent-download-assets repo aes-key asset-uuid->url asset-uuid->asset-type))))))
|
|
|
|
|
|
(defn- get-all-asset-blocks
|
|
|
[db]
|
|
|
@@ -267,7 +276,7 @@
|
|
|
db))
|
|
|
|
|
|
(defn- new-task--initial-download-missing-assets
|
|
|
- [repo get-ws-create-task graph-uuid conn add-log-fn]
|
|
|
+ [repo get-ws-create-task graph-uuid conn aes-key add-log-fn]
|
|
|
(m/sp
|
|
|
(let [local-all-asset-file-paths
|
|
|
(c.m/<? (worker-state/<invoke-main-thread :thread-api/get-all-asset-file-paths repo))
|
|
|
@@ -279,10 +288,10 @@
|
|
|
(set/difference local-all-asset-uuids local-all-asset-file-uuids)))]
|
|
|
(add-log-fn :rtc.asset.log/initial-download-missing-assets {:count (count asset-update-ops)})
|
|
|
(m/? (new-task--pull-remote-asset-updates
|
|
|
- repo get-ws-create-task conn graph-uuid add-log-fn asset-update-ops))))))
|
|
|
+ repo get-ws-create-task conn graph-uuid aes-key add-log-fn asset-update-ops))))))
|
|
|
|
|
|
(defn create-assets-sync-loop
|
|
|
- [repo get-ws-create-task graph-uuid major-schema-version conn *auto-push?]
|
|
|
+ [repo get-ws-create-task graph-uuid major-schema-version conn *auto-push? *aes-key]
|
|
|
(let [started-dfv (m/dfv)
|
|
|
add-log-fn (fn [type message]
|
|
|
(assert (map? message) message)
|
|
|
@@ -295,18 +304,21 @@
|
|
|
(m/sp
|
|
|
(try
|
|
|
(log/info :rtc-asset :loop-starting)
|
|
|
+ ;; check aes-key exists
|
|
|
+ (when (ldb/get-graph-rtc-e2ee? @conn) (assert @*aes-key))
|
|
|
(started-dfv true)
|
|
|
- (m/? (new-task--initial-download-missing-assets repo get-ws-create-task graph-uuid conn add-log-fn))
|
|
|
+ (m/? (new-task--initial-download-missing-assets
|
|
|
+ repo get-ws-create-task graph-uuid conn @*aes-key add-log-fn))
|
|
|
(->>
|
|
|
(let [event (m/?> mixed-flow)]
|
|
|
(case (:type event)
|
|
|
:remote-updates
|
|
|
(when-let [asset-update-ops (not-empty (:value event))]
|
|
|
(m/? (new-task--pull-remote-asset-updates
|
|
|
- repo get-ws-create-task conn graph-uuid add-log-fn asset-update-ops)))
|
|
|
+ repo get-ws-create-task conn graph-uuid @*aes-key add-log-fn asset-update-ops)))
|
|
|
:local-update-check
|
|
|
(m/? (new-task--push-local-asset-updates
|
|
|
- repo get-ws-create-task conn graph-uuid major-schema-version add-log-fn))))
|
|
|
+ repo get-ws-create-task conn graph-uuid major-schema-version @*aes-key add-log-fn))))
|
|
|
m/ap
|
|
|
(m/reduce {} nil)
|
|
|
m/?)
|