|
|
@@ -804,19 +804,23 @@
|
|
|
(defn- process-asset-ops!
|
|
|
[repo client]
|
|
|
(let [graph-id (:graph-id client)
|
|
|
- asset-ops (not-empty (client-op/get-all-asset-ops repo))]
|
|
|
+ asset-ops (not-empty (client-op/get-all-asset-ops repo))
|
|
|
+ parallelism 10]
|
|
|
(if (and (seq graph-id) asset-ops)
|
|
|
- (p/loop [ops asset-ops]
|
|
|
- (if (empty? ops)
|
|
|
- nil
|
|
|
- (p/do!
|
|
|
- (-> (process-asset-op! repo graph-id (first ops))
|
|
|
- (p/catch (fn [e]
|
|
|
- (log/error :db-sync/asset-op-failed
|
|
|
- {:repo repo
|
|
|
- :asset-uuid (:block/uuid (first ops))
|
|
|
- :error e}))))
|
|
|
- (p/recur (rest ops)))))
|
|
|
+ (let [queue (atom (vec asset-ops))
|
|
|
+ asset-worker (fn worker []
|
|
|
+ (if-let [asset-op (first (swap! queue subvec 1))]
|
|
|
+ (-> (process-asset-op! repo graph-id asset-op)
|
|
|
+ (p/then (fn [_] (worker)))
|
|
|
+ (p/catch (fn [e]
|
|
|
+ (log/error :db-sync/asset-op-failed
|
|
|
+ {:repo repo
|
|
|
+ :asset-uuid (:block/uuid asset-op)
|
|
|
+ :error e}))))
|
|
|
+ (p/resolved nil)))]
|
|
|
+ (->> (range (min parallelism (count asset-ops)))
|
|
|
+ (mapv (fn [_] (asset-worker)))
|
|
|
+ (p/all)))
|
|
|
(p/resolved nil))))
|
|
|
|
|
|
(defn- enqueue-asset-sync! [repo client]
|