Selaa lähdekoodia

fix(rtc): fix assets-sync-loop stopped, fix read-asset err behaviour

- ensure rtc-loop stops when assets-sync-loop stopped
- ensure read-file/read-file-raw have same behaviour on web and electron
- handle exceptions between ui-thread <-> worker-thread
- ignore upload-asset if not exist at local
- ignore download-asset if not exist at remote
rcmerci 5 päivää sitten
vanhempi
sitoutus
285eb612a1

+ 2 - 0
deps/db/src/logseq/db/frontend/property.cljs

@@ -516,6 +516,8 @@
                                              {:type :map
                                               :hide? true
                                               :public? false}
+                                             :properties
+                                             {:logseq.property/description "Metadata of asset in remote storage"}
                                              :rtc property-ignore-rtc}
      :logseq.property.asset/resize-metadata {:title "Asset resize metadata"
                                              :schema {:type :map

+ 9 - 6
src/main/frontend/fs/memory_fs.cljs

@@ -74,11 +74,14 @@
 (defn- read-file-aux
   [dir path {:keys [text?]
              :as options}]
-  (p/let [fpath (path/url-to-path (path/path-join dir path))
-          result (js/window.pfs.readFile fpath (clj->js options))]
-    (if text?
-      (.toString ^js result)
-      result)))
+  (-> (p/let [fpath (path/url-to-path (path/path-join dir path))
+              result (js/window.pfs.readFile fpath (clj->js options))]
+        (if text?
+          (.toString ^js result)
+          result))
+      (p/catch
+          ;; ensure throw ex-info both on web & electron, not string or js/Error
+          (fn [e] (ex-info (str e) {})))))
 
 (defrecord MemoryFs []
   protocol/Fs
@@ -101,7 +104,7 @@
                       (mapv #(path/path-join "memory://" %) rpaths)))
             (p/catch (fn [error]
                        (println "(memory-fs)Readdir error: " error)
-                       (p/rejected error)))))))
+                       (p/rejected (ex-info (str error) {} error))))))))
 
   (unlink! [_this _repo path opts]
     (when js/window.pfs

+ 12 - 7
src/main/frontend/fs/node.cljs

@@ -65,6 +65,10 @@
           result (bean/->clj result)]
     result))
 
+(defn- wrap-throw-ex-info
+  [p]
+  (p/catch p (fn [e] (throw (ex-info (str e) {})))))
+
 (defrecord Node []
   protocol/Fs
   (mkdir! [_this dir]
@@ -77,9 +81,10 @@
   (mkdir-recur! [_this dir]
     (ipc/ipc "mkdir-recur" dir))
 
-  (readdir [_this dir]                   ; recursive
-    (p/then (ipc/ipc "readdir" dir)
-            bean/->clj))
+  (readdir [_this dir]                  ; recursive
+    (wrap-throw-ex-info
+     (p/then (ipc/ipc "readdir" dir)
+             bean/->clj)))
 
   (unlink! [_this repo path _opts]
     (ipc/ipc "unlink"
@@ -93,19 +98,19 @@
     (let [path (if (nil? dir)
                  path
                  (path/path-join dir path))]
-      (ipc/ipc "readFile" path)))
+      (wrap-throw-ex-info (ipc/ipc "readFile" path))))
 
   (read-file-raw [_this dir path _options]
     (let [path (if (nil? dir)
                  path
                  (path/path-join dir path))]
-      (ipc/ipc "readFileRaw" path)))
+      (wrap-throw-ex-info (ipc/ipc "readFileRaw" path))))
 
   (write-file! [this repo dir path content opts]
     (p/let [fpath (path/path-join dir path)
             stat (p/catch
-                  (protocol/stat this fpath)
-                  (fn [_e] :not-found))
+                     (protocol/stat this fpath)
+                     (fn [_e] :not-found))
             parent-dir (path/parent fpath)
             _ (protocol/mkdir-recur! this parent-dir)]
       (write-file-impl! repo dir path content opts stat)))

+ 10 - 4
src/main/frontend/handler/assets.cljs

@@ -8,6 +8,7 @@
             [frontend.fs :as fs]
             [frontend.state :as state]
             [frontend.util :as util]
+            [lambdaisland.glogi :as log]
             [logseq.common.config :as common-config]
             [logseq.common.path :as path]
             [logseq.common.util :as common-util]
@@ -222,6 +223,7 @@
              (constantly nil))))
 
 (defn <read-asset
+  "Throw if asset not found"
   [repo asset-block-id asset-type]
   (let [repo-dir (config/get-repo-dir repo)
         file-path (path/path-join common-config/local-assets-dir
@@ -271,7 +273,10 @@
   [repo aes-key asset-block-uuid-str asset-type checksum put-url]
   (assert (and asset-type checksum))
   (m/sp
-    (let [asset-file (c.m/<? (<read-asset repo asset-block-uuid-str asset-type))
+    (let [asset-file (try (c.m/<? (<read-asset repo asset-block-uuid-str asset-type))
+                          (catch :default e
+                            (log/info :read-asset e)
+                            (throw (ex-info "read-asset failed" {:type :rtc.exception/read-asset-failed} e))))
           asset-file* (if (not aes-key)
                         asset-file
                         (ldb/write-transit-str
@@ -291,7 +296,8 @@
         :succ (constantly nil))
       (let [{:keys [status] :as r} (m/? http-task)]
         (when-not (http/unexceptional-status? status)
-          {:ex-data {:type :rtc.exception/upload-asset-failed :data (dissoc r :body)}})))))
+          (throw (ex-info "upload-asset failed"
+                          {:type :rtc.exception/upload-asset-failed :data (dissoc r :body)})))))))
 
 (defn new-task--rtc-download-asset
   [repo aes-key asset-block-uuid-str asset-type get-url]
@@ -311,7 +317,8 @@
       (try
         (let [{:keys [status body] :as r} (m/? http-task)]
           (if-not (http/unexceptional-status? status)
-            {:ex-data {:type :rtc.exception/download-asset-failed :data (dissoc r :body)}}
+            (throw (ex-info "download asset failed"
+                            {:type :rtc.exception/download-asset-failed :data (dissoc r :body)}))
             (let [asset-file
                   (if (not aes-key)
                     body
@@ -327,7 +334,6 @@
                           (throw e)))))]
               (c.m/<? (<write-asset repo asset-block-uuid-str asset-type asset-file))
               nil)))
-
         (catch Cancelled e
           (progress-canceler)
           (throw e))))))

+ 45 - 24
src/main/frontend/worker/rtc/asset.cljs

@@ -126,14 +126,17 @@
       (m/?
        (->> (fn [[asset-uuid url]]
               (m/sp
-                (let [r (c.m/<?
-                         (worker-state/<invoke-main-thread :thread-api/rtc-download-asset
-                                                           repo exported-aes-key (str asset-uuid)
-                                                           (get asset-uuid->asset-type asset-uuid) url))]
-                  (when-let [edata (:ex-data r)]
-                    ;; if download-url return 404, ignore this asset
-                    (when (not= 404 (:status (:data edata)))
-                      (throw (ex-info "download asset failed" r)))))))
+                (try
+                  (c.m/<?
+                   (worker-state/<invoke-main-thread :thread-api/rtc-download-asset
+                                                     repo exported-aes-key (str asset-uuid)
+                                                     (get asset-uuid->asset-type asset-uuid) url))
+                  (catch :default e
+                    (when-let [edata (ex-data e)]
+                      ;; if download-url return 404, ignore this asset
+                      (when (not= 404 (:status (:data edata)))
+                        (throw (ex-info "download asset error(not= 404)" e)))) ()))))
+
             (c.m/concurrent-exec-flow 5 (m/seed asset-uuid->url))
             (m/reduce (constantly nil)))))))
 
@@ -146,20 +149,31 @@
        (->> (fn [[asset-uuid url]]
               (m/sp
                 (let [[asset-type checksum] (get asset-uuid->asset-metadata asset-uuid)
-                      r (c.m/<?
-                         (worker-state/<invoke-main-thread :thread-api/rtc-upload-asset
-                                                           repo exported-aes-key (str asset-uuid)
-                                                           asset-type checksum url))]
-                  (when (:ex-data r)
-                    (throw (ex-info "upload asset failed" r)))
-                  ;; asset might be deleted by the user before uploaded successfully
-                  (when (d/entity @conn [:block/uuid asset-uuid])
-                    (ldb/transact! conn
-                                   [{:block/uuid asset-uuid
-                                     :logseq.property.asset/remote-metadata {:checksum checksum :type asset-type}}]
-                                   ;; Don't generate rtc ops again, (block-ops & asset-ops)
-                                   {:persist-op? false}))
-                  (client-op/remove-asset-op repo asset-uuid))))
+                      r (try
+                          (c.m/<?
+                           (worker-state/<invoke-main-thread :thread-api/rtc-upload-asset
+                                                             repo exported-aes-key (str asset-uuid)
+                                                             asset-type checksum url))
+                          nil
+                          (catch :default e e))]
+                  (case (:type (ex-data r))
+                    :rtc.exception/read-asset-failed ;asset not found, ignore
+                    (client-op/remove-asset-op repo asset-uuid)
+
+                    :rtc.exception/upload-asset-failed  ;upload to remote failed, maybe try later
+                    nil
+
+                    ;; else
+                    (do
+                      ;; asset might be deleted by the user before uploaded successfully
+                      (when (d/entity @conn [:block/uuid asset-uuid])
+                        (ldb/transact!
+                         conn
+                         [{:block/uuid asset-uuid
+                           :logseq.property.asset/remote-metadata {:checksum checksum :type asset-type}}]
+                         ;; Don't generate rtc ops again, (block-ops & asset-ops)
+                         {:persist-op? false}))
+                      (client-op/remove-asset-op repo asset-uuid))))))
             (c.m/concurrent-exec-flow 3 (m/seed asset-uuid->url))
             (m/reduce (constantly nil)))))))
 
@@ -267,7 +281,8 @@
   (d/q '[:find [(pull ?b [:block/uuid
                           :logseq.property.asset/type
                           :logseq.property.asset/size
-                          :logseq.property.asset/checksum])
+                          :logseq.property.asset/checksum
+                          :logseq.property.asset/remote-metadata])
                 ...]
          :where
          [?b :block/uuid]
@@ -280,7 +295,13 @@
     (let [local-all-asset-file-paths
           (c.m/<? (worker-state/<invoke-main-thread :thread-api/get-all-asset-file-paths repo))
           local-all-asset-file-uuids (set (map (comp parse-uuid path/file-stem) local-all-asset-file-paths))
-          local-all-asset-uuids (set (map :block/uuid (get-all-asset-blocks @conn)))]
+          local-all-asset-uuids (into
+                                 #{}
+                                 ;; Only if the asset-block contains :logseq.property.asset/remote-metadata
+                                 ;; does the asset exist remotely.
+                                 (comp (filter :logseq.property.asset/remote-metadata)
+                                       (map :block/uuid))
+                                 (get-all-asset-blocks @conn))]
       (when-let [asset-update-ops
                  (not-empty
                   (map (fn [asset-uuid] {:op :update-asset :block/uuid asset-uuid})

+ 22 - 7
src/main/frontend/worker/rtc/core.cljs

@@ -116,8 +116,9 @@
   `:local-update-check`: event to notify to check if there're some new local-updates, then push to remote.
   `:online-users-updated`: online users info updated
   `:pull-remote-updates`: pull remote updates
-  `:inject-users-info`: notify server to inject users-info into the graph"
-  [repo get-ws-create-task *auto-push? *online-users]
+  `:inject-users-info`: notify server to inject users-info into the graph
+  `:assets-sync-loop-stopped`: assets-sync-loop stopped, rtc-loop should stop as well"
+  [repo get-ws-create-task *auto-push? *online-users *assets-sync-loop-stopped?]
   (let [remote-updates-flow (m/eduction
                              (map (fn [data]
                                     (case (:req-id data)
@@ -129,7 +130,14 @@
                                   (map (fn [data] {:type :local-update-check :value data}))
                                   (r.throttle/create-local-updates-check-flow repo *auto-push? 2000))
         inject-user-info-flow (create-inject-users-info-flow repo (m/watch *online-users))
-        mix-flow (c.m/mix remote-updates-flow local-updates-check-flow inject-user-info-flow)]
+        assets-sync-loop-stopped-flow (m/eduction
+                                       (keep (fn [v] (when v {:type :assets-sync-loop-stopped})))
+                                       (take 1)
+                                       (m/watch *assets-sync-loop-stopped?))
+        mix-flow (c.m/mix remote-updates-flow
+                          local-updates-check-flow
+                          inject-user-info-flow
+                          assets-sync-loop-stopped-flow)]
     (c.m/mix mix-flow (create-pull-remote-updates-flow 60000 mix-flow))))
 
 (defn- create-ws-state-flow
@@ -200,7 +208,7 @@
         (reset! *aes-key aes-key)))))
 
 (declare new-task--inject-users-info)
-(defn- create-rtc-loop
+(defn- ^:large-vars/cleanup-todo create-rtc-loop
   "Return a map with [:rtc-state-flow :rtc-loop-task :*rtc-auto-push? :onstarted-task]
   TODO: auto refresh token if needed"
   [graph-uuid schema-version repo conn date-formatter token user-uuid
@@ -214,6 +222,7 @@
         *assets-sync-loop-canceler (atom nil)
         *server-schema-version     (atom nil)
         *aes-key                   (atom nil)
+        *assets-sync-loop-stopped  (atom nil)
         started-dfv                (m/dfv)
         add-log-fn                 (fn [type message]
                                      (assert (map? message) message)
@@ -226,7 +235,7 @@
         {:keys [assets-sync-loop-task]}
         (r.asset/create-assets-sync-loop
          repo get-ws-create-task graph-uuid major-schema-version conn *auto-push? *aes-key)
-        mixed-flow                 (create-mixed-flow repo get-ws-create-task *auto-push? *online-users)]
+        mixed-flow (create-mixed-flow repo get-ws-create-task *auto-push? *online-users *assets-sync-loop-stopped)]
     (assert (some? *current-ws))
     {:rtc-state-flow       (create-rtc-state-flow (create-ws-state-flow *current-ws))
      :*rtc-auto-push?      *auto-push?
@@ -249,7 +258,9 @@
           (reset! *assets-sync-loop-canceler
                   (c.m/run-task :assets-sync-loop-task
                     assets-sync-loop-task
-                    :fail #(log/info :assets-sync-loop-task-stopped %)))
+                    :fail (fn [e]
+                            (log/info :assets-sync-loop-task-stopped e)
+                            (reset! *assets-sync-loop-stopped true))))
           (->>
            (let [event (m/?> mixed-flow)]
              (case (:type event)
@@ -285,7 +296,11 @@
                      add-log-fn))
 
                :inject-users-info
-               (m/? (new-task--inject-users-info token graph-uuid major-schema-version))))
+               (m/? (new-task--inject-users-info token graph-uuid major-schema-version))
+
+               :assets-sync-loop-stopped
+               ;; assets-sync-loop stopped, then we should stop the whole rtc-loop
+               (throw (ex-info "assets-sync-loop-stopped" {}))))
            (m/ap)
            (m/reduce {} nil)
            (m/?))

+ 5 - 1
src/main/frontend/worker/rtc/exception.cljs

@@ -34,7 +34,11 @@ the server will put it to s3 and return its presigned-url to clients."}
   :rtc.exception/fetch-user-rsa-public-key-error {:doc "Failed to fetch user RSA public-key from server"}
   :rtc.exception/fetch-graph-aes-key-error {:doc "Failed to fetch graph AES key from server"}
   :rtc.exception/not-found-user-rsa-key-pair {:doc "user rsa-key-pair not found"}
-  :rtc.exception/not-found-graph-aes-key {:doc "graph aes-key not found"})
+  :rtc.exception/not-found-graph-aes-key {:doc "graph aes-key not found"}
+
+  :rtc.exception/read-asset-failed {:doc "read asset from fs failed, maybe not exists"}
+  :rtc.exception/upload-asset-failed {:doc "upload asset failed"}
+  :rtc.exception/download-asset-failed {:doc "download asset failed"})
 
 (def ex-remote-graph-lock-missing
   (ex-info "remote graph lock missing(server internal error)"