asset.cljs 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. (ns frontend.worker.rtc.asset
  2. "Fns to sync assets.
  3. some notes:
  4. - block/type contains \"asset\"
  5. - block/content, store the asset name
  6. - an asset-block not having :file/path indicates need to download asset from server
  7. - an asset-block not having :logseq.property.asset/remote-metadata
  8. indicates need to upload the asset to server
  9. - if an asset-block doesn't have both :file/path and :logseq.property.asset/remote-metadata,
  10. it means the other client hasn't uploaded the asset to server
  11. "
  12. (:require [cljs-http.client :as http]
  13. [datascript.core :as d]
  14. [frontend.common.missionary-util :as c.m]
  15. [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
  16. [frontend.worker.rtc.ws-util :as ws-util]
  17. [malli.core :as ma]
  18. [missionary.core :as m])
  19. (:import [missionary Cancelled]))
  20. (defn get-all-asset-blocks
  21. [db]
  22. (->> (d/q
  23. '[:find (pull ?asset [*])
  24. :in $
  25. :where
  26. [?asset :block/uuid]
  27. [?asset :block/type "asset"]]
  28. db)
  29. (apply concat)))
  30. (defn asset-block->upload+download-action
  31. [asset-block]
  32. (let [local-file-path (:file/path asset-block)
  33. remote-metadata (:logseq.property.asset/remote-metadata asset-block)]
  34. (cond
  35. (and local-file-path remote-metadata) nil
  36. (nil? local-file-path) :download
  37. (nil? remote-metadata) :upload)))
  38. (defn get-action->asset-blocks
  39. [db]
  40. (reduce
  41. (fn [action->asset-blocks asset-block]
  42. (if-let [action (asset-block->upload+download-action asset-block)]
  43. (update action->asset-blocks action (fnil conj #{}) asset-block)
  44. action->asset-blocks))
  45. {} (get-all-asset-blocks db)))
  46. (defn new-task--upload-assets
  47. [get-ws-create-task conn graph-uuid asset-uuids]
  48. {:pre [(every? uuid? asset-uuids)]}
  49. (m/sp
  50. (when (seq asset-uuids)
  51. (let [asset-uuid->url (->> (m/? (ws-util/send&recv get-ws-create-task
  52. {:action "get-assets-upload-urls"
  53. :graph-uuid graph-uuid
  54. :asset-uuid->metadata
  55. (into {}
  56. (map (fn [asset-uuid] [asset-uuid {"checksum" "TEST-CHECKSUM"}]))
  57. asset-uuids)}))
  58. :asset-uuid->url)]
  59. (doseq [[asset-uuid put-url] asset-uuid->url]
  60. (assert (uuid? asset-uuid) asset-uuid)
  61. (let [{:keys [status] :as r}
  62. (c.m/<? (http/put put-url {:headers {"x-amz-meta-checksum" "TEST-CHECKSUM"}
  63. :body (js/JSON.stringify
  64. (clj->js {:TEST-ASSET true
  65. :asset-uuid (str asset-uuid)
  66. :graph-uuid (str graph-uuid)}))
  67. :with-credentials? false}))]
  68. (if (not= 200 status)
  69. (prn :debug-failed-upload-asset {:resp r :asset-uuid asset-uuid :graph-uuid graph-uuid})
  70. (when (some? (d/entity @conn [:block/uuid asset-uuid]))
  71. (d/transact! conn [{:block/uuid asset-uuid
  72. :logseq.property.asset/remote-metadata {:checksum "TEST"}}])))))))))
  73. (defn new-task--download-assets
  74. [get-ws-create-task conn graph-uuid asset-uuids]
  75. {:pre [(every? uuid? asset-uuids)]}
  76. (m/sp
  77. (when (seq asset-uuids)
  78. (let [asset-uuid->url
  79. (->> (m/? (ws-util/send&recv get-ws-create-task {:action "get-assets-download-urls"
  80. :graph-uuid graph-uuid
  81. :asset-uuids asset-uuids}))
  82. :asset-uuid->url)]
  83. (doseq [[asset-uuid get-url] asset-uuid->url]
  84. (assert (uuid? asset-uuid) asset-uuid)
  85. (let [{:keys [status _body] :as r} (c.m/<? (http/get get-url {:with-credentials? false}))]
  86. (if (not= 200 status)
  87. (prn :debug-failed-download-asset {:resp r :asset-uuid asset-uuid :graph-uuid graph-uuid})
  88. (when (d/entity @conn [:block/uuid asset-uuid])
  89. (d/transact! conn [{:block/uuid asset-uuid
  90. :file/path "TEST-FILE-PATH"}])
  91. (prn :debug-succ-download-asset asset-uuid)))))))))
  92. (defonce ^:private *assets-sync-lock (atom nil))
  93. (defn- holding-assets-sync-lock
  94. "Use this to prevent multiple assets-sync loops at same time."
  95. [started-dfv task]
  96. (m/sp
  97. (when-not (compare-and-set! *assets-sync-lock nil true)
  98. (let [e (ex-info "Must not run multiple assets-sync loops"
  99. {:type :assets-sync.exception/lock-failed
  100. :missionary/retry true})]
  101. (started-dfv e)
  102. (throw e)))
  103. (try
  104. (m/? task)
  105. (finally
  106. (reset! *assets-sync-lock nil)))))
  107. (def ^:private asset-change-event-schema
  108. [:map-of
  109. [:enum :download :upload
  110. ;; Why don't need :delete event?
  111. ;; when remove-block-op sync to server, server will know this asset need to be deleted
  112. ;; :delete
  113. ]
  114. [:set :uuid]])
  115. (def ^:private asset-change-event-validator (ma/validator asset-change-event-schema))
  116. (defonce *global-asset-change-event (atom nil :validator asset-change-event-validator))
  117. (defonce ^:private global-asset-change-event-flow
  118. (m/buffer 20 (m/watch *global-asset-change-event)))
  119. (defn create-assets-sync-loop
  120. [get-ws-create-task graph-uuid conn]
  121. (let [started-dfv (m/dfv)
  122. asset-change-event-flow global-asset-change-event-flow
  123. add-log-fn (fn [type message]
  124. (assert (map? message) message)
  125. (rtc-log-and-state/rtc-log type (assoc message :graph-uuid graph-uuid)))]
  126. {:onstarted-task started-dfv
  127. :assets-sync-loop-task
  128. (holding-assets-sync-lock
  129. started-dfv
  130. (m/sp
  131. (try
  132. (started-dfv true)
  133. (let [action->asset-blocks (get-action->asset-blocks @conn)]
  134. (m/?
  135. (m/join
  136. (constantly nil)
  137. (m/sp
  138. ;; init phase:
  139. ;; generate all asset-change-events from db
  140. (when (or (seq (action->asset-blocks :download))
  141. (seq (action->asset-blocks :upload)))
  142. (prn "init phase: generate all asset-change-events from db" action->asset-blocks))
  143. (m/? (new-task--download-assets
  144. get-ws-create-task conn graph-uuid (map :block/uuid (action->asset-blocks :download))))
  145. (m/? (new-task--upload-assets
  146. get-ws-create-task conn graph-uuid (map :block/uuid (action->asset-blocks :upload)))))
  147. (->>
  148. (let [{asset-uuids-to-download :download
  149. asset-uuids-to-upload :upload} (m/?> asset-change-event-flow)]
  150. (m/? (new-task--download-assets get-ws-create-task conn graph-uuid asset-uuids-to-download))
  151. (m/? (new-task--upload-assets get-ws-create-task conn graph-uuid asset-uuids-to-upload)))
  152. m/ap (m/reduce {} nil)))))
  153. (catch Cancelled e
  154. (add-log-fn :rtc.asset.log/cancelled {})
  155. (throw e)))))}))
  156. (comment
  157. (def x (atom 1))
  158. (def f (m/ap
  159. (let [r (m/?> (m/buffer 10 (m/watch x)))]
  160. (m/? (m/sleep 2000))
  161. r)))
  162. (def cancel ((m/reduce (fn [r e] (prn :e e)) f) prn prn)))