Просмотр исходного кода

refactor(db-worker,wip): remove asset related <invoke-main-thread (2)

rcmerci 1 неделя назад
Родитель
Сommit
504f6fa3ce

+ 5 - 0
resources/js/worker.js

@@ -20,6 +20,11 @@ const createFS = () => new LightningFS(fsName);
 let fs = createFS();
 let pfs = fs.promises;
 
+if (typeof self !== 'undefined') {
+  self.fs = fs;
+  self.pfs = pfs;
+}
+
 if (detect() === 'Worker') {
   const portal = new MagicPortal(self);
   portal.set('fs', fs);

+ 5 - 0
resources/mobile/js/worker.js

@@ -20,6 +20,11 @@ const createFS = () => new LightningFS(fsName);
 let fs = createFS();
 let pfs = fs.promises;
 
+if (typeof self !== 'undefined') {
+  self.fs = fs;
+  self.pfs = pfs;
+}
+
 if (detect() === 'Worker') {
   const portal = new MagicPortal(self);
   portal.set('fs', fs);

+ 2 - 179
src/main/frontend/handler/assets.cljs

@@ -1,23 +1,15 @@
 (ns ^:no-doc frontend.handler.assets
-  (:require [cljs-http-missionary.client :as http]
-            [clojure.string :as string]
-            [frontend.common.crypt :as crypt]
-            [frontend.common.missionary :as c.m]
-            [frontend.common.thread-api :as thread-api :refer [def-thread-api]]
+  (:require [clojure.string :as string]
             [frontend.config :as config]
             [frontend.fs :as fs]
             [frontend.state :as state]
             [frontend.util :as util]
-            [lambdaisland.glogi :as log]
             [logseq.common.config :as common-config]
             [logseq.common.path :as path]
             [logseq.common.util :as common-util]
-            [logseq.db :as ldb]
             [logseq.db.frontend.asset :as db-asset]
             [medley.core :as medley]
-            [missionary.core :as m]
-            [promesa.core :as p])
-  (:import [missionary Cancelled]))
+            [promesa.core :as p]))
 
 (defn exceed-limit-size?
   "Asset size no more than 100M"
@@ -216,28 +208,6 @@
   (p/let [[repo-dir assets-dir] (ensure-assets-dir! (state/get-current-repo))]
     (path/path-join repo-dir assets-dir filename)))
 
-(defn <get-all-asset-file-paths
-  [repo]
-  (when-let [path (config/get-repo-assets-root repo)]
-    (p/catch (fs/readdir path {:path-only? true})
-             (constantly nil))))
-
-(defn <read-asset
-  "Throw if asset not found"
-  [repo asset-block-id asset-type]
-  (let [repo-dir (config/get-repo-dir repo)
-        file-path (path/path-join common-config/local-assets-dir
-                                  (str asset-block-id "." asset-type))]
-    (fs/read-file-raw repo-dir file-path {})))
-
-(defn <get-asset-file-metadata
-  [repo asset-block-id asset-type]
-  (-> (p/let [file (<read-asset repo asset-block-id asset-type)
-              blob (js/Blob. (array file) (clj->js {:type "image"}))
-              checksum (get-file-checksum blob)]
-        {:checksum checksum})
-      (p/catch (constantly nil))))
-
 (defn- asset-transfer-in-progress?
   [progress-entry]
   (let [{:keys [loaded total]} progress-entry]
@@ -268,150 +238,3 @@
        repo
        (:block/uuid asset-block))
       true)))
-
-(defn <write-asset
-  [repo asset-block-id asset-type data]
-  (let [asset-block-id-str (str asset-block-id)
-        file-name (str asset-block-id-str "." asset-type)]
-    (p/do!
-     (fs/write-asset-file! repo file-name data)
-     (state/update-state!
-      :assets/asset-file-write-finish
-      (fn [m] (assoc-in m [repo asset-block-id-str] (common-util/time-ms)))))))
-
-(comment
-  ;; en/decrypt assets
-  (def repo (state/get-current-repo))
-  (p/let [aes-key (crypt/<generate-aes-key)
-          asset (<read-asset repo "6903201e-9573-4914-ae88-7d3f1d095d1f" "png")
-          encrypted-asset (crypt/<encrypt-uint8array aes-key asset)
-          decrypted-asset (crypt/<decrypt-uint8array aes-key encrypted-asset)]
-    (def asset asset)
-    (def xxxx encrypted-asset)
-    (prn :decrypted (.-length decrypted-asset)
-         :origin (.-length asset))))
-
-(defn <unlink-asset
-  [repo asset-block-id asset-type]
-  (let [file-path (path/path-join (config/get-repo-dir repo)
-                                  common-config/local-assets-dir
-                                  (str asset-block-id "." asset-type))]
-    (p/catch (fs/unlink! repo file-path {}) (constantly nil))))
-
-(defn new-task--rtc-upload-asset
-  [repo aes-key asset-block-uuid-str asset-type checksum put-url & {:keys [extra-headers]}]
-  (assert (and asset-type checksum))
-  (m/sp
-    (let [asset-file (try (c.m/<? (<read-asset repo asset-block-uuid-str asset-type))
-                          (catch :default e
-                            (log/info :read-asset e)
-                            (throw (ex-info "read-asset failed" {:type :rtc.exception/read-asset-failed} e))))
-          asset-file (if aes-key
-                       (->uint8 asset-file)
-                       asset-file)
-          asset-file* (if (not aes-key)
-                        asset-file
-                        (ldb/write-transit-str
-                         (c.m/<? (crypt/<encrypt-uint8array aes-key asset-file))))
-          *progress-flow (atom nil)
-          headers (merge extra-headers
-                         {"x-amz-meta-checksum" checksum
-                          "x-amz-meta-type" asset-type})
-          http-task (http/put put-url {:headers headers
-                                       :body asset-file*
-                                       :with-credentials? false
-                                       :*progress-flow *progress-flow})]
-      (c.m/run-task :upload-asset-progress
-        (m/reduce (fn [_ v]
-                    (state/update-state!
-                     :rtc/asset-upload-download-progress
-                     (fn [m] (assoc-in m [repo asset-block-uuid-str] v))))
-                  @*progress-flow)
-        :succ (constantly nil))
-      (let [{:keys [status] :as r} (m/? http-task)]
-        (when-not (http/unexceptional-status? status)
-          (throw (ex-info "upload-asset failed"
-                          {:type :rtc.exception/upload-asset-failed :data (dissoc r :body)})))))))
-
-(defn new-task--rtc-download-asset
-  [repo aes-key asset-block-uuid-str asset-type get-url & {:keys [extra-headers]}]
-  (m/sp
-    (let [*progress-flow (atom nil)
-          http-task (http/get get-url {:with-credentials? false
-                                       :response-type :array-buffer
-                                       :headers extra-headers
-                                       :*progress-flow *progress-flow})
-          progress-canceler
-          (c.m/run-task :download-asset-progress
-            (m/reduce (fn [_ v]
-                        (state/update-state!
-                         :rtc/asset-upload-download-progress
-                         (fn [m] (assoc-in m [repo asset-block-uuid-str] v))))
-                      @*progress-flow)
-            :succ (constantly nil))]
-      (try
-        (let [{:keys [status body] :as r} (m/? http-task)]
-          (if-not (http/unexceptional-status? status)
-            (throw (ex-info "download asset failed"
-                            {:type :rtc.exception/download-asset-failed :data (dissoc r :body)}))
-            (let [asset-file
-                  (if (not aes-key)
-                    body
-                    (try
-                      (let [asset-file-untransited (ldb/read-transit-str (.decode (js/TextDecoder.) body))]
-                        (c.m/<? (crypt/<decrypt-uint8array aes-key asset-file-untransited)))
-                      (catch js/SyntaxError _
-                        body)
-                      (catch :default e
-                         ;; if decrypt failed, write origin-body
-                        (if (= "decrypt-uint8array" (ex-message e))
-                          body
-                          (throw e)))))]
-              (c.m/<? (<write-asset repo asset-block-uuid-str asset-type asset-file))
-              nil)))
-        (catch Cancelled e
-          (progress-canceler)
-          (throw e))))))
-
-(def-thread-api :thread-api/unlink-asset
-  [repo asset-block-id asset-type]
-  (<unlink-asset repo asset-block-id asset-type))
-
-(def-thread-api :thread-api/get-all-asset-file-paths
-  [repo]
-  (<get-all-asset-file-paths repo))
-
-(def-thread-api :thread-api/get-asset-file-metadata
-  [repo asset-block-id asset-type]
-  (<get-asset-file-metadata repo asset-block-id asset-type))
-
-(def-thread-api :thread-api/rtc-upload-asset
-  [repo exported-aes-key asset-block-uuid-str asset-type checksum put-url & {:as opts}]
-  (m/sp
-    (let [aes-key (when exported-aes-key (c.m/<? (crypt/<import-aes-key exported-aes-key)))]
-      (m/? (new-task--rtc-upload-asset repo aes-key asset-block-uuid-str asset-type checksum put-url opts)))))
-
-(def-thread-api :thread-api/rtc-download-asset
-  [repo exported-aes-key asset-block-uuid-str asset-type get-url & {:as opts}]
-  (m/sp
-    (let [aes-key (when exported-aes-key (c.m/<? (crypt/<import-aes-key exported-aes-key)))]
-      (m/? (new-task--rtc-download-asset repo aes-key asset-block-uuid-str asset-type get-url opts)))))
-
-(comment
-  ;; read asset
-  (p/let [repo "logseq_db_demo"
-          ;; Existing asset block's id
-          asset-block-id-str "672c5a1d-8171-4259-9f35-470c3c67e37f"
-          asset-type "png"
-          data (<read-asset repo asset-block-id-str asset-type)]
-    (js/console.dir data))
-
-  ;; write asset
-  (p/let [repo "logseq_db_demo"
-          ;; Existing asset block's id
-          asset-block-id-str "672c5a1d-8171-4259-9f35-470c3c67e37f"
-          asset-type "png"
-          data (<read-asset repo asset-block-id-str asset-type)
-          new-asset-id (random-uuid)
-          result (<write-asset repo new-asset-id asset-type data)]
-    (js/console.dir result)))

+ 12 - 0
src/main/frontend/handler/worker.cljs

@@ -23,6 +23,18 @@
   (let [state data]
     (state/pub-event! [:rtc/sync-state state])))
 
+(defmethod handle :rtc-asset-upload-download-progress [_ _worker {:keys [repo asset-id progress]}]
+  (when (and (seq repo) (seq asset-id) (map? progress))
+    (state/update-state!
+     :rtc/asset-upload-download-progress
+     (fn [m] (assoc-in m [repo asset-id] progress)))))
+
+(defmethod handle :asset-file-write-finish [_ _worker {:keys [repo asset-id ts]}]
+  (when (and (seq repo) (seq asset-id))
+    (state/update-state!
+     :assets/asset-file-write-finish
+     (fn [m] (assoc-in m [repo asset-id] (or ts (.now js/Date)))))))
+
 (defmethod handle :vector-search-sync-state [_ _worker data]
   (state/pub-event! [:vector-search/sync-state data]))
 

+ 32 - 0
src/main/frontend/worker/platform.cljs

@@ -75,6 +75,38 @@
     (f path text)
     (throw (ex-info "platform storage/write-text! missing" {:path path}))))
 
+(defn asset-read-bytes!
+  [platform repo file-name]
+  (if-let [f (get-in platform [:storage :asset-read-bytes!])]
+    (f repo file-name)
+    (throw (ex-info "platform storage/asset-read-bytes! missing"
+                    {:repo repo
+                     :file-name file-name}))))
+
+(defn asset-write-bytes!
+  [platform repo file-name payload]
+  (if-let [f (get-in platform [:storage :asset-write-bytes!])]
+    (f repo file-name payload)
+    (throw (ex-info "platform storage/asset-write-bytes! missing"
+                    {:repo repo
+                     :file-name file-name}))))
+
+(defn asset-stat
+  [platform repo file-name]
+  (if-let [f (get-in platform [:storage :asset-stat])]
+    (f repo file-name)
+    (throw (ex-info "platform storage/asset-stat missing"
+                    {:repo repo
+                     :file-name file-name}))))
+
+(defn asset-delete!
+  [platform repo file-name]
+  (if-let [f (get-in platform [:storage :asset-delete!])]
+    (f repo file-name)
+    (throw (ex-info "platform storage/asset-delete! missing"
+                    {:repo repo
+                     :file-name file-name}))))
+
 (defn save-secret-text!
   [platform key text]
   (if-let [f (get-in platform [:crypto :save-secret-text!])]

+ 66 - 0
src/main/frontend/worker/platform/browser.cljs

@@ -7,6 +7,8 @@
             [frontend.common.file.opfs :as opfs]
             [frontend.worker-common.util :as worker-util]
             [lambdaisland.glogi :as log]
+            [logseq.common.config :as common-config]
+            [logseq.common.path :as path]
             [promesa.core :as p]))
 
 (defn- iter->vec
@@ -104,6 +106,66 @@
   [path text]
   (opfs/<write-text! path text))
 
+(defn- browser-pfs
+  []
+  (or (some-> js/globalThis .-window .-pfs)
+      (some-> js/globalThis .-pfs)
+      (throw (ex-info "browser pfs is not available" {}))))
+
+(defn- graph-assets-dir
+  [repo]
+  (when-let [graph-name (some-> repo common-config/strip-leading-db-version-prefix)]
+    (str "/" graph-name "/assets")))
+
+(defn- ensure-pfs-dir!
+  [^js pfs dir]
+  (cond
+    (or (nil? dir) (= "" dir) (= "/" dir) (= "." dir))
+    (p/resolved nil)
+
+    :else
+    (-> (.stat pfs dir)
+        (p/then (constantly nil))
+        (p/catch
+         (fn [_]
+           (p/do!
+            (ensure-pfs-dir! pfs (path/parent dir))
+            (.mkdir pfs dir)))))))
+
+(defn- asset-path
+  [repo file-name]
+  (if-let [assets-dir (graph-assets-dir repo)]
+    (path/path-join assets-dir file-name)
+    (throw (ex-info "missing repo for browser asset path"
+                    {:repo repo
+                     :file-name file-name}))))
+
+(defn- asset-read-bytes!
+  [repo file-name]
+  (.readFile (browser-pfs) (asset-path repo file-name)))
+
+(defn- asset-write-bytes!
+  [repo file-name payload]
+  (let [^js pfs (browser-pfs)
+        file-path (asset-path repo file-name)]
+    (p/do!
+     (ensure-pfs-dir! pfs (path/parent file-path))
+     (.writeFile pfs file-path payload))))
+
+(defn- asset-stat
+  [repo file-name]
+  (let [^js pfs (browser-pfs)]
+    (-> (.stat pfs (asset-path repo file-name))
+        (p/then (fn [^js stat]
+                  {:size (.-size stat)
+                   :type (.-type stat)}))
+        (p/catch (constantly nil)))))
+
+(defn- asset-delete!
+  [repo file-name]
+  (-> (.unlink (browser-pfs) (asset-path repo file-name))
+      (p/catch (constantly nil))))
+
 (defn- websocket-connect
   [url]
   (js/WebSocket. url))
@@ -133,6 +195,10 @@
              :remove-vfs! remove-vfs!
              :read-text! read-text!
              :write-text! write-text!
+             :asset-read-bytes! asset-read-bytes!
+             :asset-write-bytes! asset-write-bytes!
+             :asset-stat asset-stat
+             :asset-delete! asset-delete!
              :transfer (fn [data transferables]
                          (Comlink/transfer data transferables))}
    :kv {:get kv-get

+ 44 - 1
src/main/frontend/worker/platform/node.cljs

@@ -260,6 +260,41 @@
             _ (ensure-dir! dir)]
       (fs/writeFile full-path text "utf8"))))
 
+(defn- asset-file-path
+  [data-dir repo file-name]
+  (node-path/join (repo-dir data-dir repo)
+                  common-config/local-assets-dir
+                  file-name))
+
+(defn- asset-read-bytes!
+  [data-dir repo file-name]
+  (fs/readFile (asset-file-path data-dir repo file-name)))
+
+(defn- asset-write-bytes!
+  [write-guard-fn data-dir repo file-name payload]
+  (let [full-path (asset-file-path data-dir repo file-name)
+        dir (node-path/dirname full-path)]
+    (p/let [_ (when write-guard-fn
+                (write-guard-fn))
+            _ (ensure-dir! dir)]
+      (fs/writeFile full-path (->buffer payload)))))
+
+(defn- asset-stat
+  [data-dir repo file-name]
+  (-> (fs/stat (asset-file-path data-dir repo file-name))
+      (p/then (fn [^js stat]
+                {:size (.-size stat)
+                 :is-file? (.isFile stat)}))
+      (p/catch (constantly nil))))
+
+(defn- asset-delete!
+  [write-guard-fn data-dir repo file-name]
+  (let [full-path (asset-file-path data-dir repo file-name)]
+    (p/let [_ (when write-guard-fn
+                (write-guard-fn))]
+      (-> (fs/rm full-path #js {:force true})
+          (p/catch (constantly nil))))))
+
 (defn- websocket-connect
   [url]
   (ws. url))
@@ -350,7 +385,15 @@
                 :import-db (fn [pool path data] (import-db write-guard-fn pool path data))
                 :remove-vfs! (fn [pool] (remove-vfs! write-guard-fn pool))
                 :read-text! (fn [path] (read-text! data-dir path))
-                :write-text! (fn [path text] (write-text! write-guard-fn data-dir path text))}
+                :write-text! (fn [path text] (write-text! write-guard-fn data-dir path text))
+                :asset-read-bytes! (fn [repo file-name]
+                                     (asset-read-bytes! data-dir repo file-name))
+                :asset-write-bytes! (fn [repo file-name payload]
+                                      (asset-write-bytes! write-guard-fn data-dir repo file-name payload))
+                :asset-stat (fn [repo file-name]
+                              (asset-stat data-dir repo file-name))
+                :asset-delete! (fn [repo file-name]
+                                 (asset-delete! write-guard-fn data-dir repo file-name))}
       :kv {:get (:get kv)
            :set! (:set! kv)}
       :broadcast {:post-message! (fn [type payload]

+ 194 - 35
src/main/frontend/worker/sync/assets.cljs

@@ -3,38 +3,149 @@
   (:require
    [datascript.core :as d]
    [frontend.common.crypt :as crypt]
+   [frontend.worker.platform :as platform]
+   [frontend.worker.shared-service :as shared-service]
    [frontend.worker.state :as worker-state]
    [frontend.worker.sync.auth :as sync-auth]
    [frontend.worker.sync.client-op :as client-op]
    [frontend.worker.sync.crypt :as sync-crypt]
    [frontend.worker.sync.large-title :as sync-large-title]
    [lambdaisland.glogi :as log]
+   [logseq.common.util :as common-util]
    [logseq.db :as ldb]
    [promesa.core :as p]))
 
 (def max-asset-size (* 100 1024 1024))
 
-(defn exported-graph-aes-key
+(defn graph-aes-key
   [repo graph-id fail-fast-f]
   (if (sync-crypt/graph-e2ee? repo)
     (p/let [aes-key (sync-crypt/<ensure-graph-aes-key repo graph-id)
             _ (when (nil? aes-key)
                 (fail-fast-f :db-sync/missing-field {:repo repo :field :aes-key}))]
-      (crypt/<export-aes-key aes-key))
+      aes-key)
     (p/resolved nil)))
 
+(defn- asset-file-name
+  [asset-uuid asset-type]
+  (str asset-uuid "." asset-type))
+
+(defn- ->uint8
+  [payload]
+  (cond
+    (instance? js/Uint8Array payload)
+    payload
+
+    (instance? js/ArrayBuffer payload)
+    (js/Uint8Array. payload)
+
+    (and (exists? js/ArrayBuffer)
+         (.isView js/ArrayBuffer payload))
+    (js/Uint8Array. (.-buffer payload) (.-byteOffset payload) (.-byteLength payload))
+
+    (array? payload)
+    (js/Uint8Array. payload)
+
+    (sequential? payload)
+    (js/Uint8Array. (clj->js payload))
+
+    (and (object? payload)
+         (= "Buffer" (aget payload "type"))
+         (array? (aget payload "data")))
+    (js/Uint8Array. (aget payload "data"))
+
+    :else
+    (throw (ex-info "unsupported binary payload"
+                    {:payload-type (str (type payload))}))))
+
+(defn- payload-size
+  [payload]
+  (cond
+    (string? payload) (count payload)
+    (some? (.-byteLength payload)) (.-byteLength payload)
+    (some? (.-length payload)) (.-length payload)
+    :else 0))
+
+(defn- notify-asset-progress!
+  [repo asset-id direction loaded total]
+  (shared-service/broadcast-to-clients!
+   :rtc-asset-upload-download-progress
+   {:repo repo
+    :asset-id asset-id
+    :progress {:direction direction
+               :loaded loaded
+               :total total}}))
+
+(defn- mark-asset-write-finish!
+  [repo asset-id]
+  (shared-service/broadcast-to-clients!
+   :asset-file-write-finish
+   {:repo repo
+    :asset-id asset-id
+    :ts (common-util/time-ms)}))
+
+(defn- <read-asset-bytes
+  [repo asset-id asset-type]
+  (platform/asset-read-bytes! (platform/current)
+                              repo
+                              (asset-file-name asset-id asset-type)))
+
+(defn- <write-asset-bytes!
+  [repo asset-id asset-type payload]
+  (p/let [_ (platform/asset-write-bytes! (platform/current)
+                                         repo
+                                         (asset-file-name asset-id asset-type)
+                                         payload)]
+    (mark-asset-write-finish! repo asset-id)
+    nil))
+
 (defn upload-remote-asset!
   [repo graph-id asset-uuid asset-type checksum]
   (let [base (sync-auth/http-base-url @worker-state/*db-sync-config)]
     (if (and (seq base) (seq graph-id) (seq asset-type) (seq checksum))
-      (p/let [exported-aes-key (exported-graph-aes-key
-                                repo graph-id
-                                (fn [tag data]
-                                  (throw (ex-info (name tag) data))))]
-        (worker-state/<invoke-main-thread :thread-api/rtc-upload-asset
-                                          repo exported-aes-key (str asset-uuid) asset-type checksum
-                                          (sync-large-title/asset-url base graph-id (str asset-uuid) asset-type)
-                                          {:extra-headers (sync-auth/auth-headers (worker-state/get-id-token))}))
+      (-> (p/let [aes-key (graph-aes-key
+                           repo graph-id
+                           (fn [tag data]
+                             (throw (ex-info (name tag) data))) )
+                  asset-id (str asset-uuid)
+                  put-url (sync-large-title/asset-url base graph-id asset-id asset-type)
+                  asset-file (try
+                               (<read-asset-bytes repo asset-id asset-type)
+                               (catch :default e
+                                 (log/info :read-asset e)
+                                 (throw (ex-info "read-asset failed"
+                                                 {:type :rtc.exception/read-asset-failed}
+                                                 e))))
+                  asset-file (if aes-key (->uint8 asset-file) asset-file)
+                  payload (if (not aes-key)
+                            asset-file
+                            (ldb/write-transit-str
+                             (crypt/<encrypt-uint8array aes-key asset-file)))
+                  total (payload-size payload)
+                  _ (notify-asset-progress! repo asset-id :upload 0 total)
+                  headers (merge (sync-auth/auth-headers (worker-state/get-id-token))
+                                 {"x-amz-meta-checksum" checksum
+                                  "x-amz-meta-type" asset-type})
+                  ^js resp (js/fetch put-url
+                                     (clj->js {:method "PUT"
+                                               :headers headers
+                                               :body payload}))
+                  status (.-status resp)
+                  _ (notify-asset-progress! repo asset-id :upload total total)]
+            (when-not (.-ok resp)
+              (throw (ex-info "upload-asset failed"
+                              {:type :rtc.exception/upload-asset-failed
+                               :data {:status status}})))
+            nil)
+          (p/catch
+           (fn [e]
+             (if (contains? #{:rtc.exception/read-asset-failed
+                              :rtc.exception/upload-asset-failed}
+                            (:type (ex-data e)))
+               (p/rejected e)
+               (p/rejected (ex-info "upload-asset failed"
+                                    {:type :rtc.exception/upload-asset-failed}
+                                    e))))))
       (p/rejected (ex-info "missing asset upload info"
                            {:repo repo
                             :asset-uuid asset-uuid
@@ -166,18 +277,60 @@
       :broadcast-rtc-state!-f broadcast-rtc-state!-f
       :fail-fast-f fail-fast-f})))
 
+(defn- parse-content-length
+  [^js resp]
+  (when-let [content-length (some-> (.-headers resp) (.get "content-length"))]
+    (let [length (js/parseInt content-length 10)]
+      (when (not (js/isNaN length))
+        length))))
+
 (defn download-remote-asset!
   [repo graph-id asset-uuid asset-type]
   (let [base (sync-auth/http-base-url @worker-state/*db-sync-config)]
     (if (and (seq base) (seq graph-id) (seq asset-type))
-      (p/let [exported-aes-key (exported-graph-aes-key
-                                repo graph-id
-                                (fn [tag data]
-                                  (throw (ex-info (name tag) data))))]
-        (worker-state/<invoke-main-thread :thread-api/rtc-download-asset
-                                          repo exported-aes-key (str asset-uuid) asset-type
-                                          (sync-large-title/asset-url base graph-id (str asset-uuid) asset-type)
-                                          {:extra-headers (sync-auth/auth-headers (worker-state/get-id-token))}))
+      (-> (p/let [aes-key (graph-aes-key
+                           repo graph-id
+                           (fn [tag data]
+                             (throw (ex-info (name tag) data))))
+                  asset-id (str asset-uuid)
+                  get-url (sync-large-title/asset-url base graph-id asset-id asset-type)
+                  headers (sync-auth/auth-headers (worker-state/get-id-token))
+                  request-opts (cond-> {:method "GET"}
+                                 (seq headers) (assoc :headers headers))
+                  ^js resp (js/fetch get-url
+                                     (clj->js request-opts))
+                  status (.-status resp)
+                  _ (when-not (.-ok resp)
+                      (throw (ex-info "download asset failed"
+                                      {:type :rtc.exception/download-asset-failed
+                                       :data {:status status}})))
+                  total (or (parse-content-length resp) 0)
+                  _ (notify-asset-progress! repo asset-id :download 0 total)
+                  body (.arrayBuffer resp)
+                  body-size (.-byteLength body)
+                  total' (if (pos? total) total body-size)
+                  _ (notify-asset-progress! repo asset-id :download body-size total')
+                  asset-file
+                  (if (not aes-key)
+                    body
+                    (try
+                      (let [asset-file-untransited (ldb/read-transit-str (.decode (js/TextDecoder.) body))]
+                        (crypt/<decrypt-uint8array aes-key asset-file-untransited))
+                      (catch js/SyntaxError _
+                        body)
+                      (catch :default e
+                        ;; if decrypt failed, write origin-body
+                        (if (= "decrypt-uint8array" (ex-message e))
+                          body
+                          (throw e)))))]
+            (<write-asset-bytes! repo asset-id asset-type asset-file))
+          (p/catch
+           (fn [e]
+             (if (= :rtc.exception/download-asset-failed (:type (ex-data e)))
+               (p/rejected e)
+               (p/rejected (ex-info "download asset failed"
+                                    {:type :rtc.exception/download-asset-failed}
+                                    e))))))
       (p/rejected (ex-info "missing asset download info"
                            {:repo repo
                             :asset-uuid asset-uuid
@@ -193,22 +346,28 @@
        client
        #(when-let [conn (worker-state/get-datascript-conn repo)]
           (when-let [ent (d/entity @conn [:block/uuid asset-uuid])]
-            (let [asset-type (:logseq.property.asset/type ent)]
-              (-> (p/let [meta (when (seq asset-type)
-                                 (worker-state/<invoke-main-thread
-                                  :thread-api/get-asset-file-metadata
-                                  repo (str asset-uuid) asset-type))]
-                    (when (and (seq asset-type)
-                               (:logseq.property.asset/remote-metadata ent)
-                               (nil? meta))
-                      (p/let [_ (download-remote-asset! repo graph-id asset-uuid asset-type)]
-                        (when (d/entity @conn [:block/uuid asset-uuid])
-                          (ldb/transact!
-                           conn
-                           [{:block/uuid asset-uuid
-                             :logseq.property.asset/remote-metadata nil}]
-                           {:persist-op? true}))
-                        (client-op/remove-asset-op repo asset-uuid)
-                        (broadcast-rtc-state!-f client))))
+            (let [asset-type (:logseq.property.asset/type ent)
+                  asset-id (str asset-uuid)
+                  should-download? (and (seq asset-type)
+                                        (:logseq.property.asset/remote-metadata ent))]
+              (-> (p/let [meta (when should-download?
+                                 (platform/asset-stat (platform/current)
+                                                      repo
+                                                      (asset-file-name asset-id asset-type)))
+                          missing-local? (and should-download? (nil? meta))
+                          _ (when missing-local?
+                              (download-remote-asset! repo graph-id asset-uuid asset-type))
+                          _ (when missing-local?
+                              (when-let [target-ent (d/entity @conn [:block/uuid asset-uuid])]
+                                (ldb/transact!
+                                 conn
+                                 [[:db/retract (:db/id target-ent)
+                                   :logseq.property.asset/remote-metadata]]
+                                 {:persist-op? true})))
+                          _ (when missing-local?
+                              (client-op/remove-asset-op repo asset-uuid))
+                          _ (when missing-local?
+                              (broadcast-rtc-state!-f client))]
+                    nil)
                   (p/catch (fn [e]
                              (js/console.error e)))))))))))