Browse Source

enhance(asset-sync): concurrent upload assets

rcmerci 1 year ago
parent
commit
ff757e4443
1 changed files with 41 additions and 33 deletions
  1. 41 33
      src/main/frontend/worker/rtc/asset.cljs

+ 41 - 33
src/main/frontend/worker/rtc/asset.cljs

@@ -128,6 +128,45 @@
   (doseq [asset-uuid (set/difference (set all-asset-uuids) (set handled-asset-uuids))]
     (client-op/remove-asset-op repo asset-uuid)))
 
+(defn- new-task--concurrent-download-assets
+  "Concurrently download assets with limited max concurrent count"
+  [repo asset-uuid->url asset-uuid->asset-type]
+  (->> (fn [[asset-uuid url]]
+         (m/sp
+           (let [r (ldb/read-transit-str
+                    (c.m/<?
+                     (.rtc-download-asset
+                      ^js @worker-state/*main-thread
+                      repo (str asset-uuid) (get asset-uuid->asset-type asset-uuid) url)))]
+             (when-let [edata (:ex-data r)]
+               ;; if download-url return 404, ignore this asset
+               (when (not= 404 (:status (:data edata)))
+                 (throw (ex-info "download asset failed" r)))))))
+       (c.m/concurrent-exec-flow 5 (m/seed asset-uuid->url))
+       (m/reduce (constantly nil))))
+
+(defn- new-task--concurrent-upload-assets
+  "Concurrently upload assets with limited max concurrent count"
+  [repo conn asset-uuid->url asset-uuid->asset-type+checksum]
+  (->> (fn [[asset-uuid url]]
+         (m/sp
+           (let [[asset-type checksum] (get asset-uuid->asset-type+checksum asset-uuid)
+                 r (ldb/read-transit-str
+                    (c.m/<?
+                     (.rtc-upload-asset
+                      ^js @worker-state/*main-thread
+                      repo (str asset-uuid) asset-type checksum url)))]
+             (when (:ex-data r)
+               (throw (ex-info "upload asset failed" r)))
+             (d/transact! conn
+                          [{:block/uuid asset-uuid
+                            :logseq.property.asset/remote-metadata {:checksum checksum :type asset-type}}]
+                       ;; Don't generate rtc ops again, (block-ops & asset-ops)
+                          {:persist-op? false})
+             (client-op/remove-asset-op repo asset-uuid))))
+       (c.m/concurrent-exec-flow 3 (m/seed asset-uuid->url))
+       (m/reduce (constantly nil))))
+
 (defn- new-task--push-local-asset-updates
   [repo get-ws-create-task conn graph-uuid add-log-fn]
   (m/sp
@@ -164,21 +203,7 @@
                    :asset-uuid->url))]
         (when (seq asset-uuid->url)
           (add-log-fn :rtc.asset.log/upload-assets {:asset-uuids (keys asset-uuid->url)}))
-        (doseq [[asset-uuid put-url] asset-uuid->url]
-          (let [[asset-type checksum] (get asset-uuid->asset-type+checksum asset-uuid)
-                r (ldb/read-transit-str
-                   (c.m/<?
-                    (.rtc-upload-asset
-                     ^js @worker-state/*main-thread
-                     repo (str asset-uuid) asset-type checksum put-url)))]
-            (when (:ex-data r)
-              (throw (ex-info "upload asset failed" r)))
-            (d/transact! conn
-                         [{:block/uuid asset-uuid
-                           :logseq.property.asset/remote-metadata {:checksum checksum :type asset-type}}]
-                         ;; Don't generate rtc ops again, (block-ops & asset-ops)
-                         {:persist-op? false})
-            (client-op/remove-asset-op repo asset-uuid)))
+        (m/? (new-task--concurrent-upload-assets repo conn asset-uuid->url asset-uuid->asset-type+checksum))
         (when (seq remove-asset-uuids)
           (add-log-fn :rtc.asset.log/remove-assets {:asset-uuids remove-asset-uuids})
           (m/? (ws-util/send&recv get-ws-create-task
@@ -191,23 +216,6 @@
                           (map :block/uuid asset-ops)
                           (concat (keys asset-uuid->url) remove-asset-uuids))))))
 
-(defn- new-task--throttle-download-assets
-  "Concurrent download assets with limited max concurrent count"
-  [repo asset-uuid->url asset-uuid->asset-type]
-  (->> (fn [[asset-uuid url]]
-         (m/sp
-           (let [r (ldb/read-transit-str
-                    (c.m/<?
-                     (.rtc-download-asset
-                      ^js @worker-state/*main-thread
-                      repo (str asset-uuid) (get asset-uuid->asset-type asset-uuid) url)))]
-             (when-let [edata (:ex-data r)]
-               ;; if download-url return 404, ignore this asset
-               (when (not= 404 (:status (:data edata)))
-                 (throw (ex-info "download asset failed" r)))))))
-       (c.m/concurrent-exec-flow 5 (m/seed asset-uuid->url))
-       (m/reduce (constantly nil))))
-
 (defn- new-task--pull-remote-asset-updates
   [repo get-ws-create-task conn graph-uuid add-log-fn asset-update-ops]
   (m/sp
@@ -238,7 +246,7 @@
           (c.m/<? (.unlinkAsset ^js @worker-state/*main-thread repo (str asset-uuid) asset-type)))
         (when (seq asset-uuid->url)
           (add-log-fn :rtc.asset.log/download-assets {:asset-uuids (keys asset-uuid->url)}))
-        (m/? (new-task--throttle-download-assets repo asset-uuid->url asset-uuid->asset-type))))))
+        (m/? (new-task--concurrent-download-assets repo asset-uuid->url asset-uuid->asset-type))))))
 
 (defn- get-all-asset-blocks
   [db]