full_upload_download_graph.cljs 24 KB


  1. (ns frontend.worker.rtc.full-upload-download-graph
  2. "- upload local graph to remote
  3. - download remote graph"
  4. (:require [cljs-http-missionary.client :as http]
  5. [clojure.set :as set]
  6. [datascript.core :as d]
  7. [frontend.common.missionary :as c.m]
  8. [frontend.common.thread-api :as thread-api]
  9. [frontend.worker.crypt :as crypt]
  10. [frontend.worker.db-listener :as db-listener]
  11. [frontend.worker.db-metadata :as worker-db-metadata]
  12. [frontend.worker.rtc.client-op :as client-op]
  13. [frontend.worker.rtc.const :as rtc-const]
  14. [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
  15. [frontend.worker.rtc.ws-util :as ws-util]
  16. [frontend.worker.state :as worker-state]
  17. [frontend.worker.util :as worker-util]
  18. [logseq.db :as ldb]
  19. [logseq.db.frontend.malli-schema :as db-malli-schema]
  20. [logseq.db.frontend.schema :as db-schema]
  21. [logseq.db.sqlite.create-graph :as sqlite-create-graph]
  22. [logseq.db.sqlite.util :as sqlite-util]
  23. [logseq.outliner.pipeline :as outliner-pipeline]
  24. [malli.core :as ma]
  25. [malli.transform :as mt]
  26. [missionary.core :as m]
  27. [promesa.core :as p]))
  28. (def ^:private normalized-remote-block-schema
  29. "Blocks stored in remote have some differences in format from the client's.
  30. Use this schema's coercer to decode."
  31. [:map
  32. [:db/id [:string {:decode/custom str}]]
  33. [:db/ident {:optional true} :keyword]
  34. [:block/uuid {:optional true} [:uuid {:decode/custom ldb/read-transit-str}]]
  35. [:block/order {:optional true} db-malli-schema/block-order]
  36. [:db/cardinality {:optional true} :keyword]
  37. [:db/valueType {:optional true} :keyword]
  38. [:db/index {:optional true} :boolean]
  39. [:malli.core/default [:map-of :keyword
  40. [:any {:decode/custom
  41. (fn [x] ; convert db-id to db-id-string(as temp-id)
  42. (cond
  43. (and (coll? x)
  44. (every? :db/id x))
  45. (map (comp str :db/id) x)
  46. (:db/id x)
  47. (str (:db/id x))
  48. (string? x)
  49. (ldb/read-transit-str x)
  50. (and (coll? x)
  51. (every? string? x))
  52. (map ldb/read-transit-str x)
  53. :else x))}]]]])
  54. (def ^:private normalized-remote-blocks-coercer
  55. (ma/coercer [:sequential normalized-remote-block-schema]
  56. (mt/transformer {:name :custom} mt/string-transformer)))
  57. (defn- schema->ref-type-attrs
  58. [db-schema]
  59. (set
  60. (keep
  61. (fn [[attr-name attr-body-map]]
  62. (when (= :db.type/ref (:db/valueType attr-body-map))
  63. attr-name))
  64. db-schema)))
  65. (defn- schema->card-many-attrs
  66. [db-schema]
  67. (set
  68. (keep
  69. (fn [[attr-name attr-body-map]]
  70. (when (= :db.cardinality/many (:db/cardinality attr-body-map))
  71. attr-name))
  72. db-schema)))
  73. (defn- export-as-blocks
  74. [db & {:keys [ignore-attr-set ignore-entity-set]}]
  75. (let [datoms (d/datoms db :eavt)
  76. db-schema (d/schema db)
  77. card-many-attrs (schema->card-many-attrs db-schema)
  78. ref-type-attrs (schema->ref-type-attrs db-schema)]
  79. (->> datoms
  80. (partition-by :e)
  81. (keep (fn [datoms]
  82. (when (seq datoms)
  83. (reduce
  84. (fn [r datom]
  85. (when (and (contains? #{:block/parent} (:a datom))
  86. (not (pos-int? (:v datom))))
  87. (throw (ex-info "invalid block data" {:datom datom})))
  88. (let [a (:a datom)]
  89. (cond
  90. (contains? ignore-attr-set a) r
  91. (and (keyword-identical? :db/ident a)
  92. (contains? ignore-entity-set (:v datom)))
  93. (reduced nil)
  94. :else
  95. (let [card-many? (contains? card-many-attrs a)
  96. ref? (contains? ref-type-attrs a)]
  97. (case [ref? card-many?]
  98. [true true]
  99. (update r a conj (str (:v datom)))
  100. [true false]
  101. (assoc r a (str (:v datom)))
  102. [false true]
  103. (update r a conj (ldb/write-transit-str (:v datom)))
  104. [false false]
  105. (assoc r a (ldb/write-transit-str (:v datom))))))))
  106. {:db/id (str (:e (first datoms)))}
  107. datoms))))
  108. (map (fn [block]
  109. (cond-> block
  110. (:db/ident block) (update :db/ident ldb/read-transit-str)
  111. (:block/order block) (update :block/order ldb/read-transit-str)))))))
  112. (defn- remove-rtc-data-in-conn!
  113. [repo]
  114. (client-op/reset-client-op-conn repo)
  115. (when-let [conn (worker-state/get-datascript-conn repo)]
  116. (d/transact! conn [[:db/retractEntity :logseq.kv/graph-uuid]
  117. [:db/retractEntity :logseq.kv/graph-local-tx]
  118. [:db/retractEntity :logseq.kv/remote-schema-version]])))
  119. (defn new-task--upload-graph
  120. [get-ws-create-task repo conn remote-graph-name major-schema-version]
  121. (m/sp
  122. (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :fetching-presigned-put-url
  123. :message "fetching presigned put-url"})
  124. (let [[{:keys [url key]} all-blocks-str]
  125. (m/?
  126. (m/join
  127. vector
  128. (ws-util/send&recv get-ws-create-task {:action "presign-put-temp-s3-obj"})
  129. (m/sp
  130. (let [all-blocks (export-as-blocks
  131. @conn
  132. :ignore-attr-set rtc-const/ignore-attrs-when-init-upload
  133. :ignore-entity-set rtc-const/ignore-entities-when-init-upload)]
  134. (ldb/write-transit-str all-blocks)))))]
  135. (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :upload-data
  136. :message "uploading data"})
  137. (m/? (http/put url {:body all-blocks-str :with-credentials? false}))
  138. (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :request-upload-graph
  139. :message "requesting upload-graph"})
  140. (let [aes-key (c.m/<? (crypt/<gen-aes-key))
  141. aes-key-jwk (ldb/write-transit-str (c.m/<? (crypt/<export-key aes-key)))
  142. upload-resp
  143. (m/? (ws-util/send&recv get-ws-create-task {:action "upload-graph"
  144. :s3-key key
  145. :schema-version (str major-schema-version)
  146. :graph-name remote-graph-name}))]
  147. (if-let [graph-uuid (:graph-uuid upload-resp)]
  148. (let [schema-version (ldb/get-graph-schema-version @conn)]
  149. (ldb/transact! conn
  150. [(ldb/kv :logseq.kv/graph-uuid graph-uuid)
  151. (ldb/kv :logseq.kv/graph-local-tx "0")
  152. (ldb/kv :logseq.kv/remote-schema-version schema-version)])
  153. (client-op/update-graph-uuid repo graph-uuid)
  154. (client-op/remove-local-tx repo)
  155. (client-op/add-all-exists-asset-as-ops repo)
  156. (crypt/store-graph-keys-jwk repo aes-key-jwk)
  157. (when-not rtc-const/RTC-E2E-TEST
  158. (c.m/<? (worker-db-metadata/<store repo (pr-str {:kv/value graph-uuid}))))
  159. (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :upload-completed
  160. :message "upload-graph completed"})
  161. {:graph-uuid graph-uuid})
  162. (throw (ex-info "upload-graph failed" {:upload-resp upload-resp})))))))
  163. (def page-of-block
  164. (memoize
  165. (fn [id->block-map block]
  166. (when-let [parent-id (:block/parent block)]
  167. (when-let [parent (id->block-map parent-id)]
  168. (if (:block/name parent)
  169. parent
  170. (page-of-block id->block-map parent)))))))
  171. (defn- fill-block-fields
  172. [blocks]
  173. (let [groups (group-by #(boolean (:block/name %)) blocks)
  174. other-blocks (set (get groups false))
  175. id->block (into {} (map (juxt :db/id identity) blocks))
  176. block-id->page-id (into {} (map (fn [b] [(:db/id b) (:db/id (page-of-block id->block b))]) other-blocks))]
  177. (mapv (fn [b]
  178. (if-let [page-id (block-id->page-id (:db/id b))]
  179. (assoc b :block/page page-id)
  180. b))
  181. blocks)))
  182. (defn- blocks->card-one-attrs
  183. [blocks]
  184. (set
  185. (keep
  186. (fn [block]
  187. (when-let [db-ident (:db/ident block)]
  188. (when (= :db.cardinality/one (:db/cardinality block))
  189. db-ident)))
  190. blocks)))
  191. (defn- convert-card-one-value-from-value-coll
  192. [card-one-attrs block]
  193. (let [card-one-attrs-in-block (set/intersection (set (keys block)) card-one-attrs)]
  194. (merge block
  195. (update-vals (select-keys block card-one-attrs-in-block)
  196. (fn [v]
  197. (if (or (sequential? v)
  198. (set? v))
  199. (first v)
  200. v))))))
  201. (defn- transact-remote-schema-version!
  202. [repo]
  203. (when-let [conn (worker-state/get-datascript-conn repo)]
  204. (let [db @conn]
  205. (when-let [schema-version (:kv/value (d/entity db :logseq.kv/schema-version))]
  206. (d/transact! conn
  207. [(ldb/kv :logseq.kv/remote-schema-version schema-version)]
  208. {:rtc-download-graph? true
  209. :gen-undo-ops? false
  210. :persist-op? false})))))
  211. (defn- transact-block-refs!
  212. [repo]
  213. (when-let [conn (worker-state/get-datascript-conn repo)]
  214. (let [db @conn
  215. ;; get all the block datoms
  216. datoms (d/datoms db :avet :block/uuid)
  217. refs-tx (keep
  218. (fn [d]
  219. (let [block (d/entity @conn (:e d))
  220. refs (outliner-pipeline/db-rebuild-block-refs @conn block)]
  221. (when (seq refs)
  222. {:db/id (:db/id block)
  223. :block/refs refs})))
  224. datoms)]
  225. (ldb/transact! conn refs-tx (cond-> {:outliner-op :rtc-download-rebuild-block-refs}
  226. rtc-const/RTC-E2E-TEST (assoc :frontend.worker.pipeline/skip-store-conn true))))))
  227. (defn- block->schema-map
  228. [block]
  229. (when-let [db-ident (:db/ident block)]
  230. (let [value-type (:db/valueType block)
  231. cardinality (:db/cardinality block)
  232. db-index (:db/index block)]
  233. (when (or value-type cardinality db-index)
  234. (cond-> {:db/ident db-ident}
  235. value-type (assoc :db/valueType value-type)
  236. cardinality (assoc :db/cardinality cardinality)
  237. db-index (assoc :db/index db-index))))))
  238. (defn- blocks->schema-blocks+normal-blocks
  239. [blocks]
  240. (reduce
  241. (fn [[schema-blocks normal-blocks] block]
  242. (if-let [schema-block (block->schema-map block)]
  243. (let [strip-schema-attrs-block (dissoc block :db/valueType :db/cardinality :db/index)]
  244. [(conj schema-blocks schema-block) (conj normal-blocks strip-schema-attrs-block)])
  245. [schema-blocks (conj normal-blocks block)]))
  246. [[] []] blocks))
  247. (defn- create-graph-for-rtc-test
  248. "it's complex to setup db-worker related stuff, when I only want to test rtc related logic"
  249. [repo init-tx-data other-tx-data]
  250. (let [conn (d/create-conn db-schema/schema)
  251. db-initial-data (sqlite-create-graph/build-db-initial-data "")]
  252. (swap! worker-state/*datascript-conns assoc repo conn)
  253. (d/transact! conn db-initial-data {:initial-db? true
  254. :frontend.worker.pipeline/skip-store-conn rtc-const/RTC-E2E-TEST})
  255. (db-listener/listen-db-changes! repo conn)
  256. (d/transact! conn init-tx-data {:rtc-download-graph? true
  257. :gen-undo-ops? false
  258. ;; only transact db schema, skip validation to avoid warning
  259. :frontend.worker.pipeline/skip-validate-db? true
  260. :frontend.worker.pipeline/skip-store-conn rtc-const/RTC-E2E-TEST
  261. :persist-op? false})
  262. (d/transact! conn other-tx-data {:rtc-download-graph? true
  263. :gen-undo-ops? false
  264. :frontend.worker.pipeline/skip-store-conn rtc-const/RTC-E2E-TEST
  265. :persist-op? false})
  266. (transact-remote-schema-version! repo)
  267. (transact-block-refs! repo)))
  268. (defn- blocks-resolve-temp-id
  269. [schema-blocks blocks]
  270. (let [uuids (map :block/uuid blocks)
  271. idents (map :db/ident blocks)
  272. ids (map :db/id blocks)
  273. id->uuid (zipmap ids uuids)
  274. id->ident (zipmap ids idents)
  275. id-tx-data (map (fn [id]
  276. (let [uuid' (id->uuid id)
  277. ident (id->ident id)]
  278. (cond-> {:block/uuid uuid'}
  279. ident
  280. (assoc :db/ident ident)))) ids)
  281. id-ref-exists? (fn [v] (and (string? v) (or (get id->ident v) (get id->uuid v))))
  282. ref-k-set (set (keep (fn [b] (when (= :db.type/ref (:db/valueType b))
  283. (:db/ident b)))
  284. schema-blocks))
  285. ref-k? (fn [k] (contains? ref-k-set k))
  286. blocks-tx-data (map (fn [block]
  287. (->> (map
  288. (fn [[k v]]
  289. (let [v
  290. (if (ref-k? k)
  291. (cond
  292. (id-ref-exists? v)
  293. (or (get id->ident v) [:block/uuid (get id->uuid v)])
  294. (and (sequential? v) (every? id-ref-exists? v))
  295. (map (fn [id] (or (get id->ident id) [:block/uuid (get id->uuid id)])) v)
  296. :else
  297. v)
  298. v)]
  299. [k v]))
  300. (dissoc block :db/id))
  301. (into {}))) blocks)]
  302. (concat id-tx-data blocks-tx-data)))
  303. (defn- remote-all-blocks=>client-blocks
  304. [all-blocks ignore-attr-set ignore-entity-set]
  305. (let [{:keys [_ t blocks]} all-blocks
  306. card-one-attrs (blocks->card-one-attrs blocks)
  307. blocks1 (worker-util/profile :convert-card-one-value-from-value-coll
  308. (map (partial convert-card-one-value-from-value-coll card-one-attrs) blocks))
  309. blocks2 (worker-util/profile :normalize-remote-blocks
  310. (normalized-remote-blocks-coercer blocks1))
  311. blocks (sequence
  312. (comp
  313. ;;TODO: remove this
  314. ;;client/schema already converted to :db/cardinality, :db/valueType by remote,
  315. ;;and :client/schema should be removed by remote too
  316. (map #(dissoc % :client/schema))
  317. (remove (fn [block] (contains? ignore-entity-set (:db/ident block))))
  318. (map (fn [block]
  319. (into {} (remove (comp (partial contains? ignore-attr-set) first)) block))))
  320. blocks2)
  321. blocks (fill-block-fields blocks)]
  322. blocks))
  323. (defn- remote-all-blocks->tx-data+t
  324. "Return
  325. {:remote-t ...
  326. :init-tx-data ...
  327. :tx-data ...}
  328. init-tx-data - schema data and other init-data, need to be transacted first
  329. tx-data - all other data"
  330. [remote-all-blocks graph-uuid]
  331. (let [t (:t remote-all-blocks)
  332. blocks (remote-all-blocks=>client-blocks
  333. remote-all-blocks
  334. rtc-const/ignore-attrs-when-init-download
  335. rtc-const/ignore-entities-when-init-download)
  336. [schema-blocks normal-blocks] (blocks->schema-blocks+normal-blocks blocks)
  337. tx-data (concat
  338. (blocks-resolve-temp-id schema-blocks normal-blocks)
  339. [(ldb/kv :logseq.kv/graph-uuid graph-uuid)])
  340. init-tx-data (cons (ldb/kv :logseq.kv/db-type "db") schema-blocks)]
  341. {:remote-t t
  342. :init-tx-data init-tx-data
  343. :tx-data tx-data}))
  344. (defn- new-task--transact-remote-all-blocks!
  345. [all-blocks repo graph-uuid]
  346. (let [{:keys [remote-t init-tx-data tx-data]}
  347. (remote-all-blocks->tx-data+t all-blocks graph-uuid)]
  348. (m/sp
  349. (client-op/update-local-tx repo remote-t)
  350. (rtc-log-and-state/update-local-t graph-uuid remote-t)
  351. (rtc-log-and-state/update-remote-t graph-uuid remote-t)
  352. (if rtc-const/RTC-E2E-TEST
  353. (create-graph-for-rtc-test repo init-tx-data tx-data)
  354. (c.m/<?
  355. (p/do!
  356. ((@thread-api/*thread-apis :thread-api/create-or-open-db) repo {:close-other-db? false})
  357. ((@thread-api/*thread-apis :thread-api/export-db) repo)
  358. ((@thread-api/*thread-apis :thread-api/transact)
  359. repo init-tx-data
  360. {:rtc-download-graph? true
  361. :gen-undo-ops? false
  362. ;; only transact db schema, skip validation to avoid warning
  363. :frontend.worker.pipeline/skip-validate-db? true
  364. :persist-op? false}
  365. (worker-state/get-context))
  366. ((@thread-api/*thread-apis :thread-api/transact)
  367. repo tx-data {:rtc-download-graph? true
  368. :gen-undo-ops? false
  369. :persist-op? false} (worker-state/get-context))
  370. (transact-remote-schema-version! repo)
  371. (transact-block-refs! repo))))
  372. (worker-util/post-message :add-repo {:repo repo}))))
  373. ;;;;;;;;;;;;;;;;;;;;;;;;;;
  374. ;; async download-graph ;;
  375. ;;;;;;;;;;;;;;;;;;;;;;;;;;
  376. (defn new-task--request-download-graph
  377. [get-ws-create-task graph-uuid schema-version]
  378. (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :request-download-graph
  379. :message "requesting download graph"
  380. :graph-uuid graph-uuid
  381. :schema-version schema-version})
  382. (m/join :download-info-uuid
  383. (ws-util/send&recv get-ws-create-task {:action "download-graph"
  384. :graph-uuid graph-uuid
  385. :schema-version (str schema-version)})))
  386. (defn new-task--download-info-list
  387. [get-ws-create-task graph-uuid schema-version]
  388. (m/join :download-info-list
  389. (ws-util/send&recv get-ws-create-task {:action "download-info-list"
  390. :graph-uuid graph-uuid
  391. :schema-version (str schema-version)})))
  392. (defn new-task--wait-download-info-ready
  393. [get-ws-create-task download-info-uuid graph-uuid schema-version timeout-ms]
  394. (->
  395. (m/sp
  396. (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :wait-remote-graph-data-ready
  397. :message "waiting for the remote to prepare the data"
  398. :graph-uuid graph-uuid})
  399. (loop []
  400. (m/? (m/sleep 3000))
  401. (let [{:keys [download-info-list]}
  402. (m/? (ws-util/send&recv get-ws-create-task {:action "download-info-list"
  403. :graph-uuid graph-uuid
  404. :schema-version (str schema-version)}))]
  405. (if-let [found-download-info
  406. (some
  407. (fn [download-info]
  408. (when (and (= download-info-uuid (:download-info-uuid download-info))
  409. (:download-info-s3-url download-info))
  410. download-info))
  411. download-info-list)]
  412. found-download-info
  413. (recur)))))
  414. (m/timeout timeout-ms :timeout)))
  415. (defn new-task--download-graph-from-s3
  416. [graph-uuid graph-name s3-url]
  417. (m/sp
  418. (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :downloading-graph-data
  419. :message "downloading graph data"
  420. :graph-uuid graph-uuid})
  421. (let [{:keys [status body] :as r} (m/? (http/get s3-url {:with-credentials? false}))
  422. repo (str sqlite-util/db-version-prefix graph-name)]
  423. (if (not= 200 status)
  424. (throw (ex-info "download-graph from s3 failed" {:resp r}))
  425. (do
  426. (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :transact-graph-data-to-db
  427. :message "transacting graph data to local db"
  428. :graph-uuid graph-uuid})
  429. (let [all-blocks (ldb/read-transit-str body)]
  430. (worker-state/set-rtc-downloading-graph! true)
  431. (m/? (new-task--transact-remote-all-blocks! all-blocks repo graph-uuid))
  432. (client-op/update-graph-uuid repo graph-uuid)
  433. (when-not rtc-const/RTC-E2E-TEST
  434. (c.m/<? (worker-db-metadata/<store repo (pr-str {:kv/value graph-uuid}))))
  435. (worker-state/set-rtc-downloading-graph! false)
  436. (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :download-completed
  437. :message "download completed"
  438. :graph-uuid graph-uuid})
  439. nil))))))
  440. (defn new-task--branch-graph
  441. [get-ws-create-task repo conn graph-uuid major-schema-version]
  442. (m/sp
  443. (rtc-log-and-state/rtc-log :rtc.log/branch-graph {:sub-type :fetching-presigned-put-url
  444. :message "fetching presigned put-url"})
  445. (remove-rtc-data-in-conn! repo)
  446. (let [[{:keys [url key]} all-blocks-str]
  447. (m/?
  448. (m/join
  449. vector
  450. (ws-util/send&recv get-ws-create-task {:action "presign-put-temp-s3-obj"})
  451. (m/sp
  452. (let [all-blocks (export-as-blocks
  453. @conn
  454. :ignore-attr-set rtc-const/ignore-attrs-when-init-upload
  455. :ignore-entity-set rtc-const/ignore-entities-when-init-upload)]
  456. (ldb/write-transit-str all-blocks)))))]
  457. (rtc-log-and-state/rtc-log :rtc.log/branch-graph {:sub-type :upload-data
  458. :message "uploading data"})
  459. (m/? (http/put url {:body all-blocks-str :with-credentials? false}))
  460. (rtc-log-and-state/rtc-log :rtc.log/branch-graph {:sub-type :request-branch-graph
  461. :message "requesting branch-graph"})
  462. (let [aes-key (c.m/<? (crypt/<gen-aes-key))
  463. aes-key-jwk (ldb/write-transit-str (c.m/<? (crypt/<export-key aes-key)))
  464. resp (m/? (ws-util/send&recv get-ws-create-task {:action "branch-graph"
  465. :s3-key key
  466. :schema-version (str major-schema-version)
  467. :graph-uuid graph-uuid}))]
  468. (if-let [graph-uuid (:graph-uuid resp)]
  469. (let [schema-version (ldb/get-graph-schema-version @conn)]
  470. (ldb/transact! conn
  471. [(ldb/kv :logseq.kv/graph-uuid graph-uuid)
  472. (ldb/kv :logseq.kv/graph-local-tx "0")
  473. (ldb/kv :logseq.kv/remote-schema-version schema-version)])
  474. (client-op/update-graph-uuid repo graph-uuid)
  475. (client-op/remove-local-tx repo)
  476. (client-op/add-all-exists-asset-as-ops repo)
  477. (crypt/store-graph-keys-jwk repo aes-key-jwk)
  478. (c.m/<? (worker-db-metadata/<store repo (pr-str {:kv/value graph-uuid})))
  479. (rtc-log-and-state/rtc-log :rtc.log/branch-graph {:sub-type :completed
  480. :message "branch-graph completed"})
  481. nil)
  482. (throw (ex-info "branch-graph failed" {:upload-resp resp})))))))