db_sync.cljs 16 KB


  1. (ns frontend.handler.db-based.db-sync
  2. "DB-sync handler based on Cloudflare Durable Objects."
  3. (:require [clojure.string :as string]
  4. [frontend.config :as config]
  5. [frontend.db :as db]
  6. [frontend.handler.notification :as notification]
  7. [frontend.handler.repo :as repo-handler]
  8. [frontend.handler.user :as user-handler]
  9. [frontend.state :as state]
  10. [lambdaisland.glogi :as log]
  11. [logseq.db :as ldb]
  12. [logseq.db-sync.malli-schema :as db-sync-schema]
  13. [logseq.db.sqlite.util :as sqlite-util]
  14. [promesa.core :as p]))
  15. (defn- ws->http-base [ws-url]
  16. (when (string? ws-url)
  17. (let [base (cond
  18. (string/starts-with? ws-url "wss://")
  19. (str "https://" (subs ws-url (count "wss://")))
  20. (string/starts-with? ws-url "ws://")
  21. (str "http://" (subs ws-url (count "ws://")))
  22. :else ws-url)
  23. base (string/replace base #"/sync/%s$" "")]
  24. base)))
  25. (defn- http-base []
  26. (or config/db-sync-http-base
  27. (ws->http-base config/db-sync-ws-url)))
  28. (def ^:private snapshot-text-decoder (js/TextDecoder.))
  29. (defn- ->uint8 [data]
  30. (cond
  31. (instance? js/Uint8Array data) data
  32. (instance? js/ArrayBuffer data) (js/Uint8Array. data)
  33. (string? data) (.encode (js/TextEncoder.) data)
  34. :else (js/Uint8Array. data)))
  35. (defn- decode-snapshot-rows [bytes]
  36. (sqlite-util/read-transit-str (.decode snapshot-text-decoder (->uint8 bytes))))
  37. (defn- snapshot-rows-e2ee?
  38. [rows]
  39. (boolean
  40. (some (fn [[_ content _]]
  41. (try
  42. (let [data (sqlite-util/read-transit-str content)]
  43. (and (map? data)
  44. (= :logseq.kv/graph-rtc-e2ee? (:db/ident data))))
  45. (catch :default _
  46. false)))
  47. rows)))
  48. (defn- frame-len [^js data offset]
  49. (let [view (js/DataView. (.-buffer data) offset 4)]
  50. (.getUint32 view 0 false)))
  51. (defn- concat-bytes
  52. [^js a ^js b]
  53. (cond
  54. (nil? a) b
  55. (nil? b) a
  56. :else
  57. (let [out (js/Uint8Array. (+ (.-byteLength a) (.-byteLength b)))]
  58. (.set out a 0)
  59. (.set out b (.-byteLength a))
  60. out)))
  61. (defn- parse-framed-chunk
  62. [buffer chunk]
  63. (let [data (concat-bytes buffer chunk)
  64. total (.-byteLength data)]
  65. (loop [offset 0
  66. rows []]
  67. (if (< (- total offset) 4)
  68. {:rows rows
  69. :buffer (when (< offset total)
  70. (.slice data offset total))}
  71. (let [len (frame-len data offset)
  72. next-offset (+ offset 4 len)]
  73. (if (<= next-offset total)
  74. (let [payload (.slice data (+ offset 4) next-offset)
  75. decoded (decode-snapshot-rows payload)]
  76. (recur next-offset (into rows decoded)))
  77. {:rows rows
  78. :buffer (.slice data offset total)}))))))
  79. (defn- finalize-framed-buffer
  80. [buffer]
  81. (if (or (nil? buffer) (zero? (.-byteLength buffer)))
  82. []
  83. (let [{:keys [rows buffer]} (parse-framed-chunk nil buffer)]
  84. (if (and (seq rows) (or (nil? buffer) (zero? (.-byteLength buffer))))
  85. rows
  86. (throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows}))))))
  87. (defn- auth-headers []
  88. (when-let [token (state/get-auth-id-token)]
  89. {"authorization" (str "Bearer " token)}))
  90. (defn- with-auth-headers [opts]
  91. (if-let [auth (auth-headers)]
  92. (assoc opts :headers (merge (or (:headers opts) {}) auth))
  93. opts))
  94. (declare fetch-json)
  95. (defn- fetch-graph-e2ee?
  96. [base graph-uuid]
  97. (if-not (and (string? base) (string? graph-uuid))
  98. false
  99. (p/let [resp (fetch-json (str base "/e2ee/graphs/" graph-uuid "/aes-key")
  100. {:method "GET"}
  101. {:response-schema :e2ee/graph-aes-key})
  102. encrypted-aes-key (:encrypted-aes-key resp)]
  103. (boolean (string? encrypted-aes-key)))))
  104. (declare coerce-http-response)
  105. (defn- fetch-json
  106. [url opts {:keys [response-schema error-schema] :or {error-schema :error}}]
  107. (p/let [resp (js/fetch url (clj->js (with-auth-headers opts)))
  108. text (.text resp)
  109. data (when (seq text) (js/JSON.parse text))]
  110. (if (.-ok resp)
  111. (let [body (js->clj data :keywordize-keys true)
  112. body (if response-schema
  113. (coerce-http-response response-schema body)
  114. body)]
  115. (if (or (nil? response-schema) body)
  116. body
  117. (throw (ex-info "db-sync invalid response"
  118. {:status (.-status resp)
  119. :url url
  120. :body body}))))
  121. (let [body (when data (js->clj data :keywordize-keys true))
  122. body (if error-schema
  123. (coerce-http-response error-schema body)
  124. body)]
  125. (throw (ex-info "db-sync request failed"
  126. {:status (.-status resp)
  127. :url url
  128. :body body}))))))
  129. (def ^:private invalid-coerce ::invalid-coerce)
  130. (defn- coerce
  131. [coercer value context]
  132. (try
  133. (coercer value)
  134. (catch :default e
  135. (log/error :db-sync/malli-coerce-failed (merge context {:error e :value value}))
  136. invalid-coerce)))
  137. (defn- coerce-http-request [schema-key body]
  138. (if-let [coercer (get db-sync-schema/http-request-coercers schema-key)]
  139. (let [coerced (coerce coercer body {:schema schema-key :dir :request})]
  140. (when-not (= coerced invalid-coerce)
  141. coerced))
  142. body))
  143. (defn- coerce-http-response [schema-key body]
  144. (if-let [coercer (get db-sync-schema/http-response-coercers schema-key)]
  145. (let [coerced (coerce coercer body {:schema schema-key :dir :response})]
  146. (when-not (= coerced invalid-coerce)
  147. coerced))
  148. body))
  149. (defn <rtc-start!
  150. [repo & {:keys [_stop-before-start?] :as _opts}]
  151. (log/info :db-sync/start {:repo repo})
  152. (state/<invoke-db-worker :thread-api/db-sync-start repo))
  153. (defn <rtc-stop!
  154. []
  155. (log/info :db-sync/stop true)
  156. (state/<invoke-db-worker :thread-api/db-sync-stop))
  157. (defn <rtc-update-presence!
  158. [editing-block-uuid]
  159. (state/<invoke-db-worker :thread-api/db-sync-update-presence editing-block-uuid))
  160. (defn <rtc-get-users-info
  161. []
  162. (when-let [graph-uuid (ldb/get-graph-rtc-uuid (db/get-db))]
  163. (let [base (http-base)
  164. repo (state/get-current-repo)]
  165. (if base
  166. (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
  167. resp (fetch-json (str base "/graphs/" graph-uuid "/members")
  168. {:method "GET"}
  169. {:response-schema :graph-members/list})
  170. members (:members resp)
  171. users (mapv (fn [{:keys [user-id role email username]}]
  172. (let [name (or username email user-id)
  173. user-type (some-> role keyword)]
  174. (cond-> {:user/uuid user-id
  175. :user/name name
  176. :graph<->user/user-type user-type}
  177. (string? email) (assoc :user/email email))))
  178. members)]
  179. (state/set-state! :rtc/users-info {repo users}))
  180. (p/resolved nil)))))
  181. (defn <rtc-create-graph!
  182. [repo]
  183. (let [schema-version (some-> (ldb/get-graph-schema-version (db/get-db)) :major str)
  184. base (http-base)]
  185. (if base
  186. (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
  187. body (coerce-http-request :graphs/create
  188. {:graph-name (string/replace repo config/db-version-prefix "")
  189. :schema-version schema-version})
  190. result (if (nil? body)
  191. (p/rejected (ex-info "db-sync invalid create-graph body"
  192. {:repo repo}))
  193. (fetch-json (str base "/graphs")
  194. {:method "POST"
  195. :headers {"content-type" "application/json"}
  196. :body (js/JSON.stringify (clj->js body))}
  197. {:response-schema :graphs/create}))
  198. graph-id (:graph-id result)]
  199. (if graph-id
  200. (p/do!
  201. (ldb/transact! repo [(sqlite-util/kv :logseq.kv/db-type "db")
  202. (sqlite-util/kv :logseq.kv/graph-uuid (uuid graph-id))
  203. (sqlite-util/kv :logseq.kv/graph-rtc-e2ee? true)])
  204. graph-id)
  205. (p/rejected (ex-info "db-sync missing graph id in create response"
  206. {:type :db-sync/invalid-graph
  207. :response result}))))
  208. (p/rejected (ex-info "db-sync missing graph info"
  209. {:type :db-sync/invalid-graph
  210. :base base})))))
  211. (defn <rtc-delete-graph!
  212. [graph-uuid _schema-version]
  213. (let [base (http-base)]
  214. (if (and graph-uuid base)
  215. (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)]
  216. (fetch-json (str base "/graphs/" graph-uuid)
  217. {:method "DELETE"}
  218. {:response-schema :graphs/delete}))
  219. (p/rejected (ex-info "db-sync missing graph id"
  220. {:type :db-sync/invalid-graph
  221. :graph-uuid graph-uuid
  222. :base base})))))
  223. (defn <rtc-download-graph!
  224. [graph-name graph-uuid _graph-schema-version]
  225. (state/set-state! :rtc/downloading-graph-uuid graph-uuid)
  226. (let [base (http-base)]
  227. (-> (if (and graph-uuid base)
  228. (let [download-url* (atom nil)]
  229. (-> (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
  230. graph (str config/db-version-prefix graph-name)
  231. pull-resp (fetch-json (str base "/sync/" graph-uuid "/pull")
  232. {:method "GET"}
  233. {:response-schema :sync/pull})
  234. remote-tx (:t pull-resp)
  235. _ (when-not (integer? remote-tx)
  236. (throw (ex-info "non-integer remote-tx when downloading graph"
  237. {:graph graph-name
  238. :remote-tx remote-tx})))
  239. download-resp (fetch-json (str base "/sync/" graph-uuid "/snapshot/download")
  240. {:method "GET"}
  241. {:response-schema :sync/snapshot-download})
  242. download-url (:url download-resp)
  243. _ (reset! download-url* download-url)
  244. _ (when-not (string? download-url)
  245. (throw (ex-info "missing snapshot download url"
  246. {:graph graph-name
  247. :response download-resp})))
  248. e2ee? (fetch-graph-e2ee? base (str graph-uuid))
  249. resp (js/fetch download-url (clj->js (with-auth-headers {:method "GET"})))]
  250. (when-not (.-ok resp)
  251. (throw (ex-info "snapshot download failed"
  252. {:graph graph-name
  253. :status (.-status resp)})))
  254. (when-not (.-body resp)
  255. (throw (ex-info "snapshot download missing body"
  256. {:graph graph-name})))
  257. (p/let [reader (.getReader (.-body resp))]
  258. (p/loop [buffer nil
  259. total 0
  260. total-rows []]
  261. (p/let [chunk (.read reader)]
  262. (if (.-done chunk)
  263. (let [rows (finalize-framed-buffer buffer)
  264. total' (+ total (count rows))
  265. total-rows' (into total-rows rows)]
  266. (when (seq total-rows')
  267. (p/do!
  268. (state/<invoke-db-worker :thread-api/db-sync-import-kvs-rows
  269. graph total-rows' true graph-uuid e2ee?)
  270. (state/<invoke-db-worker :thread-api/db-sync-finalize-kvs-import graph remote-tx)))
  271. total')
  272. (let [value (.-value chunk)
  273. {:keys [rows buffer]} (parse-framed-chunk buffer value)
  274. total' (+ total (count rows))]
  275. (p/recur buffer total' (into total-rows rows))))))))
  276. (p/finally
  277. (fn []
  278. (when-let [download-url @download-url*]
  279. (js/fetch download-url (clj->js (with-auth-headers {:method "DELETE"}))))))))
  280. (p/rejected (ex-info "db-sync missing graph info"
  281. {:type :db-sync/invalid-graph
  282. :graph-uuid graph-uuid
  283. :base base})))
  284. (p/catch (fn [error]
  285. (throw error)))
  286. (p/finally
  287. (fn []
  288. (state/set-state! :rtc/downloading-graph-uuid nil))))))
  289. (defn <get-remote-graphs
  290. []
  291. (let [base (http-base)]
  292. (if-not base
  293. (p/resolved [])
  294. (-> (p/let [_ (state/set-state! :rtc/loading-graphs? true)
  295. _ (js/Promise. user-handler/task--ensure-id&access-token)
  296. resp (fetch-json (str base "/graphs")
  297. {:method "GET"}
  298. {:response-schema :graphs/list})
  299. graphs (:graphs resp)
  300. result (mapv (fn [graph]
  301. (merge
  302. {:url (str config/db-version-prefix (:graph-name graph))
  303. :GraphName (:graph-name graph)
  304. :GraphSchemaVersion (:schema-version graph)
  305. :GraphUUID (:graph-id graph)
  306. :rtc-graph? true
  307. :graph<->user-user-type (:role graph)
  308. :graph<->user-grant-by-user (:invited-by graph)}
  309. (dissoc graph :graph-id :graph-name :schema-version :role :invited-by)))
  310. graphs)]
  311. (state/set-state! :rtc/graphs result)
  312. (repo-handler/refresh-repos!)
  313. result)
  314. (p/finally
  315. (fn []
  316. (state/set-state! :rtc/loading-graphs? false)))))))
  317. (defn <rtc-invite-email
  318. [graph-uuid email]
  319. (let [base (http-base)
  320. graph-uuid (str graph-uuid)]
  321. (if (and base (string? graph-uuid) (string? email))
  322. (->
  323. (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
  324. body (coerce-http-request :graph-members/create
  325. {:email email
  326. :role "member"})
  327. _ (when (nil? body)
  328. (throw (ex-info "db-sync invalid invite body"
  329. {:graph-uuid graph-uuid
  330. :email email})))
  331. _ (fetch-json (str base "/graphs/" graph-uuid "/members")
  332. {:method "POST"
  333. :headers {"content-type" "application/json"}
  334. :body (js/JSON.stringify (clj->js body))}
  335. {:response-schema :graph-members/create})
  336. repo (state/get-current-repo)
  337. e2ee? (ldb/get-graph-rtc-e2ee? (db/get-db))
  338. _ (when (and repo e2ee?)
  339. (state/<invoke-db-worker :thread-api/db-sync-grant-graph-access
  340. repo graph-uuid email))]
  341. (notification/show! "Invitation sent!" :success))
  342. (p/catch (fn [e]
  343. (notification/show! "Something wrong, please try again." :error)
  344. (log/error :db-sync/invite-email-failed
  345. {:error e
  346. :graph-uuid graph-uuid
  347. :email email}))))
  348. (p/rejected (ex-info "db-sync missing invite info"
  349. {:type :db-sync/invalid-invite
  350. :graph-uuid graph-uuid
  351. :email email
  352. :base base})))))