sync.cljs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. (ns frontend.handler.db-based.sync
  2. "DB-sync handler based on Cloudflare Durable Objects."
  3. (:require [clojure.string :as string]
  4. [frontend.config :as config]
  5. [frontend.db :as db]
  6. [frontend.handler.notification :as notification]
  7. [frontend.handler.repo :as repo-handler]
  8. [frontend.handler.user :as user-handler]
  9. [frontend.state :as state]
  10. [lambdaisland.glogi :as log]
  11. [logseq.db :as ldb]
  12. [logseq.db-sync.malli-schema :as db-sync-schema]
  13. [logseq.db.sqlite.util :as sqlite-util]
  14. [promesa.core :as p]))
  15. (defn- ws->http-base [ws-url]
  16. (when (string? ws-url)
  17. (let [base (cond
  18. (string/starts-with? ws-url "wss://")
  19. (str "https://" (subs ws-url (count "wss://")))
  20. (string/starts-with? ws-url "ws://")
  21. (str "http://" (subs ws-url (count "ws://")))
  22. :else ws-url)
  23. base (string/replace base #"/sync/%s$" "")]
  24. base)))
  25. (defn http-base []
  26. (or config/db-sync-http-base
  27. (ws->http-base config/db-sync-ws-url)))
  28. (def ^:private snapshot-text-decoder (js/TextDecoder.))
  29. (defn- ->uint8 [data]
  30. (cond
  31. (instance? js/Uint8Array data) data
  32. (instance? js/ArrayBuffer data) (js/Uint8Array. data)
  33. (string? data) (.encode (js/TextEncoder.) data)
  34. :else (js/Uint8Array. data)))
  35. (defn- decode-snapshot-rows [payload]
  36. (sqlite-util/read-transit-str (.decode snapshot-text-decoder (->uint8 payload))))
  37. (defn- frame-len [^js data offset]
  38. (let [view (js/DataView. (.-buffer data) offset 4)]
  39. (.getUint32 view 0 false)))
  40. (defn- concat-bytes
  41. [^js a ^js b]
  42. (cond
  43. (nil? a) b
  44. (nil? b) a
  45. :else
  46. (let [out (js/Uint8Array. (+ (.-byteLength a) (.-byteLength b)))]
  47. (.set out a 0)
  48. (.set out b (.-byteLength a))
  49. out)))
  50. (defn- parse-framed-chunk
  51. [buffer chunk]
  52. (let [data (concat-bytes buffer chunk)
  53. total (.-byteLength data)]
  54. (loop [offset 0
  55. rows []]
  56. (if (< (- total offset) 4)
  57. {:rows rows
  58. :buffer (when (< offset total)
  59. (.slice data offset total))}
  60. (let [len (frame-len data offset)
  61. next-offset (+ offset 4 len)]
  62. (if (<= next-offset total)
  63. (let [payload (.slice data (+ offset 4) next-offset)
  64. decoded (decode-snapshot-rows payload)]
  65. (recur next-offset (into rows decoded)))
  66. {:rows rows
  67. :buffer (.slice data offset total)}))))))
  68. (defn- finalize-framed-buffer
  69. [buffer]
  70. (if (or (nil? buffer) (zero? (.-byteLength buffer)))
  71. []
  72. (let [{:keys [rows buffer]} (parse-framed-chunk nil buffer)]
  73. (if (and (seq rows) (or (nil? buffer) (zero? (.-byteLength buffer))))
  74. rows
  75. (throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows}))))))
  76. (defn- auth-headers []
  77. (when-let [token (state/get-auth-id-token)]
  78. {"authorization" (str "Bearer " token)}))
  79. (defn- with-auth-headers [opts]
  80. (if-let [auth (auth-headers)]
  81. (assoc opts :headers (merge (or (:headers opts) {}) auth))
  82. opts))
  83. (declare fetch-json)
  84. (declare coerce-http-response)
  85. (defn fetch-json
  86. [url opts {:keys [response-schema error-schema] :or {error-schema :error}}]
  87. (p/let [resp (js/fetch url (clj->js (with-auth-headers opts)))
  88. text (.text resp)
  89. data (when (seq text) (js/JSON.parse text))]
  90. (if (.-ok resp)
  91. (let [body (js->clj data :keywordize-keys true)
  92. body (if response-schema
  93. (coerce-http-response response-schema body)
  94. body)]
  95. (if (or (nil? response-schema) body)
  96. body
  97. (throw (ex-info "db-sync invalid response"
  98. {:status (.-status resp)
  99. :url url
  100. :body body}))))
  101. (let [body (when data (js->clj data :keywordize-keys true))
  102. body (if error-schema
  103. (coerce-http-response error-schema body)
  104. body)]
  105. (throw (ex-info "db-sync request failed"
  106. {:status (.-status resp)
  107. :url url
  108. :body body}))))))
  109. (def ^:private invalid-coerce ::invalid-coerce)
  110. (defn- coerce
  111. [coercer value context]
  112. (try
  113. (coercer value)
  114. (catch :default e
  115. (log/error :db-sync/malli-coerce-failed (merge context {:error e :value value}))
  116. invalid-coerce)))
  117. (defn- coerce-http-request [schema-key body]
  118. (if-let [coercer (get db-sync-schema/http-request-coercers schema-key)]
  119. (let [coerced (coerce coercer body {:schema schema-key :dir :request})]
  120. (when-not (= coerced invalid-coerce)
  121. coerced))
  122. body))
  123. (defn- coerce-http-response [schema-key body]
  124. (if-let [coercer (get db-sync-schema/http-response-coercers schema-key)]
  125. (let [coerced (coerce coercer body {:schema schema-key :dir :response})]
  126. (when-not (= coerced invalid-coerce)
  127. coerced))
  128. body))
  129. (defn- graph-in-remote-list?
  130. [repo]
  131. (some #(= repo (:url %)) (state/get-rtc-graphs)))
  132. (defn <rtc-start!
  133. [repo & {:keys [_stop-before-start?] :as _opts}]
  134. (if (graph-in-remote-list? repo)
  135. (do
  136. (log/info :db-sync/start {:repo repo})
  137. (state/<invoke-db-worker :thread-api/db-sync-start repo))
  138. (do
  139. (log/info :db-sync/skip-start {:repo repo :reason :graph-not-in-remote-list})
  140. (p/resolved nil))))
  141. (defn <rtc-stop!
  142. []
  143. (log/info :db-sync/stop true)
  144. (state/<invoke-db-worker :thread-api/db-sync-stop))
  145. (defn <rtc-update-presence!
  146. [editing-block-uuid]
  147. (state/<invoke-db-worker :thread-api/db-sync-update-presence editing-block-uuid))
  148. (defn <rtc-get-users-info
  149. []
  150. (when-let [graph-uuid (ldb/get-graph-rtc-uuid (db/get-db))]
  151. (let [base (http-base)
  152. repo (state/get-current-repo)]
  153. (if base
  154. (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
  155. resp (fetch-json (str base "/graphs/" graph-uuid "/members")
  156. {:method "GET"}
  157. {:response-schema :graph-members/list})
  158. members (:members resp)
  159. users (mapv (fn [{:keys [user-id role email username]}]
  160. (let [name (or username email user-id)
  161. user-type (some-> role keyword)]
  162. (cond-> {:user/uuid user-id
  163. :user/name name
  164. :graph<->user/user-type user-type}
  165. (string? email) (assoc :user/email email))))
  166. members)]
  167. (state/set-state! :rtc/users-info {repo users}))
  168. (p/resolved nil)))))
  169. (defn <rtc-create-graph!
  170. [repo]
  171. (let [schema-version (some-> (ldb/get-graph-schema-version (db/get-db)) :major str)
  172. base (http-base)]
  173. (if base
  174. (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
  175. body (coerce-http-request :graphs/create
  176. {:graph-name (string/replace repo config/db-version-prefix "")
  177. :schema-version schema-version})
  178. result (if (nil? body)
  179. (p/rejected (ex-info "db-sync invalid create-graph body"
  180. {:repo repo}))
  181. (fetch-json (str base "/graphs")
  182. {:method "POST"
  183. :headers {"content-type" "application/json"}
  184. :body (js/JSON.stringify (clj->js body))}
  185. {:response-schema :graphs/create}))
  186. graph-id (:graph-id result)]
  187. (if graph-id
  188. (p/do!
  189. (ldb/transact! repo [(sqlite-util/kv :logseq.kv/db-type "db")
  190. (sqlite-util/kv :logseq.kv/graph-uuid (uuid graph-id))
  191. (sqlite-util/kv :logseq.kv/graph-rtc-e2ee? true)])
  192. graph-id)
  193. (p/rejected (ex-info "db-sync missing graph id in create response"
  194. {:type :db-sync/invalid-graph
  195. :response result}))))
  196. (p/rejected (ex-info "db-sync missing graph info"
  197. {:type :db-sync/invalid-graph
  198. :base base})))))
  199. (defn <rtc-delete-graph!
  200. [graph-uuid _schema-version]
  201. (let [base (http-base)]
  202. (if (and graph-uuid base)
  203. (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)]
  204. (fetch-json (str base "/graphs/" graph-uuid)
  205. {:method "DELETE"}
  206. {:response-schema :graphs/delete}))
  207. (p/rejected (ex-info "db-sync missing graph id"
  208. {:type :db-sync/invalid-graph
  209. :graph-uuid graph-uuid
  210. :base base})))))
  211. (defn <rtc-download-graph!
  212. [graph-name graph-uuid]
  213. (state/set-state! :rtc/downloading-graph-uuid graph-uuid)
  214. (let [base (http-base)]
  215. (-> (if (and graph-uuid base)
  216. (let [download-url* (atom nil)]
  217. (-> (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
  218. graph (str config/db-version-prefix graph-name)
  219. pull-resp (fetch-json (str base "/sync/" graph-uuid "/pull")
  220. {:method "GET"}
  221. {:response-schema :sync/pull})
  222. remote-tx (:t pull-resp)
  223. _ (when-not (integer? remote-tx)
  224. (throw (ex-info "non-integer remote-tx when downloading graph"
  225. {:graph graph-name
  226. :remote-tx remote-tx})))
  227. download-resp (fetch-json (str base "/sync/" graph-uuid "/snapshot/download")
  228. {:method "GET"}
  229. {:response-schema :sync/snapshot-download})
  230. download-url (:url download-resp)
  231. _ (reset! download-url* download-url)
  232. _ (when-not (string? download-url)
  233. (throw (ex-info "missing snapshot download url"
  234. {:graph graph-name
  235. :response download-resp})))
  236. resp (js/fetch download-url (clj->js (with-auth-headers {:method "GET"})))
  237. total-bytes (when-let [raw (some-> resp .-headers (.get "content-length"))]
  238. (let [parsed (js/parseInt raw 10)]
  239. (when-not (js/isNaN parsed) parsed)))
  240. _ (state/pub-event!
  241. [:rtc/log {:type :rtc.log/download
  242. :sub-type :download-progress
  243. :graph-uuid graph-uuid
  244. :message (str "Start downloading graph snapshot, file size: " total-bytes)}])]
  245. (when-not (.-ok resp)
  246. (throw (ex-info "snapshot download failed"
  247. {:graph graph-name
  248. :status (.-status resp)})))
  249. (when-not (.-body resp)
  250. (throw (ex-info "snapshot download missing body"
  251. {:graph graph-name})))
  252. (p/let [reader (.getReader (.-body resp))]
  253. (p/loop [buffer nil
  254. total 0
  255. total-rows []
  256. loaded 0]
  257. (p/let [chunk (.read reader)]
  258. (if (.-done chunk)
  259. (let [rows (finalize-framed-buffer buffer)
  260. total' (+ total (count rows))
  261. total-rows' (into total-rows rows)]
  262. (state/pub-event!
  263. [:rtc/log {:type :rtc.log/download
  264. :sub-type :download-completed
  265. :graph-uuid graph-uuid
  266. :message "Graph snapshot downloaded"}])
  267. (when (seq total-rows')
  268. (state/<invoke-db-worker :thread-api/db-sync-import-kvs-rows
  269. graph total-rows' true graph-uuid remote-tx))
  270. total')
  271. (let [value (.-value chunk)
  272. loaded' (+ loaded (.-byteLength value))
  273. {:keys [rows buffer]} (parse-framed-chunk buffer value)
  274. total' (+ total (count rows))]
  275. (p/recur buffer total' (into total-rows rows) loaded')))))))
  276. (p/finally
  277. (fn []
  278. (when-let [download-url @download-url*]
  279. (js/fetch download-url (clj->js (with-auth-headers {:method "DELETE"}))))))))
  280. (p/rejected (ex-info "db-sync missing graph info"
  281. {:type :db-sync/invalid-graph
  282. :graph-uuid graph-uuid
  283. :base base})))
  284. (p/catch (fn [error]
  285. (throw error)))
  286. (p/finally
  287. (fn []
  288. (state/set-state! :rtc/downloading-graph-uuid nil))))))
  289. (defn <get-remote-graphs
  290. []
  291. (let [base (http-base)]
  292. (if-not base
  293. (p/resolved [])
  294. (-> (p/let [_ (state/set-state! :rtc/loading-graphs? true)
  295. _ (js/Promise. user-handler/task--ensure-id&access-token)
  296. resp (fetch-json (str base "/graphs")
  297. {:method "GET"}
  298. {:response-schema :graphs/list})
  299. graphs (:graphs resp)
  300. result (mapv (fn [graph]
  301. (merge
  302. {:url (str config/db-version-prefix (:graph-name graph))
  303. :GraphName (:graph-name graph)
  304. :GraphSchemaVersion (:schema-version graph)
  305. :GraphUUID (:graph-id graph)
  306. :rtc-graph? true
  307. :graph<->user-user-type (:role graph)
  308. :graph<->user-grant-by-user (:invited-by graph)}
  309. (dissoc graph :graph-id :graph-name :schema-version :role :invited-by)))
  310. graphs)]
  311. (state/set-state! :rtc/graphs result)
  312. (repo-handler/refresh-repos!)
  313. result)
  314. (p/finally
  315. (fn []
  316. (state/set-state! :rtc/loading-graphs? false)))))))
  317. (defn <rtc-invite-email
  318. [graph-uuid email]
  319. (let [base (http-base)
  320. graph-uuid (str graph-uuid)]
  321. (if (and base (string? graph-uuid) (string? email))
  322. (->
  323. (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
  324. body (coerce-http-request :graph-members/create
  325. {:email email
  326. :role "member"})
  327. _ (when (nil? body)
  328. (throw (ex-info "db-sync invalid invite body"
  329. {:graph-uuid graph-uuid
  330. :email email})))
  331. _ (fetch-json (str base "/graphs/" graph-uuid "/members")
  332. {:method "POST"
  333. :headers {"content-type" "application/json"}
  334. :body (js/JSON.stringify (clj->js body))}
  335. {:response-schema :graph-members/create})
  336. repo (state/get-current-repo)
  337. e2ee? (ldb/get-graph-rtc-e2ee? (db/get-db))
  338. _ (when (and repo e2ee?)
  339. (state/<invoke-db-worker :thread-api/db-sync-grant-graph-access
  340. repo graph-uuid email))]
  341. (notification/show! "Invitation sent!" :success))
  342. (p/catch (fn [e]
  343. (notification/show! "Something wrong, please try again." :error)
  344. (log/error :db-sync/invite-email-failed
  345. {:error e
  346. :graph-uuid graph-uuid
  347. :email email}))))
  348. (p/rejected (ex-info "db-sync missing invite info"
  349. {:type :db-sync/invalid-invite
  350. :graph-uuid graph-uuid
  351. :email email
  352. :base base})))))
  353. (defn <rtc-remove-member!
  354. [graph-uuid member-id]
  355. (let [base (http-base)
  356. graph-uuid (some-> graph-uuid str)
  357. member-id (some-> member-id str)]
  358. (if (and base (string? graph-uuid) (string? member-id))
  359. (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)]
  360. (fetch-json (str base "/graphs/" graph-uuid "/members/" member-id)
  361. {:method "DELETE"}
  362. {:response-schema :graph-members/delete}))
  363. (p/rejected (ex-info "db-sync missing member info"
  364. {:type :db-sync/invalid-member
  365. :graph-uuid graph-uuid
  366. :member-id member-id
  367. :base base})))))
  368. (defn <rtc-leave-graph!
  369. [graph-uuid]
  370. (if-let [member-id (user-handler/user-uuid)]
  371. (<rtc-remove-member! graph-uuid member-id)
  372. (p/rejected (ex-info "db-sync missing user id"
  373. {:type :db-sync/invalid-member
  374. :graph-uuid graph-uuid}))))
  375. (defn <rtc-upload-graph! [repo _token _remote-graph-name]
  376. (p/let [graph-id (<rtc-create-graph! repo)]
  377. (when (nil? graph-id)
  378. (throw (ex-info "graph id doesn't exist when uploading to server" {:repo repo})))
  379. (p/do!
  380. (state/<invoke-db-worker :thread-api/db-sync-upload-graph repo)
  381. (<rtc-start! repo))))