full_upload_download_graph.cljs 25 KB


  1. (ns frontend.worker.rtc.full-upload-download-graph
  2. "- upload local graph to remote
  3. - download remote graph"
  4. (:require [cljs-http-missionary.client :as http]
  5. [clojure.set :as set]
  6. [datascript.core :as d]
  7. [frontend.common.missionary :as c.m]
  8. [frontend.common.thread-api :as thread-api]
  9. [frontend.worker.crypt :as crypt]
  10. [frontend.worker.db-listener :as db-listener]
  11. [frontend.worker.db-metadata :as worker-db-metadata]
  12. [frontend.worker.rtc.client-op :as client-op]
  13. [frontend.worker.rtc.const :as rtc-const]
  14. [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
  15. [frontend.worker.rtc.ws-util :as ws-util]
  16. [frontend.worker.shared-service :as shared-service]
  17. [frontend.worker.state :as worker-state]
  18. [frontend.worker.util :as worker-util]
  19. [logseq.db :as ldb]
  20. [logseq.db.frontend.malli-schema :as db-malli-schema]
  21. [logseq.db.frontend.schema :as db-schema]
  22. [logseq.db.sqlite.create-graph :as sqlite-create-graph]
  23. [logseq.db.sqlite.util :as sqlite-util]
  24. [logseq.outliner.pipeline :as outliner-pipeline]
  25. [malli.core :as ma]
  26. [malli.transform :as mt]
  27. [missionary.core :as m]
  28. [promesa.core :as p]))
  29. (def ^:private normalized-remote-block-schema
  30. "Blocks stored in remote have some differences in format from the client's.
  31. Use this schema's coercer to decode."
  32. [:map
  33. [:db/id [:string {:decode/custom str}]]
  34. [:db/ident {:optional true} :keyword]
  35. [:block/uuid {:optional true} [:uuid {:decode/custom ldb/read-transit-str}]]
  36. [:block/order {:optional true} db-malli-schema/block-order]
  37. [:db/cardinality {:optional true} :keyword]
  38. [:db/valueType {:optional true} :keyword]
  39. [:db/index {:optional true} :boolean]
  40. [:malli.core/default [:map-of :keyword
  41. [:any {:decode/custom
  42. (fn [x] ; convert db-id to db-id-string(as temp-id)
  43. (cond
  44. (and (coll? x)
  45. (every? :db/id x))
  46. (map (comp str :db/id) x)
  47. (:db/id x)
  48. (str (:db/id x))
  49. (string? x)
  50. (ldb/read-transit-str x)
  51. (and (coll? x)
  52. (every? string? x))
  53. (map ldb/read-transit-str x)
  54. :else x))}]]]])
  55. (def ^:private normalized-remote-blocks-coercer
  56. (ma/coercer [:sequential normalized-remote-block-schema]
  57. (mt/transformer {:name :custom} mt/string-transformer)))
  58. (defn- schema->ref-type-attrs
  59. [db-schema]
  60. (set
  61. (keep
  62. (fn [[attr-name attr-body-map]]
  63. (when (= :db.type/ref (:db/valueType attr-body-map))
  64. attr-name))
  65. db-schema)))
  66. (defn- schema->card-many-attrs
  67. [db-schema]
  68. (set
  69. (keep
  70. (fn [[attr-name attr-body-map]]
  71. (when (= :db.cardinality/many (:db/cardinality attr-body-map))
  72. attr-name))
  73. db-schema)))
  74. (defn- export-as-blocks
  75. [db & {:keys [ignore-attr-set ignore-entity-set]}]
  76. (let [datoms (d/datoms db :eavt)
  77. db-schema (d/schema db)
  78. card-many-attrs (schema->card-many-attrs db-schema)
  79. ref-type-attrs (schema->ref-type-attrs db-schema)]
  80. (->> datoms
  81. (partition-by :e)
  82. (keep (fn [datoms]
  83. (when (seq datoms)
  84. (reduce
  85. (fn [r datom]
  86. (when (and (contains? #{:block/parent} (:a datom))
  87. (not (pos-int? (:v datom))))
  88. (throw (ex-info "invalid block data" {:datom datom})))
  89. (let [a (:a datom)]
  90. (cond
  91. (contains? ignore-attr-set a) r
  92. (and (keyword-identical? :db/ident a)
  93. (contains? ignore-entity-set (:v datom)))
  94. (reduced nil)
  95. :else
  96. (let [card-many? (contains? card-many-attrs a)
  97. ref? (contains? ref-type-attrs a)]
  98. (case [ref? card-many?]
  99. [true true]
  100. (update r a conj (str (:v datom)))
  101. [true false]
  102. (assoc r a (str (:v datom)))
  103. [false true]
  104. (update r a conj (ldb/write-transit-str (:v datom)))
  105. [false false]
  106. (assoc r a (ldb/write-transit-str (:v datom))))))))
  107. {:db/id (str (:e (first datoms)))}
  108. datoms))))
  109. (map (fn [block]
  110. (cond-> block
  111. (:db/ident block) (update :db/ident ldb/read-transit-str)
  112. (:block/order block) (update :block/order ldb/read-transit-str)))))))
  113. (defn- remove-rtc-data-in-conn!
  114. [repo]
  115. (client-op/reset-client-op-conn repo)
  116. (when-let [conn (worker-state/get-datascript-conn repo)]
  117. (d/transact! conn [[:db/retractEntity :logseq.kv/graph-uuid]
  118. [:db/retractEntity :logseq.kv/graph-local-tx]
  119. [:db/retractEntity :logseq.kv/remote-schema-version]])))
  120. (defn new-task--upload-graph
  121. [get-ws-create-task repo conn remote-graph-name major-schema-version]
  122. (m/sp
  123. (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :fetching-presigned-put-url
  124. :message "fetching presigned put-url"})
  125. (let [[{:keys [url key]} all-blocks-str]
  126. (m/?
  127. (m/join
  128. vector
  129. (ws-util/send&recv get-ws-create-task {:action "presign-put-temp-s3-obj"})
  130. (m/sp
  131. (let [all-blocks (export-as-blocks
  132. @conn
  133. :ignore-attr-set rtc-const/ignore-attrs-when-init-upload
  134. :ignore-entity-set rtc-const/ignore-entities-when-init-upload)]
  135. (ldb/write-transit-str all-blocks)))))]
  136. (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :upload-data
  137. :message "uploading data"})
  138. (m/? (http/put url {:body all-blocks-str :with-credentials? false}))
  139. (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :request-upload-graph
  140. :message "requesting upload-graph"})
  141. (let [aes-key (c.m/<? (crypt/<gen-aes-key))
  142. aes-key-jwk (ldb/write-transit-str (c.m/<? (crypt/<export-key aes-key)))
  143. upload-resp
  144. (m/? (ws-util/send&recv get-ws-create-task {:action "upload-graph"
  145. :s3-key key
  146. :schema-version (str major-schema-version)
  147. :graph-name remote-graph-name}))]
  148. (if-let [graph-uuid (:graph-uuid upload-resp)]
  149. (let [schema-version (ldb/get-graph-schema-version @conn)]
  150. (ldb/transact! conn
  151. [(ldb/kv :logseq.kv/graph-uuid graph-uuid)
  152. (ldb/kv :logseq.kv/graph-local-tx "0")
  153. (ldb/kv :logseq.kv/remote-schema-version schema-version)])
  154. (client-op/update-graph-uuid repo graph-uuid)
  155. (client-op/remove-local-tx repo)
  156. (client-op/add-all-exists-asset-as-ops repo)
  157. (crypt/store-graph-keys-jwk repo aes-key-jwk)
  158. (when-not rtc-const/RTC-E2E-TEST
  159. (c.m/<? (worker-db-metadata/<store repo (pr-str {:kv/value graph-uuid}))))
  160. (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :upload-completed
  161. :message "upload-graph completed"})
  162. {:graph-uuid graph-uuid})
  163. (throw (ex-info "upload-graph failed" {:upload-resp upload-resp})))))))
  164. (def page-of-block
  165. (memoize
  166. (fn [id->block-map block]
  167. (when-let [parent-id (:block/parent block)]
  168. (when-let [parent (id->block-map parent-id)]
  169. (if (:block/name parent)
  170. parent
  171. (page-of-block id->block-map parent)))))))
  172. (defn- fill-block-fields
  173. [blocks]
  174. (let [groups (group-by #(boolean (:block/name %)) blocks)
  175. other-blocks (set (get groups false))
  176. id->block (into {} (map (juxt :db/id identity) blocks))
  177. block-id->page-id (into {} (map (fn [b] [(:db/id b) (:db/id (page-of-block id->block b))]) other-blocks))]
  178. (mapv (fn [b]
  179. (if-let [page-id (block-id->page-id (:db/id b))]
  180. (assoc b :block/page page-id)
  181. b))
  182. blocks)))
  183. (defn- blocks->card-one-attrs
  184. [blocks]
  185. (set
  186. (keep
  187. (fn [block]
  188. (when-let [db-ident (:db/ident block)]
  189. (when (= :db.cardinality/one (:db/cardinality block))
  190. db-ident)))
  191. blocks)))
  192. (defn- convert-card-one-value-from-value-coll
  193. [card-one-attrs block]
  194. (let [card-one-attrs-in-block (set/intersection (set (keys block)) card-one-attrs)]
  195. (merge block
  196. (update-vals (select-keys block card-one-attrs-in-block)
  197. (fn [v]
  198. (if (or (sequential? v)
  199. (set? v))
  200. (first v)
  201. v))))))
  202. (defn- transact-remote-schema-version!
  203. [repo]
  204. (when-let [conn (worker-state/get-datascript-conn repo)]
  205. (let [db @conn]
  206. (when-let [schema-version (:kv/value (d/entity db :logseq.kv/schema-version))]
  207. (d/transact! conn
  208. [(ldb/kv :logseq.kv/remote-schema-version schema-version)]
  209. {:rtc-download-graph? true
  210. :gen-undo-ops? false
  211. :persist-op? false})))))
  212. (defn- transact-block-refs!
  213. [repo]
  214. (when-let [conn (worker-state/get-datascript-conn repo)]
  215. (let [db @conn
  216. ;; get all the block datoms
  217. datoms (d/datoms db :avet :block/uuid)
  218. refs-tx (keep
  219. (fn [d]
  220. (let [block (d/entity @conn (:e d))
  221. refs (outliner-pipeline/db-rebuild-block-refs @conn block)]
  222. (when (seq refs)
  223. {:db/id (:db/id block)
  224. :block/refs refs})))
  225. datoms)]
  226. (ldb/transact! conn refs-tx (cond-> {:outliner-op :rtc-download-rebuild-block-refs}
  227. rtc-const/RTC-E2E-TEST (assoc :frontend.worker.pipeline/skip-store-conn true))))))
  228. (defn- block->schema-map
  229. [block]
  230. (when-let [db-ident (:db/ident block)]
  231. (let [value-type (:db/valueType block)
  232. cardinality (:db/cardinality block)
  233. db-index (:db/index block)]
  234. (when (or value-type cardinality db-index)
  235. (cond-> {:db/ident db-ident}
  236. value-type (assoc :db/valueType value-type)
  237. cardinality (assoc :db/cardinality cardinality)
  238. db-index (assoc :db/index db-index))))))
  239. (defn- blocks->schema-blocks+normal-blocks
  240. [blocks]
  241. (reduce
  242. (fn [[schema-blocks normal-blocks] block]
  243. (if-let [schema-block (block->schema-map block)]
  244. (let [strip-schema-attrs-block (dissoc block :db/valueType :db/cardinality :db/index)]
  245. [(conj schema-blocks schema-block) (conj normal-blocks strip-schema-attrs-block)])
  246. [schema-blocks (conj normal-blocks block)]))
  247. [[] []] blocks))
  248. (defn- create-graph-for-rtc-test
  249. "it's complex to setup db-worker related stuff, when I only want to test rtc related logic"
  250. [repo init-tx-data other-tx-data]
  251. (let [conn (d/create-conn db-schema/schema)
  252. db-initial-data (sqlite-create-graph/build-db-initial-data "")]
  253. (swap! worker-state/*datascript-conns assoc repo conn)
  254. (d/transact! conn db-initial-data {:initial-db? true
  255. :frontend.worker.pipeline/skip-store-conn rtc-const/RTC-E2E-TEST})
  256. (db-listener/listen-db-changes! repo conn)
  257. (d/transact! conn init-tx-data {:rtc-download-graph? true
  258. :gen-undo-ops? false
  259. ;; only transact db schema, skip validation to avoid warning
  260. :frontend.worker.pipeline/skip-validate-db? true
  261. :frontend.worker.pipeline/skip-store-conn rtc-const/RTC-E2E-TEST
  262. :persist-op? false})
  263. (d/transact! conn other-tx-data {:rtc-download-graph? true
  264. :gen-undo-ops? false
  265. :frontend.worker.pipeline/skip-store-conn rtc-const/RTC-E2E-TEST
  266. :persist-op? false})
  267. (transact-remote-schema-version! repo)
  268. (transact-block-refs! repo)))
  269. (defn- blocks-resolve-temp-id
  270. [schema-blocks blocks]
  271. (let [uuids (map :block/uuid blocks)
  272. idents (map :db/ident blocks)
  273. ids (map :db/id blocks)
  274. id->uuid (zipmap ids uuids)
  275. id->ident (zipmap ids idents)
  276. id-tx-data (map (fn [id]
  277. (let [uuid' (id->uuid id)
  278. ident (id->ident id)]
  279. (cond-> {:block/uuid uuid'}
  280. ident
  281. (assoc :db/ident ident)))) ids)
  282. id-ref-exists? (fn [v] (and (string? v) (or (get id->ident v) (get id->uuid v))))
  283. ref-k-set (set (keep (fn [b] (when (= :db.type/ref (:db/valueType b))
  284. (:db/ident b)))
  285. schema-blocks))
  286. ref-k? (fn [k] (contains? ref-k-set k))
  287. blocks-tx-data (map (fn [block]
  288. (->> (map
  289. (fn [[k v]]
  290. (let [v
  291. (if (ref-k? k)
  292. (cond
  293. (id-ref-exists? v)
  294. (or (get id->ident v) [:block/uuid (get id->uuid v)])
  295. (and (sequential? v) (every? id-ref-exists? v))
  296. (map (fn [id] (or (get id->ident id) [:block/uuid (get id->uuid id)])) v)
  297. :else
  298. v)
  299. v)]
  300. [k v]))
  301. (dissoc block :db/id))
  302. (into {}))) blocks)]
  303. (concat id-tx-data blocks-tx-data)))
  304. (defn- remote-all-blocks=>client-blocks
  305. [all-blocks ignore-attr-set ignore-entity-set]
  306. (let [{:keys [_ _t blocks]} all-blocks
  307. card-one-attrs (blocks->card-one-attrs blocks)
  308. blocks1 (worker-util/profile :convert-card-one-value-from-value-coll
  309. (map (partial convert-card-one-value-from-value-coll card-one-attrs) blocks))
  310. blocks2 (worker-util/profile :normalize-remote-blocks
  311. (normalized-remote-blocks-coercer blocks1))
  312. blocks (sequence
  313. (comp
  314. ;;TODO: remove this
  315. ;;client/schema already converted to :db/cardinality, :db/valueType by remote,
  316. ;;and :client/schema should be removed by remote too
  317. (map #(dissoc % :client/schema))
  318. (remove (fn [block] (contains? ignore-entity-set (:db/ident block))))
  319. (map (fn [block]
  320. (into {} (remove (comp (partial contains? ignore-attr-set) first)) block))))
  321. blocks2)
  322. blocks (fill-block-fields blocks)]
  323. blocks))
  324. (defn- remote-all-blocks->tx-data+t
  325. "Return
  326. {:remote-t ...
  327. :init-tx-data ...
  328. :tx-data ...}
  329. init-tx-data - schema data and other init-data, need to be transacted first
  330. tx-data - all other data"
  331. [remote-all-blocks graph-uuid]
  332. (let [t (:t remote-all-blocks)
  333. blocks (remote-all-blocks=>client-blocks
  334. remote-all-blocks
  335. rtc-const/ignore-attrs-when-init-download
  336. rtc-const/ignore-entities-when-init-download)
  337. [schema-blocks normal-blocks] (blocks->schema-blocks+normal-blocks blocks)
  338. tx-data (concat
  339. (blocks-resolve-temp-id schema-blocks normal-blocks)
  340. [(ldb/kv :logseq.kv/graph-uuid graph-uuid)])
  341. init-tx-data (cons (ldb/kv :logseq.kv/db-type "db") schema-blocks)]
  342. {:remote-t t
  343. :init-tx-data init-tx-data
  344. :tx-data tx-data}))
  345. (defn- new-task--transact-remote-all-blocks!
  346. [all-blocks repo graph-uuid]
  347. (let [{:keys [remote-t init-tx-data tx-data]}
  348. (remote-all-blocks->tx-data+t all-blocks graph-uuid)]
  349. (m/sp
  350. (client-op/update-local-tx repo remote-t)
  351. (rtc-log-and-state/update-local-t graph-uuid remote-t)
  352. (rtc-log-and-state/update-remote-t graph-uuid remote-t)
  353. (if rtc-const/RTC-E2E-TEST
  354. (create-graph-for-rtc-test repo init-tx-data tx-data)
  355. (c.m/<?
  356. (p/do!
  357. ((@thread-api/*thread-apis :thread-api/create-or-open-db) repo {:close-other-db? false})
  358. ((@thread-api/*thread-apis :thread-api/export-db) repo)
  359. ((@thread-api/*thread-apis :thread-api/transact)
  360. repo init-tx-data
  361. {:rtc-download-graph? true
  362. :gen-undo-ops? false
  363. ;; only transact db schema, skip validation to avoid warning
  364. :frontend.worker.pipeline/skip-validate-db? true
  365. :persist-op? false}
  366. (worker-state/get-context))
  367. ((@thread-api/*thread-apis :thread-api/transact)
  368. repo tx-data {:rtc-download-graph? true
  369. :gen-undo-ops? false
  370. :persist-op? false} (worker-state/get-context))
  371. (transact-remote-schema-version! repo)
  372. (transact-block-refs! repo))))
  373. (shared-service/broadcast-to-clients! :add-repo {:repo repo}))))
  374. ;;;;;;;;;;;;;;;;;;;;;;;;;;
  375. ;; async download-graph ;;
  376. ;;;;;;;;;;;;;;;;;;;;;;;;;;
  377. (defn new-task--request-download-graph
  378. [get-ws-create-task graph-uuid schema-version]
  379. (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :request-download-graph
  380. :message "requesting download graph"
  381. :graph-uuid graph-uuid
  382. :schema-version schema-version})
  383. (m/join :download-info-uuid
  384. (ws-util/send&recv get-ws-create-task {:action "download-graph"
  385. :graph-uuid graph-uuid
  386. :schema-version (str schema-version)})))
  387. (comment
  388. (defn new-task--download-info-list
  389. [get-ws-create-task graph-uuid schema-version]
  390. (m/join :download-info-list
  391. (ws-util/send&recv get-ws-create-task {:action "download-info-list"
  392. :graph-uuid graph-uuid
  393. :schema-version (str schema-version)}))))
  394. (defn new-task--wait-download-info-ready
  395. [get-ws-create-task download-info-uuid graph-uuid schema-version timeout-ms]
  396. (->
  397. (m/sp
  398. (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :wait-remote-graph-data-ready
  399. :message "waiting for the remote to prepare the data"
  400. :graph-uuid graph-uuid})
  401. (loop []
  402. (m/? (m/sleep 3000))
  403. (let [{:keys [download-info-list]}
  404. (m/? (ws-util/send&recv get-ws-create-task {:action "download-info-list"
  405. :graph-uuid graph-uuid
  406. :schema-version (str schema-version)}))]
  407. (if-let [found-download-info
  408. (some
  409. (fn [download-info]
  410. (when (and (= download-info-uuid (:download-info-uuid download-info))
  411. (:download-info-s3-url download-info))
  412. download-info))
  413. download-info-list)]
  414. found-download-info
  415. (recur)))))
  416. (m/timeout timeout-ms :timeout)))
  417. (defn new-task--download-graph-from-s3
  418. [graph-uuid graph-name s3-url]
  419. (let [graph-uuid (if (string? graph-uuid) (parse-uuid graph-uuid) graph-uuid)]
  420. (m/sp
  421. (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :downloading-graph-data
  422. :message "downloading graph data"
  423. :graph-uuid graph-uuid})
  424. (let [{:keys [status body] :as r} (m/? (http/get s3-url {:with-credentials? false}))
  425. repo (str sqlite-util/db-version-prefix graph-name)]
  426. (if (not= 200 status)
  427. (throw (ex-info "download-graph from s3 failed" {:resp r}))
  428. (do
  429. (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :transact-graph-data-to-db
  430. :message "transacting graph data to local db"
  431. :graph-uuid graph-uuid})
  432. (let [all-blocks (ldb/read-transit-str body)]
  433. (worker-state/set-rtc-downloading-graph! true)
  434. (m/? (new-task--transact-remote-all-blocks! all-blocks repo graph-uuid))
  435. (client-op/update-graph-uuid repo graph-uuid)
  436. (when-not rtc-const/RTC-E2E-TEST
  437. (c.m/<? (worker-db-metadata/<store repo (pr-str {:kv/value graph-uuid}))))
  438. (worker-state/set-rtc-downloading-graph! false)
  439. (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :download-completed
  440. :message "download completed"
  441. :graph-uuid graph-uuid})
  442. nil)))))))
  443. (defn new-task--branch-graph
  444. [get-ws-create-task repo conn graph-uuid major-schema-version]
  445. (m/sp
  446. (rtc-log-and-state/rtc-log :rtc.log/branch-graph {:sub-type :fetching-presigned-put-url
  447. :message "fetching presigned put-url"})
  448. (remove-rtc-data-in-conn! repo)
  449. (let [[{:keys [url key]} all-blocks-str]
  450. (m/?
  451. (m/join
  452. vector
  453. (ws-util/send&recv get-ws-create-task {:action "presign-put-temp-s3-obj"})
  454. (m/sp
  455. (let [all-blocks (export-as-blocks
  456. @conn
  457. :ignore-attr-set rtc-const/ignore-attrs-when-init-upload
  458. :ignore-entity-set rtc-const/ignore-entities-when-init-upload)]
  459. (ldb/write-transit-str all-blocks)))))]
  460. (rtc-log-and-state/rtc-log :rtc.log/branch-graph {:sub-type :upload-data
  461. :message "uploading data"})
  462. (m/? (http/put url {:body all-blocks-str :with-credentials? false}))
  463. (rtc-log-and-state/rtc-log :rtc.log/branch-graph {:sub-type :request-branch-graph
  464. :message "requesting branch-graph"})
  465. (let [aes-key (c.m/<? (crypt/<gen-aes-key))
  466. aes-key-jwk (ldb/write-transit-str (c.m/<? (crypt/<export-key aes-key)))
  467. resp (m/? (ws-util/send&recv get-ws-create-task {:action "branch-graph"
  468. :s3-key key
  469. :schema-version (str major-schema-version)
  470. :graph-uuid graph-uuid}))]
  471. (if-let [graph-uuid (:graph-uuid resp)]
  472. (let [schema-version (ldb/get-graph-schema-version @conn)]
  473. (ldb/transact! conn
  474. [(ldb/kv :logseq.kv/graph-uuid graph-uuid)
  475. (ldb/kv :logseq.kv/graph-local-tx "0")
  476. (ldb/kv :logseq.kv/remote-schema-version schema-version)])
  477. (client-op/update-graph-uuid repo graph-uuid)
  478. (client-op/remove-local-tx repo)
  479. (client-op/add-all-exists-asset-as-ops repo)
  480. (crypt/store-graph-keys-jwk repo aes-key-jwk)
  481. (c.m/<? (worker-db-metadata/<store repo (pr-str {:kv/value graph-uuid})))
  482. (rtc-log-and-state/rtc-log :rtc.log/branch-graph {:sub-type :completed
  483. :message "branch-graph completed"})
  484. nil)
  485. (throw (ex-info "branch-graph failed" {:upload-resp resp})))))))