Browse Source

fix: upload e2ee graph

Tienson Qin 1 month ago
parent
commit
046da8f6ad
2 changed files with 162 additions and 37 deletions
  1. 129 37
      src/main/frontend/worker/db_sync.cljs
  2. 33 0
      src/test/frontend/worker/db_sync_test.cljs

+ 129 - 37
src/main/frontend/worker/db_sync.cljs

@@ -1,9 +1,11 @@
 (ns frontend.worker.db-sync
   "Simple db-sync client based on promesa + WebSocket."
   (:require ["/frontend/idbkv" :as idb-keyval]
+            [cljs-bean.core :as bean]
             [clojure.set :as set]
             [clojure.string :as string]
             [datascript.core :as d]
+            [datascript.storage :refer [IStorage]]
             [frontend.common.crypt :as crypt]
             [frontend.worker-common.util :as worker-util]
             [frontend.worker.handler.page :as worker-page]
@@ -19,6 +21,7 @@
             [logseq.db-sync.malli-schema :as db-sync-schema]
             [logseq.db-sync.order :as sync-order]
             [logseq.db.common.normalize :as db-normalize]
+            [logseq.db.common.sqlite :as common-sqlite]
             [logseq.db.sqlite.util :as sqlite-util]
             [logseq.outliner.core :as outliner-core]
             [logseq.outliner.transaction :as outliner-tx]
@@ -639,6 +642,82 @@
               (fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))]
     (<decrypt-snapshot-rows aes-key rows)))
 
+(defn- <encrypt-datoms
+  [aes-key datoms]
+  (p/all
+   (mapv (fn [d]
+           (if (contains? rtc-const/encrypt-attr-set (:a d))
+             (p/let [v' (<encrypt-text-value aes-key (:v d))]
+               (assoc d :v v'))
+             d))
+         datoms)))
+
+(defn- upsert-addr-content!
+  [^js db data]
+  (.transaction
+   db
+   (fn [tx]
+     (doseq [item data]
+       (.exec tx #js {:sql (str "INSERT INTO kvs (addr, content, addresses) "
+                                "values ($addr, $content, $addresses) "
+                                "on conflict(addr) do update set content = $content, addresses = $addresses")
+                      :bind item})))))
+
+(defn- restore-data-from-addr
+  [^js db addr]
+  (when-let [result (-> (.exec db #js {:sql "select content, addresses from kvs where addr = ?"
+                                       :bind #js [addr]
+                                       :rowMode "array"})
+                        first)]
+    (let [[content addresses] (bean/->clj result)
+          addresses (when addresses (js/JSON.parse addresses))
+          data (sqlite-util/transit-read content)]
+      (if (and addresses (map? data))
+        (assoc data :addresses addresses)
+        data))))
+
+(defn- new-temp-sqlite-storage
+  [^js db]
+  (reify IStorage
+    (-store [_ addr+data-seq _delete-addrs]
+      (let [data (map
+                  (fn [[addr data]]
+                    (let [data' (if (map? data) (dissoc data :addresses) data)
+                          addresses (when (map? data)
+                                      (when-let [addresses (:addresses data)]
+                                        (js/JSON.stringify (bean/->js addresses))))]
+                      #js {:$addr addr
+                           :$content (sqlite-util/transit-write data')
+                           :$addresses addresses}))
+                  addr+data-seq)]
+        (upsert-addr-content! db data)))
+    (-restore [_ addr]
+      (restore-data-from-addr db addr))))
+
+(defn- create-temp-sqlite-db
+  []
+  (if-let [sqlite @worker-state/*sqlite]
+    (let [^js DB (.-DB ^js (.-oo1 sqlite))
+          db (new DB ":memory:" "c")]
+      (common-sqlite/create-kvs-table! db)
+      db)
+    (fail-fast :db-sync/missing-field {:field :sqlite})))
+
+(defn- <create-temp-sqlite-conn
+  [schema datoms]
+  (p/let [db (create-temp-sqlite-db)
+          storage (new-temp-sqlite-storage db)
+          conn (d/conn-from-datoms datoms schema {:storage storage})]
+    {:db db
+     :conn conn}))
+
+(defn- cleanup-temp-sqlite!
+  [{:keys [db conn]}]
+  (when conn
+    (reset! conn nil))
+  (when db
+    (.close db)))
+
 (defn- require-asset-field
   [repo field value context]
   (when (or (nil? value) (and (string? value) (string/blank? value)))
@@ -1385,40 +1464,53 @@
 
 (defn upload-graph!
   [repo]
-  (let [base (http-base-url)
-        graph-id (get-graph-id repo)]
-    (if (and (seq base) (seq graph-id))
-      (if-let [db (worker-state/get-sqlite-conn repo :db)]
-        (p/let [aes-key (<ensure-graph-aes-key repo graph-id)
-                _ (when (and (graph-e2ee? repo) (nil? aes-key))
-                    (fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))]
-          (set-graph-e2ee-enabled! repo)
-          (ensure-client-graph-uuid! repo graph-id)
-          (p/loop [last-addr -1
-                   first-batch? true]
-            (let [rows (fetch-kvs-rows db last-addr upload-kvs-batch-size)]
-              (if (empty? rows)
-                (do
-                  (client-op/remove-local-tx repo)
-                  (client-op/update-local-tx repo 0)
-                  (client-op/add-all-exists-asset-as-ops repo)
-                  {:graph-id graph-id})
-                (let [max-addr (apply max (map first rows))
-                      rows (normalize-snapshot-rows rows)
-                      upload-url (str base "/sync/" graph-id "/snapshot/upload?reset=" (if first-batch? "true" "false"))]
-                  (p/let [rows* (if aes-key
-                                  (<encrypt-snapshot-rows aes-key rows)
-                                  (p/resolved rows))
-                          {:keys [body encoding]} (<snapshot-upload-body rows*)
-                          headers (cond-> {"content-type" snapshot-content-type}
-                                    (string? encoding) (assoc "content-encoding" encoding))
-                          _ (fetch-json upload-url
-                                        {:method "POST"
-                                         :headers headers
-                                         :body body}
-                                        {:response-schema :sync/snapshot-upload})]
-                    (p/recur max-addr false)))))))
-        (p/rejected (ex-info "db-sync missing sqlite db"
-                             {:repo repo :graph-id graph-id})))
-      (p/rejected (ex-info "db-sync missing upload info"
-                           {:repo repo :base base :graph-id graph-id})))))
+  (->
+   (let [base (http-base-url)
+         graph-id (get-graph-id repo)]
+     (if (and (seq base) (seq graph-id))
+       (if-let [source-conn (worker-state/get-datascript-conn repo)]
+         (p/let [aes-key (<ensure-graph-aes-key repo graph-id)
+                 _ (when (and (graph-e2ee? repo) (nil? aes-key))
+                     (fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))]
+           (set-graph-e2ee-enabled! repo)
+           (ensure-client-graph-uuid! repo graph-id)
+           (p/let [datoms (d/datoms @source-conn :eavt)
+                   _ (prn :debug :datoms-count (count datoms) :time (js/Date.))
+                   encrypted-datoms (<encrypt-datoms aes-key datoms)
+                   _ (prn :debug :encrypted-datoms-count (count encrypted-datoms)
+                          :time (js/Date.))
+                   {:keys [db] :as temp} (<create-temp-sqlite-conn (d/schema @source-conn) encrypted-datoms)]
+             (prn :debug :created-temp-conn :time (js/Date.))
+             (->
+              (p/loop [last-addr -1
+                       first-batch? true]
+                (let [rows (fetch-kvs-rows db last-addr upload-kvs-batch-size)]
+                  (prn :debug :rows-count (count rows))
+                  (if (empty? rows)
+                    (do
+                      (client-op/remove-local-tx repo)
+                      (client-op/update-local-tx repo 0)
+                      (client-op/add-all-exists-asset-as-ops repo)
+                      {:graph-id graph-id})
+                    (let [max-addr (apply max (map first rows))
+                          rows (normalize-snapshot-rows rows)
+                          upload-url (str base "/sync/" graph-id "/snapshot/upload?reset=" (if first-batch? "true" "false"))]
+                      (p/let [{:keys [body encoding]} (<snapshot-upload-body rows)
+                              headers (cond-> {"content-type" snapshot-content-type}
+                                        (string? encoding) (assoc "content-encoding" encoding))
+                              _ (fetch-json upload-url
+                                            {:method "POST"
+                                             :headers headers
+                                             :body body}
+                                            {:response-schema :sync/snapshot-upload})]
+                        (p/recur max-addr false))))))
+              (p/finally
+                (fn []
+                  (prn :debug :cleanup-temp-db :time (js/Date.))
+                  (cleanup-temp-sqlite! temp))))))
+         (p/rejected (ex-info "db-sync missing datascript conn"
+                              {:repo repo :graph-id graph-id})))
+       (p/rejected (ex-info "db-sync missing upload info"
+                            {:repo repo :base base :graph-id graph-id}))))
+   (p/catch (fn [error]
+              (js/console.error error)))))

+ 33 - 0
src/test/frontend/worker/db_sync_test.cljs

@@ -103,6 +103,39 @@
                         (is false (str e))
                         (done))))))
 
+(deftest encrypt-datoms-test
+  (async done
+         (let [conn (db-test/create-conn-with-blocks
+                     {:pages-and-blocks
+                      [{:page {:block/title "page 1"}
+                        :blocks [{:block/title "parent"}]}]})
+               datoms (vec (d/datoms @conn :eavt))
+               title-datom (first (filter (fn [datom] (= :block/title (:a datom))) datoms))
+               name-datom (first (filter (fn [datom] (= :block/name (:a datom))) datoms))
+               uuid-datom (first (filter (fn [datom] (= :block/uuid (:a datom))) datoms))]
+           (-> (p/let [aes-key (crypt/<generate-aes-key)
+                       tx-data (#'db-sync/<encrypt-datoms aes-key datoms)
+                       title-tx (first (filter (fn [item]
+                                                 (and (= (:e title-datom) (nth item 1))
+                                                      (= :block/title (nth item 2))))
+                                               tx-data))
+                       name-tx (first (filter (fn [item]
+                                                (and (= (:e name-datom) (nth item 1))
+                                                     (= :block/name (nth item 2))))
+                                              tx-data))
+                       uuid-tx (first (filter (fn [item]
+                                                (and (= (:e uuid-datom) (nth item 1))
+                                                     (= :block/uuid (nth item 2))))
+                                              tx-data))]
+                 (is (string? (nth title-tx 3)))
+                 (is (string? (nth name-tx 3)))
+                 (is (not= (:v title-datom) (nth title-tx 3)))
+                 (is (= (:v uuid-datom) (nth uuid-tx 3)))
+                 (done))
+               (p/catch (fn [e]
+                          (is false (str e))
+                          (done)))))))
+
 (deftest ensure-user-rsa-keys-test
   (async done
          (let [upload-called (atom nil)]