|
|
@@ -1,177 +1,301 @@
|
|
|
(ns frontend.worker.rtc.asset
|
|
|
"Fns to sync assets.
|
|
|
some notes:
|
|
|
- - has :logseq.property.asset/type
|
|
|
- - block/content, store the asset name
|
|
|
- - an asset-block not having :file/path indicates need to download asset from server
|
|
|
+ - has :logseq.property.asset/type, :logseq.property.asset/size, :logseq.property.asset/checksum
|
|
|
+ - block/title, store the asset name
|
|
|
- an asset-block not having :logseq.property.asset/remote-metadata
|
|
|
- indicates need to upload the asset to server
|
|
|
- - if an asset-block doesn't have both :file/path and :logseq.property.asset/remote-metadata,
|
|
|
- it means the other client hasn't uploaded the asset to server
|
|
|
-"
|
|
|
- (:require [cljs-http.client :as http]
|
|
|
+ indicates need to upload the asset to server"
|
|
|
+ (:require [clojure.set :as set]
|
|
|
[datascript.core :as d]
|
|
|
[frontend.common.missionary-util :as c.m]
|
|
|
+ [frontend.worker.rtc.client-op :as client-op]
|
|
|
[frontend.worker.rtc.log-and-state :as rtc-log-and-state]
|
|
|
[frontend.worker.rtc.ws-util :as ws-util]
|
|
|
+ [frontend.worker.state :as worker-state]
|
|
|
+ [logseq.common.path :as path]
|
|
|
+ [logseq.db :as ldb]
|
|
|
[malli.core :as ma]
|
|
|
[missionary.core :as m])
|
|
|
(:import [missionary Cancelled]))
|
|
|
|
|
|
-(defn get-all-asset-blocks
|
|
|
- [db]
|
|
|
- (->> (d/q
|
|
|
- '[:find (pull ?asset [*])
|
|
|
- :in $
|
|
|
- :where
|
|
|
- [?asset :block/uuid]
|
|
|
- [?asset :logseq.property.asset/type]]
|
|
|
- db)
|
|
|
- (apply concat)))
|
|
|
-
|
|
|
-(defn asset-block->upload+download-action
|
|
|
- [asset-block]
|
|
|
- (let [local-file-path (:file/path asset-block)
|
|
|
- remote-metadata (:logseq.property.asset/remote-metadata asset-block)]
|
|
|
- (cond
|
|
|
- (and local-file-path remote-metadata) nil
|
|
|
- (nil? local-file-path) :download
|
|
|
- (nil? remote-metadata) :upload)))
|
|
|
-
|
|
|
-(defn get-action->asset-blocks
|
|
|
- [db]
|
|
|
- (reduce
|
|
|
- (fn [action->asset-blocks asset-block]
|
|
|
- (if-let [action (asset-block->upload+download-action asset-block)]
|
|
|
- (update action->asset-blocks action (fnil conj #{}) asset-block)
|
|
|
- action->asset-blocks))
|
|
|
- {} (get-all-asset-blocks db)))
|
|
|
-
|
|
|
-(defn new-task--upload-assets
|
|
|
- [get-ws-create-task conn graph-uuid asset-uuids]
|
|
|
- {:pre [(every? uuid? asset-uuids)]}
|
|
|
+(defn- create-local-updates-check-flow
|
|
|
+ "Return a flow that emits value if need to push local-updates"
|
|
|
+ [repo *auto-push? interval-ms]
|
|
|
+ (let [auto-push-flow (m/watch *auto-push?)
|
|
|
+ clock-flow (c.m/clock interval-ms :clock)
|
|
|
+ merge-flow (m/latest vector auto-push-flow clock-flow)]
|
|
|
+ (m/eduction (filter first)
|
|
|
+ (map second)
|
|
|
+ (filter (fn [v] (when (pos? (client-op/get-unpushed-asset-ops-count repo)) v)))
|
|
|
+ merge-flow)))
|
|
|
+
|
|
|
+(def ^:private remote-asset-updates-schema
|
|
|
+ [:sequential
|
|
|
+ [:map {:closed true}
|
|
|
+ [:op [:enum :update-asset :remove-asset]]
|
|
|
+ [:block/uuid :uuid]
|
|
|
+ [:malli.core/default [:map-of :keyword :any]]]])
|
|
|
+
|
|
|
+(def ^:private *remote-asset-updates (atom nil :validator (ma/validator remote-asset-updates-schema)))
|
|
|
+(def ^:private remote-asset-updates-flow (m/buffer 10 (m/watch *remote-asset-updates)))
|
|
|
+
|
|
|
+(comment
|
|
|
+ (def cancel ((m/reduce (fn [_ v] (prn :v v)) remote-asset-updates-flow) prn prn)))
|
|
|
+
|
|
|
+(defn- new-task--get-asset-file-metadata
|
|
|
+ "Return nil if this asset not exist"
|
|
|
+ [repo block-uuid asset-type]
|
|
|
(m/sp
|
|
|
- (when (seq asset-uuids)
|
|
|
- (let [asset-uuid->url (->> (m/? (ws-util/send&recv get-ws-create-task
|
|
|
- {:action "get-assets-upload-urls"
|
|
|
- :graph-uuid graph-uuid
|
|
|
- :asset-uuid->metadata
|
|
|
- (into {}
|
|
|
- (map (fn [asset-uuid] [asset-uuid {"checksum" "TEST-CHECKSUM"}]))
|
|
|
- asset-uuids)}))
|
|
|
- :asset-uuid->url)]
|
|
|
- (doseq [[asset-uuid put-url] asset-uuid->url]
|
|
|
- (assert (uuid? asset-uuid) asset-uuid)
|
|
|
- (let [{:keys [status] :as r}
|
|
|
- (c.m/<? (http/put put-url {:headers {"x-amz-meta-checksum" "TEST-CHECKSUM"}
|
|
|
- :body (js/JSON.stringify
|
|
|
- (clj->js {:TEST-ASSET true
|
|
|
- :asset-uuid (str asset-uuid)
|
|
|
- :graph-uuid (str graph-uuid)}))
|
|
|
- :with-credentials? false}))]
|
|
|
- (if (not= 200 status)
|
|
|
- (prn :debug-failed-upload-asset {:resp r :asset-uuid asset-uuid :graph-uuid graph-uuid})
|
|
|
-
|
|
|
- (when (some? (d/entity @conn [:block/uuid asset-uuid]))
|
|
|
- (d/transact! conn [{:block/uuid asset-uuid
|
|
|
- :logseq.property.asset/remote-metadata {:checksum "TEST"}}])))))))))
|
|
|
-
|
|
|
-(defn new-task--download-assets
|
|
|
- [get-ws-create-task conn graph-uuid asset-uuids]
|
|
|
- {:pre [(every? uuid? asset-uuids)]}
|
|
|
+ (ldb/read-transit-str
|
|
|
+ (c.m/<?
|
|
|
+ (.get-asset-file-metadata ^js @worker-state/*main-thread repo (str block-uuid) asset-type)))))
|
|
|
+
|
|
|
+(defn- remote-block-ops=>remote-asset-ops
|
|
|
+ [db-before remove-ops]
|
|
|
+ (keep
|
|
|
+ (fn [remove-op]
|
|
|
+ (let [block-uuid (:block-uuid remove-op)]
|
|
|
+ (when-let [ent (d/entity db-before [:block/uuid block-uuid])]
|
|
|
+ (when-let [asset-type (:logseq.property.asset/type ent)]
|
|
|
+ {:op :remove-asset
|
|
|
+ :block/uuid block-uuid
|
|
|
+ :logseq.property.asset/type asset-type}))))
|
|
|
+ remove-ops))
|
|
|
+
|
|
|
+(defn emit-remote-asset-updates-from-block-ops
|
|
|
+ [db-before remove-ops]
|
|
|
+ (when-let [asset-update-ops
|
|
|
+ (not-empty (remote-block-ops=>remote-asset-ops db-before remove-ops))]
|
|
|
+ (reset! *remote-asset-updates asset-update-ops)))
|
|
|
+
|
|
|
+(defn new-task--emit-remote-asset-updates-from-push-asset-upload-updates
|
|
|
+ [repo db push-asset-upload-updates-message]
|
|
|
(m/sp
|
|
|
- (when (seq asset-uuids)
|
|
|
- (let [asset-uuid->url
|
|
|
- (->> (m/? (ws-util/send&recv get-ws-create-task {:action "get-assets-download-urls"
|
|
|
- :graph-uuid graph-uuid
|
|
|
- :asset-uuids asset-uuids}))
|
|
|
- :asset-uuid->url)]
|
|
|
- (doseq [[asset-uuid get-url] asset-uuid->url]
|
|
|
- (assert (uuid? asset-uuid) asset-uuid)
|
|
|
- (let [{:keys [status _body] :as r} (c.m/<? (http/get get-url {:with-credentials? false}))]
|
|
|
- (if (not= 200 status)
|
|
|
- (prn :debug-failed-download-asset {:resp r :asset-uuid asset-uuid :graph-uuid graph-uuid})
|
|
|
- (when (d/entity @conn [:block/uuid asset-uuid])
|
|
|
- (d/transact! conn [{:block/uuid asset-uuid
|
|
|
- :file/path "TEST-FILE-PATH"}])
|
|
|
- (prn :debug-succ-download-asset asset-uuid)))))))))
|
|
|
+ (let [{:keys [uploaded-assets]} push-asset-upload-updates-message]
|
|
|
+ (when-let [asset-update-ops
|
|
|
+ (->> uploaded-assets
|
|
|
+ (map
|
|
|
+ (fn [[asset-uuid remote-metadata]]
|
|
|
+ (m/sp
|
|
|
+ (let [ent (d/entity db [:block/uuid asset-uuid])
|
|
|
+ asset-type (:logseq.property.asset/type ent)
|
|
|
+ local-checksum (:logseq.property.asset/checksum ent)
|
|
|
+ remote-checksum (get remote-metadata "checksum")]
|
|
|
+ (when (or (and local-checksum remote-checksum
|
|
|
+ (not= local-checksum remote-checksum))
|
|
|
+ (and asset-type
|
|
|
+ (nil? (m/? (new-task--get-asset-file-metadata
|
|
|
+ repo asset-uuid asset-type)))))
|
|
|
+ {:op :update-asset
|
|
|
+ :block/uuid asset-uuid})))))
|
|
|
+ (apply m/join vector)
|
|
|
+ m/?
|
|
|
+ (remove nil?)
|
|
|
+ not-empty)]
|
|
|
+ (reset! *remote-asset-updates asset-update-ops)))))
|
|
|
+
|
|
|
+(defn- create-mixed-flow
|
|
|
+ "Return a flow that emits different events:
|
|
|
+ - `:local-update-check`: event to notify check if there're some new local-updates on assets
|
|
|
+ - `:remote-updates`: remote asset updates "
|
|
|
+ [repo *auto-push?]
|
|
|
+ (let [remote-update-flow (m/eduction
|
|
|
+ (map (fn [v] {:type :remote-updates :value v}))
|
|
|
+ remote-asset-updates-flow)
|
|
|
+ local-update-check-flow (m/eduction
|
|
|
+ (map (fn [v] {:type :local-update-check :value v}))
|
|
|
+ (create-local-updates-check-flow repo *auto-push? 2500))]
|
|
|
+ (c.m/mix remote-update-flow local-update-check-flow)))
|
|
|
|
|
|
(defonce ^:private *assets-sync-lock (atom nil))
|
|
|
(defn- holding-assets-sync-lock
|
|
|
"Use this to prevent multiple assets-sync loops at same time."
|
|
|
[started-dfv task]
|
|
|
(m/sp
|
|
|
- (when-not (compare-and-set! *assets-sync-lock nil true)
|
|
|
- (let [e (ex-info "Must not run multiple assets-sync loops"
|
|
|
- {:type :assets-sync.exception/lock-failed
|
|
|
- :missionary/retry true})]
|
|
|
- (started-dfv e)
|
|
|
- (throw e)))
|
|
|
- (try
|
|
|
- (m/? task)
|
|
|
- (finally
|
|
|
- (reset! *assets-sync-lock nil)))))
|
|
|
-
|
|
|
-(def ^:private asset-change-event-schema
|
|
|
- [:map-of
|
|
|
- [:enum :download :upload
|
|
|
- ;; Why don't need :delete event?
|
|
|
- ;; when remove-block-op sync to server, server will know this asset need to be deleted
|
|
|
- ;; :delete
|
|
|
- ]
|
|
|
- [:set :uuid]])
|
|
|
-
|
|
|
-(def ^:private asset-change-event-validator (ma/validator asset-change-event-schema))
|
|
|
-
|
|
|
-(defonce *global-asset-change-event (atom nil :validator asset-change-event-validator))
|
|
|
-
|
|
|
-(defonce ^:private global-asset-change-event-flow
|
|
|
- (m/buffer 20 (m/watch *global-asset-change-event)))
|
|
|
+ (when-not (compare-and-set! *assets-sync-lock nil true)
|
|
|
+ (let [e (ex-info "Must not run multiple assets-sync loops"
|
|
|
+ {:type :assets-sync.exception/lock-failed
|
|
|
+ :missionary/retry true})]
|
|
|
+ (started-dfv e)
|
|
|
+ (throw e)))
|
|
|
+ (try
|
|
|
+ (m/? task)
|
|
|
+ (finally
|
|
|
+ (reset! *assets-sync-lock nil)))))
|
|
|
+
|
|
|
+(defn- clean-asset-ops!
|
|
|
+ [repo all-asset-uuids handled-asset-uuids]
|
|
|
+ (doseq [asset-uuid (set/difference (set all-asset-uuids) (set handled-asset-uuids))]
|
|
|
+ (client-op/remove-asset-op repo asset-uuid)))
|
|
|
+
|
|
|
+(defn- new-task--push-local-asset-updates
|
|
|
+ [repo get-ws-create-task conn graph-uuid add-log-fn]
|
|
|
+ (m/sp
|
|
|
+ (when-let [asset-ops (not-empty (client-op/get-all-asset-ops repo))]
|
|
|
+ (let [upload-asset-uuids (keep
|
|
|
+ (fn [asset-op]
|
|
|
+ (when (contains? asset-op :update-asset)
|
|
|
+ (:block/uuid asset-op)))
|
|
|
+ asset-ops)
|
|
|
+ remove-asset-uuids (keep
|
|
|
+ (fn [asset-op]
|
|
|
+ (when (contains? asset-op :remove-asset)
|
|
|
+ (:block/uuid asset-op)))
|
|
|
+ asset-ops)
|
|
|
+ asset-uuid->asset-type+checksum
|
|
|
+ (into {}
|
|
|
+ (keep
|
|
|
+ (fn [asset-uuid]
|
|
|
+ (let [ent (d/entity @conn [:block/uuid asset-uuid])]
|
|
|
+ (when-let [tp (:logseq.property.asset/type ent)]
|
|
|
+ (when-let [checksum (:logseq.property.asset/checksum ent)]
|
|
|
+ [asset-uuid [tp checksum]])))))
|
|
|
+ upload-asset-uuids)
|
|
|
+ asset-uuid->url
|
|
|
+ (when (seq asset-uuid->asset-type+checksum)
|
|
|
+ (->> (m/? (ws-util/send&recv get-ws-create-task
|
|
|
+ {:action "get-assets-upload-urls"
|
|
|
+ :graph-uuid graph-uuid
|
|
|
+ :asset-uuid->metadata
|
|
|
+ (into {}
|
|
|
+ (map (fn [[asset-uuid [asset-type checksum]]]
|
|
|
+ [asset-uuid {"checksum" checksum "type" asset-type}]))
|
|
|
+ asset-uuid->asset-type+checksum)}))
|
|
|
+ :asset-uuid->url))]
|
|
|
+ (when (seq asset-uuid->url)
|
|
|
+ (add-log-fn :rtc.asset.log/upload-assets {:asset-uuids (keys asset-uuid->url)}))
|
|
|
+ (doseq [[asset-uuid put-url] asset-uuid->url]
|
|
|
+ (let [[asset-type checksum] (get asset-uuid->asset-type+checksum asset-uuid)
|
|
|
+ r (ldb/read-transit-str
|
|
|
+ (c.m/<?
|
|
|
+ (.rtc-upload-asset
|
|
|
+ ^js @worker-state/*main-thread
|
|
|
+ repo (str asset-uuid) asset-type checksum put-url)))]
|
|
|
+ (when (:ex-data r)
|
|
|
+ (throw (ex-info "upload asset failed" r)))
|
|
|
+ (d/transact! conn
|
|
|
+ [{:block/uuid asset-uuid
|
|
|
+ :logseq.property.asset/remote-metadata {:checksum checksum :type asset-type}}]
|
|
|
+ ;; Don't generate rtc ops again, (block-ops & asset-ops)
|
|
|
+ {:persist-op? false})
|
|
|
+ (client-op/remove-asset-op repo asset-uuid)))
|
|
|
+ (when (seq remove-asset-uuids)
|
|
|
+ (add-log-fn :rtc.asset.log/remove-assets {:asset-uuids remove-asset-uuids})
|
|
|
+ (m/? (ws-util/send&recv get-ws-create-task
|
|
|
+ {:action "delete-assets"
|
|
|
+ :graph-uuid graph-uuid
|
|
|
+ :asset-uuids remove-asset-uuids}))
|
|
|
+ (doseq [asset-uuid remove-asset-uuids]
|
|
|
+ (client-op/remove-asset-op repo asset-uuid)))
|
|
|
+ (clean-asset-ops! repo
|
|
|
+ (map :block/uuid asset-ops)
|
|
|
+ (concat (keys asset-uuid->url) remove-asset-uuids))))))
|
|
|
+
|
|
|
+(defn- new-task--pull-remote-asset-updates
|
|
|
+ [repo get-ws-create-task conn graph-uuid add-log-fn asset-update-ops]
|
|
|
+ (m/sp
|
|
|
+ (when (seq asset-update-ops)
|
|
|
+ (let [update-asset-uuids (keep (fn [op]
|
|
|
+ (when (= :update-asset (:op op))
|
|
|
+ (:block/uuid op)))
|
|
|
+ asset-update-ops)
|
|
|
+ remove-asset-uuid->asset-type
|
|
|
+ (into {} (keep (fn [op]
|
|
|
+ (when (= :remove-asset (:op op))
|
|
|
+ [(:block/uuid op) (:logseq.property.asset/type op)])))
|
|
|
+ asset-update-ops)
|
|
|
+ asset-uuid->asset-type (into {}
|
|
|
+ (keep (fn [asset-uuid]
|
|
|
+ (when-let [tp (:logseq.property.asset/type
|
|
|
+ (d/entity @conn [:block/uuid asset-uuid]))]
|
|
|
+ [asset-uuid tp])))
|
|
|
+ update-asset-uuids)
|
|
|
+ asset-uuid->url
|
|
|
+ (when (seq asset-uuid->asset-type)
|
|
|
+ (->> (m/? (ws-util/send&recv get-ws-create-task
|
|
|
+ {:action "get-assets-download-urls"
|
|
|
+ :graph-uuid graph-uuid
|
|
|
+ :asset-uuids (keys asset-uuid->asset-type)}))
|
|
|
+ :asset-uuid->url))]
|
|
|
+ (doseq [[asset-uuid asset-type] remove-asset-uuid->asset-type]
|
|
|
+ (c.m/<? (.unlinkAsset ^js @worker-state/*main-thread repo (str asset-uuid) asset-type)))
|
|
|
+ (when (seq asset-uuid->url)
|
|
|
+ (add-log-fn :rtc.asset.log/download-assets {:asset-uuids (keys asset-uuid->url)}))
|
|
|
+ (doseq [[asset-uuid get-url] asset-uuid->url]
|
|
|
+ (prn :start-download-asset asset-uuid)
|
|
|
+ (let [r (ldb/read-transit-str
|
|
|
+ (c.m/<?
|
|
|
+ (.rtc-download-asset
|
|
|
+ ^js @worker-state/*main-thread
|
|
|
+ repo (str asset-uuid) (get asset-uuid->asset-type asset-uuid) get-url)))]
|
|
|
+ (when-let [edata (:ex-data r)]
|
|
|
+ ;; if download-url return 404, ignore this asset
|
|
|
+ (when (not= 404 (:status (:data edata)))
|
|
|
+ (throw (ex-info "download asset failed" r))))))))))
|
|
|
+
|
|
|
+(defn- get-all-asset-blocks
|
|
|
+ [db]
|
|
|
+ (d/q '[:find [(pull ?b [:block/uuid
|
|
|
+ :logseq.property.asset/type
|
|
|
+ :logseq.property.asset/checksum])
|
|
|
+ ...]
|
|
|
+ :where
|
|
|
+ [?b :block/uuid]
|
|
|
+ [?b :logseq.property.asset/type]]
|
|
|
+ db))
|
|
|
+
|
|
|
+(defn- new-task--initial-download-missing-assets
|
|
|
+ [repo get-ws-create-task graph-uuid conn add-log-fn]
|
|
|
+ (m/sp
|
|
|
+ (let [local-all-asset-file-paths (ldb/read-transit-str
|
|
|
+ (c.m/<? (.get-all-asset-file-paths ^js @worker-state/*main-thread repo)))
|
|
|
+ local-all-asset-file-uuids (set (map (comp parse-uuid path/file-stem) local-all-asset-file-paths))
|
|
|
+ local-all-asset-uuids (set (map :block/uuid (get-all-asset-blocks @conn)))]
|
|
|
+ (when-let [asset-update-ops
|
|
|
+ (not-empty
|
|
|
+ (map (fn [asset-uuid] {:op :update-asset :block/uuid asset-uuid})
|
|
|
+ (set/difference local-all-asset-uuids local-all-asset-file-uuids)))]
|
|
|
+ (add-log-fn :rtc.asset.log/initial-download-missing-assets-count {:count (count asset-update-ops)})
|
|
|
+ (m/? (new-task--pull-remote-asset-updates
|
|
|
+ repo get-ws-create-task conn graph-uuid add-log-fn asset-update-ops))))))
|
|
|
|
|
|
(defn create-assets-sync-loop
|
|
|
- [get-ws-create-task graph-uuid conn]
|
|
|
+ [repo get-ws-create-task graph-uuid conn *auto-push?]
|
|
|
(let [started-dfv (m/dfv)
|
|
|
- asset-change-event-flow global-asset-change-event-flow
|
|
|
add-log-fn (fn [type message]
|
|
|
(assert (map? message) message)
|
|
|
- (rtc-log-and-state/rtc-log type (assoc message :graph-uuid graph-uuid)))]
|
|
|
+ (rtc-log-and-state/rtc-log type (assoc message :graph-uuid graph-uuid)))
|
|
|
+ mixed-flow (create-mixed-flow repo *auto-push?)]
|
|
|
{:onstarted-task started-dfv
|
|
|
:assets-sync-loop-task
|
|
|
(holding-assets-sync-lock
|
|
|
started-dfv
|
|
|
(m/sp
|
|
|
- (try
|
|
|
- (started-dfv true)
|
|
|
- (let [action->asset-blocks (get-action->asset-blocks @conn)]
|
|
|
- (m/?
|
|
|
- (m/join
|
|
|
- (constantly nil)
|
|
|
- (m/sp
|
|
|
- ;; init phase:
|
|
|
- ;; generate all asset-change-events from db
|
|
|
- (when (or (seq (action->asset-blocks :download))
|
|
|
- (seq (action->asset-blocks :upload)))
|
|
|
- (prn "init phase: generate all asset-change-events from db" action->asset-blocks))
|
|
|
- (m/? (new-task--download-assets
|
|
|
- get-ws-create-task conn graph-uuid (map :block/uuid (action->asset-blocks :download))))
|
|
|
- (m/? (new-task--upload-assets
|
|
|
- get-ws-create-task conn graph-uuid (map :block/uuid (action->asset-blocks :upload)))))
|
|
|
- (->>
|
|
|
- (let [{asset-uuids-to-download :download
|
|
|
- asset-uuids-to-upload :upload} (m/?> asset-change-event-flow)]
|
|
|
- (m/? (new-task--download-assets get-ws-create-task conn graph-uuid asset-uuids-to-download))
|
|
|
- (m/? (new-task--upload-assets get-ws-create-task conn graph-uuid asset-uuids-to-upload)))
|
|
|
- m/ap (m/reduce {} nil)))))
|
|
|
-
|
|
|
- (catch Cancelled e
|
|
|
- (add-log-fn :rtc.asset.log/cancelled {})
|
|
|
- (throw e)))))}))
|
|
|
+ (try
|
|
|
+ (started-dfv true)
|
|
|
+ (m/? (new-task--initial-download-missing-assets repo get-ws-create-task graph-uuid conn add-log-fn))
|
|
|
+ (->>
|
|
|
+ (let [event (m/?> mixed-flow)]
|
|
|
+ (case (:type event)
|
|
|
+ :remote-updates
|
|
|
+ (when-let [asset-update-ops (not-empty (:value event))]
|
|
|
+ (m/? (new-task--pull-remote-asset-updates
|
|
|
+ repo get-ws-create-task conn graph-uuid add-log-fn asset-update-ops)))
|
|
|
+ :local-update-check
|
|
|
+ (m/? (new-task--push-local-asset-updates
|
|
|
+ repo get-ws-create-task conn graph-uuid add-log-fn))))
|
|
|
+ m/ap
|
|
|
+ (m/reduce {} nil)
|
|
|
+ m/?)
|
|
|
+ (catch Cancelled e
|
|
|
+ (add-log-fn :rtc.asset.log/cancelled {})
|
|
|
+ (throw e)))))}))
|
|
|
|
|
|
(comment
|
|
|
(def x (atom 1))
|
|
|
(def f (m/ap
|
|
|
- (let [r (m/?> (m/buffer 10 (m/watch x)))]
|
|
|
- (m/? (m/sleep 2000))
|
|
|
- r)))
|
|
|
+ (let [r (m/?> (m/buffer 10 (m/watch x)))]
|
|
|
+ (m/? (m/sleep 2000))
|
|
|
+ r)))
|
|
|
|
|
|
(def cancel ((m/reduce (fn [r e] (prn :e e)) f) prn prn)))
|