db_sync.cljs 64 KB


  1. (ns frontend.worker.db-sync
  2. "Simple db-sync client based on promesa + WebSocket."
  3. (:require ["/frontend/idbkv" :as idb-keyval]
  4. [cljs-bean.core :as bean]
  5. [clojure.set :as set]
  6. [clojure.string :as string]
  7. [datascript.core :as d]
  8. [datascript.storage :refer [IStorage]]
  9. [frontend.common.crypt :as crypt]
  10. [frontend.worker-common.util :as worker-util]
  11. [frontend.worker.handler.page :as worker-page]
  12. [frontend.worker.rtc.client-op :as client-op]
  13. [frontend.worker.rtc.const :as rtc-const]
  14. [frontend.worker.shared-service :as shared-service]
  15. [frontend.worker.state :as worker-state]
  16. [lambdaisland.glogi :as log]
  17. [logseq.common.path :as path]
  18. [logseq.common.util :as common-util]
  19. [logseq.db :as ldb]
  20. [logseq.db-sync.cycle :as sync-cycle]
  21. [logseq.db-sync.malli-schema :as db-sync-schema]
  22. [logseq.db-sync.order :as sync-order]
  23. [logseq.db.common.normalize :as db-normalize]
  24. [logseq.db.common.sqlite :as common-sqlite]
  25. [logseq.db.sqlite.util :as sqlite-util]
  26. [logseq.outliner.core :as outliner-core]
  27. [logseq.outliner.transaction :as outliner-tx]
  28. [promesa.core :as p]))
  29. (defonce *repo->latest-remote-tx (atom {}))
  30. (defonce ^:private *repo->aes-key (atom {}))
  31. (defonce ^:private e2ee-store (delay (idb-keyval/newStore "localforage" "keyvaluepairs" 2)))
  32. (defn- current-client
  33. [repo]
  34. (let [client @worker-state/*db-sync-client]
  35. (when (= repo (:repo client))
  36. client)))
  37. (defn- client-ops-conn [repo]
  38. (worker-state/get-client-ops-conn repo))
  39. (defn- sync-counts
  40. [repo]
  41. (when (worker-state/get-datascript-conn repo)
  42. (let [pending-local (when-let [conn (client-ops-conn repo)]
  43. (count (d/datoms @conn :avet :db-sync/created-at)))
  44. pending-asset (client-op/get-unpushed-asset-ops-count repo)
  45. local-tx (client-op/get-local-tx repo)
  46. remote-tx (get @*repo->latest-remote-tx repo)
  47. pending-server (when (and (number? local-tx) (number? remote-tx))
  48. (max 0 (- remote-tx local-tx)))
  49. graph-uuid (client-op/get-graph-uuid repo)]
  50. {:pending-local pending-local
  51. :pending-asset pending-asset
  52. :pending-server pending-server
  53. :local-tx local-tx
  54. :remote-tx remote-tx
  55. :graph-uuid graph-uuid})))
  56. (defn- normalize-online-users
  57. [users]
  58. (->> users
  59. (keep (fn [{:keys [user-id email username name editing-block-uuid]}]
  60. (when (string? user-id)
  61. (let [display-name (or username name user-id)]
  62. (cond-> {:user/uuid user-id
  63. :user/name display-name}
  64. (string? email) (assoc :user/email email)
  65. (and (string? editing-block-uuid)
  66. (not (string/blank? editing-block-uuid)))
  67. (assoc :user/editing-block-uuid editing-block-uuid))))))
  68. (vec)))
  69. (defn- broadcast-rtc-state!
  70. [client]
  71. (when client
  72. (let [repo (:repo client)
  73. ws-state @(:ws-state client)
  74. online-users @(:online-users client)
  75. {:keys [pending-local pending-asset pending-server local-tx remote-tx graph-uuid]} (sync-counts repo)]
  76. (shared-service/broadcast-to-clients!
  77. :rtc-sync-state
  78. {:rtc-state {:ws-state ws-state}
  79. :rtc-lock (= :open ws-state)
  80. :online-users (or online-users [])
  81. :unpushed-block-update-count (or pending-local 0)
  82. :pending-asset-ops-count (or pending-asset 0)
  83. :pending-server-ops-count (or pending-server 0)
  84. :local-tx local-tx
  85. :remote-tx remote-tx
  86. :graph-uuid graph-uuid}))))
  87. (defn- set-ws-state!
  88. [client ws-state]
  89. (when-let [*ws-state (:ws-state client)]
  90. (reset! *ws-state ws-state)
  91. (broadcast-rtc-state! client)))
  92. (defn- update-online-users!
  93. [client users]
  94. (when-let [*online-users (:online-users client)]
  95. (reset! *online-users (normalize-online-users users))
  96. (broadcast-rtc-state! client)))
  97. (defn- enabled?
  98. []
  99. (true? (:enabled? @worker-state/*db-sync-config)))
  100. (defn- ws-base-url
  101. []
  102. (:ws-url @worker-state/*db-sync-config))
  103. (defn- http-base-url
  104. []
  105. (or (:http-base @worker-state/*db-sync-config)
  106. (when-let [ws-url (ws-base-url)]
  107. (let [base (cond
  108. (string/starts-with? ws-url "wss://")
  109. (str "https://" (subs ws-url (count "wss://")))
  110. (string/starts-with? ws-url "ws://")
  111. (str "http://" (subs ws-url (count "ws://")))
  112. :else ws-url)]
  113. (string/replace base #"/sync/%s$" "")))))
  114. (defn- auth-token []
  115. (worker-state/get-id-token))
  116. (defn- auth-headers []
  117. (when-let [token (auth-token)]
  118. {"authorization" (str "Bearer " token)}))
  119. (defn- with-auth-headers [opts]
  120. (if-let [auth (auth-headers)]
  121. (assoc opts :headers (merge (or (:headers opts) {}) auth))
  122. opts))
  123. (def ^:private max-asset-size (* 100 1024 1024))
  124. (def ^:private upload-kvs-batch-size 2000)
  125. (def ^:private snapshot-content-type "application/transit+json")
  126. (def ^:private snapshot-content-encoding "gzip")
  127. (def ^:private snapshot-text-encoder (js/TextEncoder.))
  128. (def ^:private reconnect-base-delay-ms 1000)
  129. (def ^:private reconnect-max-delay-ms 30000)
  130. (def ^:private reconnect-jitter-ms 250)
  131. (defn- format-ws-url [base graph-id]
  132. (cond
  133. (string/includes? base "%s")
  134. (string/replace base "%s" graph-id)
  135. (string/ends-with? base "/")
  136. (str base graph-id)
  137. :else
  138. (str base "/" graph-id)))
  139. (defn- append-token [url token]
  140. (if (string? token)
  141. (let [separator (if (string/includes? url "?") "&" "?")]
  142. (str url separator "token=" (js/encodeURIComponent token)))
  143. url))
  144. (defn- get-graph-id [repo]
  145. (when-let [conn (worker-state/get-datascript-conn repo)]
  146. (let [db @conn
  147. graph-uuid (ldb/get-graph-rtc-uuid db)]
  148. (when graph-uuid
  149. (str graph-uuid)))))
  150. (defn- ensure-client-graph-uuid! [repo graph-id]
  151. (when (seq graph-id)
  152. (client-op/update-graph-uuid repo graph-id)))
  153. (defn- ready-state [ws]
  154. (.-readyState ws))
  155. (defn- ws-open? [ws]
  156. (= 1 (ready-state ws)))
  157. (def ^:private invalid-coerce ::invalid-coerce)
  158. (defn- coerce
  159. [coercer value context]
  160. (try
  161. (coercer value)
  162. (catch :default e
  163. (log/error :db-sync/malli-coerce-failed (merge context {:error e :value value}))
  164. invalid-coerce)))
  165. (defn- coerce-ws-client-message [message]
  166. (when message
  167. (let [coerced (coerce db-sync-schema/ws-client-message-coercer message {:schema :ws/client})]
  168. (when-not (= coerced invalid-coerce)
  169. coerced))))
  170. (defn- coerce-ws-server-message [message]
  171. (when message
  172. (let [coerced (coerce db-sync-schema/ws-server-message-coercer message {:schema :ws/server})]
  173. (when-not (= coerced invalid-coerce)
  174. coerced))))
  175. (defn- fail-fast [tag data]
  176. (log/error tag data)
  177. (throw (ex-info (name tag) data)))
  178. (defn- require-number [value context]
  179. (when-not (number? value)
  180. (fail-fast :db-sync/invalid-field (assoc context :value value))))
  181. (defn- require-non-negative [value context]
  182. (require-number value context)
  183. (when (neg? value)
  184. (fail-fast :db-sync/invalid-field (assoc context :value value))))
  185. (defn- require-seq [value context]
  186. (when-not (sequential? value)
  187. (fail-fast :db-sync/invalid-field (assoc context :value value))))
  188. (defn- parse-transit [value context]
  189. (try
  190. (sqlite-util/read-transit-str value)
  191. (catch :default e
  192. (fail-fast :db-sync/response-parse-failed (assoc context :error e)))))
  193. (defn- coerce-http-request [schema-key body]
  194. (if-let [coercer (get db-sync-schema/http-request-coercers schema-key)]
  195. (let [coerced (coerce coercer body {:schema schema-key :dir :request})]
  196. (when-not (= coerced invalid-coerce)
  197. coerced))
  198. body))
  199. (defn- coerce-http-response [schema-key body]
  200. (if-let [coercer (get db-sync-schema/http-response-coercers schema-key)]
  201. (let [coerced (coerce coercer body {:schema schema-key :dir :response})]
  202. (when-not (= coerced invalid-coerce)
  203. coerced))
  204. body))
  205. (defn- reconnect-delay-ms [attempt]
  206. (let [exp (js/Math.pow 2 attempt)
  207. delay (min reconnect-max-delay-ms (* reconnect-base-delay-ms exp))
  208. jitter (rand-int reconnect-jitter-ms)]
  209. (+ delay jitter)))
  210. (defn- clear-reconnect-timer! [reconnect]
  211. (when-let [timer (:timer @reconnect)]
  212. (js/clearTimeout timer)
  213. (swap! reconnect assoc :timer nil)))
  214. (defn- reset-reconnect! [client]
  215. (when-let [reconnect (:reconnect client)]
  216. (clear-reconnect-timer! reconnect)
  217. (swap! reconnect assoc :attempt 0)))
  218. (defn- send! [ws message]
  219. (when (ws-open? ws)
  220. (if-let [coerced (coerce-ws-client-message message)]
  221. (.send ws (js/JSON.stringify (clj->js coerced)))
  222. (log/error :db-sync/ws-request-invalid {:message message}))))
  223. (defn update-presence!
  224. [editing-block-uuid]
  225. (when-let [client @worker-state/*db-sync-client]
  226. (when-let [ws (:ws client)]
  227. (send! ws {:type "presence"
  228. :editing-block-uuid editing-block-uuid}))))
  229. (def rtc-ignored-attrs
  230. (set/union
  231. #{:logseq.property.embedding/hnsw-label-updated-at
  232. :block/tx-id
  233. ;; FIXME: created-by-ref maybe not exist yet on server or client
  234. :logseq.property/created-by-ref}
  235. rtc-const/ignore-attrs-when-syncing
  236. rtc-const/ignore-entities-when-init-upload))
  237. (defn- remove-ignored-attrs
  238. [tx-data]
  239. (remove (fn [d] (contains? rtc-ignored-attrs (:a d)))
  240. tx-data))
  241. (defn- normalize-tx-data
  242. [db-after db-before tx-data]
  243. (->> tx-data
  244. remove-ignored-attrs
  245. (db-normalize/normalize-tx-data db-after db-before)
  246. (remove (fn [[_op e]]
  247. (contains? rtc-const/ignore-entities-when-init-upload e)))))
  248. (defn- reverse-tx-data
  249. [tx-data]
  250. (->> tx-data
  251. (map (fn [[e a v t added]]
  252. [(if added :db/retract :db/add) e a v t]))))
  253. (defn- parse-message [raw]
  254. (try
  255. (js->clj (js/JSON.parse raw) :keywordize-keys true)
  256. (catch :default _
  257. nil)))
  258. (defn- fetch-json
  259. [url opts {:keys [response-schema error-schema] :or {error-schema :error}}]
  260. (p/let [resp (js/fetch url (clj->js (with-auth-headers opts)))
  261. text (.text resp)
  262. data (when (seq text) (js/JSON.parse text))]
  263. (if (.-ok resp)
  264. (let [body (js->clj data :keywordize-keys true)
  265. body (if response-schema
  266. (coerce-http-response response-schema body)
  267. body)]
  268. (if (or (nil? response-schema) body)
  269. body
  270. (throw (ex-info "db-sync invalid response"
  271. {:status (.-status resp)
  272. :url url
  273. :body body}))))
  274. (let [body (when data (js->clj data :keywordize-keys true))
  275. body (if error-schema
  276. (coerce-http-response error-schema body)
  277. body)]
  278. (throw (ex-info "db-sync request failed"
  279. {:status (.-status resp)
  280. :url url
  281. :body body}))))))
  282. (def ^:private invalid-transit ::invalid-transit)
  283. (defn- graph-e2ee?
  284. [repo]
  285. (when-let [conn (worker-state/get-datascript-conn repo)]
  286. (true? (ldb/get-graph-rtc-e2ee? @conn))))
  287. (defn- user-uuid []
  288. (some-> (worker-state/get-id-token) worker-util/parse-jwt :sub))
  289. (defn- graph-encrypted-aes-key-idb-key
  290. [graph-id]
  291. (str "rtc-encrypted-aes-key###" graph-id))
  292. (defn- <get-item
  293. [k]
  294. (assert (and k @e2ee-store))
  295. (p/let [r (idb-keyval/get k @e2ee-store)]
  296. (js->clj r :keywordize-keys true)))
  297. (defn- <set-item!
  298. [k value]
  299. (assert (and k @e2ee-store))
  300. (idb-keyval/set k value @e2ee-store))
  301. (defn- e2ee-base
  302. []
  303. (http-base-url))
  304. (defn- <fetch-user-rsa-key-pair-raw
  305. [base]
  306. (fetch-json (str base "/e2ee/user-keys")
  307. {:method "GET"}
  308. {:response-schema :e2ee/user-keys}))
  309. (defn- <upload-user-rsa-key-pair!
  310. [base public-key encrypted-private-key]
  311. (let [body (coerce-http-request :e2ee/user-keys
  312. {:public-key public-key
  313. :encrypted-private-key encrypted-private-key})]
  314. (when (nil? body)
  315. (fail-fast :db-sync/invalid-field {:type :e2ee/user-keys :body body}))
  316. (fetch-json (str base "/e2ee/user-keys")
  317. {:method "POST"
  318. :headers {"content-type" "application/json"}
  319. :body (js/JSON.stringify (clj->js body))}
  320. {:response-schema :e2ee/user-keys})))
  321. (defn- <ensure-user-rsa-key-pair-raw
  322. [base]
  323. (p/let [existing (-> (<fetch-user-rsa-key-pair-raw base)
  324. (p/catch (fn [error]
  325. (throw error))))]
  326. (if (and (string? (:public-key existing))
  327. (string? (:encrypted-private-key existing)))
  328. existing
  329. (p/let [{:keys [publicKey privateKey]} (crypt/<generate-rsa-key-pair)
  330. {:keys [password]} (worker-state/<invoke-main-thread :thread-api/request-e2ee-password)
  331. encrypted-private-key (crypt/<encrypt-private-key password privateKey)
  332. exported-public-key (crypt/<export-public-key publicKey)
  333. public-key-str (ldb/write-transit-str exported-public-key)
  334. encrypted-private-key-str (ldb/write-transit-str encrypted-private-key)]
  335. (p/let [_ (<upload-user-rsa-key-pair! base public-key-str encrypted-private-key-str)]
  336. {:public-key public-key-str
  337. :encrypted-private-key encrypted-private-key-str})))))
  338. (defn ensure-user-rsa-keys!
  339. []
  340. (let [base (e2ee-base)]
  341. (when-not (string? base)
  342. (fail-fast :db-sync/missing-field {:base base}))
  343. (<ensure-user-rsa-key-pair-raw base)))
  344. (defn- <decrypt-private-key
  345. [encrypted-private-key-str]
  346. (p/let [encrypted-private-key (ldb/read-transit-str encrypted-private-key-str)
  347. exported-private-key (worker-state/<invoke-main-thread
  348. :thread-api/decrypt-user-e2ee-private-key
  349. encrypted-private-key)]
  350. (crypt/<import-private-key exported-private-key)))
  351. (defn- <import-public-key
  352. [public-key-str]
  353. (p/let [exported (ldb/read-transit-str public-key-str)]
  354. (crypt/<import-public-key exported)))
  355. (defn- <fetch-user-public-key-by-email
  356. [base email]
  357. (fetch-json (str base "/e2ee/user-public-key?email=" (js/encodeURIComponent email))
  358. {:method "GET"}
  359. {:response-schema :e2ee/user-public-key}))
  360. (defn- <fetch-graph-encrypted-aes-key-raw
  361. [base graph-id]
  362. (fetch-json (str base "/e2ee/graphs/" graph-id "/aes-key")
  363. {:method "GET"}
  364. {:response-schema :e2ee/graph-aes-key}))
  365. (defn- <upsert-graph-encrypted-aes-key!
  366. [base graph-id encrypted-aes-key]
  367. (let [body (coerce-http-request :e2ee/graph-aes-key
  368. {:encrypted-aes-key encrypted-aes-key})]
  369. (when (nil? body)
  370. (fail-fast :db-sync/invalid-field {:type :e2ee/graph-aes-key :body body}))
  371. (fetch-json (str base "/e2ee/graphs/" graph-id "/aes-key")
  372. {:method "POST"
  373. :headers {"content-type" "application/json"}
  374. :body (js/JSON.stringify (clj->js body))}
  375. {:response-schema :e2ee/graph-aes-key})))
  376. (defn- <ensure-graph-aes-key
  377. [repo graph-id]
  378. (if-not (graph-e2ee? repo)
  379. (p/resolved nil)
  380. (if-let [cached (get @*repo->aes-key repo)]
  381. (p/resolved cached)
  382. (let [base (e2ee-base)
  383. user-id (user-uuid)]
  384. (when-not (and (string? base) (string? user-id))
  385. (fail-fast :db-sync/missing-field {:base base :user-id user-id :graph-id graph-id}))
  386. (p/let [{:keys [public-key encrypted-private-key]} (<ensure-user-rsa-key-pair-raw base)
  387. public-key' (when (string? public-key) (<import-public-key public-key))
  388. private-key' (when (string? encrypted-private-key) (<decrypt-private-key encrypted-private-key))
  389. local-encrypted (when graph-id
  390. (<get-item (graph-encrypted-aes-key-idb-key graph-id)))
  391. remote-encrypted (when (and (nil? local-encrypted) graph-id)
  392. (p/let [resp (<fetch-graph-encrypted-aes-key-raw base graph-id)]
  393. (when-let [encrypted-aes-key (:encrypted-aes-key resp)]
  394. (ldb/read-transit-str encrypted-aes-key))))
  395. encrypted-aes-key (or local-encrypted remote-encrypted)
  396. aes-key (if encrypted-aes-key
  397. (crypt/<decrypt-aes-key private-key' encrypted-aes-key)
  398. (p/let [aes-key (crypt/<generate-aes-key)
  399. encrypted (crypt/<encrypt-aes-key public-key' aes-key)
  400. encrypted-str (ldb/write-transit-str encrypted)
  401. _ (<upsert-graph-encrypted-aes-key! base graph-id encrypted-str)
  402. _ (<set-item! (graph-encrypted-aes-key-idb-key graph-id) encrypted)]
  403. aes-key))
  404. _ (when (and graph-id encrypted-aes-key (nil? local-encrypted))
  405. (<set-item! (graph-encrypted-aes-key-idb-key graph-id) encrypted-aes-key))]
  406. (swap! *repo->aes-key assoc repo aes-key)
  407. aes-key)))))
  408. (defn- <fetch-graph-aes-key-for-download
  409. [repo graph-id]
  410. (let [base (e2ee-base)]
  411. (when-not (and (string? base) (string? graph-id))
  412. (fail-fast :db-sync/missing-field {:base base :graph-id graph-id}))
  413. (p/let [{:keys [public-key encrypted-private-key]} (<fetch-user-rsa-key-pair-raw base)]
  414. (when-not (and (string? public-key) (string? encrypted-private-key))
  415. (fail-fast :db-sync/missing-field {:graph-id graph-id :field :user-rsa-key-pair}))
  416. (p/let [private-key (<decrypt-private-key encrypted-private-key)
  417. local-encrypted (<get-item (graph-encrypted-aes-key-idb-key graph-id))
  418. remote-encrypted (when (nil? local-encrypted)
  419. (p/let [resp (<fetch-graph-encrypted-aes-key-raw base graph-id)]
  420. (when-let [encrypted-aes-key (:encrypted-aes-key resp)]
  421. (ldb/read-transit-str encrypted-aes-key))))
  422. encrypted-aes-key (or local-encrypted remote-encrypted)]
  423. (when-not encrypted-aes-key
  424. (fail-fast :db-sync/missing-field {:graph-id graph-id :field :encrypted-aes-key}))
  425. (when (and encrypted-aes-key (nil? local-encrypted))
  426. (<set-item! (graph-encrypted-aes-key-idb-key graph-id) encrypted-aes-key))
  427. (p/let [aes-key (crypt/<decrypt-aes-key private-key encrypted-aes-key)]
  428. (swap! *repo->aes-key assoc repo aes-key)
  429. aes-key)))))
  430. (defn- <grant-graph-access!
  431. [repo graph-id target-email]
  432. (if-not (graph-e2ee? repo)
  433. (p/resolved nil)
  434. (let [base (e2ee-base)]
  435. (when-not (string? base)
  436. (fail-fast :db-sync/missing-field {:base base :graph-id graph-id}))
  437. (p/let [aes-key (<ensure-graph-aes-key repo graph-id)
  438. _ (when (nil? aes-key)
  439. (fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
  440. resp (<fetch-user-public-key-by-email base target-email)
  441. public-key-str (:public-key resp)]
  442. (if-not (string? public-key-str)
  443. (fail-fast :db-sync/missing-field {:repo repo :field :public-key :email target-email})
  444. (p/let [public-key (<import-public-key public-key-str)
  445. encrypted (crypt/<encrypt-aes-key public-key aes-key)
  446. encrypted-str (ldb/write-transit-str encrypted)
  447. body (coerce-http-request :e2ee/grant-access
  448. {:target-user-email+encrypted-aes-key-coll
  449. [{:email target-email
  450. :encrypted-aes-key encrypted-str}]})
  451. _ (when (nil? body)
  452. (fail-fast :db-sync/invalid-field {:type :e2ee/grant-access :body body}))
  453. _ (fetch-json (str base "/e2ee/graphs/" graph-id "/grant-access")
  454. {:method "POST"
  455. :headers {"content-type" "application/json"}
  456. :body (js/JSON.stringify (clj->js body))}
  457. {:response-schema :e2ee/grant-access})]
  458. nil))))))
  459. (defn grant-graph-access!
  460. [repo graph-id target-email]
  461. (<grant-graph-access! repo graph-id target-email))
  462. (defn- <encrypt-text-value
  463. [aes-key value]
  464. (assert (string? value) (str "encrypting value should be a string, value: " value))
  465. (p/let [encrypted (crypt/<encrypt-text aes-key (ldb/write-transit-str value))]
  466. (ldb/write-transit-str encrypted)))
  467. (defn- <decrypt-text-value
  468. [aes-key value]
  469. (assert (string? value) (str "encrypted value should be a string, value: " value))
  470. (let [decoded (ldb/read-transit-str value)]
  471. (if (= decoded invalid-transit)
  472. (p/resolved value)
  473. (p/let [value (crypt/<decrypt-text-if-encrypted aes-key decoded)
  474. value' (ldb/read-transit-str value)]
  475. value'))))
  476. (defn- encrypt-tx-item
  477. [aes-key item]
  478. (cond
  479. (and (vector? item) (<= 4 (count item)))
  480. (let [attr (nth item 2)
  481. v (nth item 3)]
  482. (if (contains? rtc-const/encrypt-attr-set attr)
  483. (p/let [v' (<encrypt-text-value aes-key v)]
  484. (assoc item 3 v'))
  485. (p/resolved item)))
  486. :else
  487. (p/resolved item)))
  488. (defn- decrypt-tx-item
  489. [aes-key item]
  490. (cond
  491. (and (vector? item) (<= 4 (count item)))
  492. (let [attr (nth item 2)
  493. v (nth item 3)]
  494. (if (contains? rtc-const/encrypt-attr-set attr)
  495. (p/let [v' (<decrypt-text-value aes-key v)]
  496. (assoc item 3 v'))
  497. (p/resolved item)))
  498. :else
  499. (p/resolved item)))
  500. (defn- <encrypt-tx-data
  501. [aes-key tx-data]
  502. (when (seq tx-data)
  503. (p/let [items (p/all (mapv (fn [item] (encrypt-tx-item aes-key item)) tx-data))]
  504. (vec items))))
  505. (defn- <decrypt-tx-data
  506. [aes-key tx-data]
  507. (when (seq tx-data)
  508. (p/let [items (p/all (mapv (fn [item] (decrypt-tx-item aes-key item)) tx-data))]
  509. (vec items))))
  510. (defn- <encrypt-keys-attrs
  511. [aes-key keys]
  512. (p/all (mapv (fn [[e a v t]]
  513. (if (contains? rtc-const/encrypt-attr-set a)
  514. (p/let [v' (<encrypt-text-value aes-key v)]
  515. [e a v' t])
  516. [e a v t])) keys)))
  517. (defn- <decrypt-keys-attrs
  518. [aes-key keys]
  519. (p/all (mapv (fn [[e a v t]]
  520. (if (contains? rtc-const/encrypt-attr-set a)
  521. (p/let [v' (<decrypt-text-value aes-key v)]
  522. [e a v' t])
  523. (p/resolved [e a v t]))) keys)))
  524. (defn- <encrypt-snapshot-rows
  525. [aes-key rows]
  526. (if-not (seq rows)
  527. (p/resolved [])
  528. (p/let [items (p/all
  529. (mapv (fn [[addr content addresses]]
  530. (let [data (ldb/read-transit-str content)]
  531. (p/let [keys' (if (map? data) ; node
  532. (<encrypt-keys-attrs aes-key (:keys data))
  533. ;; leaf
  534. (p/let [result (p/all (map #(<encrypt-keys-attrs aes-key %) data))]
  535. (vec result)))
  536. data' (if (map? data) (assoc data :keys keys') keys')
  537. content' (ldb/write-transit-str data')]
  538. [addr content' addresses])))
  539. rows))]
  540. (vec items))))
  541. (defn- <decrypt-snapshot-rows
  542. [aes-key rows]
  543. (if-not (seq rows)
  544. (p/resolved [])
  545. (p/let [items (p/all
  546. (mapv (fn [[addr content addresses]]
  547. (let [data (ldb/read-transit-str content)]
  548. (p/let [keys' (if (map? data) ; node
  549. (<decrypt-keys-attrs aes-key (:keys data))
  550. ;; leaf
  551. (p/let [result (p/all (map #(<decrypt-keys-attrs aes-key %) data))]
  552. (vec result)))
  553. data' (if (map? data) (assoc data :keys keys') keys')
  554. content' (ldb/write-transit-str data')]
  555. [addr content' addresses])))
  556. rows))]
  557. (vec items))))
  558. (defn <decrypt-kvs-rows
  559. [repo graph-id rows]
  560. (p/let [aes-key (<fetch-graph-aes-key-for-download repo graph-id)
  561. _ (when (nil? aes-key)
  562. (fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))]
  563. (<decrypt-snapshot-rows aes-key rows)))
  564. (defn- <encrypt-datoms
  565. [aes-key datoms]
  566. (p/all
  567. (mapv (fn [d]
  568. (if (contains? rtc-const/encrypt-attr-set (:a d))
  569. (p/let [v' (<encrypt-text-value aes-key (:v d))]
  570. (assoc d :v v'))
  571. d))
  572. datoms)))
  573. (defn- upsert-addr-content!
  574. [^js db data]
  575. (.transaction
  576. db
  577. (fn [tx]
  578. (doseq [item data]
  579. (.exec tx #js {:sql (str "INSERT INTO kvs (addr, content, addresses) "
  580. "values ($addr, $content, $addresses) "
  581. "on conflict(addr) do update set content = $content, addresses = $addresses")
  582. :bind item})))))
  583. (defn- restore-data-from-addr
  584. [^js db addr]
  585. (when-let [result (-> (.exec db #js {:sql "select content, addresses from kvs where addr = ?"
  586. :bind #js [addr]
  587. :rowMode "array"})
  588. first)]
  589. (let [[content addresses] (bean/->clj result)
  590. addresses (when addresses (js/JSON.parse addresses))
  591. data (sqlite-util/transit-read content)]
  592. (if (and addresses (map? data))
  593. (assoc data :addresses addresses)
  594. data))))
  595. (defn- new-temp-sqlite-storage
  596. [^js db]
  597. (reify IStorage
  598. (-store [_ addr+data-seq _delete-addrs]
  599. (let [data (map
  600. (fn [[addr data]]
  601. (let [data' (if (map? data) (dissoc data :addresses) data)
  602. addresses (when (map? data)
  603. (when-let [addresses (:addresses data)]
  604. (js/JSON.stringify (bean/->js addresses))))]
  605. #js {:$addr addr
  606. :$content (sqlite-util/transit-write data')
  607. :$addresses addresses}))
  608. addr+data-seq)]
  609. (upsert-addr-content! db data)))
  610. (-restore [_ addr]
  611. (restore-data-from-addr db addr))))
  612. (defn- create-temp-sqlite-db
  613. []
  614. (if-let [sqlite @worker-state/*sqlite]
  615. (let [^js DB (.-DB ^js (.-oo1 sqlite))
  616. db (new DB ":memory:" "c")]
  617. (common-sqlite/create-kvs-table! db)
  618. db)
  619. (fail-fast :db-sync/missing-field {:field :sqlite})))
  620. (defn- <create-temp-sqlite-conn
  621. [schema datoms]
  622. (p/let [db (create-temp-sqlite-db)
  623. storage (new-temp-sqlite-storage db)
  624. conn (d/conn-from-datoms datoms schema {:storage storage})]
  625. {:db db
  626. :conn conn}))
  627. (defn- cleanup-temp-sqlite!
  628. [{:keys [db conn]}]
  629. (when conn
  630. (reset! conn nil))
  631. (when db
  632. (.close db)))
  633. (defn- require-asset-field
  634. [repo field value context]
  635. (when (or (nil? value) (and (string? value) (string/blank? value)))
  636. (fail-fast :db-sync/missing-field
  637. (merge {:repo repo :field field :value value} context))))
  638. (defn- asset-uuids-from-tx [db tx-data]
  639. (->> tx-data
  640. (keep (fn [datom]
  641. (when (and (:added datom)
  642. (= :logseq.property.asset/size (:a datom)))
  643. (when-let [ent (d/entity db (:e datom))]
  644. (:block/uuid ent)))))
  645. (distinct)))
  646. (defn- persist-local-tx! [repo normalized-tx-data reversed-datoms _tx-meta]
  647. (when-let [conn (client-ops-conn repo)]
  648. (let [tx-id (random-uuid)
  649. now (.now js/Date)]
  650. (ldb/transact! conn [{:db-sync/tx-id tx-id
  651. :db-sync/normalized-tx-data normalized-tx-data
  652. :db-sync/reversed-tx-data reversed-datoms
  653. :db-sync/created-at now}])
  654. (when-let [client (current-client repo)]
  655. (broadcast-rtc-state! client))
  656. tx-id)))
  657. (defn- pending-txs
  658. [repo & {:keys [limit]}]
  659. (when-let [conn (client-ops-conn repo)]
  660. (let [db @conn
  661. datoms (d/datoms db :avet :db-sync/created-at)
  662. datoms' (if limit (take limit datoms) datoms)]
  663. (->> datoms'
  664. (map (fn [datom]
  665. (d/entity db (:e datom))))
  666. (keep (fn [ent]
  667. (let [tx-id (:db-sync/tx-id ent)]
  668. {:tx-id tx-id
  669. :tx (:db-sync/normalized-tx-data ent)
  670. :reversed-tx (:db-sync/reversed-tx-data ent)})))
  671. vec))))
  672. (defn- remove-pending-txs!
  673. [repo tx-ids]
  674. (when (seq tx-ids)
  675. (when-let [conn (client-ops-conn repo)]
  676. (ldb/transact! conn
  677. (mapv (fn [tx-id]
  678. [:db/retractEntity [:db-sync/tx-id tx-id]])
  679. tx-ids))
  680. (when-let [client (current-client repo)]
  681. (broadcast-rtc-state! client)))))
  682. (defn get-lookup-id
  683. [x]
  684. (when (and (vector? x)
  685. (= 2 (count x))
  686. (= :block/uuid (first x)))
  687. (second x)))
  688. (defn- keep-last-update
  689. [tx-data]
  690. (->> tx-data
  691. (common-util/distinct-by-last-wins
  692. (fn [item]
  693. (if (and (vector? item) (= 5 (count item))
  694. (contains? #{:block/updated-at :block/title :block/name :block/order} (nth item 2)))
  695. (take 3 item)
  696. item)))))
  697. (defn- sanitize-tx-data
  698. [db tx-data local-deleted-ids]
  699. (let [sanitized-tx-data (->> tx-data
  700. (db-normalize/replace-attr-retract-with-retract-entity-v2 db)
  701. (remove (fn [item]
  702. (or (= :db/retractEntity (first item))
  703. (contains? local-deleted-ids (get-lookup-id (last item))))))
  704. keep-last-update)]
  705. ;; (when (not= tx-data sanitized-tx-data)
  706. ;; (prn :debug :tx-data tx-data)
  707. ;; (prn :debug :sanitized-tx-data sanitized-tx-data))
  708. sanitized-tx-data))
  709. (defn- flush-pending!
  710. [repo client]
  711. (let [inflight @(:inflight client)
  712. local-tx (or (client-op/get-local-tx repo) 0)
  713. remote-tx (get @*repo->latest-remote-tx repo)
  714. conn (worker-state/get-datascript-conn repo)]
  715. (when (and conn (= local-tx remote-tx)) ; rebase
  716. (when (empty? inflight)
  717. (when-let [ws (:ws client)]
  718. (when (and (ws-open? ws) (worker-state/online?))
  719. (let [batch (pending-txs repo {:limit 50})]
  720. (when (seq batch)
  721. (let [tx-ids (mapv :tx-id batch)
  722. txs (mapcat :tx batch)
  723. tx-data (->> txs
  724. (db-normalize/remove-retract-entity-ref @conn)
  725. keep-last-update
  726. distinct)]
  727. ;; (prn :debug :before-keep-last-update txs)
  728. ;; (prn :debug :upload :tx-data tx-data)
  729. (when (seq txs)
  730. (->
  731. (p/let [aes-key (<ensure-graph-aes-key repo (:graph-id client))
  732. _ (when (and (graph-e2ee? repo) (nil? aes-key))
  733. (fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
  734. tx-data* (if aes-key
  735. (<encrypt-tx-data aes-key tx-data)
  736. tx-data)]
  737. (reset! (:inflight client) tx-ids)
  738. (send! ws {:type "tx/batch"
  739. :t-before local-tx
  740. :txs (sqlite-util/write-transit-str tx-data*)}))
  741. (p/catch (fn [error]
  742. (js/console.error error))))))))))))))
  743. (defn- ensure-client-state! [repo]
  744. (let [client {:repo repo
  745. :send-queue (atom (p/resolved nil))
  746. :asset-queue (atom (p/resolved nil))
  747. :inflight (atom [])
  748. :reconnect (atom {:attempt 0 :timer nil})
  749. :online-users (atom [])
  750. :ws-state (atom :closed)}]
  751. (reset! worker-state/*db-sync-client client)
  752. client))
  753. (defn- asset-url [base graph-id asset-uuid asset-type]
  754. (str base "/assets/" graph-id "/" asset-uuid "." asset-type))
  755. (defn- enqueue-asset-task! [client task]
  756. (when-let [queue (:asset-queue client)]
  757. (swap! queue
  758. (fn [prev]
  759. (p/then prev (fn [_] (task)))))))
  760. (defn- upload-remote-asset!
  761. [repo graph-id asset-uuid asset-type checksum]
  762. (let [base (http-base-url)]
  763. (if (and (seq base) (seq graph-id) (seq asset-type) (seq checksum))
  764. (worker-state/<invoke-main-thread :thread-api/rtc-upload-asset
  765. repo nil (str asset-uuid) asset-type checksum
  766. (asset-url base graph-id (str asset-uuid) asset-type)
  767. {:extra-headers (auth-headers)})
  768. (p/rejected (ex-info "missing asset upload info"
  769. {:repo repo
  770. :asset-uuid asset-uuid
  771. :asset-type asset-type
  772. :checksum checksum
  773. :base base
  774. :graph-id graph-id})))))
  775. (defn- download-remote-asset!
  776. [repo graph-id asset-uuid asset-type]
  777. (let [base (http-base-url)]
  778. (if (and (seq base) (seq graph-id) (seq asset-type))
  779. (worker-state/<invoke-main-thread :thread-api/rtc-download-asset
  780. repo nil (str asset-uuid) asset-type
  781. (asset-url base graph-id (str asset-uuid) asset-type)
  782. {:extra-headers (auth-headers)})
  783. (p/rejected (ex-info "missing asset download info"
  784. {:repo repo
  785. :asset-uuid asset-uuid
  786. :asset-type asset-type
  787. :base base
  788. :graph-id graph-id})))))
  789. (defn- process-asset-op!
  790. [repo graph-id asset-op]
  791. (let [asset-uuid (:block/uuid asset-op)
  792. op-type (cond
  793. (contains? asset-op :update-asset) :update-asset
  794. (contains? asset-op :remove-asset) :remove-asset
  795. :else :unknown)]
  796. (require-asset-field repo :asset-uuid asset-uuid {:op op-type})
  797. (cond
  798. (contains? asset-op :update-asset)
  799. (if-let [conn (worker-state/get-datascript-conn repo)]
  800. (let [ent (d/entity @conn [:block/uuid asset-uuid])
  801. asset-type (:logseq.property.asset/type ent)
  802. checksum (:logseq.property.asset/checksum ent)
  803. size (:logseq.property.asset/size ent 0)]
  804. (require-asset-field repo :asset-type asset-type {:op :update-asset :asset-uuid asset-uuid})
  805. (require-asset-field repo :checksum checksum {:op :update-asset
  806. :asset-uuid asset-uuid
  807. :asset-type asset-type})
  808. (cond
  809. (> size max-asset-size)
  810. (do
  811. (log/info :db-sync/asset-too-large {:repo repo
  812. :asset-uuid asset-uuid
  813. :size size})
  814. (client-op/remove-asset-op repo asset-uuid)
  815. (when-let [client (current-client repo)]
  816. (broadcast-rtc-state! client))
  817. (p/resolved nil))
  818. :else
  819. (-> (upload-remote-asset! repo graph-id asset-uuid asset-type checksum)
  820. (p/then (fn [_]
  821. (when (d/entity @conn [:block/uuid asset-uuid])
  822. (ldb/transact!
  823. conn
  824. [{:block/uuid asset-uuid
  825. :logseq.property.asset/remote-metadata {:checksum checksum :type asset-type}}]
  826. {:persist-op? false}))
  827. (client-op/remove-asset-op repo asset-uuid)
  828. (when-let [client (current-client repo)]
  829. (broadcast-rtc-state! client))))
  830. (p/catch (fn [e]
  831. (case (:type (ex-data e))
  832. :rtc.exception/read-asset-failed
  833. (do
  834. (client-op/remove-asset-op repo asset-uuid)
  835. (when-let [client (current-client repo)]
  836. (broadcast-rtc-state! client)))
  837. :rtc.exception/upload-asset-failed
  838. nil
  839. (log/error :db-sync/asset-upload-failed
  840. {:repo repo
  841. :asset-uuid asset-uuid
  842. :error e})))))))
  843. (fail-fast :db-sync/missing-db {:repo repo :op :process-asset-op}))
  844. (contains? asset-op :remove-asset)
  845. (-> (client-op/remove-asset-op repo asset-uuid)
  846. (p/then (fn [_]
  847. (when-let [client (current-client repo)]
  848. (broadcast-rtc-state! client))))
  849. (p/catch (fn [e]
  850. (log/error :db-sync/asset-delete-failed
  851. {:repo repo
  852. :asset-uuid asset-uuid
  853. :error e}))))
  854. :else
  855. (p/resolved nil))))
  856. (defn- process-asset-ops!
  857. [repo client]
  858. (let [graph-id (:graph-id client)
  859. asset-ops (not-empty (client-op/get-all-asset-ops repo))]
  860. (if (and (seq graph-id) asset-ops)
  861. (p/loop [ops asset-ops]
  862. (if (empty? ops)
  863. nil
  864. (p/do!
  865. (process-asset-op! repo graph-id (first ops))
  866. (p/recur (rest ops)))))
  867. (p/resolved nil))))
  868. (defn- enqueue-asset-sync! [repo client]
  869. (enqueue-asset-task! client #(process-asset-ops! repo client)))
  870. (defn- enqueue-asset-downloads!
  871. [repo client asset-uuids]
  872. (when (seq asset-uuids)
  873. (enqueue-asset-task! client
  874. (fn []
  875. (let [graph-id (:graph-id client)]
  876. (if (seq graph-id)
  877. (p/loop [uuids (distinct asset-uuids)]
  878. (if (empty? uuids)
  879. nil
  880. (let [asset-uuid (first uuids)
  881. conn (worker-state/get-datascript-conn repo)
  882. ent (when conn (d/entity @conn [:block/uuid asset-uuid]))
  883. asset-type (:logseq.property.asset/type ent)]
  884. (p/do!
  885. (when (seq asset-type)
  886. (p/let [meta (worker-state/<invoke-main-thread
  887. :thread-api/get-asset-file-metadata
  888. repo (str asset-uuid) asset-type)]
  889. (when (nil? meta)
  890. (download-remote-asset! repo graph-id asset-uuid asset-type))))
  891. (p/recur (rest uuids))))))
  892. (p/resolved nil)))))))
  893. (defn- enqueue-asset-initial-download!
  894. [repo client]
  895. (enqueue-asset-task! client
  896. (fn []
  897. (if-let [conn (worker-state/get-datascript-conn repo)]
  898. (let [db @conn
  899. graph-id (:graph-id client)
  900. remote-assets (d/q '[:find ?uuid ?type
  901. :where
  902. [?e :block/uuid ?uuid]
  903. [?e :logseq.property.asset/type ?type]
  904. [?e :logseq.property.asset/remote-metadata]]
  905. db)]
  906. (if (seq graph-id)
  907. (-> (p/let [paths (worker-state/<invoke-main-thread
  908. :thread-api/get-all-asset-file-paths
  909. repo)]
  910. (let [local-uuids (into #{}
  911. (keep (fn [path]
  912. (let [stem (path/file-stem path)]
  913. (when (seq stem)
  914. stem))))
  915. paths)
  916. missing (remove (fn [[uuid _type]]
  917. (contains? local-uuids (str uuid)))
  918. remote-assets)]
  919. (p/loop [entries missing]
  920. (if (empty? entries)
  921. nil
  922. (let [[asset-uuid asset-type] (first entries)]
  923. (p/do!
  924. (download-remote-asset! repo graph-id asset-uuid asset-type)
  925. (p/recur (rest entries))))))))
  926. (p/catch (fn [e]
  927. (log/error :db-sync/asset-initial-download-failed
  928. {:repo repo :error e}))))
  929. (p/resolved nil)))
  930. (p/resolved nil)))))
  931. (defn- get-local-deleted-blocks
  932. [reversed-tx-report reversed-tx-data]
  933. (when (seq reversed-tx-data)
  934. (->>
  935. (:tx-data reversed-tx-report)
  936. (keep
  937. (fn [[e a v _t added]]
  938. (when (and (= :block/uuid a) added
  939. (nil? (d/entity (:db-before reversed-tx-report)
  940. [:block/uuid v])))
  941. (d/entity (:db-after reversed-tx-report) e))))
  942. distinct)))
  943. (defn- delete-nodes!
  944. [temp-conn deleted-nodes tx-meta]
  945. (when (seq deleted-nodes)
  946. (let [pages (filter ldb/page? deleted-nodes)
  947. blocks (->> deleted-nodes
  948. (keep (fn [block]
  949. (d/entity @temp-conn [:block/uuid (:block/uuid block)])))
  950. (remove ldb/page?))]
  951. (when (or (seq blocks) (seq pages))
  952. (outliner-tx/transact!
  953. (merge tx-meta
  954. {:outliner-op :delete-blocks
  955. :transact-opts {:conn temp-conn}})
  956. (when (seq blocks)
  957. (outliner-core/delete-blocks! temp-conn blocks {}))
  958. (doseq [page pages]
  959. (worker-page/delete! temp-conn (:block/uuid page) {})))))))
  960. (defn- fix-tx!
  961. [temp-conn remote-tx-report rebase-tx-report tx-meta]
  962. (let [cycle-tx-report (sync-cycle/fix-cycle! temp-conn remote-tx-report rebase-tx-report
  963. {:tx-meta tx-meta})]
  964. (sync-order/fix-duplicate-orders! temp-conn
  965. (mapcat :tx-data [remote-tx-report
  966. rebase-tx-report
  967. cycle-tx-report])
  968. tx-meta)))
  969. (defn- get-reverse-tx-data
  970. [local-txs]
  971. (let [tx-data (->> local-txs
  972. reverse
  973. (mapcat :reversed-tx))
  974. retract-block-ids (->> (keep (fn [[op e a _v _t]]
  975. (when (and (= op :db/retract) (= :block/uuid a))
  976. e)) tx-data)
  977. set)
  978. tx-data' (if (seq retract-block-ids)
  979. (remove (fn [[_op e _a v]]
  980. (or (contains? retract-block-ids e)
  981. (contains? retract-block-ids v)))
  982. tx-data)
  983. tx-data)]
  984. (->>
  985. tx-data'
  986. (concat (map (fn [id] [:db/retractEntity id]) retract-block-ids))
  987. keep-last-update)))
  988. (defn- apply-remote-tx!
  989. [repo client tx-data*]
  990. (if-let [conn (worker-state/get-datascript-conn repo)]
  991. (let [tx-data (->> tx-data*
  992. (db-normalize/remove-retract-entity-ref @conn)
  993. keep-last-update)
  994. local-txs (pending-txs repo)
  995. reversed-tx-data (get-reverse-tx-data local-txs)
  996. has-local-changes? (seq reversed-tx-data)
  997. *remote-tx-report (atom nil)
  998. *reversed-tx-report (atom nil)
  999. *remote-deleted-ids (atom #{})
  1000. *rebase-tx-data (atom [])
  1001. db @conn
  1002. remote-deleted-blocks (->> tx-data
  1003. (keep (fn [item]
  1004. (when (= :db/retractEntity (first item))
  1005. (d/entity db (second item))))))
  1006. remote-deleted-block-ids (set (map :block/uuid remote-deleted-blocks))
  1007. safe-remote-tx-data (->> tx-data
  1008. (remove (fn [item]
  1009. (or (= :db/retractEntity (first item))
  1010. (contains? remote-deleted-block-ids (get-lookup-id (last item))))))
  1011. seq)
  1012. temp-tx-meta {:rtc-tx? true
  1013. :temp-conn? true
  1014. :gen-undo-ops? false
  1015. :persist-op? false}
  1016. tx-report
  1017. (if has-local-changes?
  1018. (ldb/transact-with-temp-conn!
  1019. conn
  1020. {:rtc-tx? true}
  1021. (fn [temp-conn _*batch-tx-data]
  1022. (let [tx-meta temp-tx-meta
  1023. reversed-tx-report (ldb/transact! temp-conn reversed-tx-data (assoc tx-meta :op :reverse))
  1024. _ (reset! *reversed-tx-report reversed-tx-report)
  1025. ;; 2. transact remote tx-data
  1026. remote-tx-report (let [tx-meta (assoc tx-meta :op :transact-remote-tx-data)]
  1027. (ldb/transact! temp-conn safe-remote-tx-data tx-meta))
  1028. _ (reset! *remote-tx-report remote-tx-report)
  1029. local-deleted-blocks (get-local-deleted-blocks reversed-tx-report reversed-tx-data)
  1030. _ (when (seq remote-deleted-blocks)
  1031. (reset! *remote-deleted-ids (set (map :block/uuid remote-deleted-blocks))))
  1032. ;; _ (prn :debug
  1033. ;; :local-deleted-blocks (map (fn [b] (select-keys b [:db/id :block/title])) local-deleted-blocks)
  1034. ;; :remote-deleted-blocks remote-deleted-blocks)
  1035. deleted-nodes (concat local-deleted-blocks remote-deleted-blocks)
  1036. deleted-ids (set (keep :block/uuid deleted-nodes))
  1037. ;; 3. rebase pending local txs
  1038. rebase-tx-report (when (seq local-txs)
  1039. (let [pending-tx-data (mapcat :tx local-txs)
  1040. rebased-tx-data (sanitize-tx-data
  1041. (or (:db-after remote-tx-report)
  1042. (:db-after reversed-tx-report))
  1043. pending-tx-data
  1044. (set (map :block/uuid local-deleted-blocks)))]
  1045. ;; (prn :debug :pending-tx-data pending-tx-data)
  1046. ;; (prn :debug :rebased-tx-data rebased-tx-data)
  1047. (when (seq rebased-tx-data)
  1048. (ldb/transact! temp-conn rebased-tx-data (assoc tx-meta :op :rebase)))))
  1049. ;; 4. delete nodes and fix tx data
  1050. db @temp-conn
  1051. deleted-nodes (keep (fn [id] (d/entity db [:block/uuid id])) deleted-ids)]
  1052. (delete-nodes! temp-conn deleted-nodes (assoc tx-meta :op :delete-blocks))
  1053. (fix-tx! temp-conn remote-tx-report rebase-tx-report (assoc tx-meta :op :fix))))
  1054. {:listen-db (fn [{:keys [tx-meta tx-data]}]
  1055. (when-not (contains? #{:reverse :transact-remote-tx-data} (:op tx-meta))
  1056. (swap! *rebase-tx-data into tx-data)))})
  1057. (ldb/transact-with-temp-conn!
  1058. conn
  1059. {:rtc-tx? true}
  1060. (fn [temp-conn]
  1061. (when (seq safe-remote-tx-data)
  1062. (d/transact! temp-conn safe-remote-tx-data {:rtc-tx? true}))
  1063. (when-let [deleted-nodes (keep (fn [id] (d/entity db [:block/uuid id])) remote-deleted-block-ids)]
  1064. (delete-nodes! temp-conn deleted-nodes
  1065. (assoc temp-tx-meta :op :delete-blocks))))))
  1066. remote-tx-report @*remote-tx-report]
  1067. ;; persist rebase tx to client ops
  1068. (when has-local-changes?
  1069. (when-let [tx-data (seq @*rebase-tx-data)]
  1070. (let [remote-tx-data-set (set tx-data*)
  1071. normalized (->> tx-data
  1072. (normalize-tx-data (:db-after tx-report)
  1073. (or (:db-after remote-tx-report)
  1074. (:db-after @*reversed-tx-report)))
  1075. keep-last-update
  1076. (remove (fn [[op _e a]]
  1077. (and (= op :db/retract)
  1078. (contains? #{:block/updated-at :block/created-at :block/title} a)))))
  1079. normalized-tx-data (remove remote-tx-data-set normalized)
  1080. reversed-datoms (reverse-tx-data tx-data)]
  1081. ;; (prn :debug :normalized-tx-data normalized-tx-data)
  1082. ;; (prn :debug :remote-tx-data tx-data*)
  1083. ;; (prn :debug :diff (data/diff remote-tx-data-set
  1084. ;; (set normalized)))
  1085. (when (seq normalized-tx-data)
  1086. (persist-local-tx! repo normalized-tx-data reversed-datoms {:op :rtc-rebase}))))
  1087. (remove-pending-txs! repo (map :tx-id local-txs)))
  1088. (when tx-report
  1089. (let [asset-uuids (asset-uuids-from-tx @conn (:tx-data tx-report))]
  1090. (when (seq asset-uuids)
  1091. (enqueue-asset-downloads! repo client asset-uuids))))
  1092. (when-let [*inflight (:inflight client)]
  1093. (reset! *inflight []))
  1094. (reset! *remote-tx-report nil))
  1095. (fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))
  1096. (defn- handle-message! [repo client raw]
  1097. (let [message (-> raw parse-message coerce-ws-server-message)]
  1098. (when-not (map? message)
  1099. (fail-fast :db-sync/response-parse-failed {:repo repo :raw raw}))
  1100. (let [local-tx (or (client-op/get-local-tx repo) 0)
  1101. remote-tx (:t message)]
  1102. (when remote-tx (swap! *repo->latest-remote-tx assoc repo remote-tx))
  1103. (case (:type message)
  1104. "hello" (do
  1105. (require-non-negative remote-tx {:repo repo :type "hello"})
  1106. (broadcast-rtc-state! client)
  1107. (when (> remote-tx local-tx)
  1108. (send! (:ws client) {:type "pull" :since local-tx}))
  1109. (enqueue-asset-sync! repo client)
  1110. (enqueue-asset-initial-download! repo client)
  1111. (flush-pending! repo client))
  1112. "online-users" (let [users (:online-users message)]
  1113. (when (and (some? users) (not (sequential? users)))
  1114. (fail-fast :db-sync/invalid-field
  1115. {:repo repo :type "online-users" :field :online-users}))
  1116. (update-online-users! client (or users [])))
  1117. ;; Upload response
  1118. "tx/batch/ok" (do
  1119. (require-non-negative remote-tx {:repo repo :type "tx/batch/ok"})
  1120. (client-op/update-local-tx repo remote-tx)
  1121. (broadcast-rtc-state! client)
  1122. (remove-pending-txs! repo @(:inflight client))
  1123. (reset! (:inflight client) [])
  1124. (flush-pending! repo client))
  1125. ;; Download response
  1126. ;; Merge batch txs to one tx, does it really work? We'll see
  1127. "pull/ok" (when-not (= local-tx remote-tx)
  1128. (let [txs (:txs message)
  1129. _ (require-non-negative remote-tx {:repo repo :type "pull/ok"})
  1130. _ (require-seq txs {:repo repo :type "pull/ok" :field :txs})
  1131. txs-data (mapv (fn [data]
  1132. (parse-transit (:tx data) {:repo repo :type "pull/ok"}))
  1133. txs)
  1134. tx (distinct (mapcat identity txs-data))]
  1135. (when (seq tx)
  1136. (p/let [aes-key (<ensure-graph-aes-key repo (:graph-id client))
  1137. _ (when (and (graph-e2ee? repo) (nil? aes-key))
  1138. (fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
  1139. tx* (if aes-key
  1140. (<decrypt-tx-data aes-key tx)
  1141. (p/resolved tx))]
  1142. (apply-remote-tx! repo client tx*)
  1143. (client-op/update-local-tx repo remote-tx)
  1144. (broadcast-rtc-state! client)
  1145. (flush-pending! repo client)))))
  1146. "changed" (do
  1147. (require-non-negative remote-tx {:repo repo :type "changed"})
  1148. (broadcast-rtc-state! client)
  1149. (when (< local-tx remote-tx)
  1150. (send! (:ws client) {:type "pull" :since local-tx})))
  1151. "tx/reject" (let [reason (:reason message)]
  1152. (when (nil? reason)
  1153. (fail-fast :db-sync/missing-field
  1154. {:repo repo :type "tx/reject" :field :reason}))
  1155. (when (contains? message :t)
  1156. (require-non-negative remote-tx {:repo repo :type "tx/reject"}))
  1157. (case reason
  1158. "stale"
  1159. (send! (:ws client) {:type "pull" :since local-tx})
  1160. (fail-fast :db-sync/invalid-field
  1161. {:repo repo :type "tx/reject" :reason reason})))
  1162. (fail-fast :db-sync/invalid-field
  1163. {:repo repo :type (:type message)})))))
  1164. (declare connect!)
  1165. (defn- schedule-reconnect! [repo client url reason]
  1166. (when (enabled?)
  1167. (when-let [reconnect (:reconnect client)]
  1168. (let [{:keys [attempt timer]} @reconnect]
  1169. (when (nil? timer)
  1170. (let [delay (reconnect-delay-ms attempt)
  1171. timeout-id (js/setTimeout
  1172. (fn []
  1173. (swap! reconnect assoc :timer nil)
  1174. (when (enabled?)
  1175. (when-let [current @worker-state/*db-sync-client]
  1176. (when (and (= (:repo current) repo)
  1177. (= (:graph-id current) (:graph-id client)))
  1178. (let [updated (connect! repo current url)]
  1179. (reset! worker-state/*db-sync-client updated))))))
  1180. delay)]
  1181. (swap! reconnect assoc :timer timeout-id :attempt (inc attempt))
  1182. (log/info :db-sync/ws-reconnect-scheduled
  1183. {:repo repo :delay delay :attempt attempt :reason reason})))))))
  1184. (defn- attach-ws-handlers! [repo client ws url]
  1185. (set! (.-onmessage ws)
  1186. (fn [event]
  1187. (handle-message! repo client (.-data event))))
  1188. (set! (.-onerror ws)
  1189. (fn [event]
  1190. (log/error :db-sync/ws-error {:repo repo :error event})))
  1191. (set! (.-onclose ws)
  1192. (fn [_]
  1193. (log/info :db-sync/ws-closed {:repo repo})
  1194. (update-online-users! client [])
  1195. (set-ws-state! client :closed)
  1196. (schedule-reconnect! repo client url :close))))
  1197. (defn- detach-ws-handlers! [ws]
  1198. (set! (.-onopen ws) nil)
  1199. (set! (.-onmessage ws) nil)
  1200. (set! (.-onerror ws) nil)
  1201. (set! (.-onclose ws) nil))
  1202. (defn- start-pull-loop! [client _ws]
  1203. client)
  1204. (defn- stop-client! [client]
  1205. (when-let [reconnect (:reconnect client)]
  1206. (clear-reconnect-timer! reconnect))
  1207. (when-let [ws (:ws client)]
  1208. (detach-ws-handlers! ws)
  1209. (update-online-users! client [])
  1210. (set-ws-state! client :closed)
  1211. (try
  1212. (.close ws)
  1213. (catch :default _
  1214. nil))))
  1215. (defn- connect! [repo client url]
  1216. (when (:ws client)
  1217. (stop-client! client))
  1218. (let [ws (js/WebSocket. (append-token url (auth-token)))
  1219. updated (assoc client :ws ws)]
  1220. (attach-ws-handlers! repo updated ws url)
  1221. (set! (.-onopen ws)
  1222. (fn [_]
  1223. (reset-reconnect! updated)
  1224. (set-ws-state! updated :open)
  1225. (send! ws {:type "hello" :client repo})
  1226. (enqueue-asset-sync! repo updated)
  1227. (enqueue-asset-initial-download! repo updated)))
  1228. (start-pull-loop! updated ws)))
  1229. (defn stop!
  1230. []
  1231. (when-let [client @worker-state/*db-sync-client]
  1232. (stop-client! client)
  1233. (reset! worker-state/*db-sync-client nil))
  1234. (p/resolved nil))
  1235. (defn start!
  1236. [repo]
  1237. (if-not (enabled?)
  1238. (p/resolved nil)
  1239. (p/do!
  1240. (stop!)
  1241. (let [base (ws-base-url)
  1242. graph-id (get-graph-id repo)]
  1243. (if (and (string? base) (seq base) (seq graph-id))
  1244. (let [client (ensure-client-state! repo)
  1245. url (format-ws-url base graph-id)
  1246. _ (ensure-client-graph-uuid! repo graph-id)
  1247. connected (assoc client :graph-id graph-id)
  1248. connected (connect! repo connected url)]
  1249. (reset! worker-state/*db-sync-client connected)
  1250. (p/resolved nil))
  1251. (do
  1252. (log/info :db-sync/start-skipped {:repo repo :graph-id graph-id :base base})
  1253. (p/resolved nil)))))))
  1254. (defn enqueue-local-tx!
  1255. [repo {:keys [tx-meta tx-data db-after db-before]}]
  1256. (when-not (:rtc-tx? tx-meta)
  1257. (let [conn (worker-state/get-datascript-conn repo)
  1258. db (some-> conn deref)]
  1259. (when (and db (seq tx-data))
  1260. (let [normalized (normalize-tx-data db-after db-before tx-data)
  1261. reversed-datoms (reverse-tx-data tx-data)]
  1262. (when (seq normalized)
  1263. (persist-local-tx! repo normalized reversed-datoms tx-meta)
  1264. (when-let [client @worker-state/*db-sync-client]
  1265. (when (= repo (:repo client))
  1266. (let [send-queue (:send-queue client)]
  1267. (swap! send-queue
  1268. (fn [prev]
  1269. (p/then prev
  1270. (fn [_]
  1271. (when-let [current @worker-state/*db-sync-client]
  1272. (when (= repo (:repo current))
  1273. (when-let [ws (:ws current)]
  1274. (when (ws-open? ws)
  1275. (flush-pending! repo current))))))))))))))))))
  1276. (defn handle-local-tx!
  1277. [repo {:keys [tx-data tx-meta] :as tx-report}]
  1278. (when (and (enabled?) (seq tx-data)
  1279. (not (:rtc-tx? tx-meta))
  1280. (:persist-op? tx-meta true))
  1281. (enqueue-local-tx! repo tx-report)
  1282. (when-let [client @worker-state/*db-sync-client]
  1283. (when (= repo (:repo client))
  1284. (enqueue-asset-sync! repo client)))))
  1285. (defn- fetch-kvs-rows
  1286. [db last-addr limit]
  1287. (.exec db #js {:sql "select addr, content, addresses from kvs where addr > ? order by addr asc limit ?"
  1288. :bind #js [last-addr limit]
  1289. :rowMode "array"}))
  1290. (defn- normalize-snapshot-rows [rows]
  1291. (mapv (fn [row] (vec row)) (array-seq rows)))
  1292. (defn- encode-snapshot-rows [rows]
  1293. (.encode snapshot-text-encoder (sqlite-util/write-transit-str rows)))
  1294. (defn- frame-bytes [^js data]
  1295. (let [len (.-byteLength data)
  1296. out (js/Uint8Array. (+ 4 len))
  1297. view (js/DataView. (.-buffer out))]
  1298. (.setUint32 view 0 len false)
  1299. (.set out data 4)
  1300. out))
  1301. (defn- maybe-compress-stream [stream]
  1302. (if (exists? js/CompressionStream)
  1303. (.pipeThrough stream (js/CompressionStream. "gzip"))
  1304. stream))
  1305. (defn- <buffer-stream
  1306. [stream]
  1307. (p/let [resp (js/Response. stream)
  1308. buf (.arrayBuffer resp)]
  1309. buf))
  1310. (defn- <snapshot-upload-body
  1311. [rows]
  1312. (let [frame (frame-bytes (encode-snapshot-rows rows))
  1313. stream (js/ReadableStream.
  1314. #js {:start (fn [controller]
  1315. (.enqueue controller frame)
  1316. (.close controller))})
  1317. use-compression? (exists? js/CompressionStream)
  1318. body (if use-compression? (maybe-compress-stream stream) stream)]
  1319. (if use-compression?
  1320. (p/let [buf (<buffer-stream body)]
  1321. {:body buf :encoding snapshot-content-encoding})
  1322. (p/resolved {:body frame :encoding nil}))))
  1323. (defn- set-graph-e2ee-enabled!
  1324. [repo]
  1325. (when-let [conn (worker-state/get-datascript-conn repo)]
  1326. (ldb/transact! conn [(ldb/kv :logseq.kv/graph-rtc-e2ee? true)])))
  1327. (defn upload-graph!
  1328. [repo]
  1329. (->
  1330. (let [base (http-base-url)
  1331. graph-id (get-graph-id repo)]
  1332. (if (and (seq base) (seq graph-id))
  1333. (if-let [source-conn (worker-state/get-datascript-conn repo)]
  1334. (p/let [aes-key (<ensure-graph-aes-key repo graph-id)
  1335. _ (when (and (graph-e2ee? repo) (nil? aes-key))
  1336. (fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))]
  1337. (set-graph-e2ee-enabled! repo)
  1338. (ensure-client-graph-uuid! repo graph-id)
  1339. (p/let [datoms (d/datoms @source-conn :eavt)
  1340. _ (prn :debug :datoms-count (count datoms) :time (js/Date.))
  1341. encrypted-datoms (<encrypt-datoms aes-key datoms)
  1342. _ (prn :debug :encrypted-datoms-count (count encrypted-datoms)
  1343. :time (js/Date.))
  1344. {:keys [db] :as temp} (<create-temp-sqlite-conn (d/schema @source-conn) encrypted-datoms)]
  1345. (prn :debug :created-temp-conn :time (js/Date.))
  1346. (->
  1347. (p/loop [last-addr -1
  1348. first-batch? true]
  1349. (let [rows (fetch-kvs-rows db last-addr upload-kvs-batch-size)]
  1350. (prn :debug :rows-count (count rows))
  1351. (if (empty? rows)
  1352. (do
  1353. (client-op/remove-local-tx repo)
  1354. (client-op/update-local-tx repo 0)
  1355. (client-op/add-all-exists-asset-as-ops repo)
  1356. {:graph-id graph-id})
  1357. (let [max-addr (apply max (map first rows))
  1358. rows (normalize-snapshot-rows rows)
  1359. upload-url (str base "/sync/" graph-id "/snapshot/upload?reset=" (if first-batch? "true" "false"))]
  1360. (p/let [{:keys [body encoding]} (<snapshot-upload-body rows)
  1361. headers (cond-> {"content-type" snapshot-content-type}
  1362. (string? encoding) (assoc "content-encoding" encoding))
  1363. _ (fetch-json upload-url
  1364. {:method "POST"
  1365. :headers headers
  1366. :body body}
  1367. {:response-schema :sync/snapshot-upload})]
  1368. (p/recur max-addr false))))))
  1369. (p/finally
  1370. (fn []
  1371. (prn :debug :cleanup-temp-db :time (js/Date.))
  1372. (cleanup-temp-sqlite! temp))))))
  1373. (p/rejected (ex-info "db-sync missing datascript conn"
  1374. {:repo repo :graph-id graph-id})))
  1375. (p/rejected (ex-info "db-sync missing upload info"
  1376. {:repo repo :base base :graph-id graph-id}))))
  1377. (p/catch (fn [error]
  1378. (js/console.error error)))))