asset.cljs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. (ns frontend.worker.rtc.asset
  2. "Fns to sync assets.
  3. some notes:
  4. - has :logseq.property.asset/type, :logseq.property.asset/size, :logseq.property.asset/checksum
  5. - block/title, store the asset name
  6. - an asset-block not having :logseq.property.asset/remote-metadata
  7. indicates need to upload the asset to server"
  8. (:require [clojure.set :as set]
  9. [datascript.core :as d]
  10. [frontend.common.missionary :as c.m]
  11. [frontend.worker.rtc.client-op :as client-op]
  12. [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
  13. [frontend.worker.rtc.ws-util :as ws-util]
  14. [frontend.worker.state :as worker-state]
  15. [logseq.common.path :as path]
  16. [malli.core :as ma]
  17. [missionary.core :as m])
  18. (:import [missionary Cancelled]))
  19. (defn- create-local-updates-check-flow
  20. "Return a flow that emits value if need to push local-updates"
  21. [repo *auto-push? interval-ms]
  22. (let [auto-push-flow (m/watch *auto-push?)
  23. clock-flow (c.m/clock interval-ms :clock)
  24. merge-flow (m/latest vector auto-push-flow clock-flow)]
  25. (m/eduction (filter first)
  26. (map second)
  27. (filter (fn [v] (when (pos? (client-op/get-unpushed-asset-ops-count repo)) v)))
  28. merge-flow)))
  29. (def ^:private remote-asset-updates-schema
  30. [:sequential
  31. [:map {:closed true}
  32. [:op [:enum :update-asset :remove-asset]]
  33. [:block/uuid :uuid]
  34. [:malli.core/default [:map-of :keyword :any]]]])
  35. (def ^:private *remote-asset-updates (atom nil :validator (ma/validator remote-asset-updates-schema)))
  36. (def ^:private remote-asset-updates-flow (m/buffer 10 (m/watch *remote-asset-updates)))
  37. (comment
  38. (def cancel ((m/reduce (fn [_ v] (prn :v v)) remote-asset-updates-flow) prn prn)))
  39. (defn- new-task--get-asset-file-metadata
  40. "Return nil if this asset not exist"
  41. [repo block-uuid asset-type]
  42. (m/sp
  43. (c.m/<?
  44. (worker-state/<invoke-main-thread :thread-api/get-asset-file-metadata
  45. repo (str block-uuid) asset-type))))
  46. (defn- remote-block-ops=>remote-asset-ops
  47. [db-before remove-ops]
  48. (keep
  49. (fn [remove-op]
  50. (let [block-uuid (:block-uuid remove-op)]
  51. (when-let [ent (d/entity db-before [:block/uuid block-uuid])]
  52. (when-let [asset-type (:logseq.property.asset/type ent)]
  53. {:op :remove-asset
  54. :block/uuid block-uuid
  55. :logseq.property.asset/type asset-type}))))
  56. remove-ops))
  57. (defn emit-remote-asset-updates-from-block-ops
  58. [db-before remove-ops]
  59. (when-let [asset-update-ops
  60. (not-empty (remote-block-ops=>remote-asset-ops db-before remove-ops))]
  61. (reset! *remote-asset-updates asset-update-ops)))
  62. (defn new-task--emit-remote-asset-updates-from-push-asset-upload-updates
  63. [repo db push-asset-upload-updates-message]
  64. (m/sp
  65. (let [{:keys [uploaded-assets]} push-asset-upload-updates-message]
  66. (when-let [asset-update-ops
  67. (->> uploaded-assets
  68. (map
  69. (fn [[asset-uuid remote-metadata]]
  70. (m/sp
  71. (let [ent (d/entity db [:block/uuid asset-uuid])
  72. asset-type (:logseq.property.asset/type ent)
  73. local-checksum (:logseq.property.asset/checksum ent)
  74. remote-checksum (get remote-metadata "checksum")]
  75. (when (or (and local-checksum remote-checksum
  76. (not= local-checksum remote-checksum))
  77. (and asset-type
  78. (nil? (m/? (new-task--get-asset-file-metadata
  79. repo asset-uuid asset-type)))))
  80. {:op :update-asset
  81. :block/uuid asset-uuid})))))
  82. (apply m/join vector)
  83. m/?
  84. (remove nil?)
  85. not-empty)]
  86. (reset! *remote-asset-updates asset-update-ops)))))
  87. (defn- create-mixed-flow
  88. "Return a flow that emits different events:
  89. - `:local-update-check`: event to notify check if there're some new local-updates on assets
  90. - `:remote-updates`: remote asset updates "
  91. [repo *auto-push?]
  92. (let [remote-update-flow (m/eduction
  93. (map (fn [v] {:type :remote-updates :value v}))
  94. remote-asset-updates-flow)
  95. local-update-check-flow (m/eduction
  96. (map (fn [v] {:type :local-update-check :value v}))
  97. (create-local-updates-check-flow repo *auto-push? 2500))]
  98. (c.m/mix remote-update-flow local-update-check-flow)))
  99. (defonce ^:private *assets-sync-lock (atom nil))
  100. (defn- holding-assets-sync-lock
  101. "Use this to prevent multiple assets-sync loops at same time."
  102. [started-dfv task]
  103. (m/sp
  104. (when-not (compare-and-set! *assets-sync-lock nil true)
  105. (let [e (ex-info "Must not run multiple assets-sync loops"
  106. {:type :assets-sync.exception/lock-failed
  107. :missionary/retry true})]
  108. (started-dfv e)
  109. (throw e)))
  110. (try
  111. (m/? task)
  112. (finally
  113. (reset! *assets-sync-lock nil)))))
  114. (defn- clean-asset-ops!
  115. [repo all-asset-uuids handled-asset-uuids]
  116. (doseq [asset-uuid (set/difference (set all-asset-uuids) (set handled-asset-uuids))]
  117. (client-op/remove-asset-op repo asset-uuid)))
  118. (defn- new-task--concurrent-download-assets
  119. "Concurrently download assets with limited max concurrent count"
  120. [repo asset-uuid->url asset-uuid->asset-type]
  121. (->> (fn [[asset-uuid url]]
  122. (m/sp
  123. (let [r (c.m/<?
  124. (worker-state/<invoke-main-thread :thread-api/rtc-download-asset
  125. repo (str asset-uuid)
  126. (get asset-uuid->asset-type asset-uuid) url))]
  127. (when-let [edata (:ex-data r)]
  128. ;; if download-url return 404, ignore this asset
  129. (when (not= 404 (:status (:data edata)))
  130. (throw (ex-info "download asset failed" r)))))))
  131. (c.m/concurrent-exec-flow 5 (m/seed asset-uuid->url))
  132. (m/reduce (constantly nil))))
  133. (defn- new-task--concurrent-upload-assets
  134. "Concurrently upload assets with limited max concurrent count"
  135. [repo conn asset-uuid->url asset-uuid->asset-type+checksum]
  136. (->> (fn [[asset-uuid url]]
  137. (m/sp
  138. (let [[asset-type checksum] (get asset-uuid->asset-type+checksum asset-uuid)
  139. r (c.m/<?
  140. (worker-state/<invoke-main-thread :thread-api/rtc-upload-asset
  141. repo (str asset-uuid) asset-type checksum url))]
  142. (when (:ex-data r)
  143. (throw (ex-info "upload asset failed" r)))
  144. (d/transact! conn
  145. [{:block/uuid asset-uuid
  146. :logseq.property.asset/remote-metadata {:checksum checksum :type asset-type}}]
  147. ;; Don't generate rtc ops again, (block-ops & asset-ops)
  148. {:persist-op? false})
  149. (client-op/remove-asset-op repo asset-uuid))))
  150. (c.m/concurrent-exec-flow 3 (m/seed asset-uuid->url))
  151. (m/reduce (constantly nil))))
  152. (defn- new-task--push-local-asset-updates
  153. [repo get-ws-create-task conn graph-uuid major-schema-version add-log-fn]
  154. (m/sp
  155. (when-let [asset-ops (not-empty (client-op/get-all-asset-ops repo))]
  156. (let [upload-asset-uuids (keep
  157. (fn [asset-op]
  158. (when (contains? asset-op :update-asset)
  159. (:block/uuid asset-op)))
  160. asset-ops)
  161. remove-asset-uuids (keep
  162. (fn [asset-op]
  163. (when (contains? asset-op :remove-asset)
  164. (:block/uuid asset-op)))
  165. asset-ops)
  166. asset-uuid->asset-type+checksum
  167. (into {}
  168. (keep
  169. (fn [asset-uuid]
  170. (let [ent (d/entity @conn [:block/uuid asset-uuid])]
  171. (when-let [tp (:logseq.property.asset/type ent)]
  172. (when-let [checksum (:logseq.property.asset/checksum ent)]
  173. [asset-uuid [tp checksum]])))))
  174. upload-asset-uuids)
  175. asset-uuid->url
  176. (when (seq asset-uuid->asset-type+checksum)
  177. (->> (m/? (ws-util/send&recv get-ws-create-task
  178. {:action "get-assets-upload-urls"
  179. :graph-uuid graph-uuid
  180. :asset-uuid->metadata
  181. (into {}
  182. (map (fn [[asset-uuid [asset-type checksum]]]
  183. [asset-uuid {"checksum" checksum "type" asset-type}]))
  184. asset-uuid->asset-type+checksum)}))
  185. :asset-uuid->url))]
  186. (when (seq asset-uuid->url)
  187. (add-log-fn :rtc.asset.log/upload-assets {:asset-uuids (keys asset-uuid->url)}))
  188. (m/? (new-task--concurrent-upload-assets repo conn asset-uuid->url asset-uuid->asset-type+checksum))
  189. (when (seq remove-asset-uuids)
  190. (add-log-fn :rtc.asset.log/remove-assets {:asset-uuids remove-asset-uuids})
  191. (m/? (ws-util/send&recv get-ws-create-task
  192. {:action "delete-assets"
  193. :graph-uuid graph-uuid
  194. :schema-version (str major-schema-version)
  195. :asset-uuids remove-asset-uuids}))
  196. (doseq [asset-uuid remove-asset-uuids]
  197. (client-op/remove-asset-op repo asset-uuid)))
  198. (clean-asset-ops! repo
  199. (map :block/uuid asset-ops)
  200. (concat (keys asset-uuid->url) remove-asset-uuids))))))
  201. (defn- new-task--pull-remote-asset-updates
  202. [repo get-ws-create-task conn graph-uuid add-log-fn asset-update-ops]
  203. (m/sp
  204. (when (seq asset-update-ops)
  205. (let [update-asset-uuids (keep (fn [op]
  206. (when (= :update-asset (:op op))
  207. (:block/uuid op)))
  208. asset-update-ops)
  209. remove-asset-uuid->asset-type
  210. (into {} (keep (fn [op]
  211. (when (= :remove-asset (:op op))
  212. [(:block/uuid op) (:logseq.property.asset/type op)])))
  213. asset-update-ops)
  214. asset-uuid->asset-type (into {}
  215. (keep (fn [asset-uuid]
  216. (when-let [tp (:logseq.property.asset/type
  217. (d/entity @conn [:block/uuid asset-uuid]))]
  218. [asset-uuid tp])))
  219. update-asset-uuids)
  220. asset-uuid->url
  221. (when (seq asset-uuid->asset-type)
  222. (->> (m/? (ws-util/send&recv get-ws-create-task
  223. {:action "get-assets-download-urls"
  224. :graph-uuid graph-uuid
  225. :asset-uuids (keys asset-uuid->asset-type)}))
  226. :asset-uuid->url))]
  227. (doseq [[asset-uuid asset-type] remove-asset-uuid->asset-type]
  228. (c.m/<? (worker-state/<invoke-main-thread :thread-api/unlink-asset
  229. repo (str asset-uuid) asset-type)))
  230. (when (seq asset-uuid->url)
  231. (add-log-fn :rtc.asset.log/download-assets {:asset-uuids (keys asset-uuid->url)}))
  232. (m/? (new-task--concurrent-download-assets repo asset-uuid->url asset-uuid->asset-type))))))
  233. (defn- get-all-asset-blocks
  234. [db]
  235. (d/q '[:find [(pull ?b [:block/uuid
  236. :logseq.property.asset/type
  237. :logseq.property.asset/checksum])
  238. ...]
  239. :where
  240. [?b :block/uuid]
  241. [?b :logseq.property.asset/type]]
  242. db))
  243. (defn- new-task--initial-download-missing-assets
  244. [repo get-ws-create-task graph-uuid conn add-log-fn]
  245. (m/sp
  246. (let [local-all-asset-file-paths
  247. (c.m/<? (worker-state/<invoke-main-thread :thread-api/get-all-asset-file-paths repo))
  248. local-all-asset-file-uuids (set (map (comp parse-uuid path/file-stem) local-all-asset-file-paths))
  249. local-all-asset-uuids (set (map :block/uuid (get-all-asset-blocks @conn)))]
  250. (when-let [asset-update-ops
  251. (not-empty
  252. (map (fn [asset-uuid] {:op :update-asset :block/uuid asset-uuid})
  253. (set/difference local-all-asset-uuids local-all-asset-file-uuids)))]
  254. (add-log-fn :rtc.asset.log/initial-download-missing-assets {:count (count asset-update-ops)})
  255. (m/? (new-task--pull-remote-asset-updates
  256. repo get-ws-create-task conn graph-uuid add-log-fn asset-update-ops))))))
  257. (defn create-assets-sync-loop
  258. [repo get-ws-create-task graph-uuid major-schema-version conn *auto-push?]
  259. (let [started-dfv (m/dfv)
  260. add-log-fn (fn [type message]
  261. (assert (map? message) message)
  262. (rtc-log-and-state/rtc-log type (assoc message :graph-uuid graph-uuid)))
  263. mixed-flow (create-mixed-flow repo *auto-push?)]
  264. {:onstarted-task started-dfv
  265. :assets-sync-loop-task
  266. (holding-assets-sync-lock
  267. started-dfv
  268. (m/sp
  269. (try
  270. (started-dfv true)
  271. (m/? (new-task--initial-download-missing-assets repo get-ws-create-task graph-uuid conn add-log-fn))
  272. (->>
  273. (let [event (m/?> mixed-flow)]
  274. (case (:type event)
  275. :remote-updates
  276. (when-let [asset-update-ops (not-empty (:value event))]
  277. (m/? (new-task--pull-remote-asset-updates
  278. repo get-ws-create-task conn graph-uuid add-log-fn asset-update-ops)))
  279. :local-update-check
  280. (m/? (new-task--push-local-asset-updates
  281. repo get-ws-create-task conn graph-uuid major-schema-version add-log-fn))))
  282. m/ap
  283. (m/reduce {} nil)
  284. m/?)
  285. (catch Cancelled e
  286. (add-log-fn :rtc.asset.log/cancelled {})
  287. (throw e)))))}))
  288. (comment
  289. (def x (atom 1))
  290. (def f (m/ap
  291. (let [r (m/?> (m/buffer 10 (m/watch x)))]
  292. (m/? (m/sleep 2000))
  293. r)))
  294. (def cancel ((m/reduce (fn [r e] (prn :e e)) f) prn prn)))