(ns frontend.handler.db-based.sync "DB-sync handler based on Cloudflare Durable Objects." (:require [clojure.string :as string] [frontend.config :as config] [frontend.db :as db] [frontend.handler.notification :as notification] [frontend.handler.repo :as repo-handler] [frontend.handler.user :as user-handler] [frontend.state :as state] [lambdaisland.glogi :as log] [logseq.db :as ldb] [logseq.db-sync.malli-schema :as db-sync-schema] [logseq.db.sqlite.util :as sqlite-util] [promesa.core :as p])) (defn- ws->http-base [ws-url] (when (string? ws-url) (let [base (cond (string/starts-with? ws-url "wss://") (str "https://" (subs ws-url (count "wss://"))) (string/starts-with? ws-url "ws://") (str "http://" (subs ws-url (count "ws://"))) :else ws-url) base (string/replace base #"/sync/%s$" "")] base))) (defn http-base [] (or config/db-sync-http-base (ws->http-base config/db-sync-ws-url))) (def ^:private snapshot-text-decoder (js/TextDecoder.)) (defn- ->uint8 [data] (cond (instance? js/Uint8Array data) data (instance? js/ArrayBuffer data) (js/Uint8Array. data) (string? data) (.encode (js/TextEncoder.) data) :else (js/Uint8Array. data))) (defn- decode-snapshot-rows [payload] (sqlite-util/read-transit-str (.decode snapshot-text-decoder (->uint8 payload)))) (defn- frame-len [^js data offset] (let [view (js/DataView. (.-buffer data) offset 4)] (.getUint32 view 0 false))) (defn- concat-bytes [^js a ^js b] (cond (nil? a) b (nil? b) a :else (let [out (js/Uint8Array. (+ (.-byteLength a) (.-byteLength b)))] (.set out a 0) (.set out b (.-byteLength a)) out))) (defn- parse-framed-chunk [buffer chunk] (let [data (concat-bytes buffer chunk) total (.-byteLength data)] (loop [offset 0 rows []] (if (< (- total offset) 4) {:rows rows :buffer (when (< offset total) (.slice data offset total))} (let [len (frame-len data offset) next-offset (+ offset 4 len)] (if (<= next-offset total) (let [payload (.slice data (+ offset 4) next-offset) decoded (decode-snapshot-rows payload)] (recur next-offset (into rows decoded))) {:rows rows :buffer (.slice data offset total)})))))) (defn- finalize-framed-buffer [buffer] (if (or (nil? buffer) (zero? (.-byteLength buffer))) [] (let [{:keys [rows buffer]} (parse-framed-chunk nil buffer)] (if (and (seq rows) (or (nil? buffer) (zero? (.-byteLength buffer)))) rows (throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows})))))) (defn- auth-headers [] (when-let [token (state/get-auth-id-token)] {"authorization" (str "Bearer " token)})) (defn- with-auth-headers [opts] (if-let [auth (auth-headers)] (assoc opts :headers (merge (or (:headers opts) {}) auth)) opts)) (declare fetch-json) (declare coerce-http-response) (defn fetch-json [url opts {:keys [response-schema error-schema] :or {error-schema :error}}] (p/let [resp (js/fetch url (clj->js (with-auth-headers opts))) text (.text resp) data (when (seq text) (js/JSON.parse text))] (if (.-ok resp) (let [body (js->clj data :keywordize-keys true) body (if response-schema (coerce-http-response response-schema body) body)] (if (or (nil? response-schema) body) body (throw (ex-info "db-sync invalid response" {:status (.-status resp) :url url :body body})))) (let [body (when data (js->clj data :keywordize-keys true)) body (if error-schema (coerce-http-response error-schema body) body)] (throw (ex-info "db-sync request failed" {:status (.-status resp) :url url :body body})))))) (def ^:private invalid-coerce ::invalid-coerce) (defn- coerce [coercer value context] (try (coercer value) (catch :default e (log/error :db-sync/malli-coerce-failed (merge context {:error e :value value})) invalid-coerce))) (defn- coerce-http-request [schema-key body] (if-let [coercer (get db-sync-schema/http-request-coercers schema-key)] (let [coerced (coerce coercer body {:schema schema-key :dir :request})] (when-not (= coerced invalid-coerce) coerced)) body)) (defn- coerce-http-response [schema-key body] (if-let [coercer (get db-sync-schema/http-response-coercers schema-key)] (let [coerced (coerce coercer body {:schema schema-key :dir :response})] (when-not (= coerced invalid-coerce) coerced)) body)) (defn- graph-in-remote-list? [repo] (some #(= repo (:url %)) (state/get-rtc-graphs))) (defn role keyword)] (cond-> {:user/uuid user-id :user/name name :graph<->user/user-type user-type} (string? email) (assoc :user/email email)))) members)] (state/set-state! :rtc/users-info {repo users})) (p/resolved nil))))) (defn (ldb/get-graph-schema-version (db/get-db)) :major str) base (http-base)] (if base (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token) body (coerce-http-request :graphs/create {:graph-name (string/replace repo config/db-version-prefix "") :schema-version schema-version}) result (if (nil? body) (p/rejected (ex-info "db-sync invalid create-graph body" {:repo repo})) (fetch-json (str base "/graphs") {:method "POST" :headers {"content-type" "application/json"} :body (js/JSON.stringify (clj->js body))} {:response-schema :graphs/create})) graph-id (:graph-id result)] (if graph-id (p/do! (ldb/transact! repo [(sqlite-util/kv :logseq.kv/db-type "db") (sqlite-util/kv :logseq.kv/graph-uuid (uuid graph-id)) (sqlite-util/kv :logseq.kv/graph-rtc-e2ee? true)]) graph-id) (p/rejected (ex-info "db-sync missing graph id in create response" {:type :db-sync/invalid-graph :response result})))) (p/rejected (ex-info "db-sync missing graph info" {:type :db-sync/invalid-graph :base base}))))) (defn (if (and graph-uuid base) (let [download-url* (atom nil)] (-> (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token) graph (str config/db-version-prefix graph-name) pull-resp (fetch-json (str base "/sync/" graph-uuid "/pull") {:method "GET"} {:response-schema :sync/pull}) remote-tx (:t pull-resp) _ (when-not (integer? remote-tx) (throw (ex-info "non-integer remote-tx when downloading graph" {:graph graph-name :remote-tx remote-tx}))) download-resp (fetch-json (str base "/sync/" graph-uuid "/snapshot/download") {:method "GET"} {:response-schema :sync/snapshot-download}) download-url (:url download-resp) _ (reset! download-url* download-url) _ (when-not (string? download-url) (throw (ex-info "missing snapshot download url" {:graph graph-name :response download-resp}))) resp (js/fetch download-url (clj->js (with-auth-headers {:method "GET"}))) total-bytes (when-let [raw (some-> resp .-headers (.get "content-length"))] (let [parsed (js/parseInt raw 10)] (when-not (js/isNaN parsed) parsed))) _ (state/pub-event! [:rtc/log {:type :rtc.log/download :sub-type :download-progress :graph-uuid graph-uuid :message (str "Start downloading graph snapshot, file size: " total-bytes)}])] (when-not (.-ok resp) (throw (ex-info "snapshot download failed" {:graph graph-name :status (.-status resp)}))) (when-not (.-body resp) (throw (ex-info "snapshot download missing body" {:graph graph-name}))) (p/let [reader (.getReader (.-body resp))] (p/loop [buffer nil total 0 total-rows [] loaded 0] (p/let [chunk (.read reader)] (if (.-done chunk) (let [rows (finalize-framed-buffer buffer) total' (+ total (count rows)) total-rows' (into total-rows rows)] (state/pub-event! [:rtc/log {:type :rtc.log/download :sub-type :download-completed :graph-uuid graph-uuid :message "Graph snapshot downloaded"}]) (when (seq total-rows') (state/js (with-auth-headers {:method "DELETE"})))))))) (p/rejected (ex-info "db-sync missing graph info" {:type :db-sync/invalid-graph :graph-uuid graph-uuid :base base}))) (p/catch (fn [error] (throw error))) (p/finally (fn [] (state/set-state! :rtc/downloading-graph-uuid nil)))))) (defn (p/let [_ (state/set-state! :rtc/loading-graphs? true) _ (js/Promise. user-handler/task--ensure-id&access-token) resp (fetch-json (str base "/graphs") {:method "GET"} {:response-schema :graphs/list}) graphs (:graphs resp) result (mapv (fn [graph] (merge {:url (str config/db-version-prefix (:graph-name graph)) :GraphName (:graph-name graph) :GraphSchemaVersion (:schema-version graph) :GraphUUID (:graph-id graph) :rtc-graph? true :graph<->user-user-type (:role graph) :graph<->user-grant-by-user (:invited-by graph)} (dissoc graph :graph-id :graph-name :schema-version :role :invited-by))) graphs)] (state/set-state! :rtc/graphs result) (repo-handler/refresh-repos!) result) (p/finally (fn [] (state/set-state! :rtc/loading-graphs? false))))))) (defn (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token) body (coerce-http-request :graph-members/create {:email email :role "member"}) _ (when (nil? body) (throw (ex-info "db-sync invalid invite body" {:graph-uuid graph-uuid :email email}))) _ (fetch-json (str base "/graphs/" graph-uuid "/members") {:method "POST" :headers {"content-type" "application/json"} :body (js/JSON.stringify (clj->js body))} {:response-schema :graph-members/create}) repo (state/get-current-repo) e2ee? (ldb/get-graph-rtc-e2ee? (db/get-db)) _ (when (and repo e2ee?) (state/ graph-uuid str) member-id (some-> member-id str)] (if (and base (string? graph-uuid) (string? member-id)) (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)] (fetch-json (str base "/graphs/" graph-uuid "/members/" member-id) {:method "DELETE"} {:response-schema :graph-members/delete})) (p/rejected (ex-info "db-sync missing member info" {:type :db-sync/invalid-member :graph-uuid graph-uuid :member-id member-id :base base}))))) (defn